summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2021-06-07 10:52:06 +0200
committerGitHub <noreply@github.com>2021-06-07 10:52:06 +0200
commitce65aea0abf0b79f54b351f637ad6a15d1350ad4 (patch)
tree781261c727327497edb12ebec8b1cae7547d96fe
parent45edad867c5a05745f9e5004946db1e3a88d7e6e (diff)
lib/db: Use a more concurrent GC (fixes #7722) (#7750)
This changes the GC mechanism so that the first pass (which reads all FileInfos to populate bloom filters with block & version hashes) can happen concurrently with normal database operations. The big gcMut still exists, and we grab it temporarily to block all other modifications while we set up the bloom filters. We then release the lock and let other things happen, with those other things also updating the bloom filters as required. Once the first phase is done we again grab the gcMut, knowing that we are the sole modifier of the database, and do the cleanup. I also removed the final compaction step.
-rw-r--r--lib/db/lowlevel.go107
-rw-r--r--lib/db/transactions.go8
2 files changed, 78 insertions, 37 deletions
diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go
index 51608dc49f..21f77fd0e0 100644
--- a/lib/db/lowlevel.go
+++ b/lib/db/lowlevel.go
@@ -70,6 +70,9 @@ type Lowlevel struct {
recheckInterval time.Duration
oneFileSetCreated chan struct{}
evLogger events.Logger
+
+ blockFilter *bloomFilter
+ versionFilter *bloomFilter
}
func NewLowlevel(backend backend.Backend, evLogger events.Logger, opts ...Option) (*Lowlevel, error) {
@@ -686,28 +689,30 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
// Indirection GC needs to run when there are no modifications to the
// FileInfos or indirected items.
- db.gcMut.Lock()
- defer db.gcMut.Unlock()
+ l.Debugln("Starting database GC")
- l.Debugln("Started database GC")
+ // Create a new set of bloom filters, while holding the gcMut which
+ // guarantees that no other modifications are happening concurrently.
- var discardedBlocks, matchedBlocks, discardedVersions, matchedVersions int
+ db.gcMut.Lock()
+ capacity := indirectGCBloomCapacity
+ if db.gcKeyCount > capacity {
+ capacity = db.gcKeyCount
+ }
+ db.blockFilter = newBloomFilter(capacity)
+ db.versionFilter = newBloomFilter(capacity)
+ db.gcMut.Unlock()
- // Only print something if the process takes more than "a moment".
- logWait := make(chan struct{})
- logTimer := time.AfterFunc(10*time.Second, func() {
- l.Infoln("Database GC started - many Syncthing operations will be unresponsive until it's finished")
- close(logWait)
- })
defer func() {
- if logTimer.Stop() || err != nil {
- return
- }
- <-logWait // Make sure messages are sent in order.
- l.Infof("Database GC done (discarded/remaining: %v/%v blocks, %v/%v versions)",
- discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
+ // Forget the bloom filters on the way out.
+ db.gcMut.Lock()
+ db.blockFilter = nil
+ db.versionFilter = nil
+ db.gcMut.Unlock()
}()
+ var discardedBlocks, matchedBlocks, discardedVersions, matchedVersions int
+
t, err := db.newReadWriteTransaction()
if err != nil {
return err
@@ -719,16 +724,13 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
// items. For simplicity's sake we track just one count, which is the
// highest of the various indirected items.
- capacity := indirectGCBloomCapacity
- if db.gcKeyCount > capacity {
- capacity = db.gcKeyCount
- }
- blockFilter := newBloomFilter(capacity)
- versionFilter := newBloomFilter(capacity)
-
// Iterate the FileInfos, unmarshal the block and version hashes and
// add them to the filter.
+ // This happens concurrently with normal database modifications, though
+ // those modifications will now also add their blocks and versions to
+ // the bloom filters.
+
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
if err != nil {
return err
@@ -745,18 +747,35 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
if err := hashes.Unmarshal(it.Value()); err != nil {
return err
}
- if len(hashes.BlocksHash) > 0 {
- blockFilter.add(hashes.BlocksHash)
- }
- if len(hashes.VersionHash) > 0 {
- versionFilter.add(hashes.VersionHash)
- }
+ db.recordIndirectionHashes(hashes)
}
it.Release()
if err := it.Error(); err != nil {
return err
}
+ // For the next phase we grab the GC lock again and hold it for the rest
+ // of the method call. Now there can't be any further modifications to
+ // the database or the bloom filters.
+
+ db.gcMut.Lock()
+ defer db.gcMut.Unlock()
+
+ // Only print something if the process takes more than "a moment".
+ logWait := make(chan struct{})
+ logTimer := time.AfterFunc(10*time.Second, func() {
+ l.Infoln("Database GC in progress - many Syncthing operations will be unresponsive until it's finished")
+ close(logWait)
+ })
+ defer func() {
+ if logTimer.Stop() {
+ return
+ }
+ <-logWait // Make sure messages are sent in order.
+ l.Infof("Database GC complete (discarded/remaining: %v/%v blocks, %v/%v versions)",
+ discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
+ }()
+
// Iterate over block lists, removing keys with hashes that don't match
// the filter.
@@ -773,7 +792,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
}
key := blockListKey(it.Key())
- if blockFilter.has(key.Hash()) {
+ if db.blockFilter.has(key.Hash()) {
matchedBlocks++
continue
}
@@ -802,7 +821,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
}
key := versionKey(it.Key())
- if versionFilter.has(key.Hash()) {
+ if db.versionFilter.has(key.Hash()) {
matchedVersions++
continue
}
@@ -826,17 +845,31 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
return err
}
- l.Debugf("Finished GC, starting compaction (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
+ l.Debugf("Finished GC (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
+
+ return nil
+}
+
+func (db *Lowlevel) recordIndirectionHashesForFile(f *protocol.FileInfo) {
+ db.recordIndirectionHashes(IndirectionHashesOnly{BlocksHash: f.BlocksHash, VersionHash: f.VersionHash})
+}
- return db.Compact()
+func (db *Lowlevel) recordIndirectionHashes(hs IndirectionHashesOnly) {
+ // must be called with gcMut held (at least read-held)
+ if db.blockFilter != nil && len(hs.BlocksHash) > 0 {
+ db.blockFilter.add(hs.BlocksHash)
+ }
+ if db.versionFilter != nil && len(hs.VersionHash) > 0 {
+ db.versionFilter.add(hs.VersionHash)
+ }
}
-func newBloomFilter(capacity int) bloomFilter {
+func newBloomFilter(capacity int) *bloomFilter {
var buf [16]byte
io.ReadFull(rand.Reader, buf[:])
- return bloomFilter{
- f: blobloom.NewOptimized(blobloom.Config{
+ return &bloomFilter{
+ f: blobloom.NewSyncOptimized(blobloom.Config{
Capacity: uint64(capacity),
FPRate: indirectGCBloomFalsePositiveRate,
MaxBits: 8 * indirectGCBloomMaxBytes,
@@ -848,7 +881,7 @@ func newBloomFilter(capacity int) bloomFilter {
}
type bloomFilter struct {
- f *blobloom.Filter
+ f *blobloom.SyncFilter
k0, k1 uint64 // Random key for SipHash.
}
diff --git a/lib/db/transactions.go b/lib/db/transactions.go
index 4054a85d3a..7e41b9655b 100644
--- a/lib/db/transactions.go
+++ b/lib/db/transactions.go
@@ -534,6 +534,11 @@ func (t *readOnlyTransaction) withNeedLocal(folder []byte, truncate bool, fn Ite
type readWriteTransaction struct {
backend.WriteTransaction
readOnlyTransaction
+ indirectionTracker
+}
+
+type indirectionTracker interface {
+ recordIndirectionHashesForFile(f *protocol.FileInfo)
}
func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWriteTransaction, error) {
@@ -547,6 +552,7 @@ func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWr
ReadTransaction: tran,
keyer: db.keyer,
},
+ indirectionTracker: db,
}, nil
}
@@ -606,6 +612,8 @@ func (t readWriteTransaction) putFile(fkey []byte, fi protocol.FileInfo) error {
fi.VersionHash = nil
}
+ t.indirectionTracker.recordIndirectionHashesForFile(&fi)
+
fiBs := mustMarshal(&fi)
return t.Put(fkey, fiBs)
}