diff options
author | Jakob Borg <jakob@nym.se> | 2014-03-29 18:53:48 +0100 |
---|---|---|
committer | Jakob Borg <jakob@nym.se> | 2014-03-30 21:28:13 +0200 |
commit | 5eb5a056bfbdd0a7a3ee94d12aaf9b1e7ab3c79e (patch) | |
tree | 56592068bc76bfd1b579dd5b9e01ba108d969500 | |
parent | 2a5c0646c09e0d737ebdef16f6b5bb6ddd41b0fa (diff) |
Basic support for synchronizing multiple repositories (fixes #35)
-rwxr-xr-x | build.sh | 6 | ||||
-rw-r--r-- | cmd/stcli/main.go | 8 | ||||
-rw-r--r-- | cmd/syncthing/config.go | 15 | ||||
-rw-r--r-- | cmd/syncthing/main.go | 81 | ||||
-rw-r--r-- | cmd/syncthing/model.go | 407 | ||||
-rw-r--r-- | cmd/syncthing/model_test.go | 18 | ||||
-rw-r--r-- | cmd/syncthing/puller.go | 84 | ||||
-rw-r--r-- | integration/.gitignore | 4 | ||||
-rw-r--r-- | integration/h1/config.xml | 8 | ||||
-rw-r--r-- | integration/h2/config.xml | 16 | ||||
-rw-r--r-- | integration/h3/config.xml | 8 | ||||
-rwxr-xr-x | integration/test.sh | 39 | ||||
-rw-r--r-- | protocol/common_test.go | 4 | ||||
-rw-r--r-- | protocol/nativemodel_darwin.go | 8 | ||||
-rw-r--r-- | protocol/nativemodel_unix.go | 8 | ||||
-rw-r--r-- | protocol/nativemodel_windows.go | 8 | ||||
-rw-r--r-- | protocol/protocol.go | 38 | ||||
-rw-r--r-- | protocol/wireformat.go | 8 |
18 files changed, 479 insertions, 289 deletions
@@ -14,7 +14,7 @@ build() { go get -d ./cmd/syncthing godep= fi - ${godep} go build -ldflags "-w -X main.Version $version" ./cmd/syncthing + ${godep} go build $* -ldflags "-w -X main.Version $version" ./cmd/syncthing ${godep} go build -ldflags "-w -X main.Version $version" ./cmd/stcli } @@ -61,6 +61,10 @@ case "$1" in build ;; + race) + build -race + ;; + test) test ;; diff --git a/cmd/stcli/main.go b/cmd/stcli/main.go index e52b37f056..6e7ec11fe8 100644 --- a/cmd/stcli/main.go +++ b/cmd/stcli/main.go @@ -78,8 +78,8 @@ func prtIndex(files []protocol.FileInfo) { } } -func (m Model) Index(nodeID string, files []protocol.FileInfo) { - log.Printf("Received index") +func (m Model) Index(nodeID string, repo string, files []protocol.FileInfo) { + log.Printf("Received index for repo %q", repo) if cmd == "idx" { prtIndex(files) if get != "" { @@ -117,8 +117,8 @@ func getFile(f protocol.FileInfo) { fd.Close() } -func (m Model) IndexUpdate(nodeID string, files []protocol.FileInfo) { - log.Println("Received index update") +func (m Model) IndexUpdate(nodeID string, repo string, files []protocol.FileInfo) { + log.Printf("Received index update for repo %q", repo) if cmd == "idx" { prtIndex(files) if exit { diff --git a/cmd/syncthing/config.go b/cmd/syncthing/config.go index b8bdf40940..f0c7ecce3d 100644 --- a/cmd/syncthing/config.go +++ b/cmd/syncthing/config.go @@ -19,6 +19,7 @@ type Configuration struct { } type RepositoryConfiguration struct { + ID string `xml:"id,attr"` Directory string `xml:"directory,attr"` Nodes []NodeConfiguration `xml:"node"` } @@ -181,6 +182,20 @@ func readConfigXML(rd io.Reader) (Configuration, error) { fillNilSlices(&cfg.Options) cfg.Options.ListenAddress = uniqueStrings(cfg.Options.ListenAddress) + + var seenRepos = map[string]bool{} + for i := range cfg.Repositories { + if cfg.Repositories[i].ID == "" { + cfg.Repositories[i].ID = "default" + } + + id := cfg.Repositories[i].ID + if seenRepos[id] { + panic("duplicate repository ID " + id) + } + seenRepos[id] = true + } + return cfg, err } diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 6acf7eb762..02d273b112 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -1,7 +1,6 @@ package main import ( - "compress/gzip" "crypto/tls" "flag" "fmt" @@ -20,7 +19,6 @@ import ( "github.com/calmh/ini" "github.com/calmh/syncthing/discover" "github.com/calmh/syncthing/protocol" - "github.com/calmh/syncthing/scanner" ) const BlockSize = 128 * 1024 @@ -166,11 +164,6 @@ func main() { infof("Edit %s to taste or use the GUI\n", cfgFile) } - // Make sure the local node is in the node list. - cfg.Repositories[0].Nodes = cleanNodeList(cfg.Repositories[0].Nodes, myID) - - var dir = expandTilde(cfg.Repositories[0].Directory) - if profiler := os.Getenv("STPROFILER"); len(profiler) > 0 { go func() { dlog.Println("Starting profiler on", profiler) @@ -194,12 +187,18 @@ func main() { MinVersion: tls.VersionTLS12, } - ensureDir(dir, -1) - m := NewModel(dir, cfg.Options.MaxChangeKbps*1000) + m := NewModel(cfg.Options.MaxChangeKbps * 1000) if cfg.Options.MaxSendKbps > 0 { m.LimitRate(cfg.Options.MaxSendKbps) } + for i := range cfg.Repositories { + cfg.Repositories[i].Nodes = cleanNodeList(cfg.Repositories[i].Nodes, myID) + dir := expandTilde(cfg.Repositories[i].Directory) + ensureDir(dir, -1) + m.AddRepo(cfg.Repositories[i].ID, dir, cfg.Repositories[i].Nodes) + } + // GUI if cfg.Options.GUIEnabled && cfg.Options.GUIAddress != "" { addr, err := net.ResolveTCPAddr("tcp", cfg.Options.GUIAddress) @@ -233,19 +232,9 @@ func main() { if verbose { infoln("Populating repository index") } - loadIndex(m) - - sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} - w := &scanner.Walker{ - Dir: m.dir, - IgnoreFile: ".stignore", - FollowSymlinks: cfg.Options.FollowSymlinks, - BlockSize: BlockSize, - TempNamer: defTempNamer, - Suppressor: sup, - CurrentFiler: m, - } - updateLocalModel(m, w) + m.LoadIndexes(confDir) + m.ScanRepos() + m.SaveIndexes(confDir) connOpts := map[string]string{ "clientId": "syncthing", @@ -467,54 +456,6 @@ func discovery() *discover.Discoverer { return disc } -func updateLocalModel(m *Model, w *scanner.Walker) { - files, _ := w.Walk() - m.ReplaceLocal(files) - saveIndex(m) -} - -func saveIndex(m *Model) { - name := m.RepoID() + ".idx.gz" - fullName := filepath.Join(confDir, name) - idxf, err := os.Create(fullName + ".tmp") - if err != nil { - return - } - - gzw := gzip.NewWriter(idxf) - - protocol.IndexMessage{ - Repository: "local", - Files: m.ProtocolIndex(), - }.EncodeXDR(gzw) - gzw.Close() - idxf.Close() - - Rename(fullName+".tmp", fullName) -} - -func loadIndex(m *Model) { - name := m.RepoID() + ".idx.gz" - idxf, err := os.Open(filepath.Join(confDir, name)) - if err != nil { - return - } - defer idxf.Close() - - gzr, err := gzip.NewReader(idxf) - if err != nil { - return - } - defer gzr.Close() - - var im protocol.IndexMessage - err = im.DecodeXDR(gzr) - if err != nil || im.Repository != "local" { - return - } - m.SeedLocal(im.Files) -} - func ensureDir(dir string, mode int) { fi, err := os.Stat(dir) if os.IsNotExist(err) { diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index 7c53cca67b..0e1d61b044 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -1,6 +1,7 @@ package main import ( + "compress/gzip" "crypto/sha1" "errors" "fmt" @@ -20,28 +21,26 @@ import ( ) type Model struct { - dir string - cm *cid.Map - fs *files.Set + repoDirs map[string]string // repo -> dir + repoFiles map[string]*files.Set // repo -> files + repoNodes map[string][]string // repo -> nodeIDs + nodeRepos map[string][]string // nodeID -> repos + rmut sync.RWMutex // protects the above + + cm *cid.Map protoConn map[string]protocol.Connection rawConn map[string]io.Closer pmut sync.RWMutex // protects protoConn and rawConn - initOnce sync.Once - sup suppressor limitRequestRate chan struct{} - imut sync.Mutex // protects Index + addedRepo bool + started bool } -const ( - idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification - idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long -) - var ( ErrNoSuchFile = errors.New("no such file") ErrInvalid = errors.New("file is invalid") @@ -50,11 +49,13 @@ var ( // NewModel creates and starts a new model. The model starts in read-only mode, // where it sends index information to connected peers and responds to requests // for file data without altering the local repository in any way. -func NewModel(dir string, maxChangeBw int) *Model { +func NewModel(maxChangeBw int) *Model { m := &Model{ - dir: dir, + repoDirs: make(map[string]string), + repoFiles: make(map[string]*files.Set), + repoNodes: make(map[string][]string), + nodeRepos: make(map[string][]string), cm: cid.NewMap(), - fs: files.NewSet(), protoConn: make(map[string]protocol.Connection), rawConn: make(map[string]io.Closer), sup: suppressor{threshold: int64(maxChangeBw)}, @@ -83,25 +84,32 @@ func (m *Model) LimitRate(kbps int) { // read/write mode the model will attempt to keep in sync with the cluster by // pulling needed files from peer nodes. func (m *Model) StartRW(threads int) { - m.initOnce.Do(func() { - newPuller("default", m.dir, m, threads) - }) + m.rmut.Lock() + defer m.rmut.Unlock() + + if !m.addedRepo { + panic("cannot start without repo") + } + m.started = true + for repo, dir := range m.repoDirs { + newPuller(repo, dir, m, threads) + } } // StartRO starts read only processing on the current model. When in // read only mode the model will announce files to the cluster but not // pull in any external changes. func (m *Model) StartRO() { - m.initOnce.Do(func() { - newPuller("default", m.dir, m, 0) // zero threads => read only - }) -} + m.rmut.Lock() + defer m.rmut.Unlock() -// Generation returns an opaque integer that is guaranteed to increment on -// every change to the local repository or global model. -func (m *Model) Generation() uint64 { - c := m.fs.Changes(cid.LocalID) - return c + if !m.addedRepo { + panic("cannot start without repo") + } + m.started = true + for repo, dir := range m.repoDirs { + newPuller(repo, dir, m, 0) // zero threads => read only + } } type ConnectionInfo struct { @@ -119,13 +127,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { } m.pmut.RLock() - - var tot int64 - for _, f := range m.fs.Global() { - if f.Flags&protocol.FlagDeleted == 0 { - tot += f.Size - } - } + m.rmut.RLock() var res = make(map[string]ConnectionInfo) for node, conn := range m.protoConn { @@ -138,10 +140,21 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { ci.Address = nc.RemoteAddr().String() } - var have = tot - for _, f := range m.fs.Need(m.cm.Get(node)) { - if f.Flags&protocol.FlagDeleted == 0 { - have -= f.Size + var tot int64 + var have int64 + + for _, repo := range m.nodeRepos[node] { + for _, f := range m.repoFiles[repo].Global() { + if f.Flags&protocol.FlagDeleted == 0 { + tot += f.Size + have += f.Size + } + } + + for _, f := range m.repoFiles[repo].Need(m.cm.Get(node)) { + if f.Flags&protocol.FlagDeleted == 0 { + have -= f.Size + } } } @@ -153,6 +166,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { res[node] = ci } + m.rmut.RUnlock() m.pmut.RUnlock() return res @@ -173,32 +187,54 @@ func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) { // GlobalSize returns the number of files, deleted files and total bytes for all // files in the global model. func (m *Model) GlobalSize() (files, deleted int, bytes int64) { - fs := m.fs.Global() + m.rmut.RLock() + var fs []scanner.File + for _, rf := range m.repoFiles { + fs = append(fs, rf.Global()...) + } + m.rmut.RUnlock() return sizeOf(fs) } // LocalSize returns the number of files, deleted files and total bytes for all // files in the local repository. func (m *Model) LocalSize() (files, deleted int, bytes int64) { - fs := m.fs.Have(cid.LocalID) + m.rmut.RLock() + var fs []scanner.File + for _, rf := range m.repoFiles { + fs = append(fs, rf.Have(cid.LocalID)...) + } + m.rmut.RUnlock() return sizeOf(fs) } // InSyncSize returns the number and total byte size of the local files that // are in sync with the global model. func (m *Model) InSyncSize() (files int, bytes int64) { - gf := m.fs.Global() - hf := m.fs.Need(cid.LocalID) + var gf []scanner.File + var nf []scanner.File + + m.rmut.RLock() + for _, rf := range m.repoFiles { + gf = append(gf, rf.Global()...) + nf = append(nf, rf.Need(cid.LocalID)...) + } + m.rmut.RUnlock() gn, _, gb := sizeOf(gf) - hn, _, hb := sizeOf(hf) + nn, _, nb := sizeOf(nf) - return gn - hn, gb - hb + return gn - nn, gb - nb } // NeedFiles returns the list of currently needed files and the total size. func (m *Model) NeedFiles() ([]scanner.File, int64) { - nf := m.fs.Need(cid.LocalID) + var nf []scanner.File + m.rmut.RLock() + for _, rf := range m.repoFiles { + nf = append(nf, rf.Need(cid.LocalID)...) + } + m.rmut.RUnlock() var bytes int64 for _, f := range nf { @@ -210,24 +246,34 @@ func (m *Model) NeedFiles() ([]scanner.File, int64) { // Index is called when a new node is connected and we receive their full index. // Implements the protocol.Model interface. -func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { +func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) { + if debugNet { + dlog.Printf("IDX(in): %s / %q: %d files", nodeID, repo, len(fs)) + } + var files = make([]scanner.File, len(fs)) for i := range fs { lamport.Default.Tick(fs[i].Version) files[i] = fileFromFileInfo(fs[i]) } - cid := m.cm.Get(nodeID) - m.fs.Replace(cid, files) - - if debugNet { - dlog.Printf("IDX(in): %s: %d files", nodeID, len(fs)) + id := m.cm.Get(nodeID) + m.rmut.RLock() + if r, ok := m.repoFiles[repo]; ok { + r.Replace(id, files) + } else { + warnf("Index from %s for nonexistant repo %q; dropping", nodeID, repo) } + m.rmut.RUnlock() } // IndexUpdate is called for incremental updates to connected nodes' indexes. // Implements the protocol.Model interface. -func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { +func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) { + if debugNet { + dlog.Printf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs)) + } + var files = make([]scanner.File, len(fs)) for i := range fs { lamport.Default.Tick(fs[i].Version) @@ -235,11 +281,13 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) { } id := m.cm.Get(nodeID) - m.fs.Update(id, files) - - if debugNet { - dlog.Printf("IDXUP(in): %s: %d files", nodeID, len(files)) + m.rmut.RLock() + if r, ok := m.repoFiles[repo]; ok { + r.Update(id, files) + } else { + warnf("Index update from %s for nonexistant repo %q; dropping", nodeID, repo) } + m.rmut.RUnlock() } // Close removes the peer from the model and closes the underlying connection if possible. @@ -255,7 +303,11 @@ func (m *Model) Close(node string, err error) { } cid := m.cm.Get(node) - m.fs.Replace(cid, nil) + m.rmut.RLock() + for _, repo := range m.nodeRepos[node] { + m.repoFiles[repo].Replace(cid, nil) + } + m.rmut.RUnlock() m.cm.Clear(node) m.pmut.Lock() @@ -272,19 +324,31 @@ func (m *Model) Close(node string, err error) { // Implements the protocol.Model interface. func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) { // Verify that the requested file exists in the local model. - lf := m.fs.Get(cid.LocalID, name) + m.rmut.RLock() + r, ok := m.repoFiles[repo] + m.rmut.RUnlock() + + if !ok { + warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo) + return nil, ErrNoSuchFile + } + + lf := r.Get(cid.LocalID, name) if offset > lf.Size { warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) return nil, ErrNoSuchFile } + if lf.Suppressed { return nil, ErrInvalid } if debugNet && nodeID != "<local>" { - dlog.Printf("REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) + dlog.Printf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size) } - fn := filepath.Join(m.dir, name) + m.rmut.RLock() + fn := filepath.Join(m.repoDirs[repo], name) + m.rmut.RUnlock() fd, err := os.Open(fn) // XXX: Inefficient, should cache fd? if err != nil { return nil, err @@ -307,24 +371,34 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by } // ReplaceLocal replaces the local repository index with the given list of files. -func (m *Model) ReplaceLocal(fs []scanner.File) { - m.fs.ReplaceWithDelete(cid.LocalID, fs) +func (m *Model) ReplaceLocal(repo string, fs []scanner.File) { + m.rmut.RLock() + m.repoFiles[repo].ReplaceWithDelete(cid.LocalID, fs) + m.rmut.RUnlock() } -// ReplaceLocal replaces the local repository index with the given list of files. -func (m *Model) SeedLocal(fs []protocol.FileInfo) { +func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) { var sfs = make([]scanner.File, len(fs)) for i := 0; i < len(fs); i++ { lamport.Default.Tick(fs[i].Version) sfs[i] = fileFromFileInfo(fs[i]) } - m.fs.Replace(cid.LocalID, sfs) + m.rmut.RLock() + m.repoFiles[repo].Replace(cid.LocalID, sfs) + m.rmut.RUnlock() +} + +type cFiler struct { + m *Model + r string } // Implements scanner.CurrentFiler -func (m *Model) CurrentFile(file string) scanner.File { - f := m.fs.Get(cid.LocalID, file) +func (cf cFiler) CurrentFile(file string) scanner.File { + cf.m.rmut.RLock() + f := cf.m.repoFiles[cf.r].Get(cid.LocalID, file) + cf.m.rmut.RUnlock() return f } @@ -336,11 +410,6 @@ func (m *Model) ConnectedTo(nodeID string) bool { return ok } -// RepoID returns a unique ID representing the current repository location. -func (m *Model) RepoID() string { - return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir))) -} - // AddConnection adds a new peer connection to the model. An initial index will // be sent to the connected peer, thereafter index updates whenever the local // repository changes. @@ -358,20 +427,27 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) m.pmut.Unlock() go func() { - idx := m.ProtocolIndex() - if debugNet { - dlog.Printf("IDX(out/initial): %s: %d files", nodeID, len(idx)) + m.rmut.RLock() + repos := m.nodeRepos[nodeID] + m.rmut.RUnlock() + for _, repo := range repos { + idx := m.ProtocolIndex(repo) + if debugNet { + dlog.Printf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx)) + } + protoConn.Index(repo, idx) } - protoConn.Index("default", idx) }() } // ProtocolIndex returns the current local index in protocol data types. // Must be called with the read lock held. -func (m *Model) ProtocolIndex() []protocol.FileInfo { +func (m *Model) ProtocolIndex(repo string) []protocol.FileInfo { var index []protocol.FileInfo - fs := m.fs.Have(cid.LocalID) + m.rmut.RLock() + fs := m.repoFiles[repo].Have(cid.LocalID) + m.rmut.RUnlock() for _, f := range fs { mf := fileInfoFromFile(f) @@ -380,7 +456,7 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo { if mf.Flags&protocol.FlagDeleted != 0 { flagComment = " (deleted)" } - dlog.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks)) + dlog.Printf("IDX(out): %q/%q m=%d f=%o%s v=%d (%d blocks)", repo, mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks)) } index = append(index, mf) } @@ -388,11 +464,13 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo { return index } -func (m *Model) updateLocal(f scanner.File) { - m.fs.Update(cid.LocalID, []scanner.File{f}) +func (m *Model) updateLocal(repo string, f scanner.File) { + m.rmut.RLock() + m.repoFiles[repo].Update(cid.LocalID, []scanner.File{f}) + m.rmut.RUnlock() } -func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash []byte) ([]byte, error) { +func (m *Model) requestGlobal(nodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) { m.pmut.RLock() nc, ok := m.protoConn[nodeID] m.pmut.RUnlock() @@ -402,50 +480,161 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash } if debugNet { - dlog.Printf("REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + dlog.Printf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash) } - return nc.Request("default", name, offset, size) + return nc.Request(repo, name, offset, size) } func (m *Model) broadcastIndexLoop() { - var lastChange uint64 + var lastChange = map[string]uint64{} for { time.Sleep(5 * time.Second) - c := m.fs.Changes(cid.LocalID) - if c == lastChange { - continue + m.pmut.RLock() + m.rmut.RLock() + + for repo, fs := range m.repoFiles { + c := fs.Changes(cid.LocalID) + if c == lastChange[repo] { + continue + } + lastChange[repo] = c + + idx := m.ProtocolIndex(repo) + m.saveIndex(repo, confDir, idx) + + var indexWg sync.WaitGroup + for _, nodeID := range m.repoNodes[repo] { + if conn, ok := m.protoConn[nodeID]; ok { + indexWg.Add(1) + if debugNet { + dlog.Printf("IDX(out/loop): %s: %d files", nodeID, len(idx)) + } + go func() { + conn.Index(repo, idx) + indexWg.Done() + }() + } + } + + indexWg.Wait() } - lastChange = c - saveIndex(m) // This should be cleaned up we don't do a lot of processing twice + m.rmut.RUnlock() + m.pmut.RUnlock() + } +} + +func (m *Model) AddRepo(id, dir string, nodes []NodeConfiguration) { + if m.started { + panic("cannot add repo to started model") + } + if len(id) == 0 { + panic("cannot add empty repo id") + } + + m.rmut.Lock() + m.repoDirs[id] = dir + m.repoFiles[id] = files.NewSet() - fs := m.fs.Have(cid.LocalID) + m.repoNodes[id] = make([]string, len(nodes)) + for i, node := range nodes { + m.repoNodes[id][i] = node.NodeID + m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], id) + } - var indexWg sync.WaitGroup - indexWg.Add(len(m.protoConn)) + m.addedRepo = true + m.rmut.Unlock() +} - var idx = make([]protocol.FileInfo, len(fs)) - for i, f := range fs { - idx[i] = fileInfoFromFile(f) - } +func (m *Model) ScanRepos() { + m.rmut.RLock() + for repo := range m.repoDirs { + m.ScanRepo(repo) + } + m.rmut.RUnlock() +} - m.pmut.RLock() - for _, node := range m.protoConn { - node := node - if debugNet { - dlog.Printf("IDX(out/loop): %s: %d files", node.ID(), len(idx)) - } - go func() { - node.Index("default", idx) - indexWg.Done() - }() - } - m.pmut.RUnlock() +func (m *Model) ScanRepo(repo string) { + sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} + w := &scanner.Walker{ + Dir: m.repoDirs[repo], + IgnoreFile: ".stignore", + FollowSymlinks: cfg.Options.FollowSymlinks, + BlockSize: BlockSize, + TempNamer: defTempNamer, + Suppressor: sup, + CurrentFiler: cFiler{m, repo}, + } + fs, _ := w.Walk() + m.ReplaceLocal(repo, fs) +} + +func (m *Model) SaveIndexes(dir string) { + m.rmut.RLock() + for repo := range m.repoDirs { + fs := m.ProtocolIndex(repo) + m.saveIndex(repo, dir, fs) + } + m.rmut.RUnlock() +} + +func (m *Model) LoadIndexes(dir string) { + m.rmut.RLock() + for repo := range m.repoDirs { + fs := m.loadIndex(repo, dir) + m.SeedLocal(repo, fs) + } + m.rmut.RUnlock() +} + +func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) { + id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoDirs[repo]))) + name := id + ".idx.gz" + name = filepath.Join(dir, name) + + idxf, err := os.Create(name + ".tmp") + if err != nil { + return + } - indexWg.Wait() + gzw := gzip.NewWriter(idxf) + + protocol.IndexMessage{ + Repository: repo, + Files: fs, + }.EncodeXDR(gzw) + gzw.Close() + idxf.Close() + + Rename(name+".tmp", name) +} + +func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo { + id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoDirs[repo]))) + name := id + ".idx.gz" + name = filepath.Join(dir, name) + + idxf, err := os.Open(name) + if err != nil { + return nil } + defer idxf.Close() + + gzr, err := gzip.NewReader(idxf) + if err != nil { + return nil + } + defer gzr.Close() + + var im protocol.IndexMessage + err = im.DecodeXDR(gzr) + if err != nil || im.Repository != repo { + return nil + } + + return im.Files } func fileFromFileInfo(f protocol.FileInfo) scanner.File { diff --git a/cmd/syncthing/model_test.go b/cmd/syncthing/model_test.go index 186ff01bb6..0dbf096bf4 100644 --- a/cmd/syncthing/model_test.go +++ b/cmd/syncthing/model_test.go @@ -92,7 +92,7 @@ func BenchmarkIndex10000(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - m.Index("42", files) + m.Index("42", "default", files) } } @@ -105,7 +105,7 @@ func BenchmarkIndex00100(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - m.Index("42", files) + m.Index("42", "default", files) } } @@ -115,11 +115,11 @@ func BenchmarkIndexUpdate10000f10000(b *testing.B) { fs, _ := w.Walk() m.ReplaceLocal(fs) files := genFiles(10000) - m.Index("42", files) + m.Index("42", "default", files) b.ResetTimer() for i := 0; i < b.N; i++ { - m.IndexUpdate("42", files) + m.IndexUpdate("42", "default", files) } } @@ -129,12 +129,12 @@ func BenchmarkIndexUpdate10000f00100(b *testing.B) { fs, _ := w.Walk() m.ReplaceLocal(fs) files := genFiles(10000) - m.Index("42", files) + m.Index("42", "default", files) ufiles := genFiles(100) b.ResetTimer() for i := 0; i < b.N; i++ { - m.IndexUpdate("42", ufiles) + m.IndexUpdate("42", "default", ufiles) } } @@ -144,12 +144,12 @@ func BenchmarkIndexUpdate10000f00001(b *testing.B) { fs, _ := w.Walk() m.ReplaceLocal(fs) files := genFiles(10000) - m.Index("42", files) + m.Index("42", "default", files) ufiles := genFiles(1) b.ResetTimer() for i := 0; i < b.N; i++ { - m.IndexUpdate("42", ufiles) + m.IndexUpdate("42", "default", ufiles) } } @@ -206,7 +206,7 @@ func BenchmarkRequest(b *testing.B) { requestData: []byte("some data to return"), } m.AddConnection(fc, fc) - m.Index("42", files) + m.Index("42", "default", files) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/cmd/syncthing/puller.go b/cmd/syncthing/puller.go index 6f30cf59f2..cad660e68c 100644 --- a/cmd/syncthing/puller.go +++ b/cmd/syncthing/puller.go @@ -111,7 +111,7 @@ func (p *puller) run() { <-p.requestSlots b := p.bq.get() if debugPull { - dlog.Printf("filler: queueing %q offset %d copy %d", b.file.Name, b.block.Offset, len(b.copy)) + dlog.Printf("filler: queueing %q / %q offset %d copy %d", p.repo, b.file.Name, b.block.Offset, len(b.copy)) } p.blocks <- b } @@ -120,17 +120,6 @@ func (p *puller) run() { walkTicker := time.Tick(time.Duration(cfg.Options.RescanIntervalS) * time.Second) timeout := time.Tick(5 * time.Second) - sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} - w := &scanner.Walker{ - Dir: p.dir, < |