summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLars K.W. Gohlke <lkwg82@gmx.de>2016-04-26 14:01:46 +0000
committerJakob Borg <jakob@nym.se>2016-04-26 14:01:46 +0000
commit236f121c4e6882e31fd96e3ba5aa39fb01947115 (patch)
treedab9a187586fd3d13f143af56d8917d4177b3b02
parent2467678bd4f2aa0fecd18ba1bd3df4ae867c627a (diff)
lib/model: Refactor out folder and folderscan types, simplify somewhat
GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3007
-rw-r--r--lib/model/folder.go53
-rw-r--r--lib/model/folderscan.go49
-rw-r--r--lib/model/folderstate.go8
-rw-r--r--lib/model/model.go6
-rw-r--r--lib/model/rofolder.go139
-rw-r--r--lib/model/rwfolder.go596
-rw-r--r--lib/model/rwfolder_test.go55
7 files changed, 447 insertions, 459 deletions
diff --git a/lib/model/folder.go b/lib/model/folder.go
new file mode 100644
index 0000000000..8f15adc8cc
--- /dev/null
+++ b/lib/model/folder.go
@@ -0,0 +1,53 @@
+// Copyright (C) 2014 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at http://mozilla.org/MPL/2.0/.
+
+package model
+
+import "time"
+
+type folder struct {
+ stateTracker
+ scan folderscan
+ model *Model
+ stop chan struct{}
+}
+
+func (f *folder) IndexUpdated() {
+}
+
+func (f *folder) DelayScan(next time.Duration) {
+ f.scan.Delay(next)
+}
+
+func (f *folder) Scan(subdirs []string) error {
+ return f.scan.Scan(subdirs)
+}
+func (f *folder) Stop() {
+ close(f.stop)
+}
+
+func (f *folder) Jobs() ([]string, []string) {
+ return nil, nil
+}
+
+func (f *folder) BringToFront(string) {}
+
+func (f *folder) scanSubdirsIfHealthy(subDirs []string) error {
+ if err := f.model.CheckFolderHealth(f.folderID); err != nil {
+ l.Infoln("Skipping folder", f.folderID, "scan due to folder error:", err)
+ return err
+ }
+ l.Debugln(f, "Scanning subdirectories")
+ if err := f.model.internalScanFolderSubdirs(f.folderID, subDirs); err != nil {
+ // Potentially sets the error twice, once in the scanner just
+ // by doing a check, and once here, if the error returned is
+ // the same one as returned by CheckFolderHealth, though
+ // duplicate set is handled by setError.
+ f.setError(err)
+ return err
+ }
+ return nil
+}
diff --git a/lib/model/folderscan.go b/lib/model/folderscan.go
new file mode 100644
index 0000000000..65ed31aa52
--- /dev/null
+++ b/lib/model/folderscan.go
@@ -0,0 +1,49 @@
+// Copyright (C) 2016 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at http://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+ "math/rand"
+ "time"
+)
+
+type rescanRequest struct {
+ subdirs []string
+ err chan error
+}
+
+// bundle all folder scan activity
+type folderscan struct {
+ interval time.Duration
+ timer *time.Timer
+ now chan rescanRequest
+ delay chan time.Duration
+}
+
+func (s *folderscan) reschedule() {
+ if s.interval == 0 {
+ return
+ }
+ // Sleep a random time between 3/4 and 5/4 of the configured interval.
+ sleepNanos := (s.interval.Nanoseconds()*3 + rand.Int63n(2*s.interval.Nanoseconds())) / 4
+ interval := time.Duration(sleepNanos) * time.Nanosecond
+ l.Debugln(s, "next rescan in", interval)
+ s.timer.Reset(interval)
+}
+
+func (s *folderscan) Scan(subdirs []string) error {
+ req := rescanRequest{
+ subdirs: subdirs,
+ err: make(chan error),
+ }
+ s.now <- req
+ return <-req.err
+}
+
+func (s *folderscan) Delay(next time.Duration) {
+ s.delay <- next
+}
diff --git a/lib/model/folderstate.go b/lib/model/folderstate.go
index dfacdc47d1..f68bee355f 100644
--- a/lib/model/folderstate.go
+++ b/lib/model/folderstate.go
@@ -38,7 +38,7 @@ func (s folderState) String() string {
}
type stateTracker struct {
- folder string
+ folderID string
mut sync.Mutex
current folderState
@@ -61,7 +61,7 @@ func (s *stateTracker) setState(newState folderState) {
*/
eventData := map[string]interface{}{
- "folder": s.folder,
+ "folder": s.folderID,
"to": newState.String(),
"from": s.current.String(),
}
@@ -92,7 +92,7 @@ func (s *stateTracker) setError(err error) {
s.mut.Lock()
if s.current != FolderError || s.err.Error() != err.Error() {
eventData := map[string]interface{}{
- "folder": s.folder,
+ "folder": s.folderID,
"to": FolderError.String(),
"from": s.current.String(),
"error": err.Error(),
@@ -116,7 +116,7 @@ func (s *stateTracker) clearError() {
s.mut.Lock()
if s.current == FolderError {
eventData := map[string]interface{}{
- "folder": s.folder,
+ "folder": s.folderID,
"to": FolderIdle.String(),
"from": s.current.String(),
}
diff --git a/lib/model/model.go b/lib/model/model.go
index 83d2ee3b50..558d598371 100644
--- a/lib/model/model.go
+++ b/lib/model/model.go
@@ -172,7 +172,7 @@ func (m *Model) StartFolderRW(folder string) {
if ok {
panic("cannot start already running folder " + folder)
}
- p := newRWFolder(m, m.shortID, cfg)
+ p := newRWFolder(m, cfg)
m.folderRunners[folder] = p
if len(cfg.Versioning.Type) > 0 {
@@ -243,7 +243,7 @@ func (m *Model) StartFolderRO(folder string) {
if ok {
panic("cannot start already running folder " + folder)
}
- s := newROFolder(m, folder, time.Duration(cfg.RescanIntervalS)*time.Second)
+ s := newROFolder(m, cfg)
m.folderRunners[folder] = s
token := m.Add(s)
@@ -1360,7 +1360,7 @@ func (m *Model) ScanFolderSubs(folder string, subs []string) error {
return runner.Scan(subs)
}
-func (m *Model) internalScanFolderSubs(folder string, subs []string) error {
+func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
for i, sub := range subs {
sub = osutil.NativeFilename(sub)
if p := filepath.Clean(filepath.Join(folder, sub)); !strings.HasPrefix(p, folder) {
diff --git a/lib/model/rofolder.go b/lib/model/rofolder.go
index f1d65f94b3..5a146191ef 100644
--- a/lib/model/rofolder.go
+++ b/lib/model/rofolder.go
@@ -8,151 +8,88 @@ package model
import (
"fmt"
- "math/rand"
"time"
+ "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/sync"
)
type roFolder struct {
- stateTracker
-
- folder string
- intv time.Duration
- timer *time.Timer
- model *Model
- stop chan struct{}
- scanNow chan rescanRequest
- delayScan chan time.Duration
+ folder
}
-type rescanRequest struct {
- subs []string
- err chan error
-}
-
-func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
+func newROFolder(model *Model, cfg config.FolderConfiguration) *roFolder {
return &roFolder{
- stateTracker: stateTracker{
- folder: folder,
- mut: sync.NewMutex(),
+ folder: folder{
+ stateTracker: stateTracker{
+ folderID: cfg.ID,
+ mut: sync.NewMutex(),
+ },
+ scan: folderscan{
+ interval: time.Duration(cfg.RescanIntervalS) * time.Second,
+ timer: time.NewTimer(time.Millisecond),
+ now: make(chan rescanRequest),
+ delay: make(chan time.Duration),
+ },
+ stop: make(chan struct{}),
+ model: model,
},
- folder: folder,
- intv: interval,
- timer: time.NewTimer(time.Millisecond),
- model: model,
- stop: make(chan struct{}),
- scanNow: make(chan rescanRequest),
- delayScan: make(chan time.Duration),
}
}
-func (s *roFolder) Serve() {
- l.Debugln(s, "starting")
- defer l.Debugln(s, "exiting")
+func (f *roFolder) Serve() {
+ l.Debugln(f, "starting")
+ defer l.Debugln(f, "exiting")
defer func() {
- s.timer.Stop()
+ f.scan.timer.Stop()
}()
- reschedule := func() {
- if s.intv == 0 {
- return
- }
- // Sleep a random time between 3/4 and 5/4 of the configured interval.
- sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4
- s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond)
- }
-
initialScanCompleted := false
for {
select {
- case <-s.stop:
+ case <-f.stop:
return
- case <-s.timer.C:
- if err := s.model.CheckFolderHealth(s.folder); err != nil {
- l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err)
- reschedule()
+ case <-f.scan.timer.C:
+ if err := f.model.CheckFolderHealth(f.folderID); err != nil {
+ l.Infoln("Skipping folder", f.folderID, "scan due to folder error:", err)
+ f.scan.reschedule()
continue
}
- l.Debugln(s, "rescan")
+ l.Debugln(f, "rescan")
- if err := s.model.internalScanFolderSubs(s.folder, nil); err != nil {
+ if err := f.model.internalScanFolderSubdirs(f.folderID, nil); err != nil {
// Potentially sets the error twice, once in the scanner just
// by doing a check, and once here, if the error returned is
// the same one as returned by CheckFolderHealth, though
// duplicate set is handled by setError.
- s.setError(err)
- reschedule()
+ f.setError(err)
+ f.scan.reschedule()
continue
}
if !initialScanCompleted {
- l.Infoln("Completed initial scan (ro) of folder", s.folder)
+ l.Infoln("Completed initial scan (ro) of folder", f.folderID)
initialScanCompleted = true
}
- if s.intv == 0 {
+ if f.scan.interval == 0 {
continue
}
- reschedule()
+ f.scan.reschedule()
- case req := <-s.scanNow:
- if err := s.model.CheckFolderHealth(s.folder); err != nil {
- l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err)
- req.err <- err
- continue
- }
+ case req := <-f.scan.now:
+ req.err <- f.scanSubdirsIfHealthy(req.subdirs)
- l.Debugln(s, "forced rescan")
-
- if err := s.model.internalScanFolderSubs(s.folder, req.subs); err != nil {
- // Potentially sets the error twice, once in the scanner just
- // by doing a check, and once here, if the error returned is
- // the same one as returned by CheckFolderHealth, though
- // duplicate set is handled by setError.
- s.setError(err)
- req.err <- err
- continue
- }
-
- req.err <- nil
-
- case next := <-s.delayScan:
- s.timer.Reset(next)
+ case next := <-f.scan.delay:
+ f.scan.timer.Reset(next)
}
}
}
-func (s *roFolder) Stop() {
- close(s.stop)
-}
-
-func (s *roFolder) IndexUpdated() {
-}
-
-func (s *roFolder) Scan(subs []string) error {
- req := rescanRequest{
- subs: subs,
- err: make(chan error),
- }
- s.scanNow <- req
- return <-req.err
-}
-
-func (s *roFolder) String() string {
- return fmt.Sprintf("roFolder/%s@%p", s.folder, s)
-}
-
-func (s *roFolder) BringToFront(string) {}
-
-func (s *roFolder) Jobs() ([]string, []string) {
- return nil, nil
-}
-
-func (s *roFolder) DelayScan(next time.Duration) {
- s.delayScan <- next
+func (f *roFolder) String() string {
+ return fmt.Sprintf("roFolder/%s@%p", f.folderID, f)
}
diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go
index 44bfac512e..b1751214a7 100644
--- a/lib/model/rwfolder.go
+++ b/lib/model/rwfolder.go
@@ -74,114 +74,108 @@ type dbUpdateJob struct {
}
type rwFolder struct {
- stateTracker
+ folder
- model *Model
- progressEmitter *ProgressEmitter
virtualMtimeRepo *db.VirtualMtimeRepo
+ dir string
+ versioner versioner.Versioner
+ ignorePerms bool
+ copiers int
+ pullers int
+ order config.PullOrder
+ maxConflicts int
+ sleep time.Duration
+ pause time.Duration
+ allowSparse bool
+ checkFreeSpace bool
- folder string
- dir string
- scanIntv time.Duration
- versioner versioner.Versioner
- ignorePerms bool
- copiers int
- pullers int
- shortID protocol.ShortID
- order config.PullOrder
- maxConflicts int
- sleep time.Duration
- pause time.Duration
- allowSparse bool
- checkFreeSpace bool
-
- stop chan struct{}
queue *jobQueue
dbUpdates chan dbUpdateJob
- scanTimer *time.Timer
pullTimer *time.Timer
- delayScan chan time.Duration
- scanNow chan rescanRequest
remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
errors map[string]string // path -> error string
errorsMut sync.Mutex
}
-func newRWFolder(m *Model, shortID protocol.ShortID, cfg config.FolderConfiguration) *rwFolder {
- p := &rwFolder{
- stateTracker: stateTracker{
- folder: cfg.ID,
- mut: sync.NewMutex(),
+func newRWFolder(model *Model, cfg config.FolderConfiguration) *rwFolder {
+ f := &rwFolder{
+ folder: folder{
+ stateTracker: stateTracker{
+ folderID: cfg.ID,
+ mut: sync.NewMutex(),
+ },
+ scan: folderscan{
+ interval: time.Duration(cfg.RescanIntervalS) * time.Second,
+ timer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
+ now: make(chan rescanRequest),
+ delay: make(chan time.Duration),
+ },
+ stop: make(chan struct{}),
+ model: model,
},
- model: m,
- progressEmitter: m.progressEmitter,
- virtualMtimeRepo: db.NewVirtualMtimeRepo(m.db, cfg.ID),
-
- folder: cfg.ID,
- dir: cfg.Path(),
- scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
- ignorePerms: cfg.IgnorePerms,
- copiers: cfg.Copiers,
- pullers: cfg.Pullers,
- shortID: shortID,
- order: cfg.Order,
- maxConflicts: cfg.MaxConflicts,
- allowSparse: !cfg.DisableSparseFiles,
- checkFreeSpace: cfg.MinDiskFreePct != 0,
-
- stop: make(chan struct{}),
+ virtualMtimeRepo: db.NewVirtualMtimeRepo(model.db, cfg.ID),
+ dir: cfg.Path(),
+ ignorePerms: cfg.IgnorePerms,
+ copiers: cfg.Copiers,
+ pullers: cfg.Pullers,
+ order: cfg.Order,
+ maxConflicts: cfg.MaxConflicts,
+ allowSparse: !cfg.DisableSparseFiles,
+ checkFreeSpace: cfg.MinDiskFreePct != 0,
+
queue: newJobQueue(),
pullTimer: time.NewTimer(time.Second),
- scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
- delayScan: make(chan time.Duration),
- scanNow: make(chan rescanRequest),
remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
errorsMut: sync.NewMutex(),
}
- if p.copiers == 0 {
- p.copiers = defaultCopiers
+ f.configureCopiersAndPullers(cfg)
+
+ return f
+}
+
+func (f *rwFolder) configureCopiersAndPullers(config config.FolderConfiguration) {
+ if f.copiers == 0 {
+ f.copiers = defaultCopiers
}
- if p.pullers == 0 {
- p.pullers = defaultPullers
+ if f.pullers == 0 {
+ f.pullers = defaultPullers
}
- if cfg.PullerPauseS == 0 {
- p.pause = defaultPullerPause
+ if config.PullerPauseS == 0 {
+ f.pause = defaultPullerPause
} else {
- p.pause = time.Duration(cfg.PullerPauseS) * time.Second
+ f.pause = time.Duration(config.PullerPauseS) * time.Second
}
- if cfg.PullerSleepS == 0 {
- p.sleep = defaultPullerSleep
+ if config.PullerSleepS == 0 {
+ f.sleep = defaultPullerSleep
} else {
- p.sleep = time.Duration(cfg.PullerSleepS) * time.Second
+ f.sleep = time.Duration(config.PullerSleepS) * time.Second
}
-
- return p
}
// Helper function to check whether either the ignorePerm flag has been
// set on the local host or the FlagNoPermBits has been set on the file/dir
// which is being pulled.
-func (p *rwFolder) ignorePermissions(file protocol.FileInfo) bool {
- return p.ignorePerms || file.Flags&protocol.FlagNoPermBits != 0
+func (f *rwFolder) ignorePermissions(file protocol.FileInfo) bool {
+ return f.ignorePerms || file.Flags&protocol.FlagNoPermBits != 0
}
// Serve will run scans and pulls. It will return when Stop()ed or on a
// critical error.
-func (p *rwFolder) Serve() {
- l.Debugln(p, "starting")
- defer l.Debugln(p, "exiting")
+func (f *rwFolder) Serve() {
+ l.Debugln(f, "starting")
+ defer l.Debugln(f, "exiting")
defer func() {
- p.pullTimer.Stop()
- p.scanTimer.Stop()
+ f.pullTimer.Stop()
+ f.scan.timer.Stop()
// TODO: Should there be an actual FolderStopped state?
- p.setState(FolderIdle)
+ f.setState(FolderIdle)
}()
var prevVer int64
@@ -192,65 +186,65 @@ func (p *rwFolder) Serve() {
for {
select {
- case <-p.stop:
+ case <-f.stop:
return
- case <-p.remoteIndex:
+ case <-f.remoteIndex:
prevVer = 0
- p.pullTimer.Reset(0)
- l.Debugln(p, "remote index updated, rescheduling pull")
+ f.pullTimer.Reset(0)
+ l.Debugln(f, "remote index updated, rescheduling pull")
- case <-p.pullTimer.C:
+ case <-f.pullTimer.C:
if !initialScanCompleted {
- l.Debugln(p, "skip (initial)")
- p.pullTimer.Reset(p.sleep)
+ l.Debugln(f, "skip (initial)")
+ f.pullTimer.Reset(f.sleep)
continue
}
- p.model.fmut.RLock()
- curIgnores := p.model.folderIgnores[p.folder]
- p.model.fmut.RUnlock()
+ f.model.fmut.RLock()
+ curIgnores := f.model.folderIgnores[f.folderID]
+ f.model.fmut.RUnlock()
if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
// The ignore patterns have changed. We need to re-evaluate if
// there are files we need now that were ignored before.
- l.Debugln(p, "ignore patterns have changed, resetting prevVer")
+ l.Debugln(f, "ignore patterns have changed, resetting prevVer")
prevVer = 0
prevIgnoreHash = newHash
}
// RemoteLocalVersion() is a fast call, doesn't touch the database.
- curVer, ok := p.model.RemoteLocalVersion(p.folder)
+ curVer, ok := f.model.RemoteLocalVersion(f.folderID)
if !ok || curVer == prevVer {
- l.Debugln(p, "skip (curVer == prevVer)", prevVer, ok)
- p.pullTimer.Reset(p.sleep)
+ l.Debugln(f, "skip (curVer == prevVer)", prevVer, ok)
+ f.pullTimer.Reset(f.sleep)
continue
}
- if err := p.model.CheckFolderHealth(p.folder); err != nil {
- l.Infoln("Skipping folder", p.folder, "pull due to folder error:", err)
- p.pullTimer.Reset(p.sleep)
+ if err := f.model.CheckFolderHealth(f.folderID); err != nil {
+ l.Infoln("Skipping folder", f.folderID, "pull due to folder error:", err)
+ f.pullTimer.Reset(f.sleep)
continue
}
- l.Debugln(p, "pulling", prevVer, curVer)
+ l.Debugln(f, "pulling", prevVer, curVer)
- p.setState(FolderSyncing)
- p.clearErrors()
+ f.setState(FolderSyncing)
+ f.clearErrors()
tries := 0
for {
tries++
- changed := p.pullerIteration(curIgnores)
- l.Debugln(p, "changed", changed)
+ changed := f.pullerIteration(curIgnores)
+ l.Debugln(f, "changed", changed)
if changed == 0 {
// No files were changed by the puller, so we are in
// sync. Remember the local version number and
// schedule a resync a little bit into the future.
- if lv, ok := p.model.RemoteLocalVersion(p.folder); ok && lv < curVer {
+ if lv, ok := f.model.RemoteLocalVersion(f.folderID); ok && lv < curVer {
// There's a corner case where the device we needed
// files from disconnected during the puller
// iteration. The files will have been removed from
@@ -259,12 +253,12 @@ func (p *rwFolder) Serve() {
// version that includes those files in curVer. So we
// catch the case that localVersion might have
// decreased here.
- l.Debugln(p, "adjusting curVer", lv)
+ l.Debugln(f, "adjusting curVer", lv)
curVer = lv
}
prevVer = curVer
- l.Debugln(p, "next pull in", p.sleep)
- p.pullTimer.Reset(p.sleep)
+ l.Debugln(f, "next pull in", f.sleep)
+ f.pullTimer.Reset(f.sleep)
break
}
@@ -273,81 +267,48 @@ func (p *rwFolder) Serve() {
// we're not making it. Probably there are write
// errors preventing us. Flag this with a warning and
// wait a bit longer before retrying.
- l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, p.pause)
- l.Debugln(p, "next pull in", p.pause)
+ l.Infof("Folder %q isn't making progress. Pausing puller for %v.", f.folderID, f.pause)
+ l.Debugln(f, "next pull in", f.pause)
- if folderErrors := p.currentErrors(); len(folderErrors) > 0 {
+ if folderErrors := f.currentErrors(); len(folderErrors) > 0 {
events.Default.Log(events.FolderErrors, map[string]interface{}{
- "folder": p.folder,
+ "folder": f.folderID,
"errors": folderErrors,
})
}
- p.pullTimer.Reset(p.pause)
+ f.pullTimer.Reset(f.pause)
break
}
}
- p.setState(FolderIdle)
+ f.setState(FolderIdle)
// The reason for running the scanner from within the puller is that
// this is the easiest way to make sure we are not doing both at the
// same time.
- case <-p.scanTimer.C:
- err := p.scanSubsIfHealthy(nil)
- p.rescheduleScan()
+ case <-f.scan.timer.C:
+ err := f.scanSubdirsIfHealthy(nil)
+ f.scan.reschedule()
if err != nil {
continue
}
if !initialScanCompleted {
- l.Infoln("Completed initial scan (rw) of folder", p.folder)
+ l.Infoln("Completed initial scan (rw) of folder", f.folderID)
initialScanCompleted = true
}
- case req := <-p.scanNow:
- req.err <- p.scanSubsIfHealthy(req.subs)
+ case req := <-f.scan.now:
+ req.err <- f.scanSubdirsIfHealthy(req.subdirs)
- case next := <-p.delayScan:
- p.scanTimer.Reset(next)
+ case next := <-f.scan.delay:
+ f.scan.timer.Reset(next)
}
}
}
-func (p *rwFolder) rescheduleScan() {
- if p.scanIntv == 0 {
- // We should not run scans, so it should not be rescheduled.
- return
- }
- // Sleep a random time between 3/4 and 5/4 of the configured interval.
- sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
- intv := time.Duration(sleepNanos) * time.Nanosecond
- l.Debugln(p, "next rescan in", intv)
- p.scanTimer.Reset(intv)
-}
-
-func (p *rwFolder) scanSubsIfHealthy(subs []string) error {
- if err := p.model.CheckFolderHealth(p.folder); err != nil {
- l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
- return err
- }
- l.Debugln(p, "Scanning subdirectories")
- if err := p.model.internalScanFolderSubs(p.folder, subs); err != nil {
- // Potentially sets the error twice, once in the scanner just
- // by doing a check, and once here, if the error returned is
- // the same one as returned by CheckFolderHealth, though
- // duplicate set is handled by setError.
- p.setError(err)
- return err
- }
- return nil
-}
-
-func (p *rwFolder) Stop() {
- close(p.stop)
-}
-
-func (p *rwFolder) IndexUpdated() {
+func (f *rwFolder) IndexUpdated() {
select {
- case p.remoteIndex <- struct{}{}:
+ case f.remoteIndex <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
@@ -356,24 +317,15 @@ func (p *rwFolder) IndexUpdated() {
}
}
-func (p *rwFolder) Scan(subs []string) error {
- req := rescanRequest{
- subs: subs,
- err: make(chan error),
- }
- p.scanNow <- req
- return <-req.err
-}
-
-func (p *rwFolder) String() string {
- return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
+func (f *rwFolder) String() string {
+ return fmt.Sprintf("rwFolder/%s@%p", f.folderID, f)
}
// pullerIteration runs a single puller iteration for the given folder and
// returns the number items that should have been synced (even those that
// might have failed). One puller iteration handles all files currently
// flagged as needed in the folder.
-func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
+func (f *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
pullChan := make(chan pullBlockState)
copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState)
@@ -383,30 +335,30 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
pullWg := sync.NewWaitGroup()
doneWg := sync.NewWaitGroup()
- l.Debugln(p, "c", p.copiers, "p", p.pullers)
+ l.Debugln(f, "c", f.copiers, "p", f.pullers)
- p.dbUpdates = make(chan dbUpdateJob)
+ f.dbUpdates = make(chan dbUpdateJob)
updateWg.Add(1)
go func() {
// dbUpdaterRoutine finishes when p.dbUpdates is closed
- p.dbUpdaterRoutine()
+ f.dbUpdaterRoutine()
updateWg.Done()
}()
- for i := 0; i < p.copiers; i++ {
+ for i := 0; i < f.copiers; i++ {
copyWg.Add(1)
go func() {
// copierRoutine finishes when copyChan is closed
- p.copierRoutine(copyChan, pullChan, finisherChan)
+ f.copierRoutine(copyChan, pullChan, finisherChan)
copyWg.Done()
}()
}
- for i := 0; i < p.pullers; i++ {
+ for i := 0; i < f.pullers; i++ {
pullWg.Add(1)
go func() {
// pullerRoutine finishes when pullChan is closed
- p.pullerRoutine(pullChan, finisherChan)
+ f.pullerRoutine(pullChan, finisherChan)
pullWg.Done()
}()
}
@@ -414,13 +366,13 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
doneWg.Add(1)
// finisherRoutine finishes when finisherChan is closed
go func() {
- p.finisherRoutine(finisherChan)
+ f.finisherRoutine(finisherChan)
doneWg.Done()
}()
- p.model.fmut.RLock()
- folderFiles := p.model.folderFiles[p.folder]
- p.model.fmut.RUnlock()
+ f.model.fmut.RLock()
+ folderFiles := f.model.folderFiles[f.folderID]
+ f.model.fmut.RUnlock()
// !!!
// WithNeed takes a database snapshot (by necessity). By the time we've
@@ -434,15 +386,15 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
dirDeletions := []protocol.FileInfo{}
buckets := map[string][]protocol.FileInfo{}
- handleFile := func(f protocol.FileInfo) bool {
+ handleFile := func(fi protocol.FileInfo) bool {
switch {
- case f.IsDeleted():
+ case fi.IsDeleted():
// A deleted file, directory or symlink
- if f.IsDirectory() {
- dirDeletions = append(dirDeletions, f)
+ if fi.IsDirectory() {
+ dirDeletions = append(dirDeletions, fi)
} else {
- fileDeletions[f.Name] = f
- df, ok := p.model.CurrentFolderFile(p.folder, f.Name)
+ fileDeletions[fi.Name] = fi
+ df, ok := f.model.CurrentFolderFile(f.folderID, fi.Name)
// Local file can be already deleted, but with a lower version
// number, hence the deletion coming in again as part of
// WithNeed, furthermore, the file can simply be of the wrong
@@ -453,10 +405,10 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
buckets[key] = append(buckets[key], df)
}
}
- case f.IsDirectory() && !f.IsSymlink():
+ case fi.IsDirectory() && !fi.IsSymlink():
// A new or changed directory
- l.Debugln("Creating directory", f.Name)
- p.handleDir(f)
+ l.Debugln("Creating directory", fi.Name)
+ f.handleDir(fi)
default:
return false
}
@@ -475,12 +427,12 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
return true
}
- l.Debugln(p, "handling", file.Name)
+ l.Debugln(f, "handling", file.Name)
if !handleFile(file) {
// A new or changed file or symlink. This is the only case where we
// do stuff concurrently in the background
- p.queue.Push(file.Name, file.Size(), file.Modified)
+ f.queue.Push(file.Name, file.Size(), file.Modified)
}
changed++
@@ -489,19 +441,19 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
// Reorder the file queue according to configuration
- switch p.order {
+ switch f.order {
case config.OrderRandom:
- p.queue.Shuffle()
+ f.queue.Shuffle()
case config.OrderAlphabetic:
- // The queue is already in alphabetic order.
+ // The queue is already in alphabetic order.
case config.OrderSmallestFirst:
- p.queue.SortSmallestFirst()
+ f.queue.SortSmallestFirst()
case config.OrderLargestFirst:
- p.queue.SortLargestFirst()
+ f.queue.SortLargestFirst()
case config.OrderOldestFirst:
- p.queue.SortOldestFirst()
+ f.queue.SortOldestFirst()
case config.OrderNewestFirst:
- p.queue.SortNewestFirst()
+ f.queue.SortNewestFirst()
}
// Process the file queue
@@ -509,35 +461,35 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
nextFile:
for {
select {
- case <-p.stop:
+ case <-f.stop:
// Stop processing files if the puller has been told to stop.
break
default:
}
- fileName, ok := p.queue.Pop()
+ fileName, ok := f.queue.Pop()
if !ok {
break
}
- f, ok := p.model.CurrentGlobalFile(p.folder, fileName)
+ fi, ok := f.model.CurrentGlobalFile(f.folderID, fileName)
if !ok {
// File is no longer in the index. Mark it as done and drop it.
- p.queue.Done(fileName)
+ f.queue.Done(fileName)
continue
}
// Handles races where an index update arrives changing what the file
// is between queueing and retrieving it from the queue, effectively
// changing how the file should be handled.
- if handleFile(f) {
+ if handleFile(fi) {
continue
}
- if !f.IsSymlink() {
- key := string(f.Blocks[0].Hash)
+ if !fi.IsSymlink() {
+ key := string(fi.Blocks[0].Hash)
for i, candidate := range buckets[key] {
- if scanner.BlocksEqual(candidate.Blocks, f.Blocks) {
+ if scanner.BlocksEqual(candidate.Blocks, fi.Blocks) {
// Remove the candidate from the bucket
lidx := len(buckets[key]) - 1
buckets[key][i] = buckets[key][lidx]
@@ -550,16 +502,16 @@ nextFile:
// Remove the pending deletion (as we perform it by renaming)
delete(fileDeletions, candidate.Name)
- p.renameFile(desired, f)
+ f.renameFile(desired, fi)
- p.queue.Done(fileName)
+ f.queue.Done(fileName)
continue nextFile
}
}
}
// Not a rename or a symlink, deal with it.
- p.handleFile(f, copyChan, finisherChan)
+ f.handleFile(fi, copyChan, finisherChan)
}
// Signal copy and puller routines that we are done with the in data for
@@ -577,27 +529,27 @@ nextFile:
for _, file := range fileDeletions {
l.Debugln("Deleting file", file.Name)