summaryrefslogtreecommitdiffstats
path: root/discover
diff options
context:
space:
mode:
authorJakob Borg <jakob@nym.se>2014-03-18 17:51:55 +0100
committerJakob Borg <jakob@nym.se>2014-03-19 13:57:19 +0100
commit8db1bf9732a1853ab3c8415417bfe96ee8eb6a4f (patch)
tree039fc6a25febfcc72f28fbd78b66b6f334dffb58 /discover
parent58fd379e350fbfcc9bfd6aeb7647379018e51940 (diff)
Fix local announce (IPv6 multicast, include all listen addresses)
Diffstat (limited to 'discover')
-rw-r--r--discover/discover.go198
1 files changed, 104 insertions, 94 deletions
diff --git a/discover/discover.go b/discover/discover.go
index f8f60eb996..c17e02f0da 100644
--- a/discover/discover.go
+++ b/discover/discover.go
@@ -9,6 +9,7 @@ import (
"strings"
"sync"
"time"
+ "code.google.com/p/go.net/ipv6"
"github.com/calmh/syncthing/buffers"
)
@@ -19,14 +20,16 @@ const (
type Discoverer struct {
MyID string
- ListenPort int
+ ListenAddresses []string
BroadcastIntv time.Duration
ExtBroadcastIntv time.Duration
- conn *net.UDPConn
+ conn *ipv6.PacketConn
+ intfs []*net.Interface
registry map[string][]string
registryLock sync.RWMutex
extServer string
+ group *net.UDPAddr
localBroadcastTick <-chan time.Time
forcedBroadcastTick chan time.Time
@@ -41,113 +44,114 @@ var (
// When we hit this many errors in succession, we stop.
const maxErrors = 30
-func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) {
- local := &net.UDPAddr{IP: nil, Port: AnnouncementPort}
- conn, err := net.ListenUDP("udp", local)
- if err != nil {
- return nil, err
- }
-
+func NewDiscoverer(id string, addresses []string, extServer string) (*Discoverer, error) {
disc := &Discoverer{
MyID: id,
- ListenPort: port,
+ ListenAddresses: addresses,
BroadcastIntv: 30 * time.Second,
ExtBroadcastIntv: 1800 * time.Second,
-
- conn: conn,
- registry: make(map[string][]string),
- extServer: extServer,
+ registry: make(map[string][]string),
+ extServer: extServer,
+ group: &net.UDPAddr{IP: net.ParseIP("ff02::2012:1025"), Port: AnnouncementPort},
}
- go disc.recvAnnouncements()
+ // Listen on a multicast socket. This enables sharing the socket, i.e.
+ // other instances of syncting on the same box can listen on the same
+ // group/port.
- if disc.ListenPort > 0 {
- disc.localBroadcastTick = time.Tick(disc.BroadcastIntv)
- disc.forcedBroadcastTick = make(chan time.Time)
- go disc.sendAnnouncements()
- }
- if len(disc.extServer) > 0 {
- go disc.sendExtAnnouncements()
+ conn, err := net.ListenPacket("udp6", fmt.Sprintf("[ff02::]:%d", AnnouncementPort))
+ if err != nil {
+ return nil, err
}
+ disc.conn = ipv6.NewPacketConn(conn)
- return disc, nil
-}
-
-func (d *Discoverer) sendAnnouncements() {
- var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}}
- var buf = pkt.MarshalXDR()
- var errCounter = 0
- var err error
+ // Join the multicast group on as many interfaces as possible. Remember
+ // which those were.
- remote := &net.UDPAddr{
- IP: net.IP{255, 255, 255, 255},
- Port: AnnouncementPort,
+ intfs, err := net.Interfaces()
+ if err != nil {
+ log.Printf("discover/interfaces: %v; no local announcements", err)
+ conn.Close()
+ return nil, err
}
- for errCounter < maxErrors {
- intfs, err := net.Interfaces()
- if err != nil {
- log.Printf("discover/listInterfaces: %v; no local announcements", err)
- return
+ for _, intf := range intfs {
+ intf := intf
+ addrs, err := intf.Addrs()
+ if err == nil && len(addrs) > 0 && intf.Flags&net.FlagMulticast != 0 && intf.Flags&net.FlagUp != 0 {
+ if err := disc.conn.JoinGroup(&intf, disc.group); err != nil {
+ if debug {
+ dlog.Printf("%v; not joining on %s", err, intf.Name)
+ }
+ } else {
+ disc.intfs = append(disc.intfs, &intf)
+ }
}
+ }
- for _, intf := range intfs {
- if intf.Flags&(net.FlagBroadcast|net.FlagLoopback) == net.FlagBroadcast {
- addrs, err := intf.Addrs()
- if err != nil {
- log.Println("discover/listAddrs: warning:", err)
- errCounter++
- continue
- }
+ // Receive announcements sent to the local multicast group.
- var srcAddr string
- for _, addr := range addrs {
- if strings.Contains(addr.String(), ".") {
- // Found an IPv4 adress
- parts := strings.Split(addr.String(), "/")
- srcAddr = parts[0]
- break
- }
- }
- if len(srcAddr) == 0 {
- if debug {
- dlog.Println("no source address found on interface", intf.Name)
- }
- continue
- }
+ go disc.recvAnnouncements()
- iaddr, err := net.ResolveUDPAddr("udp4", srcAddr+":0")
- if err != nil {
- log.Println("discover/resolve: warning:", err)
- errCounter++
- continue
- }
+ // If we got a list of addresses that we listen on, announce those
+ // locally.
- conn, err := net.ListenUDP("udp4", iaddr)
- if err != nil {
- log.Println("discover/listen: warning:", err)
- errCounter++
- continue
- }
+ if len(disc.ListenAddresses) > 0 {
+ disc.localBroadcastTick = time.Tick(disc.BroadcastIntv)
+ disc.forcedBroadcastTick = make(chan time.Time)
+ go disc.sendLocalAnnouncements()
- if debug {
- dlog.Println("send announcement from", conn.LocalAddr(), "to", remote, "on", intf.Name)
- }
+ // If we have an external server address, also announce to that
+ // server.
- _, err = conn.WriteTo(buf, remote)
- if err != nil {
- // Some interfaces don't seem to support broadcast even though the flags claims they do, i.e. vmnet
- conn.Close()
+ if len(disc.extServer) > 0 {
+ go disc.sendExternalAnnouncements()
+ }
+ }
- if debug {
- log.Println(err)
- }
+ return disc, nil
+}
- errCounter++
- continue
- }
+func (d *Discoverer) announcementPkt() []byte {
+ var addrs []Address
+ for _, astr := range d.ListenAddresses {
+ addr, err := net.ResolveTCPAddr("tcp", astr)
+ if err != nil {
+ log.Printf("discover/announcement: %v: not announcing %s", err, astr)
+ continue
+ } else if debug {
+ dlog.Printf("announcing %s: %#v", astr, addr)
+ }
+ if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
+ addrs = append(addrs, Address{Port: uint16(addr.Port)})
+ } else if bs := addr.IP.To4(); bs != nil {
+ addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
+ } else if bs := addr.IP.To16(); bs != nil {
+ addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
+ }
+ }
+ var pkt = AnnounceV2{
+ Magic: AnnouncementMagicV2,
+ NodeID: d.MyID,
+ Addresses: addrs,
+ }
+ return pkt.MarshalXDR()
+}
+
+func (d *Discoverer) sendLocalAnnouncements() {
+ var buf = d.announcementPkt()
+ var errCounter = 0
+ var err error
- conn.Close()
+ wcm := ipv6.ControlMessage{HopLimit: 1}
+ for errCounter < maxErrors {
+ for _, intf := range d.intfs {
+ wcm.IfIndex = intf.Index
+ if _, err = d.conn.WriteTo(buf, &wcm, d.group); err != nil {
+ log.Printf("discover/sendLocalAnnouncements: on %s: %v; no local announcement", intf.Name, err)
+ errCounter++
+ continue
+ } else {
errCounter = 0
}
}
@@ -157,25 +161,29 @@ func (d *Discoverer) sendAnnouncements() {
case <-d.forcedBroadcastTick:
}
}
- log.Println("discover/write: local: stopping due to too many errors:", err)
}
-func (d *Discoverer) sendExtAnnouncements() {
+func (d *Discoverer) sendExternalAnnouncements() {
remote, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil {
log.Printf("discover/external: %v; no external announcements", err)
return
}
- var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}}
- var buf = pkt.MarshalXDR()
+ conn, err := net.ListenUDP("udp", nil)
+ if err != nil {
+ log.Printf("discover/external: %v; no external announcements", err)
+ return
+ }
+
+ var buf = d.announcementPkt()
var errCounter = 0
for errCounter < maxErrors {
if debug {
dlog.Println("send announcement -> ", remote)
}
- _, err = d.conn.WriteTo(buf, remote)
+ _, err = conn.WriteTo(buf, remote)
if err != nil {
log.Println("discover/write: warning:", err)
errCounter++
@@ -192,7 +200,7 @@ func (d *Discoverer) recvAnnouncements() {
var errCounter = 0
var err error
for errCounter < maxErrors {
- n, addr, err := d.conn.ReadFromUDP(buf)
+ n, _, addr, err := d.conn.ReadFrom(buf)
if err != nil {
errCounter++
time.Sleep(time.Second)
@@ -224,7 +232,9 @@ func (d *Discoverer) recvAnnouncements() {
if len(a.IP) > 0 {
nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port)
} else {
- nodeAddr = fmt.Sprintf("%s:%d", addr.IP.String(), a.Port)
+ ua := addr.(*net.UDPAddr)
+ ua.Port = int(a.Port)
+ nodeAddr = ua.String()
}
addrs = append(addrs, nodeAddr)
}