summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2024-06-03 07:13:21 +0200
committerJakob Borg <jakob@kastelo.net>2024-06-03 19:50:28 +0200
commitf283215fce1660753fba9e213a5fee8820137c0c (patch)
tree263d8811a368d4c71363f3283484c23d792135e8
parent495809ac9ecb0bffd4045d164021f2b4ed235ecc (diff)
cmd/stdiscosrv: Add AMQP replication
-rw-r--r--cmd/stdiscosrv/amqp.go246
-rw-r--r--cmd/stdiscosrv/apisrv.go36
-rw-r--r--cmd/stdiscosrv/main.go23
-rw-r--r--cmd/stdiscosrv/replication.go3
-rw-r--r--cmd/stdiscosrv/stats.go14
-rw-r--r--go.mod1
-rw-r--r--go.sum4
7 files changed, 293 insertions, 34 deletions
diff --git a/cmd/stdiscosrv/amqp.go b/cmd/stdiscosrv/amqp.go
new file mode 100644
index 0000000000..e09919e27f
--- /dev/null
+++ b/cmd/stdiscosrv/amqp.go
@@ -0,0 +1,246 @@
+// Copyright (C) 2024 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/thejerf/suture/v4"
+)
+
+type amqpReplicator struct {
+ suture.Service
+ broker string
+ sender *amqpSender
+ receiver *amqpReceiver
+ outbox chan ReplicationRecord
+}
+
+func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator {
+ svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true})
+
+ sender := &amqpSender{
+ broker: broker,
+ clientID: clientID,
+ outbox: make(chan ReplicationRecord, replicationOutboxSize),
+ }
+ svc.Add(sender)
+
+ receiver := &amqpReceiver{
+ broker: broker,
+ clientID: clientID,
+ db: db,
+ }
+ svc.Add(receiver)
+
+ return &amqpReplicator{
+ Service: svc,
+ broker: broker,
+ sender: sender,
+ receiver: receiver,
+ outbox: make(chan ReplicationRecord, replicationOutboxSize),
+ }
+}
+
+func (s *amqpReplicator) send(key string, ps []DatabaseAddress, seen int64) {
+ s.sender.send(key, ps, seen)
+}
+
+type amqpSender struct {
+ broker string
+ clientID string
+ outbox chan ReplicationRecord
+}
+
+func (s *amqpSender) Serve(ctx context.Context) error {
+ conn, ch, err := amqpChannel(s.broker)
+ if err != nil {
+ return err
+ }
+ defer ch.Close()
+ defer conn.Close()
+
+ buf := make([]byte, 1024)
+ for {
+ select {
+ case rec := <-s.outbox:
+ size := rec.Size()
+ if len(buf) < size {
+ buf = make([]byte, size)
+ }
+
+ n, err := rec.MarshalTo(buf)
+ if err != nil {
+ replicationSendsTotal.WithLabelValues("error").Inc()
+ return fmt.Errorf("replication marshal: %w", err)
+ }
+
+ err = ch.PublishWithContext(ctx,
+ "discovery", // exchange
+ "", // routing key
+ false, // mandatory
+ false, // immediate
+ amqp.Publishing{
+ ContentType: "application/protobuf",
+ Body: buf[:n],
+ AppId: s.clientID,
+ })
+ if err != nil {
+ replicationSendsTotal.WithLabelValues("error").Inc()
+ return fmt.Errorf("replication publish: %w", err)
+ }
+
+ replicationSendsTotal.WithLabelValues("success").Inc()
+
+ case <-ctx.Done():
+ return nil
+ }
+ }
+}
+
+func (s *amqpSender) String() string {
+ return fmt.Sprintf("amqpSender(%q)", s.broker)
+}
+
+func (s *amqpSender) send(key string, ps []DatabaseAddress, seen int64) {
+ item := ReplicationRecord{
+ Key: key,
+ Addresses: ps,
+ Seen: seen,
+ }
+
+ // The send should never block. The inbox is suitably buffered for at
+ // least a few seconds of stalls, which shouldn't happen in practice.
+ select {
+ case s.outbox <- item:
+ default:
+ replicationSendsTotal.WithLabelValues("drop").Inc()
+ }
+}
+
+type amqpReceiver struct {
+ broker string
+ clientID string
+ db database
+}
+
+func (s *amqpReceiver) Serve(ctx context.Context) error {
+ conn, ch, err := amqpChannel(s.broker)
+ if err != nil {
+ return err
+ }
+ defer ch.Close()
+ defer conn.Close()
+
+ msgs, err := amqpConsume(ch)
+ if err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case msg, ok := <-msgs:
+ if !ok {
+ return fmt.Errorf("subscription closed: %w", io.EOF)
+ }
+
+ // ignore messages from ourself
+ if msg.AppId == s.clientID {
+ continue
+ }
+
+ var rec ReplicationRecord
+ if err := rec.Unmarshal(msg.Body); err != nil {
+ replicationRecvsTotal.WithLabelValues("error").Inc()
+ return fmt.Errorf("replication unmarshal: %w", err)
+ }
+
+ if err := s.db.merge(rec.Key, rec.Addresses, rec.Seen); err != nil {
+ return fmt.Errorf("replication database merge: %w", err)
+ }
+
+ replicationRecvsTotal.WithLabelValues("success").Inc()
+
+ case <-ctx.Done():
+ return nil
+ }
+ }
+}
+
+func (s *amqpReceiver) String() string {
+ return fmt.Sprintf("amqpReceiver(%q)", s.broker)
+}
+
+func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) {
+ conn, err := amqp.Dial(dst)
+ if err != nil {
+ return nil, nil, fmt.Errorf("AMQP dial: %w", err)
+ }
+
+ ch, err := conn.Channel()
+ if err != nil {
+ return nil, nil, fmt.Errorf("AMQP channel: %w", err)
+ }
+
+ err = ch.ExchangeDeclare(
+ "discovery", // name
+ "fanout", // type
+ false, // durable
+ false, // auto-deleted
+ false, // internal
+ false, // no-wait
+ nil, // arguments
+ )
+ if err != nil {
+ return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err)
+ }
+
+ return conn, ch, nil
+}
+
+func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) {
+ q, err := ch.QueueDeclare(
+ "", // name
+ false, // durable
+ false, // delete when unused
+ true, // exclusive
+ false, // no-wait
+ nil, // arguments
+ )
+ if err != nil {
+ return nil, fmt.Errorf("AMQP declare queue: %w", err)
+ }
+
+ err = ch.QueueBind(
+ q.Name, // queue name
+ "", // routing key
+ "discovery", // exchange
+ false,
+ nil,
+ )
+ if err != nil {
+ return nil, fmt.Errorf("AMQP bind queue: %w", err)
+ }
+
+ msgs, err := ch.Consume(
+ q.Name, // queue
+ "", // consumer
+ true, // auto-ack
+ false, // exclusive
+ false, // no-local
+ false, // no-wait
+ nil, // args
+ )
+ if err != nil {
+ return nil, fmt.Errorf("AMQP consume: %w", err)
+ }
+
+ return msgs, nil
+}
diff --git a/cmd/stdiscosrv/apisrv.go b/cmd/stdiscosrv/apisrv.go
index 3da8ee0f19..b8721ae680 100644
--- a/cmd/stdiscosrv/apisrv.go
+++ b/cmd/stdiscosrv/apisrv.go
@@ -39,12 +39,13 @@ type announcement struct {
}
type apiSrv struct {
- addr string
- cert tls.Certificate
- db database
- listener net.Listener
- repl replicator // optional
- useHTTP bool
+ addr string
+ cert tls.Certificate
+ db database
+ listener net.Listener
+ repl replicator // optional
+ useHTTP bool
+ missesIncrease int
mapsMut sync.Mutex
misses map[string]int32
@@ -60,14 +61,15 @@ type contextKey int
const idKey contextKey = iota
-func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool) *apiSrv {
+func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool, missesIncrease int) *apiSrv {
return &apiSrv{
- addr: addr,
- cert: cert,
- db: db,
- repl: repl,
- useHTTP: useHTTP,
- misses: make(map[string]int32),
+ addr: addr,
+ cert: cert,
+ db: db,
+ repl: repl,
+ useHTTP: useHTTP,
+ misses: make(map[string]int32),
+ missesIncrease: missesIncrease,
}
}
@@ -197,14 +199,13 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) {
s.mapsMut.Lock()
misses := s.misses[key]
if misses < rec.Misses {
- misses = rec.Misses + 1
- } else {
- misses++
+ misses = rec.Misses
}
+ misses += int32(s.missesIncrease)
s.misses[key] = misses
s.mapsMut.Unlock()
- if misses%notFoundMissesWriteInterval == 0 {
+ if misses >= notFoundMissesWriteInterval {
rec.Misses = misses
rec.Missed = time.Now().UnixNano()
rec.Addresses = nil
@@ -444,7 +445,6 @@ func fixupAddresses(remote *net.TCPAddr, addresses []string) []string {
// remote is nil, unable to determine host IP
continue
}
-
}
// If zero port was specified, use remote port.
diff --git a/cmd/stdiscosrv/main.go b/cmd/stdiscosrv/main.go
index 4890f3bda9..0b8c907f5e 100644
--- a/cmd/stdiscosrv/main.go
+++ b/cmd/stdiscosrv/main.go
@@ -22,6 +22,7 @@ import (
_ "github.com/syncthing/syncthing/lib/automaxprocs"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/protocol"
+ "github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/thejerf/suture/v4"
@@ -80,6 +81,8 @@ func main() {
var replKeyFile string
var useHTTP bool
var largeDB bool
+ var amqpAddress string
+ missesIncrease := 1
log.SetOutput(os.Stdout)
log.SetFlags(0)
@@ -96,6 +99,8 @@ func main() {
flag.StringVar(&replCertFile, "replication-cert", "", "Certificate file for replication")
flag.StringVar(&replKeyFile, "replication-key", "", "Key file for replication")
flag.BoolVar(&largeDB, "large-db", false, "Use larger database settings")
+ flag.StringVar(&amqpAddress, "amqp-address", "", "Address to AMQP broker")
+ flag.IntVar(&missesIncrease, "misses-increase", 1, "How many times to increase the misses counter on each miss")
showVersion := flag.Bool("version", false, "Show version")
flag.Parse()
@@ -203,8 +208,24 @@ func main() {
main.Add(rl)
}
+ // If we have an AMQP broker, start that
+ if amqpAddress != "" {
+ clientID := rand.String(10)
+ kr := newAMQPReplicator(amqpAddress, clientID, db)
+ repl = append(repl, kr)
+ main.Add(kr)
+ }
+
+ go func() {
+ for range time.NewTicker(time.Second).C {
+ for _, r := range repl {
+ r.send("<heartbeat>", nil, time.Now().UnixNano())
+ }
+ }
+ }()
+
// Start the main API server.
- qs := newAPISrv(listen, cert, db, repl, useHTTP)
+ qs := newAPISrv(listen, cert, db, repl, useHTTP, missesIncrease)
main.Add(qs)
// If we have a metrics port configured, start a metrics handler.
diff --git a/cmd/stdiscosrv/replication.go b/cmd/stdiscosrv/replication.go
index 8d0db3f7b8..e7aa2894ad 100644
--- a/cmd/stdiscosrv/replication.go
+++ b/cmd/stdiscosrv/replication.go
@@ -144,10 +144,11 @@ func (s *replicationSender) String() string {
return fmt.Sprintf("replicationSender(%q)", s.dst)
}
-func (s *replicationSender) send(key string, ps []DatabaseAddress, _ int64) {
+func (s *replicationSender) send(key string, ps []DatabaseAddress, seen int64) {
item := ReplicationRecord{
Key: key,
Addresses: ps,
+ Seen: seen,
}
// The send should never block. The inbox is suitably buffered for at
diff --git a/cmd/stdiscosrv/stats.go b/cmd/stdiscosrv/stats.go
index 949a369eb6..ba9ccb40d9 100644
--- a/cmd/stdiscosrv/stats.go
+++ b/cmd/stdiscosrv/stats.go
@@ -7,10 +7,7 @@
package main
import (
- "os"
-
"github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/collectors"
)
var (
@@ -127,15 +124,4 @@ func init() {
databaseKeys, databaseStatisticsSeconds,
databaseOperations, databaseOperationSeconds,
retryAfterHistogram)
-
- processCollectorOpts := collectors.ProcessCollectorOpts{
- Namespace: "syncthing_discovery",
- PidFn: func() (int, error) {
- return os.Getpid(), nil
- },
- }
-
- prometheus.MustRegister(
- collectors.NewProcessCollector(processCollectorOpts),
- )
}
diff --git a/go.mod b/go.mod
index 3636d1ef81..b23df054ae 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.21
github.com/prometheus/client_golang v1.19.1
github.com/quic-go/quic-go v0.44.0
+ github.com/rabbitmq/amqp091-go v1.10.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/shirou/gopsutil/v3 v3.24.4
github.com/syncthing/notify v0.0.0-20210616190510-c6b7342338d2
diff --git a/go.sum b/go.sum
index 9955b733a8..4a0e77ede7 100644
--- a/go.sum
+++ b/go.sum
@@ -200,6 +200,8 @@ github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI
github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk=
github.com/quic-go/quic-go v0.44.0 h1:So5wOr7jyO4vzL2sd8/pD9Kesciv91zSk8BoFngItQ0=
github.com/quic-go/quic-go v0.44.0/go.mod h1:z4cx/9Ny9UtGITIPzmPTXh1ULfOyWh4qGQlpnPcWmek=
+github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
+github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/riywo/loginshell v0.0.0-20200815045211-7d26008be1ab h1:ZjX6I48eZSFetPb41dHudEyVr5v953N15TsNZXlkcWY=
@@ -250,6 +252,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=