summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@nym.se>2014-09-02 09:43:42 +0200
committerJakob Borg <jakob@nym.se>2014-09-02 09:44:07 +0200
commitf633bdddf0c5ffbfe954b2d409fd121051c80eb7 (patch)
treea930a9f9f4fe169e526e445b30e049edf8ef02c1
parentde0b91d1570ae5a6a3d1d26f725e07a6b82d59cf (diff)
Update goleveldb
-rw-r--r--Godeps/Godeps.json2
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go11
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go38
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go23
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go6
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go11
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go9
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go70
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go10
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go14
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go17
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go26
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go2
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go217
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go4
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go (renamed from Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go)103
16 files changed, 384 insertions, 179 deletions
diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json
index 065c61690c..f8368e416b 100644
--- a/Godeps/Godeps.json
+++ b/Godeps/Godeps.json
@@ -49,7 +49,7 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
- "Rev": "59d87758aeaab5ab6ed289c773349500228a1557"
+ "Rev": "2b99e8d4757bf06eeab1b0485d80b8ae1c088874"
},
{
"ImportPath": "github.com/vitrun/qart/coding",
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
index fe398f03ae..49f82f0fbc 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go
@@ -40,10 +40,21 @@ type Cache interface {
// Size returns entire alive cache objects size.
Size() int
+ // NumObjects returns number of alive objects.
+ NumObjects() int
+
// GetNamespace gets cache namespace with the given id.
// GetNamespace is never return nil.
GetNamespace(id uint64) Namespace
+ // PurgeNamespace purges cache namespace with the given id from this cache tree.
+ // Also read Namespace.Purge.
+ PurgeNamespace(id uint64, fin PurgeFin)
+
+ // ZapNamespace detaches cache namespace with the given id from this cache tree.
+ // Also read Namespace.Zap.
+ ZapNamespace(id uint64)
+
// Purge purges all cache namespace from this cache tree.
// This is behave the same as calling Namespace.Purge method on all cache namespace.
Purge(fin PurgeFin)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go
index a1504b159f..a29b5d0882 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go
@@ -15,11 +15,11 @@ import (
// lruCache represent a LRU cache state.
type lruCache struct {
- mu sync.Mutex
- recent lruNode
- table map[uint64]*lruNs
- capacity int
- used, size int
+ mu sync.Mutex
+ recent lruNode
+ table map[uint64]*lruNs
+ capacity int
+ used, size, alive int
}
// NewLRUCache creates a new initialized LRU cache with the given capacity.
@@ -51,6 +51,12 @@ func (c *lruCache) Size() int {
return c.size
}
+func (c *lruCache) NumObjects() int {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return c.alive
+}
+
// SetCapacity set cache capacity.
func (c *lruCache) SetCapacity(capacity int) {
c.mu.Lock()
@@ -77,6 +83,23 @@ func (c *lruCache) GetNamespace(id uint64) Namespace {
return ns
}
+func (c *lruCache) ZapNamespace(id uint64) {
+ c.mu.Lock()
+ if ns, exist := c.table[id]; exist {
+ ns.zapNB()
+ delete(c.table, id)
+ }
+ c.mu.Unlock()
+}
+
+func (c *lruCache) PurgeNamespace(id uint64, fin PurgeFin) {
+ c.mu.Lock()
+ if ns, exist := c.table[id]; exist {
+ ns.purgeNB(fin)
+ }
+ c.mu.Unlock()
+}
+
// Purge purge entire cache.
func (c *lruCache) Purge(fin PurgeFin) {
c.mu.Lock()
@@ -158,11 +181,12 @@ func (ns *lruNs) Get(key uint64, setf SetFunc) Handle {
}
ns.table[key] = node
+ ns.lru.size += charge
+ ns.lru.alive++
if charge > 0 {
node.ref++
node.rInsert(&ns.lru.recent)
ns.lru.used += charge
- ns.lru.size += charge
ns.lru.evict()
}
}
@@ -322,8 +346,10 @@ func (n *lruNode) derefNB() {
// Remove elemement.
delete(n.ns.table, n.key)
n.ns.lru.size -= n.charge
+ n.ns.lru.alive--
n.fin()
}
+ n.value = nil
} else if n.ref < 0 {
panic("leveldb/cache: lruCache: negative node reference")
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
index 59b9017d3d..979d0ac4ad 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
@@ -14,6 +14,7 @@ import (
"runtime"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
@@ -35,7 +36,7 @@ type DB struct {
// MemDB.
memMu sync.RWMutex
- memPool *util.Pool
+ memPool chan *memdb.DB
mem, frozenMem *memDB
journal *journal.Writer
journalWriter storage.Writer
@@ -47,6 +48,9 @@ type DB struct {
snapsMu sync.Mutex
snapsRoot snapshotElement
+ // Stats.
+ aliveSnaps, aliveIters int32
+
// Write.
writeC chan *Batch
writeMergedC chan bool
@@ -80,7 +84,7 @@ func openDB(s *session) (*DB, error) {
// Initial sequence
seq: s.stSeq,
// MemDB
- memPool: util.NewPool(1),
+ memPool: make(chan *memdb.DB, 1),
// Write
writeC: make(chan *Batch),
writeMergedC: make(chan bool),
@@ -122,6 +126,7 @@ func openDB(s *session) (*DB, error) {
go db.tCompaction()
go db.mCompaction()
go db.jWriter()
+ go db.mpoolDrain()
s.logf("db@open done T·%v", time.Since(start))
@@ -568,7 +573,7 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
}
defer m.decref()
- mk, mv, me := m.db.Find(ikey)
+ mk, mv, me := m.mdb.Find(ikey)
if me == nil {
ukey, _, t, ok := parseIkey(mk)
if ok && db.s.icmp.uCompare(ukey, key) == 0 {
@@ -657,6 +662,14 @@ func (db *DB) GetSnapshot() (*Snapshot, error) {
// Returns sstables list for each level.
// leveldb.blockpool
// Returns block pool stats.
+// leveldb.cachedblock
+// Returns size of cached block.
+// leveldb.openedtables
+// Returns number of opened tables.
+// leveldb.alivesnaps
+// Returns number of alive snapshots.
+// leveldb.aliveiters
+// Returns number of alive iterators.
func (db *DB) GetProperty(name string) (value string, err error) {
err = db.ok()
if err != nil {
@@ -712,6 +725,10 @@ func (db *DB) GetProperty(name string) (value string, err error) {
}
case p == "openedtables":
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
+ case p == "alivesnaps":
+ value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
+ case p == "aliveiters":
+ value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
default:
err = errors.New("leveldb: GetProperty: unknown property: " + name)
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
index e5e74d7e1f..b38cdedc57 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go
@@ -221,10 +221,10 @@ func (db *DB) memCompaction() {
c := newCMem(db.s)
stats := new(cStatsStaging)
- db.logf("mem@flush N·%d S·%s", mem.db.Len(), shortenb(mem.db.Size()))
+ db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
// Don't compact empty memdb.
- if mem.db.Len() == 0 {
+ if mem.mdb.Len() == 0 {
db.logf("mem@flush skipping")
// drop frozen mem
db.dropFrozenMem()
@@ -242,7 +242,7 @@ func (db *DB) memCompaction() {
db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
- return c.flush(mem.db, -1)
+ return c.flush(mem.mdb, -1)
}, func() error {
for _, r := range c.rec.addedTables {
db.logf("mem@flush rollback @%d", r.num)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
index d028768d50..c34c7abae1 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go
@@ -10,6 +10,7 @@ import (
"errors"
"runtime"
"sync"
+ "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -38,11 +39,11 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
ti := v.getIterators(slice, ro)
n := len(ti) + 2
i := make([]iterator.Iterator, 0, n)
- emi := em.db.NewIterator(slice)
+ emi := em.mdb.NewIterator(slice)
emi.SetReleaser(&memdbReleaser{m: em})
i = append(i, emi)
if fm != nil {
- fmi := fm.db.NewIterator(slice)
+ fmi := fm.mdb.NewIterator(slice)
fmi.SetReleaser(&memdbReleaser{m: fm})
i = append(i, fmi)
}
@@ -66,6 +67,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
}
rawIter := db.newRawIterator(islice, ro)
iter := &dbIter{
+ db: db,
icmp: db.s.icmp,
iter: rawIter,
seq: seq,
@@ -73,6 +75,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
key: make([]byte, 0),
value: make([]byte, 0),
}
+ atomic.AddInt32(&db.aliveIters, 1)
runtime.SetFinalizer(iter, (*dbIter).Release)
return iter
}
@@ -89,6 +92,7 @@ const (
// dbIter represent an interator states over a database session.
type dbIter struct {
+ db *DB
icmp *iComparer
iter iterator.Iterator
seq uint64
@@ -303,6 +307,7 @@ func (i *dbIter) Release() {
if i.releaser != nil {
i.releaser.Release()
+ i.releaser = nil
}
i.dir = dirReleased
@@ -310,6 +315,8 @@ func (i *dbIter) Release() {
i.value = nil
i.iter.Release()
i.iter = nil
+ atomic.AddInt32(&i.db.aliveIters, -1)
+ i.db = nil
}
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
index 31340bdd00..fb1ce85b9e 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
@@ -9,6 +9,7 @@ package leveldb
import (
"runtime"
"sync"
+ "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -81,7 +82,7 @@ func (db *DB) minSeq() uint64 {
type Snapshot struct {
db *DB
elem *snapshotElement
- mu sync.Mutex
+ mu sync.RWMutex
released bool
}
@@ -91,6 +92,7 @@ func (db *DB) newSnapshot() *Snapshot {
db: db,
elem: db.acquireSnapshot(),
}
+ atomic.AddInt32(&db.aliveSnaps, 1)
runtime.SetFinalizer(snap, (*Snapshot).Release)
return snap
}
@@ -105,8 +107,8 @@ func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err er
if err != nil {
return
}
- snap.mu.Lock()
- defer snap.mu.Unlock()
+ snap.mu.RLock()
+ defer snap.mu.RUnlock()
if snap.released {
err = ErrSnapshotReleased
return
@@ -160,6 +162,7 @@ func (snap *Snapshot) Release() {
snap.released = true
snap.db.releaseSnapshot(snap.elem)
+ atomic.AddInt32(&snap.db.aliveSnaps, -1)
snap.db = nil
snap.elem = nil
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
index 807cc6c7ac..24ecab504d 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go
@@ -8,16 +8,16 @@ package leveldb
import (
"sync/atomic"
+ "time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/memdb"
- "github.com/syndtr/goleveldb/leveldb/util"
)
type memDB struct {
- pool *util.Pool
- db *memdb.DB
- ref int32
+ db *DB
+ mdb *memdb.DB
+ ref int32
}
func (m *memDB) incref() {
@@ -26,7 +26,13 @@ func (m *memDB) incref() {
func (m *memDB) decref() {
if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
- m.pool.Put(m)
+ // Only put back memdb with std capacity.
+ if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() {
+ m.mdb.Reset()
+ m.db.mpoolPut(m.mdb)
+ }
+ m.db = nil
+ m.mdb = nil
} else if ref < 0 {
panic("negative memdb ref")
}
@@ -42,6 +48,41 @@ func (db *DB) addSeq(delta uint64) {
atomic.AddUint64(&db.seq, delta)
}
+func (db *DB) mpoolPut(mem *memdb.DB) {
+ defer func() {
+ recover()
+ }()
+ select {
+ case db.memPool <- mem:
+ default:
+ }
+}
+
+func (db *DB) mpoolGet() *memdb.DB {
+ select {
+ case mem := <-db.memPool:
+ return mem
+ default:
+ return nil
+ }
+}
+
+func (db *DB) mpoolDrain() {
+ ticker := time.NewTicker(30 * time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ select {
+ case <-db.memPool:
+ default:
+ }
+ case _, _ = <-db.closeC:
+ close(db.memPool)
+ return
+ }
+ }
+}
+
// Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer.
func (db *DB) newMem(n int) (mem *memDB, err error) {
@@ -70,18 +111,15 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
db.journalWriter = w
db.journalFile = file
db.frozenMem = db.mem
- mem, ok := db.memPool.Get().(*memDB)
- if ok && mem.db.Capacity() >= n {
- mem.db.Reset()
- mem.incref()
- } else {
- mem = &memDB{
- pool: db.memPool,
- db: memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)),
- ref: 1,
- }
+ mdb := db.mpoolGet()
+ if mdb == nil || mdb.Capacity() < n {
+ mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
+ }
+ mem = &memDB{
+ db: db,
+ mdb: mdb,
+ ref: 2,
}
- mem.incref()
db.mem = mem
// The seq only incremented by the writer. And whoever called newMem
// should hold write lock, so no need additional synchronization here.
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go
index 7f15b4b656..5aadc12d79 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go
@@ -1577,7 +1577,11 @@ func TestDb_BloomFilter(t *testing.T) {
return fmt.Sprintf("key%06d", i)
}
- n := 10000
+ const (
+ n = 10000
+ indexOverheat = 19898
+ filterOverheat = 19799
+ )
// Populate multiple layers
for i := 0; i < n; i++ {
@@ -1601,7 +1605,7 @@ func TestDb_BloomFilter(t *testing.T) {
cnt := int(h.stor.ReadCounter())
t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
- if min, max := n, n+2*n/100; cnt < min || cnt > max {
+ if min, max := n+indexOverheat+filterOverheat, n+indexOverheat+filterOverheat+2*n/100; cnt < min || cnt > max {
t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
}
@@ -1612,7 +1616,7 @@ func TestDb_BloomFilter(t *testing.T) {
}
cnt = int(h.stor.ReadCounter())
t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
- if max := 3 * n / 100; cnt > max {
+ if max := 3*n/100 + indexOverheat + filterOverheat; cnt > max {
t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
index 7551085904..82725a9eea 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go
@@ -75,7 +75,7 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
mem = nil
}
}()
- nn = mem.db.Free()
+ nn = mem.mdb.Free()
switch {
case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed:
delayed = true
@@ -90,13 +90,13 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
}
default:
// Allow memdb to grow if it has no entry.
- if mem.db.Len() == 0 {
+ if mem.mdb.Len() == 0 {
nn = n
} else {
mem.decref()
mem, err = db.rotateMem(n)
if err == nil {
- nn = mem.db.Free()
+ nn = mem.mdb.Free()
} else {
nn = 0
}
@@ -190,7 +190,7 @@ drain:
return
case db.journalC <- b:
// Write into memdb
- b.memReplay(mem.db)
+ b.memReplay(mem.mdb)
}
// Wait for journal writer
select {
@@ -200,7 +200,7 @@ drain:
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
- b.revertMemReplay(mem.db)
+ b.revertMemReplay(mem.mdb)
return
}
}
@@ -209,7 +209,7 @@ drain:
if err != nil {
return
}
- b.memReplay(mem.db)
+ b.memReplay(mem.mdb)
}
// Set last seq number.
@@ -271,7 +271,7 @@ func (db *DB) CompactRange(r util.Range) error {
// Check for overlaps in memdb.
mem := db.getEffectiveMem()
defer mem.decref()
- if isMemOverlaps(db.s.icmp, mem.db, r.Start, r.Limit) {
+ if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) {
// Memdb compaction.
if _, err := db.rotateMem(0); err != nil {
<-db.writeLockC
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
index 2411844817..21e7a8a7fc 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go
@@ -30,13 +30,16 @@ const (
type noCache struct{}
-func (noCache) SetCapacity(capacity int) {}
-func (noCache) Capacity() int { return 0 }
-func (noCache) Used() int { return 0 }
-func (noCache) Size() int { return 0 }
-func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
-func (noCache) Purge(fin cache.PurgeFin) {}
-func (noCache) Zap() {}
+func (noCache) SetCapacity(capacity int) {}
+func (noCache) Capacity() int { return 0 }
+func (noCache) Used() int { return 0 }
+func (noCache) Size() int { return 0 }
+func (noCache) NumObjects() int { return 0 }
+func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
+func (noCache) PurgeNamespace(id uint64, fin cache.PurgeFin) {}
+func (noCache) ZapNamespace(id uint64) {}
+func (noCache) Purge(fin cache.PurgeFin) {}
+func (noCache) Zap() {}
var NoCache cache.Cache = noCache{}
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
index 1c3ff32494..a1b04d827f 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go
@@ -7,7 +7,6 @@
package leveldb
import (
- "io"
"sort"
"sync/atomic"
@@ -323,15 +322,6 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
return
}
-type tableWrapper struct {
- *table.Reader
- closer io.Closer
-}
-
-func (tr tableWrapper) Release() {
- tr.closer.Close()
-}
-
// Opens table. It returns a cache handle, which should
// be released after use.
func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
@@ -347,7 +337,7 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
if bc := t.s.o.GetBlockCache(); bc != nil {
bcacheNS = bc.GetNamespace(num)
}
- return 1, tableWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), r}
+ return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o)
})
if ch == nil && err == nil {
err = ErrClosed
@@ -363,7 +353,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b
return nil, nil, err
}
defer ch.Release()
- return ch.Value().(tableWrapper).Find(key, ro)
+ return ch.Value().(*table.Reader).Find(key, ro)
}
// Returns approximate offset of the given key.
@@ -372,10 +362,9 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
if err != nil {
return
}
- _offset, err := ch.Value().(tableWrapper).OffsetOf(key)
- offset = uint64(_offset)
- ch.Release()
- return
+ defer ch.Release()
+ offset_, err := ch.Value().(*table.Reader).OffsetOf(key)
+ return uint64(offset_), err
}
// Creates an iterator from the given table.
@@ -384,7 +373,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
if err != nil {
return iterator.NewEmptyIterator(err)
}
- iter := ch.Value().(tableWrapper).NewIterator(slice, ro)
+ iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
iter.SetReleaser(ch)
return iter
}
@@ -401,7 +390,7 @@ func (t *tOps) remove(f *tFile) {
t.s.logf("table@remove removed @%d", num)
}
if bc := t.s.o.GetBlockCache(); bc != nil {
- bc.GetNamespace(num).Zap()
+ bc.ZapNamespace(num)
}
}
})
@@ -411,6 +400,7 @@ func (t *tOps) remove(f *tFile) {
// regadless still used or not.
func (t *tOps) close() {
t.cache.Zap()
+ t.bpool.Close()
}
// Creates new initialized table ops instance.
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go
index ca598f4f51..9032751b96 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go
@@ -40,7 +40,7 @@ var _ = testutil.Defer(func() {
data := bw.buf.Bytes()
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
return &block{
- cmp: comparer.DefaultComparer,
+ tr: &Reader{cmp: comparer.DefaultComparer},
data: data,
restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4,
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
index f397ac4f86..5ec2b3e539 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go
@@ -37,8 +37,7 @@ func max(x, y int) int {
}
type block struct {
- bpool *util.BufferPool
- cmp comparer.BasicComparer
+ tr *Reader
data []byte
restartsLen int
restartsOffset int
@@ -47,31 +46,25 @@ type block struct {
}
func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) {
- n := b.restartsOffset
- data := b.data
- cmp := b.cmp
-
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- offset := int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):]))
- offset += 1 // shared always zero, since this is a restart point
- v1, n1 := binary.Uvarint(data[offset:]) // key length
- _, n2 := binary.Uvarint(data[offset+n1:]) // value length
+ offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
+ offset += 1 // shared always zero, since this is a restart point
+ v1, n1 := binary.Uvarint(b.data[offset:]) // key length
+ _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2
- return cmp.Compare(data[m:m+int(v1)], key) > 0
+ return b.tr.cmp.Compare(b.data[m:m+int(v1)], key) > 0
}) + rstart - 1
if index < rstart {
// The smallest key is greater-than key sought.
index = rstart
}
- offset = int(binary.LittleEndian.Uint32(data[n+4*index:]))
+ offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
return
}
func (b *block) restartIndex(rstart, rlimit, offset int) int {
- n := b.restartsOffset
- data := b.data
return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- return int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) > offset
+ return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
}) + rstart - 1
}
@@ -141,10 +134,10 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas
}
func (b *block) Release() {
- if b.bpool != nil {
- b.bpool.Put(b.data)
- b.bpool = nil
+ if b.tr.bpool != nil {
+ b.tr.bpool.Put(b.data)
}
+ b.tr = nil
b.data = nil
}
@@ -270,7 +263,7 @@ func (i *blockIter) Seek(key []byte) bool {
i.dir = dirForward
}
for i.Next() {
- if i.block.cmp.Compare(i.key, key) >= 0 {
+ if i.block.tr.cmp.Compare(i.key, key) >= 0 {
return true
}
}
@@ -479,7 +472,7 @@ func (i *blockIter) Error() error {
}
type filterBlock struct {
- filter filter.Filter
+ tr *Reader
data []byte
oOffset int
baseLg uint
@@ -493,7 +486,7 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
n := int(binary.LittleEndian.Uint32(o))
m := int(binary.LittleEndian.Uint32(o[4:]))
if n < m && m <= b.oOffset {
- return b.filter.Contains(b.data[n:m], key)
+ return b.tr.filter.Contains(b.data[n:m], key)
} else if n == m {
return false
}
@@ -501,10 +494,17 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
return true
}
+func (b *filterBlock) Release() {
+ if b.tr.bpool != nil {
+ b.tr.bpool.Put(b.data)
+ }
+ b.tr = nil
+ b.data = nil
+}
+
type indexIter struct {
- blockIter
- tableReader *Reader