diff options
author | Jakob Borg <jakob@kastelo.net> | 2024-06-04 07:18:35 -0400 |
---|---|---|
committer | Jakob Borg <jakob@kastelo.net> | 2024-06-04 07:18:35 -0400 |
commit | 21e0f98fe266964765fcfee4b1b109b47a810410 (patch) | |
tree | 6b7561f41dc11afa218fa2359590c986410a50ae | |
parent | 495809ac9ecb0bffd4045d164021f2b4ed235ecc (diff) | |
parent | 2bb5b2244ba38c279d1bac6afaea463f67fce2b0 (diff) |
Merge branch 'infrastructure'
* infrastructure:
cmd/stupgrades: Basic process metrics
cmd/stcrashreceiver: Ignore patterns, improve metrics
cmd/strelaypoolsrv: More compact response, improved metrics
cmd/stdiscosrv: Add AMQP replication
-rw-r--r-- | cmd/stcrashreceiver/main.go | 90 | ||||
-rw-r--r-- | cmd/stcrashreceiver/sentry.go | 117 | ||||
-rw-r--r-- | cmd/stcrashreceiver/sentry_test.go | 60 | ||||
-rw-r--r-- | cmd/stcrashreceiver/stcrashreceiver.go | 31 | ||||
-rw-r--r-- | cmd/stdiscosrv/amqp.go | 246 | ||||
-rw-r--r-- | cmd/stdiscosrv/apisrv.go | 36 | ||||
-rw-r--r-- | cmd/stdiscosrv/main.go | 23 | ||||
-rw-r--r-- | cmd/stdiscosrv/replication.go | 3 | ||||
-rw-r--r-- | cmd/stdiscosrv/stats.go | 14 | ||||
-rw-r--r-- | cmd/strelaypoolsrv/gui/index.html | 6 | ||||
-rw-r--r-- | cmd/strelaypoolsrv/main.go | 126 | ||||
-rw-r--r-- | cmd/strelaypoolsrv/main_test.go | 17 | ||||
-rw-r--r-- | cmd/strelaypoolsrv/stats.go | 15 | ||||
-rw-r--r-- | cmd/stupgrades/main.go | 36 | ||||
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | lib/build/parse.go | 93 | ||||
-rw-r--r-- | lib/build/parse_test.go | 72 |
18 files changed, 722 insertions, 268 deletions
diff --git a/cmd/stcrashreceiver/main.go b/cmd/stcrashreceiver/main.go index 657d19e91d..1a22313abf 100644 --- a/cmd/stcrashreceiver/main.go +++ b/cmd/stcrashreceiver/main.go @@ -21,11 +21,14 @@ import ( "net/http" "os" "path/filepath" + "regexp" + "strings" "github.com/alecthomas/kong" raven "github.com/getsentry/raven-go" "github.com/prometheus/client_golang/prometheus/promhttp" _ "github.com/syncthing/syncthing/lib/automaxprocs" + "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/ur" ) @@ -33,13 +36,15 @@ import ( const maxRequestSize = 1 << 20 // 1 MiB type cli struct { - Dir string `help:"Parent directory to store crash and failure reports in" env:"REPORTS_DIR" default:"."` - DSN string `help:"Sentry DSN" env:"SENTRY_DSN"` - Listen string `help:"HTTP listen address" default:":8080" env:"LISTEN_ADDRESS"` - MaxDiskFiles int `help:"Maximum number of reports on disk" default:"100000" env:"MAX_DISK_FILES"` - MaxDiskSizeMB int64 `help:"Maximum disk space to use for reports" default:"1024" env:"MAX_DISK_SIZE_MB"` - SentryQueue int `help:"Maximum number of reports to queue for sending to Sentry" default:"64" env:"SENTRY_QUEUE"` - DiskQueue int `help:"Maximum number of reports to queue for writing to disk" default:"64" env:"DISK_QUEUE"` + Dir string `help:"Parent directory to store crash and failure reports in" env:"REPORTS_DIR" default:"."` + DSN string `help:"Sentry DSN" env:"SENTRY_DSN"` + Listen string `help:"HTTP listen address" default:":8080" env:"LISTEN_ADDRESS"` + MaxDiskFiles int `help:"Maximum number of reports on disk" default:"100000" env:"MAX_DISK_FILES"` + MaxDiskSizeMB int64 `help:"Maximum disk space to use for reports" default:"1024" env:"MAX_DISK_SIZE_MB"` + SentryQueue int `help:"Maximum number of reports to queue for sending to Sentry" default:"64" env:"SENTRY_QUEUE"` + DiskQueue int `help:"Maximum number of reports to queue for writing to disk" default:"64" env:"DISK_QUEUE"` + MetricsListen string `help:"HTTP listen address for metrics" default:":8081" env:"METRICS_LISTEN_ADDRESS"` + IngorePatterns string `help:"File containing ignore patterns (regexp)" env:"IGNORE_PATTERNS" type:"existingfile"` } func main() { @@ -62,19 +67,38 @@ func main() { } go ss.Serve(context.Background()) + var ip *ignorePatterns + if params.IngorePatterns != "" { + var err error + ip, err = loadIgnorePatterns(params.IngorePatterns) + if err != nil { + log.Fatalf("Failed to load ignore patterns: %v", err) + } + } + cr := &crashReceiver{ store: ds, sentry: ss, + ignore: ip, } mux.Handle("/", cr) mux.HandleFunc("/ping", func(w http.ResponseWriter, req *http.Request) { w.Write([]byte("OK")) }) - mux.Handle("/metrics", promhttp.Handler()) + + if params.MetricsListen != "" { + mmux := http.NewServeMux() + mmux.Handle("/metrics", promhttp.Handler()) + go func() { + if err := http.ListenAndServe(params.MetricsListen, mmux); err != nil { + log.Fatalln("HTTP serve metrics:", err) + } + }() + } if params.DSN != "" { - mux.HandleFunc("/newcrash/failure", handleFailureFn(params.DSN, filepath.Join(params.Dir, "failure_reports"))) + mux.HandleFunc("/newcrash/failure", handleFailureFn(params.DSN, filepath.Join(params.Dir, "failure_reports"), ip)) } log.SetOutput(os.Stdout) @@ -83,7 +107,7 @@ func main() { } } -func handleFailureFn(dsn, failureDir string) func(w http.ResponseWriter, req *http.Request) { +func handleFailureFn(dsn, failureDir string, ignore *ignorePatterns) func(w http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) { result := "failure" defer func() { @@ -98,6 +122,11 @@ func handleFailureFn(dsn, failureDir string) func(w http.ResponseWriter, req *ht return } + if ignore.match(bs) { + result = "ignored" + return + } + var reports []ur.FailureReport err = json.Unmarshal(bs, &reports) if err != nil { @@ -110,7 +139,7 @@ func handleFailureFn(dsn, failureDir string) func(w http.ResponseWriter, req *ht return } - version, err := parseVersion(reports[0].Version) + version, err := build.ParseVersion(reports[0].Version) if err != nil { http.Error(w, err.Error(), 400) return @@ -158,3 +187,42 @@ func saveFailureWithGoroutines(data ur.FailureData, failureDir string) (string, } return reportServer + path, nil } + +type ignorePatterns struct { + patterns []*regexp.Regexp +} + +func loadIgnorePatterns(path string) (*ignorePatterns, error) { + bs, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var patterns []*regexp.Regexp + for _, line := range strings.Split(string(bs), "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + re, err := regexp.Compile(line) + if err != nil { + return nil, err + } + patterns = append(patterns, re) + } + + log.Printf("Loaded %d ignore patterns", len(patterns)) + return &ignorePatterns{patterns: patterns}, nil +} + +func (i *ignorePatterns) match(report []byte) bool { + if i == nil { + return false + } + for _, re := range i.patterns { + if re.Match(report) { + return true + } + } + return false +} diff --git a/cmd/stcrashreceiver/sentry.go b/cmd/stcrashreceiver/sentry.go index dadd9f3337..f8bc227414 100644 --- a/cmd/stcrashreceiver/sentry.go +++ b/cmd/stcrashreceiver/sentry.go @@ -18,6 +18,7 @@ import ( raven "github.com/getsentry/raven-go" "github.com/maruel/panicparse/v2/stack" + "github.com/syncthing/syncthing/lib/build" ) const reportServer = "https://crash.syncthing.net/report/" @@ -105,7 +106,7 @@ func parseCrashReport(path string, report []byte) (*raven.Packet, error) { return nil, errors.New("no first line") } - version, err := parseVersion(string(parts[0])) + version, err := build.ParseVersion(string(parts[0])) if err != nil { return nil, err } @@ -143,12 +144,12 @@ func parseCrashReport(path string, report []byte) (*raven.Packet, error) { } // Lock the source code loader to the version we are processing here. - if version.commit != "" { + if version.Commit != "" { // We have a commit hash, so we know exactly which source to use - loader.LockWithVersion(version.commit) - } else if strings.HasPrefix(version.tag, "v") { + loader.LockWithVersion(version.Commit) + } else if strings.HasPrefix(version.Tag, "v") { // Lets hope the tag is close enough - loader.LockWithVersion(version.tag) + loader.LockWithVersion(version.Tag) } else { // Last resort loader.LockWithVersion("main") @@ -215,106 +216,26 @@ func crashReportFingerprint(message string) []string { return []string{"{{ default }}", message} } -// syncthing v1.1.4-rc.1+30-g6aaae618-dirty-crashrep "Erbium Earthworm" (go1.12.5 darwin-amd64) jb@kvin.kastelo.net 2019-05-23 16:08:14 UTC [foo, bar] -// or, somewhere along the way the "+" in the version tag disappeared: -// syncthing v1.23.7-dev.26.gdf7b56ae.dirty-stversionextra "Fermium Flea" (go1.20.5 darwin-arm64) jb@ok.kastelo.net 2023-07-12 06:55:26 UTC [Some Wrapper, purego, stnoupgrade] -var ( - longVersionRE = regexp.MustCompile(`syncthing\s+(v[^\s]+)\s+"([^"]+)"\s\(([^\s]+)\s+([^-]+)-([^)]+)\)\s+([^\s]+)[^\[]*(?:\[(.+)\])?$`) - gitExtraRE = regexp.MustCompile(`\.\d+\.g[0-9a-f]+`) // ".1.g6aaae618" - gitExtraSepRE = regexp.MustCompile(`[.-]`) // dot or dash -) - -type version struct { - version string // "v1.1.4-rc.1+30-g6aaae618-dirty-crashrep" - tag string // "v1.1.4-rc.1" - commit string // "6aaae618", blank when absent - codename string // "Erbium Earthworm" - runtime string // "go1.12.5" - goos string // "darwin" - goarch string // "amd64" - builder string // "jb@kvin.kastelo.net" - extra []string // "foo", "bar" -} - -func (v version) environment() string { - if v.commit != "" { - return "Development" - } - if strings.Contains(v.tag, "-rc.") { - return "Candidate" - } - if strings.Contains(v.tag, "-") { - return "Beta" - } - return "Stable" -} - -func parseVersion(line string) (version, error) { - m := longVersionRE.FindStringSubmatch(line) - if len(m) == 0 { - return version{}, errors.New("unintelligeble version string") - } - - v := version{ - version: m[1], - codename: m[2], - runtime: m[3], - goos: m[4], - goarch: m[5], - builder: m[6], - } - - // Split the version tag into tag and commit. This is old style - // v1.2.3-something.4+11-g12345678 or newer with just dots - // v1.2.3-something.4.11.g12345678 or v1.2.3-dev.11.g12345678. - parts := []string{v.version} - if strings.Contains(v.version, "+") { - parts = strings.Split(v.version, "+") - } else { - idxs := gitExtraRE.FindStringIndex(v.version) - if len(idxs) > 0 { - parts = []string{v.version[:idxs[0]], v.version[idxs[0]+1:]} - } - } - v.tag = parts[0] - if len(parts) > 1 { - fields := gitExtraSepRE.Split(parts[1], -1) - if len(fields) >= 2 && strings.HasPrefix(fields[1], "g") { - v.commit = fields[1][1:] - } - } - - if len(m) >= 8 && m[7] != "" { - tags := strings.Split(m[7], ",") - for i := range tags { - tags[i] = strings.TrimSpace(tags[i]) - } - v.extra = tags - } - - return v, nil -} - -func packet(version version, reportType string) *raven.Packet { +func packet(version build.VersionParts, reportType string) *raven.Packet { pkt := &raven.Packet{ Platform: "go", - Release: version.tag, - Environment: version.environment(), + Release: version.Tag, + Environment: version.Environment(), Tags: raven.Tags{ - raven.Tag{Key: "version", Value: version.version}, - raven.Tag{Key: "tag", Value: version.tag}, - raven.Tag{Key: "codename", Value: version.codename}, - raven.Tag{Key: "runtime", Value: version.runtime}, - raven.Tag{Key: "goos", Value: version.goos}, - raven.Tag{Key: "goarch", Value: version.goarch}, - raven.Tag{Key: "builder", Value: version.builder}, + raven.Tag{Key: "version", Value: version.Version}, + raven.Tag{Key: "tag", Value: version.Tag}, + raven.Tag{Key: "codename", Value: version.Codename}, + raven.Tag{Key: "runtime", Value: version.Runtime}, + raven.Tag{Key: "goos", Value: version.GOOS}, + raven.Tag{Key: "goarch", Value: version.GOARCH}, + raven.Tag{Key: "builder", Value: version.Builder}, raven.Tag{Key: "report_type", Value: reportType}, }, } - if version.commit != "" { - pkt.Tags = append(pkt.Tags, raven.Tag{Key: "commit", Value: version.commit}) + if version.Commit != "" { + pkt.Tags = append(pkt.Tags, raven.Tag{Key: "commit", Value: version.Commit}) } - for _, tag := range version.extra { + for _, tag := range version.Extra { pkt.Tags = append(pkt.Tags, raven.Tag{Key: tag, Value: "1"}) } return pkt diff --git a/cmd/stcrashreceiver/sentry_test.go b/cmd/stcrashreceiver/sentry_test.go index 9fa30f262c..f087641e4c 100644 --- a/cmd/stcrashreceiver/sentry_test.go +++ b/cmd/stcrashreceiver/sentry_test.go @@ -12,66 +12,6 @@ import ( "testing" ) -func TestParseVersion(t *testing.T) { - cases := []struct { - longVersion string - parsed version - }{ - { - longVersion: `syncthing v1.1.4-rc.1+30-g6aaae618-dirty-crashrep "Erbium Earthworm" (go1.12.5 darwin-amd64) jb@kvin.kastelo.net 2019-05-23 16:08:14 UTC`, - parsed: version{ - version: "v1.1.4-rc.1+30-g6aaae618-dirty-crashrep", - tag: "v1.1.4-rc.1", - commit: "6aaae618", - codename: "Erbium Earthworm", - runtime: "go1.12.5", - goos: "darwin", - goarch: "amd64", - builder: "jb@kvin.kastelo.net", - }, - }, - { - longVersion: `syncthing v1.1.4-rc.1+30-g6aaae618-dirty-crashrep "Erbium Earthworm" (go1.12.5 darwin-amd64) jb@kvin.kastelo.net 2019-05-23 16:08:14 UTC [foo, bar]`, - parsed: version{ - version: "v1.1.4-rc.1+30-g6aaae618-dirty-crashrep", - tag: "v1.1.4-rc.1", - commit: "6aaae618", - codename: "Erbium Earthworm", - runtime: "go1.12.5", - goos: "darwin", - goarch: "amd64", - builder: "jb@kvin.kastelo.net", - extra: []string{"foo", "bar"}, - }, - }, - { - longVersion: `syncthing v1.23.7-dev.26.gdf7b56ae-stversionextra "Fermium Flea" (go1.20.5 darwin-arm64) jb@ok.kastelo.net 2023-07-12 06:55:26 UTC [Some Wrapper, purego, stnoupgrade]`, - parsed: version{ - version: "v1.23.7-dev.26.gdf7b56ae-stversionextra", - tag: "v1.23.7-dev", - commit: "df7b56ae", - codename: "Fermium Flea", - runtime: "go1.20.5", - goos: "darwin", - goarch: "arm64", - builder: "jb@ok.kastelo.net", - extra: []string{"Some Wrapper", "purego", "stnoupgrade"}, - }, - }, - } - - for _, tc := range cases { - v, err := parseVersion(tc.longVersion) - if err != nil { - t.Errorf("%s\nerror: %v\n", tc.longVersion, err) - continue - } - if fmt.Sprint(v) != fmt.Sprint(tc.parsed) { - t.Errorf("%s\nA: %v\nE: %v\n", tc.longVersion, v, tc.parsed) - } - } -} - func TestParseReport(t *testing.T) { bs, err := os.ReadFile("_testdata/panic.log") if err != nil { diff --git a/cmd/stcrashreceiver/stcrashreceiver.go b/cmd/stcrashreceiver/stcrashreceiver.go index d8c0d29655..d45e057ed8 100644 --- a/cmd/stcrashreceiver/stcrashreceiver.go +++ b/cmd/stcrashreceiver/stcrashreceiver.go @@ -12,11 +12,16 @@ import ( "net/http" "path" "strings" + "sync" ) type crashReceiver struct { store *diskStore sentry *sentryService + ignore *ignorePatterns + + ignoredMut sync.RWMutex + ignored map[string]struct{} } func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -64,6 +69,12 @@ func (r *crashReceiver) serveGet(reportID string, w http.ResponseWriter, _ *http // serveHead responds to HEAD requests by checking if the named report // already exists in the system. func (r *crashReceiver) serveHead(reportID string, w http.ResponseWriter, _ *http.Request) { + r.ignoredMut.RLock() + _, ignored := r.ignored[reportID] + r.ignoredMut.RUnlock() + if ignored { + return // found + } if !r.store.Exists(reportID) { http.Error(w, "Not found", http.StatusNotFound) } @@ -76,6 +87,15 @@ func (r *crashReceiver) servePut(reportID string, w http.ResponseWriter, req *ht metricCrashReportsTotal.WithLabelValues(result).Inc() }() + r.ignoredMut.RLock() + _, ignored := r.ignored[reportID] + r.ignoredMut.RUnlock() + if ignored { + result = "ignored_cached" + io.Copy(io.Discard, req.Body) + return // found + } + // Read at most maxRequestSize of report data. log.Println("Receiving report", reportID) lr := io.LimitReader(req.Body, maxRequestSize) @@ -86,6 +106,17 @@ func (r *crashReceiver) servePut(reportID string, w http.ResponseWriter, req *ht return } + if r.ignore.match(bs) { + r.ignoredMut.Lock() + if r.ignored == nil { + r.ignored = make(map[string]struct{}) + } + r.ignored[reportID] = struct{}{} + r.ignoredMut.Unlock() + result = "ignored" + return + } + result = "success" // Store the report diff --git a/cmd/stdiscosrv/amqp.go b/cmd/stdiscosrv/amqp.go new file mode 100644 index 0000000000..e09919e27f --- /dev/null +++ b/cmd/stdiscosrv/amqp.go @@ -0,0 +1,246 @@ +// Copyright (C) 2024 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package main + +import ( + "context" + "fmt" + "io" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/thejerf/suture/v4" +) + +type amqpReplicator struct { + suture.Service + broker string + sender *amqpSender + receiver *amqpReceiver + outbox chan ReplicationRecord +} + +func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator { + svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true}) + + sender := &amqpSender{ + broker: broker, + clientID: clientID, + outbox: make(chan ReplicationRecord, replicationOutboxSize), + } + svc.Add(sender) + + receiver := &amqpReceiver{ + broker: broker, + clientID: clientID, + db: db, + } + svc.Add(receiver) + + return &amqpReplicator{ + Service: svc, + broker: broker, + sender: sender, + receiver: receiver, + outbox: make(chan ReplicationRecord, replicationOutboxSize), + } +} + +func (s *amqpReplicator) send(key string, ps []DatabaseAddress, seen int64) { + s.sender.send(key, ps, seen) +} + +type amqpSender struct { + broker string + clientID string + outbox chan ReplicationRecord +} + +func (s *amqpSender) Serve(ctx context.Context) error { + conn, ch, err := amqpChannel(s.broker) + if err != nil { + return err + } + defer ch.Close() + defer conn.Close() + + buf := make([]byte, 1024) + for { + select { + case rec := <-s.outbox: + size := rec.Size() + if len(buf) < size { + buf = make([]byte, size) + } + + n, err := rec.MarshalTo(buf) + if err != nil { + replicationSendsTotal.WithLabelValues("error").Inc() + return fmt.Errorf("replication marshal: %w", err) + } + + err = ch.PublishWithContext(ctx, + "discovery", // exchange + "", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/protobuf", + Body: buf[:n], + AppId: s.clientID, + }) + if err != nil { + replicationSendsTotal.WithLabelValues("error").Inc() + return fmt.Errorf("replication publish: %w", err) + } + + replicationSendsTotal.WithLabelValues("success").Inc() + + case <-ctx.Done(): + return nil + } + } +} + +func (s *amqpSender) String() string { + return fmt.Sprintf("amqpSender(%q)", s.broker) +} + +func (s *amqpSender) send(key string, ps []DatabaseAddress, seen int64) { + item := ReplicationRecord{ + Key: key, + Addresses: ps, + Seen: seen, + } + + // The send should never block. The inbox is suitably buffered for at + // least a few seconds of stalls, which shouldn't happen in practice. + select { + case s.outbox <- item: + default: + replicationSendsTotal.WithLabelValues("drop").Inc() + } +} + +type amqpReceiver struct { + broker string + clientID string + db database +} + +func (s *amqpReceiver) Serve(ctx context.Context) error { + conn, ch, err := amqpChannel(s.broker) + if err != nil { + return err + } + defer ch.Close() + defer conn.Close() + + msgs, err := amqpConsume(ch) + if err != nil { + return err + } + + for { + select { + case msg, ok := <-msgs: + if !ok { + return fmt.Errorf("subscription closed: %w", io.EOF) + } + + // ignore messages from ourself + if msg.AppId == s.clientID { + continue + } + + var rec ReplicationRecord + if err := rec.Unmarshal(msg.Body); err != nil { + replicationRecvsTotal.WithLabelValues("error").Inc() + return fmt.Errorf("replication unmarshal: %w", err) + } + + if err := s.db.merge(rec.Key, rec.Addresses, rec.Seen); err != nil { + return fmt.Errorf("replication database merge: %w", err) + } + + replicationRecvsTotal.WithLabelValues("success").Inc() + + case <-ctx.Done(): + return nil + } + } +} + +func (s *amqpReceiver) String() string { + return fmt.Sprintf("amqpReceiver(%q)", s.broker) +} + +func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) { + conn, err := amqp.Dial(dst) + if err != nil { + return nil, nil, fmt.Errorf("AMQP dial: %w", err) + } + + ch, err := conn.Channel() + if err != nil { + return nil, nil, fmt.Errorf("AMQP channel: %w", err) + } + + err = ch.ExchangeDeclare( + "discovery", // name + "fanout", // type + false, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err) + } + + return conn, ch, nil +} + +func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) { + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, fmt.Errorf("AMQP declare queue: %w", err) + } + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "discovery", // exchange + false, + nil, + ) + if err != nil { + return nil, fmt.Errorf("AMQP bind queue: %w", err) + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, fmt.Errorf("AMQP consume: %w", err) + } + + return msgs, nil +} diff --git a/cmd/stdiscosrv/apisrv.go b/cmd/stdiscosrv/apisrv.go index 3da8ee0f19..b8721ae680 100644 --- a/cmd/stdiscosrv/apisrv.go +++ b/cmd/stdiscosrv/apisrv.go @@ -39,12 +39,13 @@ type announcement struct { } type apiSrv struct { - addr string - cert tls.Certificate - db database - listener net.Listener - repl replicator // optional - useHTTP bool + addr string + cert tls.Certificate + db database + listener net.Listener + repl replicator // optional + useHTTP bool + missesIncrease int mapsMut sync.Mutex misses map[string]int32 @@ -60,14 +61,15 @@ type contextKey int const idKey contextKey = iota -func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool) *apiSrv { +func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator, useHTTP bool, missesIncrease int) *apiSrv { return &apiSrv{ - addr: addr, - cert: cert, - db: db, - repl: repl, - useHTTP: useHTTP, - misses: make(map[string]int32), + addr: addr, + cert: cert, + db: db, + repl: repl, + useHTTP: useHTTP, + misses: make(map[string]int32), + missesIncrease: missesIncrease, } } @@ -197,14 +199,13 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) { s.mapsMut.Lock() misses := s.misses[key] if misses < rec.Misses { - misses = rec.Misses + 1 - } else { - misses++ + misses = rec.Misses } + misses += int32(s.missesIncrease) s.misses[key] = misses s.mapsMut.Unlock() - if misses%notFoundMissesWriteInterval == 0 { + if misses >= notFoundMissesWriteInterval { rec.Misses = misses rec.Missed = time.Now().UnixNano() rec.Addresses = nil @@ -444,7 +445,6 @@ func fixupAddresses(remote *net.TCPAddr, addresses []string) []string { // remote is nil, unable to determine host IP continue } - } // If zero port was specified, use remote port. diff --git a/cmd/stdiscosrv/main.go b/cmd/stdiscosrv/main.go index 4890f3bda9..0b8c907f5e 100644 --- a/cmd/stdiscosrv/main.go +++ b/cmd/stdiscosrv/main.go @@ -22,6 +22,7 @@ import ( _ "github.com/syncthing/syncthing/lib/automaxprocs" "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/tlsutil" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/thejerf/suture/v4" @@ -80,6 +81,8 @@ func main() { var replKeyFile string var useHTTP bool var largeDB bool + var amqpAddress string + missesIncrease := 1 log.SetOutput(os.Stdout) log.SetFlags(0) @@ -96,6 +99,8 @@ func main() { flag.StringVar(&replCertFile, "replication-cert", "", "Certificate file for replication") flag.StringVar(&replKeyFile, "replication-key", "", "Key file for replication") flag.BoolVar(&largeDB, "large-db", false, "Use larger database settings") + flag.StringVar(&amqpAddress, "amqp-address", "", "Address to AMQP broker") + flag.IntVar(&missesIncrease, "misses-increase", 1, "How many times to increase the misses counter on each miss") showVersion := flag.Bool("version", false, "Show version") flag.Parse() @@ -203,8 +208,24 @@ func main() { main.Add(rl) } + // If we have an AMQP broker, start that + if amqpAddress != "" { + clientID := rand.String(10) + kr := newAMQPReplicator(amqpAddress, clientID, db) + repl = append(repl, kr) + main.Add(kr) + } + + go func() { + for range time.NewTicker(time.Second).C { + for _, r := range repl { + r.send("<heartbeat>", nil, time.Now().UnixNano()) + } + } + }() + // Start the main API server. - qs := newAPISrv(listen, cert, db, repl, useHTTP) + qs := newAPISrv(listen, cert, db, repl, useHTTP, missesIncrease) main.Add(qs) // If we have a metrics port configured, start a metrics handler. diff --git a/cmd/stdiscosrv/replication.go b/cmd/stdiscosrv/replication.go index 8d0db3f7b8..e7aa2894ad 100644 --- a/cmd/stdiscosrv/replication.go +++ b/cmd/stdiscosrv/replication.go @@ -144,10 +144,11 @@ func (s *replicationSender) String() string {< |