summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2024-06-04 07:18:35 -0400
committerJakob Borg <jakob@kastelo.net>2024-06-04 07:18:35 -0400
commit21e0f98fe266964765fcfee4b1b109b47a810410 (patch)
tree6b7561f41dc11afa218fa2359590c986410a50ae
parent495809ac9ecb0bffd4045d164021f2b4ed235ecc (diff)
parent2bb5b2244ba38c279d1bac6afaea463f67fce2b0 (diff)
Merge branch 'infrastructure'
* infrastructure: cmd/stupgrades: Basic process metrics cmd/stcrashreceiver: Ignore patterns, improve metrics cmd/strelaypoolsrv: More compact response, improved metrics cmd/stdiscosrv: Add AMQP replication
-rw-r--r--cmd/stcrashreceiver/main.go90
-rw-r--r--cmd/stcrashreceiver/sentry.go117
-rw-r--r--cmd/stcrashreceiver/sentry_test.go60
-rw-r--r--cmd/stcrashreceiver/stcrashreceiver.go31
-rw-r--r--cmd/stdiscosrv/amqp.go246
-rw-r--r--cmd/stdiscosrv/apisrv.go36
-rw-r--r--cmd/stdiscosrv/main.go23
-rw-r--r--cmd/stdiscosrv/replication.go3
-rw-r--r--cmd/stdiscosrv/stats.go14
-rw-r--r--cmd/strelaypoolsrv/gui/index.html6
-rw-r--r--cmd/strelaypoolsrv/main.go126
-rw-r--r--cmd/strelaypoolsrv/main_test.go17
-rw-r--r--cmd/strelaypoolsrv/stats.go15
-rw-r--r--cmd/stupgrades/main.go36
-rw-r--r--go.mod1
-rw-r--r--go.sum4
-rw-r--r--lib/build/parse.go93
-rw-r--r--lib/build/parse_test.go72
18 files changed, 722 insertions, 268 deletions
diff --git a/cmd/stcrashreceiver/main.go b/cmd/stcrashreceiver/main.go
index 657d19e91d..1a22313abf 100644
--- a/cmd/stcrashreceiver/main.go
+++ b/cmd/stcrashreceiver/main.go
@@ -21,11 +21,14 @@ import (
"net/http"
"os"
"path/filepath"
+ "regexp"
+ "strings"
"github.com/alecthomas/kong"
raven "github.com/getsentry/raven-go"
"github.com/prometheus/client_golang/prometheus/promhttp"
_ "github.com/syncthing/syncthing/lib/automaxprocs"
+ "github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/ur"
)
@@ -33,13 +36,15 @@ import (
const maxRequestSize = 1 << 20 // 1 MiB
type cli struct {
- Dir string `help:"Parent directory to store crash and failure reports in" env:"REPORTS_DIR" default:"."`
- DSN string `help:"Sentry DSN" env:"SENTRY_DSN"`
- Listen string `help:"HTTP listen address" default:":8080" env:"LISTEN_ADDRESS"`
- MaxDiskFiles int `help:"Maximum number of reports on disk" default:"100000" env:"MAX_DISK_FILES"`
- MaxDiskSizeMB int64 `help:"Maximum disk space to use for reports" default:"1024" env:"MAX_DISK_SIZE_MB"`
- SentryQueue int `help:"Maximum number of reports to queue for sending to Sentry" default:"64" env:"SENTRY_QUEUE"`
- DiskQueue int `help:"Maximum number of reports to queue for writing to disk" default:"64" env:"DISK_QUEUE"`
+ Dir string `help:"Parent directory to store crash and failure reports in" env:"REPORTS_DIR" default:"."`
+ DSN string `help:"Sentry DSN" env:"SENTRY_DSN"`
+ Listen string `help:"HTTP listen address" default:":8080" env:"LISTEN_ADDRESS"`
+ MaxDiskFiles int `help:"Maximum number of reports on disk" default:"100000" env:"MAX_DISK_FILES"`
+ MaxDiskSizeMB int64 `help:"Maximum disk space to use for reports" default:"1024" env:"MAX_DISK_SIZE_MB"`
+ SentryQueue int `help:"Maximum number of reports to queue for sending to Sentry" default:"64" env:"SENTRY_QUEUE"`
+ DiskQueue int `help:"Maximum number of reports to queue for writing to disk" default:"64" env:"DISK_QUEUE"`
+ MetricsListen string `help:"HTTP listen address for metrics" default:":8081" env:"METRICS_LISTEN_ADDRESS"`
+ IngorePatterns string `help:"File containing ignore patterns (regexp)" env:"IGNORE_PATTERNS" type:"existingfile"`
}
func main() {
@@ -62,19 +67,38 @@ func main() {
}
go ss.Serve(context.Background())
+ var ip *ignorePatterns
+ if params.IngorePatterns != "" {
+ var err error
+ ip, err = loadIgnorePatterns(params.IngorePatterns)
+ if err != nil {
+ log.Fatalf("Failed to load ignore patterns: %v", err)
+ }
+ }
+
cr := &crashReceiver{
store: ds,
sentry: ss,
+ ignore: ip,
}
mux.Handle("/", cr)
mux.HandleFunc("/ping", func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("OK"))
})
- mux.Handle("/metrics", promhttp.Handler())
+
+ if params.MetricsListen != "" {
+ mmux := http.NewServeMux()
+ mmux.Handle("/metrics", promhttp.Handler())
+ go func() {
+ if err := http.ListenAndServe(params.MetricsListen, mmux); err != nil {
+ log.Fatalln("HTTP serve metrics:", err)
+ }
+ }()
+ }
if params.DSN != "" {
- mux.HandleFunc("/newcrash/failure", handleFailureFn(params.DSN, filepath.Join(params.Dir, "failure_reports")))
+ mux.HandleFunc("/newcrash/failure", handleFailureFn(params.DSN, filepath.Join(params.Dir, "failure_reports"), ip))
}
log.SetOutput(os.Stdout)
@@ -83,7 +107,7 @@ func main() {
}
}
-func handleFailureFn(dsn, failureDir string) func(w http.ResponseWriter, req *http.Request) {
+func handleFailureFn(dsn, failureDir string, ignore *ignorePatterns) func(w http.ResponseWriter, req *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
result := "failure"
defer func() {
@@ -98,6 +122,11 @@ func handleFailureFn(dsn, failureDir string) func(w http.ResponseWriter, req *ht
return
}
+ if ignore.match(bs) {
+ result = "ignored"
+ return
+ }
+
var reports []ur.FailureReport
err = json.Unmarshal(bs, &reports)
if err != nil {
@@ -110,7 +139,7 @@ func handleFailureFn(dsn, failureDir string) func(w http.ResponseWriter, req *ht
return
}
- version, err := parseVersion(reports[0].Version)
+ version, err := build.ParseVersion(reports[0].Version)
if err != nil {
http.Error(w, err.Error(), 400)
return
@@ -158,3 +187,42 @@ func saveFailureWithGoroutines(data ur.FailureData, failureDir string) (string,
}
return reportServer + path, nil
}
+
+type ignorePatterns struct {
+ patterns []*regexp.Regexp
+}
+
+func loadIgnorePatterns(path string) (*ignorePatterns, error) {
+ bs, err := os.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+
+ var patterns []*regexp.Regexp
+ for _, line := range strings.Split(string(bs), "\n") {
+ line = strings.TrimSpace(line)
+ if line == "" {
+ continue
+ }
+ re, err := regexp.Compile(line)
+ if err != nil {
+ return nil, err
+ }
+ patterns = append(patterns, re)
+ }
+
+ log.Printf("Loaded %d ignore patterns", len(patterns))
+ return &ignorePatterns{patterns: patterns}, nil
+}
+
+func (i *ignorePatterns) match(report []byte) bool {
+ if i == nil {
+ return false
+ }
+ for _, re := range i.patterns {
+ if re.Match(report) {
+ return true
+ }
+ }
+ return false
+}
diff --git a/cmd/stcrashreceiver/sentry.go b/cmd/stcrashreceiver/sentry.go
index dadd9f3337..f8bc227414 100644
--- a/cmd/stcrashreceiver/sentry.go
+++ b/cmd/stcrashreceiver/sentry.go
@@ -18,6 +18,7 @@ import (
raven "github.com/getsentry/raven-go"
"github.com/maruel/panicparse/v2/stack"
+ "github.com/syncthing/syncthing/lib/build"
)
const reportServer = "https://crash.syncthing.net/report/"
@@ -105,7 +106,7 @@ func parseCrashReport(path string, report []byte) (*raven.Packet, error) {
return nil, errors.New("no first line")
}
- version, err := parseVersion(string(parts[0]))
+ version, err := build.ParseVersion(string(parts[0]))
if err != nil {
return nil, err
}
@@ -143,12 +144,12 @@ func parseCrashReport(path string, report []byte) (*raven.Packet, error) {
}
// Lock the source code loader to the version we are processing here.
- if version.commit != "" {
+ if version.Commit != "" {
// We have a commit hash, so we know exactly which source to use
- loader.LockWithVersion(version.commit)
- } else if strings.HasPrefix(version.tag, "v") {
+ loader.LockWithVersion(version.Commit)
+ } else if strings.HasPrefix(version.Tag, "v") {
// Lets hope the tag is close enough
- loader.LockWithVersion(version.tag)
+ loader.LockWithVersion(version.Tag)
} else {
// Last resort
loader.LockWithVersion("main")
@@ -215,106 +216,26 @@ func crashReportFingerprint(message string) []string {
return []string{"{{ default }}", message}
}
-// syncthing v1.1.4-rc.1+30-g6aaae618-dirty-crashrep "Erbium Earthworm" (go1.12.5 darwin-amd64) jb@kvin.kastelo.net 2019-05-23 16:08:14 UTC [foo, bar]
-// or, somewhere along the way the "+" in the version tag disappeared:
-// syncthing v1.23.7-dev.26.gdf7b56ae.dirty-stversionextra "Fermium Flea" (go1.20.5 darwin-arm64) jb@ok.kastelo.net 2023-07-12 06:55:26 UTC [Some Wrapper, purego, stnoupgrade]
-var (
- longVersionRE = regexp.MustCompile(`syncthing\s+(v[^\s]+)\s+"([^"]+)"\s\(([^\s]+)\s+([^-]+)-([^)]+)\)\s+([^\s]+)[^\[]*(?:\[(.+)\])?$`)
- gitExtraRE = regexp.MustCompile(`\.\d+\.g[0-9a-f]+`) // ".1.g6aaae618"
- gitExtraSepRE = regexp.MustCompile(`[.-]`) // dot or dash
-)
-
-type version struct {
- version string // "v1.1.4-rc.1+30-g6aaae618-dirty-crashrep"
- tag string // "v1.1.4-rc.1"
- commit string // "6aaae618", blank when absent
- codename string // "Erbium Earthworm"
- runtime string // "go1.12.5"
- goos string // "darwin"
- goarch string // "amd64"
- builder string // "jb@kvin.kastelo.net"
- extra []string // "foo", "bar"
-}
-
-func (v version) environment() string {
- if v.commit != "" {
- return "Development"
- }
- if strings.Contains(v.tag, "-rc.") {
- return "Candidate"
- }
- if strings.Contains(v.tag, "-") {
- return "Beta"
- }
- return "Stable"
-}
-
-func parseVersion(line string) (version, error) {
- m := longVersionRE.FindStringSubmatch(line)
- if len(m) == 0 {
- return version{}, errors.New("unintelligeble version string")
- }
-
- v := version{
- version: m[1],
- codename: m[2],
- runtime: m[3],
- goos: m[4],
- goarch: m[5],
- builder: m[6],
- }
-
- // Split the version tag into tag and commit. This is old style
- // v1.2.3-something.4+11-g12345678 or newer with just dots
- // v1.2.3-something.4.11.g12345678 or v1.2.3-dev.11.g12345678.
- parts := []string{v.version}
- if strings.Contains(v.version, "+") {
- parts = strings.Split(v.version, "+")
- } else {
- idxs := gitExtraRE.FindStringIndex(v.version)
- if len(idxs) > 0 {
- parts = []string{v.version[:idxs[0]], v.version[idxs[0]+1:]}
- }
- }
- v.tag = parts[0]
- if len(parts) > 1 {
- fields := gitExtraSepRE.Split(parts[1], -1)
- if len(fields) >= 2 && strings.HasPrefix(fields[1], "g") {
- v.commit = fields[1][1:]
- }
- }
-
- if len(m) >= 8 && m[7] != "" {
- tags := strings.Split(m[7], ",")
- for i := range tags {
- tags[i] = strings.TrimSpace(tags[i])
- }
- v.extra = tags
- }
-
- return v, nil
-}
-
-func packet(version version, reportType string) *raven.Packet {
+func packet(version build.VersionParts, reportType string) *raven.Packet {
pkt := &raven.Packet{
Platform: "go",
- Release: version.tag,
- Environment: version.environment(),
+ Release: version.Tag,
+ Environment: version.Environment(),
Tags: raven.Tags{
- raven.Tag{Key: "version", Value: version.version},
- raven.Tag{Key: "tag", Value: version.tag},
- raven.Tag{Key: "codename", Value: version.codename},
- raven.Tag{Key: "runtime", Value: version.runtime},
- raven.Tag{Key: "goos", Value: version.goos},
- raven.Tag{Key: "goarch", Value: version.goarch},
- raven.Tag{Key: "builder", Value: version.builder},
+ raven.Tag{Key: "version", Value: version.Version},
+ raven.Tag{Key: "tag", Value: version.Tag},
+ raven.Tag{Key: "codename", Value: version.Codename},
+ raven.Tag{Key: "runtime", Value: version.Runtime},
+ raven.Tag{Key: "goos", Value: version.GOOS},
+ raven.Tag{Key: "goarch", Value: version.GOARCH},
+ raven.Tag{Key: "builder", Value: version.Builder},
raven.Tag{Key: "report_type", Value: reportType},
},
}
- if version.commit != "" {
- pkt.Tags = append(pkt.Tags, raven.Tag{Key: "commit", Value: version.commit})
+ if version.Commit != "" {
+ pkt.Tags = append(pkt.Tags, raven.Tag{Key: "commit", Value: version.Commit})
}
- for _, tag := range version.extra {
+ for _, tag := range version.Extra {
pkt.Tags = append(pkt.Tags, raven.Tag{Key: tag, Value: "1"})
}
return pkt
diff --git a/cmd/stcrashreceiver/sentry_test.go b/cmd/stcrashreceiver/sentry_test.go
index 9fa30f262c..f087641e4c 100644
--- a/cmd/stcrashreceiver/sentry_test.go
+++ b/cmd/stcrashreceiver/sentry_test.go
@@ -12,66 +12,6 @@ import (
"testing"
)
-func TestParseVersion(t *testing.T) {
- cases := []struct {
- longVersion string
- parsed version
- }{
- {
- longVersion: `syncthing v1.1.4-rc.1+30-g6aaae618-dirty-crashrep "Erbium Earthworm" (go1.12.5 darwin-amd64) jb@kvin.kastelo.net 2019-05-23 16:08:14 UTC`,
- parsed: version{
- version: "v1.1.4-rc.1+30-g6aaae618-dirty-crashrep",
- tag: "v1.1.4-rc.1",
- commit: "6aaae618",
- codename: "Erbium Earthworm",
- runtime: "go1.12.5",
- goos: "darwin",
- goarch: "amd64",
- builder: "jb@kvin.kastelo.net",
- },
- },
- {
- longVersion: `syncthing v1.1.4-rc.1+30-g6aaae618-dirty-crashrep "Erbium Earthworm" (go1.12.5 darwin-amd64) jb@kvin.kastelo.net 2019-05-23 16:08:14 UTC [foo, bar]`,
- parsed: version{
- version: "v1.1.4-rc.1+30-g6aaae618-dirty-crashrep",
- tag: "v1.1.4-rc.1",
- commit: "6aaae618",
- codename: "Erbium Earthworm",
- runtime: "go1.12.5",
- goos: "darwin",
- goarch: "amd64",
- builder: "jb@kvin.kastelo.net",
- extra: []string{"foo", "bar"},
- },
- },
- {
- longVersion: `syncthing v1.23.7-dev.26.gdf7b56ae-stversionextra "Fermium Flea" (go1.20.5 darwin-arm64) jb@ok.kastelo.net 2023-07-12 06:55:26 UTC [Some Wrapper, purego, stnoupgrade]`,
- parsed: version{
- version: "v1.23.7-dev.26.gdf7b56ae-stversionextra",
- tag: "v1.23.7-dev",
- commit: "df7b56ae",
- codename: "Fermium Flea",
- runtime: "go1.20.5",
- goos: "darwin",
- goarch: "arm64",
- builder: "jb@ok.kastelo.net",
- extra: []string{"Some Wrapper", "purego", "stnoupgrade"},
- },
- },
- }
-
- for _, tc := range cases {
- v, err := parseVersion(tc.longVersion)
- if err != nil {
- t.Errorf("%s\nerror: %v\n", tc.longVersion, err)
- continue
- }
- if fmt.Sprint(v) != fmt.Sprint(tc.parsed) {
- t.Errorf("%s\nA: %v\nE: %v\n", tc.longVersion, v, tc.parsed)
- }
- }
-}
-
func TestParseReport(t *testing.T) {
bs, err := os.ReadFile("_testdata/panic.log")
if err != nil {
diff --git a/cmd/stcrashreceiver/stcrashreceiver.go b/cmd/stcrashreceiver/stcrashreceiver.go
index d8c0d29655..d45e057ed8 100644
--- a/cmd/stcrashreceiver/stcrashreceiver.go
+++ b/cmd/stcrashreceiver/stcrashreceiver.go
@@ -12,11 +12,16 @@ import (
"net/http"
"path"
"strings"
+ "sync"
)
type crashReceiver struct {
store *diskStore
sentry *sentryService
+ ignore *ignorePatterns
+
+ ignoredMut sync.RWMutex
+ ignored map[string]struct{}
}
func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -64,6 +69,12 @@ func (r *crashReceiver) serveGet(reportID string, w http.ResponseWriter, _ *http
// serveHead responds to HEAD requests by checking if the named report
// already exists in the system.
func (r *crashReceiver) serveHead(reportID string, w http.ResponseWriter, _ *http.Request) {
+ r.ignoredMut.RLock()
+ _, ignored := r.ignored[reportID]
+ r.ignoredMut.RUnlock()
+ if ignored {
+ return // found
+ }
if !r.store.Exists(reportID) {
http.Error(w, "Not found", http.StatusNotFound)
}
@@ -76,6 +87,15 @@ func (r *crashReceiver) servePut(reportID string, w http.ResponseWriter, req *ht
metricCrashReportsTotal.WithLabelValues(result).Inc()
}()
+ r.ignoredMut.RLock()
+ _, ignored := r.ignored[reportID]
+ r.ignoredMut.RUnlock()
+ if ignored {
+ result = "ignored_cached"
+ io.Copy(io.Discard, req.Body)
+ return // found
+ }
+
// Read at most maxRequestSize of report data.
log.Println("Receiving report", reportID)
lr := io.LimitReader(req.Body, maxRequestSize)
@@ -86,6 +106,17 @@ func (r *crashReceiver) servePut(reportID string, w http.ResponseWriter, req *ht
return
}
+ if r.ignore.match(bs) {
+ r.ignoredMut.Lock()
+ if r.ignored == nil {
+ r.ignored = make(map[string]struct{})
+ }
+ r.ignored[reportID] = struct{}{}
+ r.ignoredMut.Unlock()
+ result = "ignored"
+ return
+ }
+
result = "success"
// Store the report
diff --git a/cmd/stdiscosrv/amqp.go b/cmd/stdiscosrv/amqp.go
new file mode 100644
index 0000000000..e09919e27f
--- /dev/null
+++ b/cmd/stdiscosrv/amqp.go
@@ -0,0 +1,246 @@
+// Copyright (C) 2024 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/thejerf/suture/v4"
+)
+
+type amqpReplicator struct {
+ suture.Service
+ broker string
+ sender *amqpSender
+ receiver *amqpReceiver
+ outbox chan ReplicationRecord
+}
+
+func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator {
+ svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true})
+
+ sender := &amqpSender{
+ broker: broker,
+ clientID: clientID,
+ outbox: make(chan ReplicationRecord, replicationOutboxSize),
+ }
+ svc.Add(sender)
+
+ receiver := &amqpReceiver{
+ broker: broker,
+ clientID: clientID,
+ db: db,
+ }
+ svc.Add(receiver)
+
+ return &amqpReplicator{
+ Service: svc,
+ broker: broker,
+ sender: sender,
+ receiver: receiver,
+ outbox: make(chan ReplicationRecord, replicationOutboxSize),
+ }
+}
+
+func (s *amqpReplicator) send(key string, ps []DatabaseAddress, seen int64) {
+ s.sender.send(key, ps, seen)
+}
+
+type amqpSender struct {
+ broker string
+ clientID string
+ outbox chan ReplicationRecord
+}
+
+func (s *amqpSender) Serve(ctx context.Context) error {
+ conn, ch, err := amqpChannel(s.broker)
+ if err != nil {
+ return err
+ }
+ defer ch.Close()
+ defer conn.Close()
+
+ buf := make([]byte, 1024)
+ for {
+ select {
+ case rec := <-s.outbox:
+ size := rec.Size()
+ if len(buf) < size {
+ buf = make([]byte, size)
+ }
+
+ n, err := rec.MarshalTo(buf)
+ if err != nil {
+ replicationSendsTotal.WithLabelValues("error").Inc()
+ return fmt.Errorf("replication marshal: %w", err)
+ }
+
+ err = ch.PublishWithContext(ctx,
+ "discovery", // exchange
+ "", // routing key
+ false, // mandatory
+ false, // immediate
+ amqp.Publishing{
+ ContentType: "application/protobuf",
+ Body: buf[:n],
+ AppId: s.clientID,
+ })
+ if err != nil {
+ replicationSendsTotal.WithLabelValues("error").Inc()
+ return fmt.Errorf("replication publish: %w", err)
+ }
+
+ replicationSendsTotal.WithLabelValues("success").Inc()
+
+ case <-ctx.Done():
+ return nil
+ }
+ }
+}
+
+func (s *amqpSender) String() string {
+ return fmt.Sprintf("amqpSender(%q)", s.broker)
+}
+
+func (s *amqpSender) send(key string, ps []DatabaseAddress, seen int64) {
+ item := ReplicationRecord{
+ Key: key,
+ Addresses: ps,
+ Seen: seen,
+ }
+
+ // The send should never block. The inbox is suitably buffered for at
+ // least a few seconds of stalls, which shouldn't happen in practice.
+ select {
+ case s.outbox <- item:
+ default:
+ replicationSendsTotal.WithLabelValues("drop").Inc()
+ }
+}
+
+type amqpReceiver struct {
+ broker string
+ clientID string
+ db database
+}
+
+func (s *amqpReceiver) Serve(ctx context.Context) error {
+ conn, ch, err := amqpChannel(s.broker)
+ if err != nil {
+ return err
+ }
+ defer ch.Close()
+ defer conn.Close()
+
+ msgs, err := amqpConsume(ch)
+ if err != nil {
+ return err
+ }
+
+ for {
+ select {
+ case msg, ok := <-msgs:
+ if !ok {
+ return fmt.Errorf("subscription closed: %w", io.EOF)
+ }
+
+ // ignore messages from ourself
+ if msg.AppId == s.clientID {
+ continue
+ }
+
+ var rec ReplicationRecord
+ if err := rec.Unmarshal(msg.Body); err != nil {
+ replicationRecvsTotal.WithLabelValues("error").Inc()
+ return fmt.Errorf("replication unmarshal: %w", err)
+ }
+
+ if err := s.db.merge(rec.Key, rec.Addresses, rec.Seen); err != nil {
+ return fmt.Errorf("replication database merge: %w", err)
+ }
+
+ replicationRecvsTotal.WithLabelValues("success").Inc()
+
+ case <-ctx.Done():
+ return nil
+ }
+ }
+}
+
+func (s *amqpReceiver) String() string {
+ return fmt.Sprintf("amqpReceiver(%q)", s.broker)
+}
+
+func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) {
+ conn, err := amqp.Dial(dst)
+ if err != nil {
+ return nil, nil, fmt.Errorf("AMQP dial: %w", err)
+ }
+
+ ch, err := conn.Channel()
+ if err != nil {
+ return nil, nil, fmt.Errorf("AMQP channel: %w", err)
+ }
+
+ err = ch.ExchangeDeclare(
+ "discovery", // name
+ "fanout", // type
+ false, // durable
+ false, // auto-deleted
+ false, // internal
+ false, // no-wait
+ nil, // arguments
+ )
+ if err != nil {
+ return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err)
+ }
+
+ return conn, ch, nil
+}
+
+func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) {
+ q, err := ch.QueueDeclare(
+ "", // name
+ false, // durable
+ false, // delete when unused
+ true, // exclusive
+ false, // no-wait
+ nil, // arguments
+ )
+ if err != nil {
+ return nil, fmt.Errorf("AMQP declare queue: %w", err)
+ }
+
+ err = ch.QueueBind(
+ q.Name, // queue name
+ "", // routing key
+ "discovery", // exchange
+ false,
+ nil,
+ )
+ if err != nil {
+ return nil, fmt.Errorf("AMQP bind queue: %w", err)
+ }
+
+ msgs, err := ch.Consume(
+ q.Name, // queue
+ "", // consumer
+ true, // auto-ack
+ false, // exclusive
+ false, // no-local
+ false, // no-wait
+ nil, // args
+ )
+ if err != nil {
+ return nil, fmt.Errorf("AMQP consume: %w", err)
+ }
+
+ return msgs, nil
+}
diff --git a/cmd/stdiscosrv/apisrv.go b/cmd/stdiscosrv/apisrv.go
index 3da8ee0f19..b8721ae680 100644
--- a/cmd/stdiscosrv/apisrv.go
+++ b/cmd/stdiscosrv/apisrv.go
@@ -39,12 +39,13 @@ type announcement struct {
}
type apiSrv struct {
- addr string
- cert tls.Certificate
- db database
- listener net.Listener
- repl replicator // optional
- useHTTP bool
+ addr string
+ cert tls.Certificate
+ db database
+ listener net.Listener
+ repl replicator // optional
+ useHTTP bool
+ missesIncrease int
mapsMut sync.Mutex
misses map[string]int32
@@ -60,14 +61,15 @@ type contextKey int
const idKey contextKey = iota
-func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool) *apiSrv {
+func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool, missesIncrease int) *apiSrv {
return &apiSrv{
- addr: addr,
- cert: cert,
- db: db,
- repl: repl,
- useHTTP: useHTTP,
- misses: make(map[string]int32),
+ addr: addr,
+ cert: cert,
+ db: db,
+ repl: repl,
+ useHTTP: useHTTP,
+ misses: make(map[string]int32),
+ missesIncrease: missesIncrease,
}
}
@@ -197,14 +199,13 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) {
s.mapsMut.Lock()
misses := s.misses[key]
if misses < rec.Misses {
- misses = rec.Misses + 1
- } else {
- misses++
+ misses = rec.Misses
}
+ misses += int32(s.missesIncrease)
s.misses[key] = misses
s.mapsMut.Unlock()
- if misses%notFoundMissesWriteInterval == 0 {
+ if misses >= notFoundMissesWriteInterval {
rec.Misses = misses
rec.Missed = time.Now().UnixNano()
rec.Addresses = nil
@@ -444,7 +445,6 @@ func fixupAddresses(remote *net.TCPAddr, addresses []string) []string {
// remote is nil, unable to determine host IP
continue
}
-
}
// If zero port was specified, use remote port.
diff --git a/cmd/stdiscosrv/main.go b/cmd/stdiscosrv/main.go
index 4890f3bda9..0b8c907f5e 100644
--- a/cmd/stdiscosrv/main.go
+++ b/cmd/stdiscosrv/main.go
@@ -22,6 +22,7 @@ import (
_ "github.com/syncthing/syncthing/lib/automaxprocs"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/protocol"
+ "github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/thejerf/suture/v4"
@@ -80,6 +81,8 @@ func main() {
var replKeyFile string
var useHTTP bool
var largeDB bool
+ var amqpAddress string
+ missesIncrease := 1
log.SetOutput(os.Stdout)
log.SetFlags(0)
@@ -96,6 +99,8 @@ func main() {
flag.StringVar(&replCertFile, "replication-cert", "", "Certificate file for replication")
flag.StringVar(&replKeyFile, "replication-key", "", "Key file for replication")
flag.BoolVar(&largeDB, "large-db", false, "Use larger database settings")
+ flag.StringVar(&amqpAddress, "amqp-address", "", "Address to AMQP broker")
+ flag.IntVar(&missesIncrease, "misses-increase", 1, "How many times to increase the misses counter on each miss")
showVersion := flag.Bool("version", false, "Show version")
flag.Parse()
@@ -203,8 +208,24 @@ func main() {
main.Add(rl)
}
+ // If we have an AMQP broker, start that
+ if amqpAddress != "" {
+ clientID := rand.String(10)
+ kr := newAMQPReplicator(amqpAddress, clientID, db)
+ repl = append(repl, kr)
+ main.Add(kr)
+ }
+
+ go func() {
+ for range time.NewTicker(time.Second).C {
+ for _, r := range repl {
+ r.send("<heartbeat>", nil, time.Now().UnixNano())
+ }
+ }
+ }()
+
// Start the main API server.
- qs := newAPISrv(listen, cert, db, repl, useHTTP)
+ qs := newAPISrv(listen, cert, db, repl, useHTTP, missesIncrease)
main.Add(qs)
// If we have a metrics port configured, start a metrics handler.
diff --git a/cmd/stdiscosrv/replication.go b/cmd/stdiscosrv/replication.go
index 8d0db3f7b8..e7aa2894ad 100644
--- a/cmd/stdiscosrv/replication.go
+++ b/cmd/stdiscosrv/replication.go
@@ -144,10 +144,11 @@ func (s *replicationSender) String() string {<