diff --git a/leveldb/db.go b/leveldb/db.go index b2724cd9..8311bbc8 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -304,7 +304,7 @@ func recoverTable(s *session, o *opt.Options) error { noSync = o.GetNoSync() rec = &sessionRecord{} - bpool = util.NewBufferPool(o.GetBlockSize() + 5) + bpool = util.NewBufferPool(o.GetBlockSize() * 2) ) buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) { tmpFd = s.newTemp() @@ -329,7 +329,7 @@ func recoverTable(s *session, o *opt.Options) error { }() // Copy entries. - tw := table.NewWriter(writer, o, nil, 0) + tw := table.NewWriter(writer, o, nil) for iter.Next() { key := iter.Key() if validInternalKey(key) { diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index cc275ace..1804e279 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -390,7 +390,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error { // Create new table. var err error - b.tw, err = b.s.tops.create(b.tableSize) + b.tw, err = b.s.tops.create() if err != nil { return err } diff --git a/leveldb/db_test.go b/leveldb/db_test.go index b25157a7..24610380 100644 --- a/leveldb/db_test.go +++ b/leveldb/db_test.go @@ -2611,7 +2611,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) { value = bytes.Repeat([]byte{'0'}, 100) ) for i := 0; i < 2; i++ { - tw, err := s.tops.create(0) + tw, err := s.tops.create() if err != nil { t.Fatal(err) } diff --git a/leveldb/table.go b/leveldb/table.go index d0fab40c..06dc8f0c 100644 --- a/leveldb/table.go +++ b/leveldb/table.go @@ -354,7 +354,7 @@ type tOps struct { } // Creates an empty table and returns table writer. -func (t *tOps) create(tSize int) (*tWriter, error) { +func (t *tOps) create() (*tWriter, error) { fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()} fw, err := t.s.stor.Create(fd) if err != nil { @@ -364,13 +364,13 @@ func (t *tOps) create(tSize int) (*tWriter, error) { t: t, fd: fd, w: fw, - tw: table.NewWriter(fw, t.s.o.Options, t.blockBuffer, tSize), + tw: table.NewWriter(fw, t.s.o.Options, t.blockBuffer), }, nil } // Builds table from src iterator. func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { - w, err := t.create(0) + w, err := t.create() if err != nil { return } @@ -515,7 +515,7 @@ func newTableOps(s *session) *tOps { blockCache = cache.NewCache(blockCacher) } if !s.o.GetDisableBufferPool() { - blockBuffer = util.NewBufferPool(s.o.GetBlockSize() + 5) + blockBuffer = util.NewBufferPool(s.o.GetBlockSize() * 2) } return &tOps{ s: s, diff --git a/leveldb/table/table_test.go b/leveldb/table/table_test.go index d1d864dc..a93d1d3f 100644 --- a/leveldb/table/table_test.go +++ b/leveldb/table/table_test.go @@ -47,7 +47,7 @@ var _ = testutil.Defer(func() { ) // Building the table. - tw := NewWriter(buf, o, nil, 0) + tw := NewWriter(buf, o, nil) err := tw.Append([]byte("k01"), []byte("hello")) Expect(err).ShouldNot(HaveOccurred()) err = tw.Append([]byte("k02"), []byte("hello2")) @@ -98,7 +98,7 @@ var _ = testutil.Defer(func() { buf := &bytes.Buffer{} // Building the table. - tw := NewWriter(buf, o, nil, 0) + tw := NewWriter(buf, o, nil) kv.Iterate(func(i int, key, value []byte) { Expect(tw.Append(key, value)).ShouldNot(HaveOccurred()) }) diff --git a/leveldb/table/writer.go b/leveldb/table/writer.go index ea89d600..2ccb01af 100644 --- a/leveldb/table/writer.go +++ b/leveldb/table/writer.go @@ -389,15 +389,8 @@ func (w *Writer) Close() error { // NewWriter creates a new initialized table writer for the file. // // Table writer is not safe for concurrent use. -func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Writer { - var bufBytes []byte - if pool == nil { - bufBytes = make([]byte, size) - } else { - bufBytes = pool.Get(size) - } - bufBytes = bufBytes[:0] - +func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool) *Writer { + bufBytes := pool.Get(0) w := &Writer{ writer: f, cmp: o.GetComparer(), diff --git a/leveldb/util/buffer_pool.go b/leveldb/util/buffer_pool.go index 4f512f6d..bd896727 100644 --- a/leveldb/util/buffer_pool.go +++ b/leveldb/util/buffer_pool.go @@ -7,15 +7,18 @@ package util import ( + "encoding/binary" "fmt" + "reflect" "sync" "sync/atomic" + "unsafe" ) // BufferPool is a 'buffer pool'. type BufferPool struct { - pool [6]sync.Pool - baseline [5]int + pool byteSlicePool + baseline int get uint32 put uint32 @@ -25,15 +28,6 @@ type BufferPool struct { miss uint32 } -func (p *BufferPool) poolNum(n int) int { - for i, x := range p.baseline { - if n <= x { - return i - } - } - return len(p.baseline) -} - // Get returns buffer with length of n. func (p *BufferPool) Get(n int) []byte { if p == nil { @@ -41,45 +35,29 @@ func (p *BufferPool) Get(n int) []byte { } atomic.AddUint32(&p.get, 1) - poolNum := p.poolNum(n) - - b := p.pool[poolNum].Get().(*[]byte) - - if cap(*b) == 0 { + b := p.pool.Get() + if cap(b) == 0 { // If we grabbed nothing, increment the miss stats. atomic.AddUint32(&p.miss, 1) - if poolNum == len(p.baseline) { - *b = make([]byte, n) - return *b - } - - *b = make([]byte, p.baseline[poolNum]) - *b = (*b)[:n] - return *b } else { // If there is enough capacity in the bytes grabbed, resize the length // to n and return. - if n < cap(*b) { + if n < cap(b) { atomic.AddUint32(&p.less, 1) - *b = (*b)[:n] - return *b - } else if n == cap(*b) { + return b[:n] + } else if n == cap(b) { atomic.AddUint32(&p.equal, 1) - *b = (*b)[:n] - return *b - } else if n > cap(*b) { + return b[:n] + } else if n > cap(b) { atomic.AddUint32(&p.greater, 1) + p.pool.Put(b) } } - - if poolNum == len(p.baseline) { - *b = make([]byte, n) - return *b + if n >= p.baseline { + return make([]byte, n) } - *b = make([]byte, p.baseline[poolNum]) - *b = (*b)[:n] - return *b + return make([]byte, n, p.baseline) } // Put adds given buffer to the pool. @@ -88,48 +66,48 @@ func (p *BufferPool) Put(b []byte) { return } - poolNum := p.poolNum(cap(b)) - atomic.AddUint32(&p.put, 1) - p.pool[poolNum].Put(&b) + p.pool.Put(b) } func (p *BufferPool) String() string { if p == nil { return "" } - return fmt.Sprintf("BufferPool{B·%d G·%d P·%d <·%d =·%d >·%d M·%d}", - p.baseline, p.get, p.put, p.less, p.equal, p.greater, p.miss) + return fmt.Sprintf("BufferPool{G·%d P·%d <·%d =·%d >·%d M·%d}", + p.get, p.put, p.less, p.equal, p.greater, p.miss) } // NewBufferPool creates a new initialized 'buffer pool'. func NewBufferPool(baseline int) *BufferPool { - if baseline <= 0 { - panic("baseline can't be <= 0") - } - bufPool := &BufferPool{ - baseline: [...]int{baseline / 4, baseline / 2, baseline, baseline * 2, baseline * 4}, - pool: [6]sync.Pool{ - { - New: func() interface{} { return new([]byte) }, - }, - { - New: func() interface{} { return new([]byte) }, - }, - { - New: func() interface{} { return new([]byte) }, - }, - { - New: func() interface{} { return new([]byte) }, - }, - { - New: func() interface{} { return new([]byte) }, - }, - { - New: func() interface{} { return new([]byte) }, - }, - }, + return &BufferPool{baseline: baseline} +} + +// byteSlicePool pools byte-slices avoid extra allocations. +type byteSlicePool struct { + pool sync.Pool +} + +func (p *byteSlicePool) Get() (s []byte) { + if ptr, ok := p.pool.Get().(unsafe.Pointer); ok { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&s)) + // temporarily set len(s) to 8 + sh.Data = uintptr(ptr) + sh.Cap = 8 + sh.Len = 8 + // extrat and set actual cap + sh.Cap = int(binary.BigEndian.Uint64(s)) + sh.Len = 0 } + return +} - return bufPool +func (p *byteSlicePool) Put(b []byte) { + if cap(b) >= 8 { + // save the slice cap in first 8 bytes + b = b[:8] + binary.BigEndian.PutUint64(b, uint64(cap(b))) + // pools the pointer of the first byte + p.pool.Put(unsafe.Pointer(&b[0])) + } } diff --git a/leveldb/util/buffer_pool_test.go b/leveldb/util/buffer_pool_test.go new file mode 100644 index 00000000..a7375fd5 --- /dev/null +++ b/leveldb/util/buffer_pool_test.go @@ -0,0 +1,19 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package util + +import "testing" + +func BenchmarkBufferPool(b *testing.B) { + const n = 100 + pool := NewBufferPool(n) + + for i := 0; i < b.N; i++ { + buf := pool.Get(n) + pool.Put(buf) + } +} diff --git a/manualtest/dbstress/main.go b/manualtest/dbstress/main.go index 3e2c8d77..33c6589c 100644 --- a/manualtest/dbstress/main.go +++ b/manualtest/dbstress/main.go @@ -309,7 +309,7 @@ func main() { flag.Parse() if enableBufferPool { - bpool = util.NewBufferPool(opt.DefaultBlockSize + 128) + bpool = util.NewBufferPool(opt.DefaultBlockSize * 2) } log.Printf("Test DB stored at %q", dbPath)