diff options
author | Jakob Borg <jakob@nym.se> | 2014-03-18 17:51:55 +0100 |
---|---|---|
committer | Jakob Borg <jakob@nym.se> | 2014-03-19 13:57:19 +0100 |
commit | 8db1bf9732a1853ab3c8415417bfe96ee8eb6a4f (patch) | |
tree | 039fc6a25febfcc72f28fbd78b66b6f334dffb58 | |
parent | 58fd379e350fbfcc9bfd6aeb7647379018e51940 (diff) |
Fix local announce (IPv6 multicast, include all listen addresses)
-rw-r--r-- | cmd/syncthing/main.go | 17 | ||||
-rw-r--r-- | discover/discover.go | 198 | ||||
-rw-r--r-- | integration/h1/config.xml | 8 | ||||
-rw-r--r-- | integration/h2/config.xml | 10 | ||||
-rw-r--r-- | integration/h3/config.xml | 8 |
5 files changed, 123 insertions, 118 deletions
diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 30fdc473df..6a85ddc69e 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -14,7 +14,6 @@ import ( "path" "runtime" "runtime/debug" - "strconv" "strings" "time" @@ -263,7 +262,7 @@ func main() { if verbose { infoln("Attempting to connect to other nodes") } - disc := discovery(cfg.Options.ListenAddress[0]) + disc := discovery() go connect(myID, disc, m, tlsCfg, connOpts) // Routine to pull blocks from other nodes to synchronize the local @@ -452,24 +451,20 @@ listen: } } -func discovery(addr string) *discover.Discoverer { - _, portstr, err := net.SplitHostPort(addr) - fatalErr(err) - port, _ := strconv.Atoi(portstr) - +func discovery() *discover.Discoverer { if !cfg.Options.LocalAnnEnabled { - port = -1 - } else if verbose { - infoln("Sending local discovery announcements") + return nil } + infoln("Sending local discovery announcements") + if !cfg.Options.GlobalAnnEnabled { cfg.Options.GlobalAnnServer = "" } else if verbose { infoln("Sending external discovery announcements") } - disc, err := discover.NewDiscoverer(myID, port, cfg.Options.GlobalAnnServer) + disc, err := discover.NewDiscoverer(myID, cfg.Options.ListenAddress, cfg.Options.GlobalAnnServer) if err != nil { warnf("No discovery possible (%v)", err) diff --git a/discover/discover.go b/discover/discover.go index f8f60eb996..c17e02f0da 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "time" + "code.google.com/p/go.net/ipv6" "github.com/calmh/syncthing/buffers" ) @@ -19,14 +20,16 @@ const ( type Discoverer struct { MyID string - ListenPort int + ListenAddresses []string BroadcastIntv time.Duration ExtBroadcastIntv time.Duration - conn *net.UDPConn + conn *ipv6.PacketConn + intfs []*net.Interface registry map[string][]string registryLock sync.RWMutex extServer string + group *net.UDPAddr localBroadcastTick <-chan time.Time forcedBroadcastTick chan time.Time @@ -41,113 +44,114 @@ var ( // When we hit this many errors in succession, we stop. const maxErrors = 30 -func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) { - local := &net.UDPAddr{IP: nil, Port: AnnouncementPort} - conn, err := net.ListenUDP("udp", local) - if err != nil { - return nil, err - } - +func NewDiscoverer(id string, addresses []string, extServer string) (*Discoverer, error) { disc := &Discoverer{ MyID: id, - ListenPort: port, + ListenAddresses: addresses, BroadcastIntv: 30 * time.Second, ExtBroadcastIntv: 1800 * time.Second, - - conn: conn, - registry: make(map[string][]string), - extServer: extServer, + registry: make(map[string][]string), + extServer: extServer, + group: &net.UDPAddr{IP: net.ParseIP("ff02::2012:1025"), Port: AnnouncementPort}, } - go disc.recvAnnouncements() + // Listen on a multicast socket. This enables sharing the socket, i.e. + // other instances of syncting on the same box can listen on the same + // group/port. - if disc.ListenPort > 0 { - disc.localBroadcastTick = time.Tick(disc.BroadcastIntv) - disc.forcedBroadcastTick = make(chan time.Time) - go disc.sendAnnouncements() - } - if len(disc.extServer) > 0 { - go disc.sendExtAnnouncements() + conn, err := net.ListenPacket("udp6", fmt.Sprintf("[ff02::]:%d", AnnouncementPort)) + if err != nil { + return nil, err } + disc.conn = ipv6.NewPacketConn(conn) - return disc, nil -} - -func (d *Discoverer) sendAnnouncements() { - var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} - var buf = pkt.MarshalXDR() - var errCounter = 0 - var err error + // Join the multicast group on as many interfaces as possible. Remember + // which those were. - remote := &net.UDPAddr{ - IP: net.IP{255, 255, 255, 255}, - Port: AnnouncementPort, + intfs, err := net.Interfaces() + if err != nil { + log.Printf("discover/interfaces: %v; no local announcements", err) + conn.Close() + return nil, err } - for errCounter < maxErrors { - intfs, err := net.Interfaces() - if err != nil { - log.Printf("discover/listInterfaces: %v; no local announcements", err) - return + for _, intf := range intfs { + intf := intf + addrs, err := intf.Addrs() + if err == nil && len(addrs) > 0 && intf.Flags&net.FlagMulticast != 0 && intf.Flags&net.FlagUp != 0 { + if err := disc.conn.JoinGroup(&intf, disc.group); err != nil { + if debug { + dlog.Printf("%v; not joining on %s", err, intf.Name) + } + } else { + disc.intfs = append(disc.intfs, &intf) + } } + } - for _, intf := range intfs { - if intf.Flags&(net.FlagBroadcast|net.FlagLoopback) == net.FlagBroadcast { - addrs, err := intf.Addrs() - if err != nil { - log.Println("discover/listAddrs: warning:", err) - errCounter++ - continue - } + // Receive announcements sent to the local multicast group. - var srcAddr string - for _, addr := range addrs { - if strings.Contains(addr.String(), ".") { - // Found an IPv4 adress - parts := strings.Split(addr.String(), "/") - srcAddr = parts[0] - break - } - } - if len(srcAddr) == 0 { - if debug { - dlog.Println("no source address found on interface", intf.Name) - } - continue - } + go disc.recvAnnouncements() - iaddr, err := net.ResolveUDPAddr("udp4", srcAddr+":0") - if err != nil { - log.Println("discover/resolve: warning:", err) - errCounter++ - continue - } + // If we got a list of addresses that we listen on, announce those + // locally. - conn, err := net.ListenUDP("udp4", iaddr) - if err != nil { - log.Println("discover/listen: warning:", err) - errCounter++ - continue - } + if len(disc.ListenAddresses) > 0 { + disc.localBroadcastTick = time.Tick(disc.BroadcastIntv) + disc.forcedBroadcastTick = make(chan time.Time) + go disc.sendLocalAnnouncements() - if debug { - dlog.Println("send announcement from", conn.LocalAddr(), "to", remote, "on", intf.Name) - } + // If we have an external server address, also announce to that + // server. - _, err = conn.WriteTo(buf, remote) - if err != nil { - // Some interfaces don't seem to support broadcast even though the flags claims they do, i.e. vmnet - conn.Close() + if len(disc.extServer) > 0 { + go disc.sendExternalAnnouncements() + } + } - if debug { - log.Println(err) - } + return disc, nil +} - errCounter++ - continue - } +func (d *Discoverer) announcementPkt() []byte { + var addrs []Address + for _, astr := range d.ListenAddresses { + addr, err := net.ResolveTCPAddr("tcp", astr) + if err != nil { + log.Printf("discover/announcement: %v: not announcing %s", err, astr) + continue + } else if debug { + dlog.Printf("announcing %s: %#v", astr, addr) + } + if len(addr.IP) == 0 || addr.IP.IsUnspecified() { + addrs = append(addrs, Address{Port: uint16(addr.Port)}) + } else if bs := addr.IP.To4(); bs != nil { + addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)}) + } else if bs := addr.IP.To16(); bs != nil { + addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)}) + } + } + var pkt = AnnounceV2{ + Magic: AnnouncementMagicV2, + NodeID: d.MyID, + Addresses: addrs, + } + return pkt.MarshalXDR() +} + +func (d *Discoverer) sendLocalAnnouncements() { + var buf = d.announcementPkt() + var errCounter = 0 + var err error - conn.Close() + wcm := ipv6.ControlMessage{HopLimit: 1} + for errCounter < maxErrors { + for _, intf := range d.intfs { + wcm.IfIndex = intf.Index + if _, err = d.conn.WriteTo(buf, &wcm, d.group); err != nil { + log.Printf("discover/sendLocalAnnouncements: on %s: %v; no local announcement", intf.Name, err) + errCounter++ + continue + } else { errCounter = 0 } } @@ -157,25 +161,29 @@ func (d *Discoverer) sendAnnouncements() { case <-d.forcedBroadcastTick: } } - log.Println("discover/write: local: stopping due to too many errors:", err) } -func (d *Discoverer) sendExtAnnouncements() { +func (d *Discoverer) sendExternalAnnouncements() { remote, err := net.ResolveUDPAddr("udp", d.extServer) if err != nil { log.Printf("discover/external: %v; no external announcements", err) return } - var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} - var buf = pkt.MarshalXDR() + conn, err := net.ListenUDP("udp", nil) + if err != nil { + log.Printf("discover/external: %v; no external announcements", err) + return + } + + var buf = d.announcementPkt() var errCounter = 0 for errCounter < maxErrors { if debug { dlog.Println("send announcement -> ", remote) } - _, err = d.conn.WriteTo(buf, remote) + _, err = conn.WriteTo(buf, remote) if err != nil { log.Println("discover/write: warning:", err) errCounter++ @@ -192,7 +200,7 @@ func (d *Discoverer) recvAnnouncements() { var errCounter = 0 var err error for errCounter < maxErrors { - n, addr, err := d.conn.ReadFromUDP(buf) + n, _, addr, err := d.conn.ReadFrom(buf) if err != nil { errCounter++ time.Sleep(time.Second) @@ -224,7 +232,9 @@ func (d *Discoverer) recvAnnouncements() { if len(a.IP) > 0 { nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port) } else { - nodeAddr = fmt.Sprintf("%s:%d", addr.IP.String(), a.Port) + ua := addr.(*net.UDPAddr) + ua.Port = int(a.Port) + nodeAddr = ua.String() } addrs = append(addrs, nodeAddr) } diff --git a/integration/h1/config.xml b/integration/h1/config.xml index c62208277c..f5840342fb 100644 --- a/integration/h1/config.xml +++ b/integration/h1/config.xml @@ -1,13 +1,13 @@ <configuration version="1"> <repository directory="s1"> <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"> - <address>127.0.0.1:22001</address> + <address>dynamic</address> </node> <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"> - <address>127.0.0.1:22002</address> + <address>dynamic</address> </node> <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"> - <address>127.0.0.1:22003</address> + <address>dynamic</address> </node> </repository> <options> @@ -19,7 +19,7 @@ <guiAddress>127.0.0.1:8081</guiAddress> <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer> <globalAnnounceEnabled>false</globalAnnounceEnabled> - <localAnnounceEnabled>false</localAnnounceEnabled> + <localAnnounceEnabled>true</localAnnounceEnabled> <parallelRequests>16</parallelRequests> <maxSendKbps>0</maxSendKbps> <rescanIntervalS>60</rescanIntervalS> diff --git a/integration/h2/config.xml b/integration/h2/config.xml index 6896e58c20..c18c147ec3 100644 --- a/integration/h2/config.xml +++ b/integration/h2/config.xml @@ -1,17 +1,17 @@ <configuration version="1"> <repository directory="s2"> <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"> - <address>127.0.0.1:22001</address> + <address>dynamic</address> </node> <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"> - <address>127.0.0.1:22002</address> + <address>dynamic</address> </node> <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"> - <address>127.0.0.1:22003</address> + <address>dynamic</address> </node> </repository> <options> - <listenAddress>127.0.0.2:22002</listenAddress> + <listenAddress>127.0.0.1:22002</listenAddress> <readOnly>false</readOnly> <allowDelete>true</allowDelete> <followSymlinks>true</followSymlinks> @@ -19,7 +19,7 @@ <guiAddress>127.0.0.1:8082</guiAddress> <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer> <globalAnnounceEnabled>false</globalAnnounceEnabled> - <localAnnounceEnabled>false</localAnnounceEnabled> + <localAnnounceEnabled>true</localAnnounceEnabled> <parallelRequests>16</parallelRequests> <maxSendKbps>0</maxSendKbps> <rescanIntervalS>60</rescanIntervalS> diff --git a/integration/h3/config.xml b/integration/h3/config.xml index 6c48cb1507..f990deeba3 100644 --- a/integration/h3/config.xml +++ b/integration/h3/config.xml @@ -1,13 +1,13 @@ <configuration version="1"> <repository directory="s3"> <node id="I6KAH7666SLLL5PFXSOAUFJCDZYAOMLEKCP2GB3BV5RQST3PSROA" name="s1"> - <address>127.0.0.1:22001</address> + <address>dynamic</address> </node> <node id="JMFJCXBGZDE4BOCJE3VF65GYZNAIVJRET3J6HMRAUQIGJOFKNHMQ" name="s2"> - <address>127.0.0.1:22002</address> + <address>dynamic</address> </node> <node id="373HSRPQLPNLIJYKZVQFP4PKZ6R2ZE6K3YD442UJHBGBQGWWXAHA" name="s3"> - <address>127.0.0.1:22003</address> + <address>dynamic</address> </node> </repository> <options> @@ -19,7 +19,7 @@ <guiAddress>127.0.0.1:8083</guiAddress> <globalAnnounceServer>announce.syncthing.net:22025</globalAnnounceServer> <globalAnnounceEnabled>false</globalAnnounceEnabled> - <localAnnounceEnabled>false</localAnnounceEnabled> + <localAnnounceEnabled>true</localAnnounceEnabled> <parallelRequests>16</parallelRequests> <maxSendKbps>0</maxSendKbps> <rescanIntervalS>60</rescanIntervalS> |