summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@nym.se>2014-03-23 08:45:05 +0100
committerJakob Borg <jakob@nym.se>2014-03-23 08:53:34 +0100
commit589244f39e80d144f922628fc05088b3ac685d1d (patch)
tree14a10be4d3074e39bf7d4d1c6c1208c9394441b9
parent804cce7ba08b306b2d3a0755f6641a9f0904b2a3 (diff)
Prevent zombie connections due to simultaneous connect
-rw-r--r--cmd/syncthing/main.go165
-rw-r--r--cmd/syncthing/model.go6
2 files changed, 88 insertions, 83 deletions
diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go
index 6a85ddc69e..699c618215 100644
--- a/cmd/syncthing/main.go
+++ b/cmd/syncthing/main.go
@@ -250,20 +250,12 @@ func main() {
"clusterHash": clusterHash(cfg.Repositories[0].Nodes),
}
- // Routine to listen for incoming connections
- if verbose {
- infoln("Listening for incoming connections")
- }
- for _, addr := range cfg.Options.ListenAddress {
- go listen(myID, addr, m, tlsCfg, connOpts)
- }
-
// Routine to connect out to configured nodes
if verbose {
infoln("Attempting to connect to other nodes")
}
disc := discovery()
- go connect(myID, disc, m, tlsCfg, connOpts)
+ go listenConnect(myID, disc, m, tlsCfg, connOpts)
// Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode.
@@ -401,50 +393,106 @@ func printStatsLoop(m *Model) {
}
}
-func listen(myID string, addr string, m *Model, tlsCfg *tls.Config, connOpts map[string]string) {
- if debugNet {
- dlog.Println("listening on", addr)
+func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) {
+ var conns = make(chan *tls.Conn)
+
+ // Listen
+ for _, addr := range cfg.Options.ListenAddress {
+ addr := addr
+ go func() {
+ if debugNet {
+ dlog.Println("listening on", addr)
+ }
+ l, err := tls.Listen("tcp", addr, tlsCfg)
+ fatalErr(err)
+
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ warnln(err)
+ continue
+ }
+
+ if debugNet {
+ dlog.Println("connect from", conn.RemoteAddr())
+ }
+
+ tc := conn.(*tls.Conn)
+ err = tc.Handshake()
+ if err != nil {
+ warnln(err)
+ tc.Close()
+ continue
+ }
+
+ conns <- tc
+ }
+ }()
}
- l, err := tls.Listen("tcp", addr, tlsCfg)
- fatalErr(err)
-listen:
- for {
- conn, err := l.Accept()
- if err != nil {
- warnln(err)
- continue
- }
+ // Connect
+ go func() {
+ for {
+ nextNode:
+ for _, nodeCfg := range cfg.Repositories[0].Nodes {
+ if nodeCfg.NodeID == myID {
+ continue
+ }
+ if m.ConnectedTo(nodeCfg.NodeID) {
+ continue
+ }
+ for _, addr := range nodeCfg.Addresses {
+ if addr == "dynamic" {
+ if disc != nil {
+ t := disc.Lookup(nodeCfg.NodeID)
+ if len(t) == 0 {
+ continue
+ }
+ addr = t[0] //XXX: Handle all of them
+ }
+ }
- if debugNet {
- dlog.Println("connect from", conn.RemoteAddr())
- }
+ if debugNet {
+ dlog.Println("dial", nodeCfg.NodeID, addr)
+ }
+ conn, err := tls.Dial("tcp", addr, tlsCfg)
+ if err != nil {
+ if debugNet {
+ dlog.Println(err)
+ }
+ continue
+ }
- tc := conn.(*tls.Conn)
- err = tc.Handshake()
- if err != nil {
- warnln(err)
- tc.Close()
- continue
+ conns <- conn
+ continue nextNode
+ }
+ }
+
+ time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second)
}
+ }()
- remoteID := certID(tc.ConnectionState().PeerCertificates[0].Raw)
+next:
+ for conn := range conns {
+ remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw)
if remoteID == myID {
- warnf("Connect from myself (%s) - should not happen", remoteID)
+ warnf("Connected to myself (%s) - should not happen", remoteID)
conn.Close()
continue
}
if m.ConnectedTo(remoteID) {
- warnf("Connect from connected node (%s)", remoteID)
+ warnf("Connected to already connected node (%s)", remoteID)
+ conn.Close()
+ continue
}
for _, nodeCfg := range cfg.Repositories[0].Nodes {
if nodeCfg.NodeID == remoteID {
protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn)
- continue listen
+ continue next
}
}
conn.Close()
@@ -473,55 +521,6 @@ func discovery() *discover.Discoverer {
return disc
}
-func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) {
- for {
- nextNode:
- for _, nodeCfg := range cfg.Repositories[0].Nodes {
- if nodeCfg.NodeID == myID {
- continue
- }
- if m.ConnectedTo(nodeCfg.NodeID) {
- continue
- }
- for _, addr := range nodeCfg.Addresses {
- if addr == "dynamic" {
- if disc != nil {
- t := disc.Lookup(nodeCfg.NodeID)
- if len(t) == 0 {
- continue
- }
- addr = t[0] //XXX: Handle all of them
- }
- }
-
- if debugNet {
- dlog.Println("dial", nodeCfg.NodeID, addr)
- }
- conn, err := tls.Dial("tcp", addr, tlsCfg)
- if err != nil {
- if debugNet {
- dlog.Println(err)
- }
- continue
- }
-
- remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw)
- if remoteID != nodeCfg.NodeID {
- warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID)
- conn.Close()
- continue
- }
-
- protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
- m.AddConnection(conn, protoConn)
- continue nextNode
- }
- }
-
- time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second)
- }
-}
-
func updateLocalModel(m *Model, w *scanner.Walker) {
files, _ := w.Walk()
m.ReplaceLocal(files)
diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go
index db6dd29d8b..93ecbf7247 100644
--- a/cmd/syncthing/model.go
+++ b/cmd/syncthing/model.go
@@ -507,7 +507,13 @@ func (m *Model) RepoID() string {
func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
nodeID := protoConn.ID()
m.pmut.Lock()
+ if _, ok := m.protoConn[nodeID]; ok {
+ panic("add existing node")
+ }
m.protoConn[nodeID] = protoConn
+ if _, ok := m.rawConn[nodeID]; ok {
+ panic("add existing node")
+ }
m.rawConn[nodeID] = rawConn
m.pmut.Unlock()