summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2023-01-23 08:38:55 +0100
committerJakob Borg <jakob@kastelo.net>2023-01-31 11:17:52 +0100
commit65cfefaa3c2989db25fb847b44847a683e9881c4 (patch)
tree6d696b9fc55ec55958687dac3cf0d6fecdd1821d
parenta6c2a5a0cef4c38a44d70a05b518a9d97a735b33 (diff)
cmd, docker: Updates for infrastructure
These are some changes to the relay pool server, upgrade server, and crash receiver to run under Kubernetes. It's been in production for a while.
-rw-r--r--Dockerfile.stcrashreceiver18
-rw-r--r--Dockerfile.strelaypoolsrv26
-rw-r--r--Dockerfile.stupgrades23
-rw-r--r--build.go12
-rw-r--r--cmd/stcrashreceiver/diskstore.go181
-rw-r--r--cmd/stcrashreceiver/main.go46
-rw-r--r--cmd/stcrashreceiver/sentry.go40
-rw-r--r--cmd/stcrashreceiver/stcrashreceiver.go80
-rw-r--r--cmd/strelaypoolsrv/main.go151
-rw-r--r--cmd/stupgrades/main.go99
-rw-r--r--lib/httpcache/httpcache.go124
-rwxr-xr-xscript/strelaypoolsrv-entrypoint.sh10
12 files changed, 651 insertions, 159 deletions
diff --git a/Dockerfile.stcrashreceiver b/Dockerfile.stcrashreceiver
new file mode 100644
index 0000000000..4f6e609861
--- /dev/null
+++ b/Dockerfile.stcrashreceiver
@@ -0,0 +1,18 @@
+ARG GOVERSION=latest
+FROM golang:$GOVERSION AS builder
+
+WORKDIR /src
+COPY . .
+
+ENV CGO_ENABLED=0
+ENV BUILD_HOST=syncthing.net
+ENV BUILD_USER=docker
+RUN rm -f stcrashreceiver && go run build.go build stcrashreceiver
+
+FROM alpine
+
+EXPOSE 8080
+
+COPY --from=builder /src/stcrashreceiver /bin/stcrashreceiver
+
+ENTRYPOINT [ "/bin/stcrashreceiver" ]
diff --git a/Dockerfile.strelaypoolsrv b/Dockerfile.strelaypoolsrv
new file mode 100644
index 0000000000..814948b252
--- /dev/null
+++ b/Dockerfile.strelaypoolsrv
@@ -0,0 +1,26 @@
+ARG GOVERSION=latest
+FROM golang:$GOVERSION AS builder
+
+WORKDIR /src
+COPY . .
+
+ENV CGO_ENABLED=0
+ENV BUILD_HOST=syncthing.net
+ENV BUILD_USER=docker
+RUN rm -f strelaysrv && go run build.go -no-upgrade build strelaypoolsrv
+
+FROM alpine
+
+EXPOSE 8080
+
+RUN apk add --no-cache ca-certificates su-exec curl
+ENV PUID=1000 PGID=1000 MAXMIND_KEY=
+
+RUN mkdir /var/strelaypoolsrv && chown 1000 /var/strelaypoolsrv
+USER 1000
+
+COPY --from=builder /src/strelaypoolsrv /bin/strelaypoolsrv
+COPY --from=builder /src/script/strelaypoolsrv-entrypoint.sh /bin/entrypoint.sh
+
+WORKDIR /var/strelaypoolsrv
+ENTRYPOINT ["/bin/entrypoint.sh", "/bin/strelaypoolsrv", "-listen", ":8080"]
diff --git a/Dockerfile.stupgrades b/Dockerfile.stupgrades
new file mode 100644
index 0000000000..b5caf44a96
--- /dev/null
+++ b/Dockerfile.stupgrades
@@ -0,0 +1,23 @@
+ARG GOVERSION=latest
+FROM golang:$GOVERSION AS builder
+
+WORKDIR /src
+COPY . .
+
+ENV CGO_ENABLED=0
+ENV BUILD_HOST=syncthing.net
+ENV BUILD_USER=docker
+RUN rm -f stupgrades && go run build.go build stupgrades
+
+FROM alpine
+
+EXPOSE 8080
+
+COPY --from=builder /src/stupgrades /bin/stupgrades
+
+ENTRYPOINT [ \
+ "/bin/stupgrades", \
+ "-f", "/nightly.json->https://build.syncthing.net/guestAuth/repository/download/Release_Nightly/.lastSuccessful/nightly.json", \
+ "-f", "/syncthing-macos/appcast.xml->https://build.syncthing.net/guestAuth/repository/download/SyncthingMacOS_CreateAppcastXml/.lastSuccessful/appcast.xml" \
+ ]
+
diff --git a/build.go b/build.go
index 14b9e18e65..a96eb91efb 100644
--- a/build.go
+++ b/build.go
@@ -207,6 +207,18 @@ var targets = map[string]target{
{src: "AUTHORS", dst: "deb/usr/share/doc/syncthing-relaypoolsrv/AUTHORS.txt", perm: 0644},
},
},
+ "stupgrades": {
+ name: "stupgrades",
+ description: "Syncthing Upgrade Check Server",
+ buildPkgs: []string{"github.com/syncthing/syncthing/cmd/stupgrades"},
+ binaryName: "stupgrades",
+ },
+ "stcrashreceiver": {
+ name: "stupgrastcrashreceiverdes",
+ description: "Syncthing Crash Server",
+ buildPkgs: []string{"github.com/syncthing/syncthing/cmd/stcrashreceiver"},
+ binaryName: "stcrashreceiver",
+ },
}
func initTargets() {
diff --git a/cmd/stcrashreceiver/diskstore.go b/cmd/stcrashreceiver/diskstore.go
new file mode 100644
index 0000000000..a5f459ad01
--- /dev/null
+++ b/cmd/stcrashreceiver/diskstore.go
@@ -0,0 +1,181 @@
+package main
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "io"
+ "log"
+ "os"
+ "path/filepath"
+ "sort"
+ "time"
+)
+
+type diskStore struct {
+ dir string
+ inbox chan diskEntry
+ maxBytes int64
+ maxFiles int
+
+ currentFiles []currentFile
+ currentSize int64
+}
+
+type diskEntry struct {
+ path string
+ data []byte
+}
+
+type currentFile struct {
+ path string
+ size int64
+ mtime int64
+}
+
+func (d *diskStore) Serve(ctx context.Context) {
+ if err := os.MkdirAll(d.dir, 0750); err != nil {
+ log.Println("Creating directory:", err)
+ return
+ }
+
+ if err := d.inventory(); err != nil {
+ log.Println("Failed to inventory disk store:", err)
+ }
+ d.clean()
+
+ cleanTimer := time.NewTicker(time.Minute)
+ inventoryTimer := time.NewTicker(24 * time.Hour)
+
+ buf := new(bytes.Buffer)
+ gw := gzip.NewWriter(buf)
+ for {
+ select {
+ case entry := <-d.inbox:
+ path := d.fullPath(entry.path)
+
+ if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
+ log.Println("Creating directory:", err)
+ continue
+ }
+
+ buf.Reset()
+ gw.Reset(buf)
+ if _, err := gw.Write(entry.data); err != nil {
+ log.Println("Failed to compress crash report:", err)
+ continue
+ }
+ if err := gw.Close(); err != nil {
+ log.Println("Failed to compress crash report:", err)
+ continue
+ }
+ if err := os.WriteFile(path, buf.Bytes(), 0644); err != nil {
+ log.Printf("Failed to write %s: %v", entry.path, err)
+ _ = os.Remove(path)
+ continue
+ }
+
+ d.currentSize += int64(buf.Len())
+ d.currentFiles = append(d.currentFiles, currentFile{
+ size: int64(len(entry.data)),
+ path: path,
+ })
+
+ case <-cleanTimer.C:
+ d.clean()
+
+ case <-inventoryTimer.C:
+ if err := d.inventory(); err != nil {
+ log.Println("Failed to inventory disk store:", err)
+ }
+
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (d *diskStore) Put(path string, data []byte) bool {
+ select {
+ case d.inbox <- diskEntry{
+ path: path,
+ data: data,
+ }:
+ return true
+ default:
+ return false
+ }
+}
+
+func (d *diskStore) Get(path string) ([]byte, error) {
+ path = d.fullPath(path)
+ bs, err := os.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+ gr, err := gzip.NewReader(bytes.NewReader(bs))
+ if err != nil {
+ return nil, err
+ }
+ defer gr.Close()
+ return io.ReadAll(gr)
+}
+
+func (d *diskStore) Exists(path string) bool {
+ path = d.fullPath(path)
+ _, err := os.Lstat(path)
+ return err == nil
+}
+
+func (d *diskStore) clean() {
+ for len(d.currentFiles) > 0 && (len(d.currentFiles) > d.maxFiles || d.currentSize > d.maxBytes) {
+ f := d.currentFiles[0]
+ log.Println("Removing", f.path)
+ if err := os.Remove(f.path); err != nil {
+ log.Println("Failed to remove file:", err)
+ }
+ d.currentFiles = d.currentFiles[1:]
+ d.currentSize -= f.size
+ }
+ var oldest time.Duration
+ if len(d.currentFiles) > 0 {
+ oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
+ }
+ log.Printf("Clean complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
+}
+
+func (d *diskStore) inventory() error {
+ d.currentFiles = nil
+ d.currentSize = 0
+ err := filepath.Walk(d.dir, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if info.IsDir() {
+ return nil
+ }
+ if filepath.Ext(path) != ".gz" {
+ return nil
+ }
+ d.currentSize += info.Size()
+ d.currentFiles = append(d.currentFiles, currentFile{
+ path: path,
+ size: info.Size(),
+ mtime: info.ModTime().Unix(),
+ })
+ return nil
+ })
+ sort.Slice(d.currentFiles, func(i, j int) bool {
+ return d.currentFiles[i].mtime < d.currentFiles[j].mtime
+ })
+ var oldest time.Duration
+ if len(d.currentFiles) > 0 {
+ oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
+ }
+ log.Printf("Inventory complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
+ return err
+}
+
+func (d *diskStore) fullPath(path string) string {
+ return filepath.Join(d.dir, path[0:2], path[2:]) + ".gz"
+}
diff --git a/cmd/stcrashreceiver/main.go b/cmd/stcrashreceiver/main.go
index 62b70d5c21..15f48c17b3 100644
--- a/cmd/stcrashreceiver/main.go
+++ b/cmd/stcrashreceiver/main.go
@@ -13,15 +13,17 @@
package main
import (
+ "context"
"encoding/json"
- "flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
+ "time"
+ "github.com/alecthomas/kong"
"github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/ur"
@@ -30,26 +32,50 @@ import (
const maxRequestSize = 1 << 20 // 1 MiB
+type cli struct {
+ Dir string `help:"Parent directory to store crash and failure reports in" env:"REPORTS_DIR" default:"."`
+ DSN string `help:"Sentry DSN" env:"SENTRY_DSN"`
+ Listen string `help:"HTTP listen address" default:":8080" env:"LISTEN_ADDRESS"`
+ MaxDiskFiles int `help:"Maximum number of reports on disk" default:"100000" env:"MAX_DISK_FILES"`
+ MaxDiskSizeMB int64 `help:"Maximum disk space to use for reports" default:"1024" env:"MAX_DISK_SIZE_MB"`
+ CleanInterval time.Duration `help:"Interval between cleaning up old reports" default:"12h" env:"CLEAN_INTERVAL"`
+ SentryQueue int `help:"Maximum number of reports to queue for sending to Sentry" default:"64" env:"SENTRY_QUEUE"`
+ DiskQueue int `help:"Maximum number of reports to queue for writing to disk" default:"64" env:"DISK_QUEUE"`
+}
+
func main() {
- dir := flag.String("dir", ".", "Parent directory to store crash and failure reports in")
- dsn := flag.String("dsn", "", "Sentry DSN")
- listen := flag.String("listen", ":22039", "HTTP listen address")
- flag.Parse()
+ var params cli
+ kong.Parse(&params)
mux := http.NewServeMux()
+ ds := &diskStore{
+ dir: filepath.Join(params.Dir, "crash_reports"),
+ inbox: make(chan diskEntry, params.DiskQueue),
+ maxFiles: params.MaxDiskFiles,
+ maxBytes: params.MaxDiskSizeMB << 20,
+ }
+ go ds.Serve(context.Background())
+
+ ss := &sentryService{
+ dsn: params.DSN,
+ inbox: make(chan sentryRequest, params.SentryQueue),
+ }
+ go ss.Serve(context.Background())
+
cr := &crashReceiver{
- dir: filepath.Join(*dir, "crash_reports"),
- dsn: *dsn,
+ store: ds,
+ sentry: ss,
}
+
mux.Handle("/", cr)
- if *dsn != "" {
- mux.HandleFunc("/newcrash/failure", handleFailureFn(*dsn, filepath.Join(*dir, "failure_reports")))
+ if params.DSN != "" {
+ mux.HandleFunc("/newcrash/failure", handleFailureFn(params.DSN, filepath.Join(params.Dir, "failure_reports")))
}
log.SetOutput(os.Stdout)
- if err := http.ListenAndServe(*listen, mux); err != nil {
+ if err := http.ListenAndServe(params.Listen, mux); err != nil {
log.Fatalln("HTTP serve:", err)
}
}
diff --git a/cmd/stcrashreceiver/sentry.go b/cmd/stcrashreceiver/sentry.go
index e7796fe29c..de8d91e57d 100644
--- a/cmd/stcrashreceiver/sentry.go
+++ b/cmd/stcrashreceiver/sentry.go
@@ -8,8 +8,10 @@ package main
import (
"bytes"
+ "context"
"errors"
"io"
+ "log"
"regexp"
"strings"
"sync"
@@ -31,6 +33,44 @@ var (
clientsMut sync.Mutex
)
+type sentryService struct {
+ dsn string
+ inbox chan sentryRequest
+}
+
+type sentryRequest struct {
+ reportID string
+ data []byte
+}
+
+func (s *sentryService) Serve(ctx context.Context) {
+ for {
+ select {
+ case req := <-s.inbox:
+ pkt, err := parseCrashReport(req.reportID, req.data)
+ if err != nil {
+ log.Println("Failed to parse crash report:", err)
+ continue
+ }
+ if err := sendReport(s.dsn, pkt, req.reportID); err != nil {
+ log.Println("Failed to send crash report:", err)
+ }
+
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (s *sentryService) Send(reportID string, data []byte) bool {
+ select {
+ case s.inbox <- sentryRequest{reportID, data}:
+ return true
+ default:
+ return false
+ }
+}
+
func sendReport(dsn string, pkt *raven.Packet, userID string) error {
pkt.Interfaces = append(pkt.Interfaces, &raven.User{ID: userID})
diff --git a/cmd/stcrashreceiver/stcrashreceiver.go b/cmd/stcrashreceiver/stcrashreceiver.go
index 57b98a5d1b..0c1dcd4e25 100644
--- a/cmd/stcrashreceiver/stcrashreceiver.go
+++ b/cmd/stcrashreceiver/stcrashreceiver.go
@@ -7,20 +7,16 @@
package main
import (
- "bytes"
- "compress/gzip"
"io"
"log"
"net/http"
- "os"
"path"
- "path/filepath"
"strings"
)
type crashReceiver struct {
- dir string
- dsn string
+ store *diskStore
+ sentry *sentryService
}
func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -43,55 +39,38 @@ func (r *crashReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
- // The location of the report on disk, compressed
- fullPath := filepath.Join(r.dir, r.dirFor(reportID), reportID) + ".gz"
-
switch req.Method {
case http.MethodGet:
- r.serveGet(fullPath, w, req)
+ r.serveGet(reportID, w, req)
case http.MethodHead:
- r.serveHead(fullPath, w, req)
+ r.serveHead(reportID, w, req)
case http.MethodPut:
- r.servePut(reportID, fullPath, w, req)
+ r.servePut(reportID, w, req)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
// serveGet responds to GET requests by serving the uncompressed report.
-func (*crashReceiver) serveGet(fullPath string, w http.ResponseWriter, _ *http.Request) {
- fd, err := os.Open(fullPath)
+func (r *crashReceiver) serveGet(reportID string, w http.ResponseWriter, _ *http.Request) {
+ bs, err := r.store.Get(reportID)
if err != nil {
http.Error(w, "Not found", http.StatusNotFound)
return
}
-
- defer fd.Close()
- gr, err := gzip.NewReader(fd)
- if err != nil {
- http.Error(w, "Internal server error", http.StatusInternalServerError)
- return
- }
- _, _ = io.Copy(w, gr) // best effort
+ w.Write(bs)
}
// serveHead responds to HEAD requests by checking if the named report
// already exists in the system.
-func (*crashReceiver) serveHead(fullPath string, w http.ResponseWriter, _ *http.Request) {
- if _, err := os.Lstat(fullPath); err != nil {
+func (r *crashReceiver) serveHead(reportID string, w http.ResponseWriter, _ *http.Request) {
+ if !r.store.Exists(reportID) {
http.Error(w, "Not found", http.StatusNotFound)
}
}
// servePut accepts and stores the given report.
-func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWriter, req *http.Request) {
- // Ensure the destination directory exists
- if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
- log.Println("Creating directory:", err)
- http.Error(w, "Internal server error", http.StatusInternalServerError)
- return
- }
-
+func (r *crashReceiver) servePut(reportID string, w http.ResponseWriter, req *http.Request) {
// Read at most maxRequestSize of report data.
log.Println("Receiving report", reportID)
lr := io.LimitReader(req.Body, maxRequestSize)
@@ -102,40 +81,13 @@ func (r *crashReceiver) servePut(reportID, fullPath string, w http.ResponseWrite
return
}
- // Compress the report for storage
- buf := new(bytes.Buffer)
- gw := gzip.NewWriter(buf)
- _, _ = gw.Write(bs) // can't fail
- gw.Close()
-
- // Create an output file with the compressed report
- err = os.WriteFile(fullPath, buf.Bytes(), 0644)
- if err != nil {
- log.Println("Saving report:", err)
- http.Error(w, "Internal server error", http.StatusInternalServerError)
- return
+ // Store the report
+ if !r.store.Put(reportID, bs) {
+ log.Println("Failed to store report (queue full):", reportID)
}
// Send the report to Sentry
- if r.dsn != "" {
- // Remote ID
- user := userIDFor(req)
-
- go func() {
- // There's no need for the client to have to wait for this part.
- pkt, err := parseCrashReport(reportID, bs)
- if err != nil {
- log.Println("Failed to parse crash report:", err)
- return
- }
- if err := sendReport(r.dsn, pkt, user); err != nil {
- log.Println("Failed to send crash report:", err)
- }
- }()
+ if !r.sentry.Send(reportID, bs) {
+ log.Println("Failed to send report to sentry (queue full):", reportID)
}
}
-
-// 01234567890abcdef... => 01/23
-func (*crashReceiver) dirFor(base string) string {
- return filepath.Join(base[0:2], base[2:4])
-}
diff --git a/cmd/strelaypoolsrv/main.go b/cmd/strelaypoolsrv/main.go
index 063097017f..81bc116933 100644
--- a/cmd/strelaypoolsrv/main.go
+++ b/cmd/strelaypoolsrv/main.go
@@ -3,14 +3,12 @@
package main
import (
- "compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
- "io"
"log"
"net"
"net/http"
@@ -19,11 +17,13 @@ import (
"path/filepath"
"strconv"
"strings"
+ "sync/atomic"
"time"
+ lru "github.com/hashicorp/golang-lru/v2"
+ "github.com/syncthing/syncthing/lib/httpcache"
"github.com/syncthing/syncthing/lib/protocol"
- "github.com/golang/groupcache/lru"
"github.com/oschwald/geoip2-golang"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -33,7 +33,6 @@ import (
"github.com/syncthing/syncthing/lib/relay/client"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/tlsutil"
- "golang.org/x/time/rate"
)
type location struct {
@@ -99,27 +98,13 @@ var (
dir string
evictionTime = time.Hour
debug bool
- getLRUSize = 10 << 10
- getLimitBurst = 10
- getLimitAvg = 2
- postLRUSize = 1 << 10
- postLimitBurst = 2
- postLimitAvg = 2
- getLimit time.Duration
- postLimit time.Duration
permRelaysFile string
ipHeader string
geoipPath string
proto string
- statsRefresh = time.Minute / 2
- requestQueueLen = 10
- requestProcessors = 1
-
- getMut = sync.NewMutex()
- getLRUCache *lru.Cache
-
- postMut = sync.NewMutex()
- postLRUCache *lru.Cache
+ statsRefresh = time.Minute
+ requestQueueLen = 64
+ requestProcessors = 8
requests chan request
@@ -127,6 +112,7 @@ var (
knownRelays = make([]*relay, 0)
permanentRelays = make([]*relay, 0)
evictionTimers = make(map[string]*time.Timer)
+ globalBlocklist = newErrorTracker(1000)
)
const (
@@ -141,13 +127,8 @@ func main() {
flag.StringVar(&dir, "keys", dir, "Directory where http-cert.pem and http-key.pem is stored for TLS listening")
flag.BoolVar(&debug, "debug", debug, "Enable debug output")
flag.DurationVar(&evictionTime, "eviction", evictionTime, "After how long the relay is evicted")
- flag.IntVar(&getLRUSize, "get-limit-cache", getLRUSize, "Get request limiter cache size")
- flag.IntVar(&getLimitAvg, "get-limit-avg", getLimitAvg, "Allowed average get request rate, per 10 s")
- flag.IntVar(&getLimitBurst, "get-limit-burst", getLimitBurst, "Allowed burst get requests")
- flag.IntVar(&postLRUSize, "post-limit-cache", postLRUSize, "Post request limiter cache size")
- flag.IntVar(&postLimitAvg, "post-limit-avg", postLimitAvg, "Allowed average post request rate, per minute")
- flag.IntVar(&postLimitBurst, "post-limit-burst", postLimitBurst, "Allowed burst post requests")
flag.StringVar(&permRelaysFile, "perm-relays", "", "Path to list of permanent relays")
+ flag.StringVar(&knownRelaysFile, "known-relays", knownRelaysFile, "Path to list of current relays")
flag.StringVar(&ipHeader, "ip-header", "", "Name of header which holds clients ip:port. Only meaningful when running behind a reverse proxy.")
flag.StringVar(&geoipPath, "geoip", "GeoLite2-City.mmdb", "Path to GeoLite2-City database")
flag.StringVar(&proto, "protocol", "tcp", "Protocol used for listening. 'tcp' for IPv4 and IPv6, 'tcp4' for IPv4, 'tcp6' for IPv6")
@@ -159,12 +140,6 @@ func main() {
requests = make(chan request, requestQueueLen)
- getLimit = 10 * time.Second / time.Duration(getLimitAvg)
- postLimit = time.Minute / time.Duration(postLimitAvg)
-
- getLRUCache = lru.New(getLRUSize)
- postLRUCache = lru.New(postLRUSize)
-
var listener net.Listener
var err error
@@ -240,7 +215,7 @@ func main() {
handler := http.NewServeMux()
handler.HandleFunc("/", handleAssets)
- handler.HandleFunc("/endpoint", handleRequest)
+ handler.Handle("/endpoint", httpcache.SinglePath(http.HandlerFunc(handleRequest), 15*time.Second))
handler.HandleFunc("/metrics", handleMetrics)
srv := http.Server{
@@ -291,21 +266,17 @@ func handleRequest(w http.ResponseWriter, r *http.Request) {
}()
if ipHeader != "" {
- r.RemoteAddr = r.Header.Get(ipHeader)
+ hdr := r.Header.Get(ipHeader)
+ fields := strings.Split(hdr, ",")
+ if len(fields) > 0 {
+ r.RemoteAddr = strings.TrimSpace(fields[len(fields)-1])
+ }
}
w.Header().Set("Access-Control-Allow-Origin", "*")
switch r.Method {
case "GET":
- if limit(r.RemoteAddr, getLRUCache, getMut, getLimit, getLimitBurst) {
- w.WriteHeader(httpStatusEnhanceYourCalm)
- return
- }
handleGetRequest(w, r)
case "POST":
- if limit(r.RemoteAddr, postLRUCache, postMut, postLimit, postLimitBurst) {
- w.WriteHeader(httpStatusEnhanceYourCalm)
- return
- }
handlePostRequest(w, r)
default:
if debug {
@@ -327,20 +298,28 @@ func handleGetRequest(rw http.ResponseWriter, r *http.Request) {
// Shuffle
rand.Shuffle(relays)
- w := io.Writer(rw)
- if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- rw.Header().Set("Content-Encoding", "gzip")
- gw := gzip.NewWriter(rw)
- defer gw.Close()
- w = gw
- }
-
- _ = json.NewEncoder(w).Encode(map[string][]*relay{
+ _ = json.NewEncoder(rw).Encode(map[string][]*relay{
"relays": relays,
})
}
func handlePostRequest(w http.ResponseWriter, r *http.Request) {
+ // Get the IP address of the client
+ rhost := r.RemoteAddr
+ if host, _, err := net.SplitHostPort(rhost); err == nil {
+ rhost = host
+ }
+
+ // Check the black list. A client is blacklisted if their last 10
+ // attempts to join have all failed. The "Unauthorized" status return
+ // causes strelaysrv to cease attempting to join.
+ if globalBlocklist.IsBlocked(rhost) {
+ log.Println("Rejected blocked client", rhost)
+ http.Error(w, "Too many errors", http.StatusUnauthorized)
+ globalBlocklist.ClearErrors(rhost)
+ return
+ }
+
var relayCert *x509.Certificate
if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 {
relayCert = r.TLS.PeerCertificates[0]
@@ -392,12 +371,6 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) {
return
}
- // Get the IP address of the client
- rhost := r.RemoteAddr
- if host, _, err := net.SplitHostPort(rhost); err == nil {
- rhost = host
- }
-
ip := net.ParseIP(host)
// The client did not provide an IP address, use the IP address of the client.
if ip == nil || ip.IsUnspecified() {
@@ -429,10 +402,14 @@ func handlePostRequest(w http.ResponseWriter, r *http.Request) {
case requests <- request{&newRelay, reschan, prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("queue"))}:
result := <-reschan
if result.err != nil {
+ log.Println("Join from", r.RemoteAddr, "failed:", result.err)
+ globalBlocklist.AddError(rhost)
relayTestsTotal.WithLabelValues("failed").Inc()
http.Error(w, result.err.Error(), http.StatusBadRequest)
return
}
+ log.Println("Join from", r.RemoteAddr, "succeeded")
+ globalBlocklist.ClearErrors(rhost)
relayTestsTotal.WithLabelValues("success").Inc()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(map[string]time.Duration{
@@ -546,23 +523,6 @@ func evict(relay *relay) func() {
}
}
-func limit(addr string, cache *lru.Cache, lock sync.Mutex, intv time.Duration, burst int) bool {
- if host, _, err := net.SplitHostPort(addr); err == nil {
- addr = host
- }
-
- lock.Lock()
- v, _ := cache.Get(addr)
- bkt, ok := v.(*rate.Limiter)
- if !ok {
- bkt = rate.NewLimiter(rate.Every(intv), burst)
- cache.Add(addr, bkt)
- }
- lock.Unlock()
-
- return !bkt.Allow()
-}
-
func loadRelays(file string) []*relay {
content, err := os.ReadFile(file)
if err != nil {
@@ -602,7 +562,7 @@ func saveRelays(file string, relays []*relay) error {
for _, relay := range relays {
content += relay.uri.String() + "\n"
}
- return os.WriteFile(file, []byte(content), 0777)
+ return os.WriteFile(file, []byte(content), 0o777)
}
func createTestCertificate() tls.Certificate {
@@ -661,3 +621,42 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
+
+type errorTracker struct {
+ errors *lru.TwoQueueCache[string, *errorCounter]
+}
+
+type errorCounter struct {
+ count atomic.Int32
+}
+
+func newErrorTracker(size int) *errorTracker {
+ cache, err := lru.New2Q[string, *errorCounter](size)
+ if err != nil {
+ panic(err)
+ }
+ return &errorTracker{
+ errors: cache,
+ }
+}
+
+func (b *errorTracker) AddError(host string) {
+ entry, ok := b.errors.Get(host)
+ if !ok {
+ entry = &errorCounter{}
+ b.errors.Add(host, entry)
+ }
+ c := entry.count.Add(1)
+ log.Printf("Error count for %s is now %d", host, c)
+}
+
+func (b *errorTracker) ClearErrors(host string) {
+ b.errors.Remove(host)
+}
+
+func (b *errorTracker) IsBlocked(host string) bool {
+ if be, ok := b.errors.Get(host); ok {
+ return be.count.Load() > 10
+ }
+ return false
+}
diff --git a/cmd/stupgrades/main.go b/cmd/stupgrades/main.go
index 429f026e32..3b2c79ff29 100644
--- a/cmd/stupgrades/main.go
+++ b/cmd/stupgrades/main.go
@@ -7,31 +7,112 @@
package main
import (
+ "bytes"
"encoding/json"
- "flag"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
"os"
"sort"
+ "strings"
+ "time"
+ "github.com/alecthomas/kong"
+ "github.com/syncthing/syncthing/lib/httpcache"
"github.com/syncthing/syncthing/lib/upgrade"
)
-const defaultURL = "https://api.github.com/repos/syncthing/syncthing/releases?per_page=25"
+type cli struct {
+ Listen string `default:":8080" help:"Listen address"`
+ URL string `short:"u" default:"https://api.github.com/repos/syncthing/syncthing/releases?per_page=25" help:"GitHub releases url"`
+ Forward []string `short:"f" help:"Forwarded pages, format: /path->https://example/com/url"`
+ CacheTime time.Duration `default:"15m" help:"Cache time"`
+}
func main() {
- url := flag.String("u", defaultURL, "GitHub releases url")
- flag.Parse()
+ var params cli
+ kong.Parse(&params)
+ if err := server(&params); err != nil {
+ fmt.Printf("Error: %v\n", err)
+ os.Exit(1)
+ }
+}
+
+func server(params *cli) error {
+ http.Handle("/meta.json", httpcache.SinglePath(&githubReleases{url: params.URL}, params.CacheTime))
+
+ for _, fwd := range params.Forward {
+ path, url, ok := strings.Cut(fwd, "->")
+ if !ok {
+ return fmt.Errorf("invalid forward: %q", fwd)
+ }
+ http.Handle(path, httpcache.SinglePath(&proxy{url: url}, params.CacheTime))
+ }
+
+ return http.ListenAndServe(params.Listen, nil)
+}
+
+type githubReleases struct {
+ url string
+}
- rels := upgrade.FetchLatestReleases(*url, "")
+func (p *githubReleases) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ log.Println("Fetching", p.url)
+ rels := upgrade.FetchLatestReleases(p.url, "")
if rels == nil {
- // An error was already logged
- os.Exit(1)
+ http.Error(w, "no releases", http.StatusInternalServerError)
+ return
}
sort.Sort(upgrade.SortByRelease(rels))
rels = filterForLatest(rels)
- if err := json.NewEncoder(os.Stdout).Encode(rels); err != nil {
- os.Exit(1)
+ buf := new(bytes.Buffer)
+ _ = json.NewEncoder(buf).Encode(rels)
+
+ w.Header().Set("Content-Type", "application/json; charset=utf-8")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Methods", "GET")
+ w.Write(buf.Bytes())
+}
+
+type proxy struct {
+ url string
+}
+
+func (p *proxy) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ log.Println("Fetching", p.url)
+ req, err := http.NewRequestWithContext(req.Context(), http.MethodGet, p.url, nil)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ http.Error(w, err.Error(), http