summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2023-04-28 09:01:54 +0200
committerJakob Borg <jakob@kastelo.net>2023-04-28 09:01:54 +0200
commit020cfe395a23db0cf0aa51a35d2506b788530daa (patch)
tree89df5486fd896f423a47e8fdcac37e393e6ee40c
parent947dd0db0938d8350f567b8ab0bbf3d0aa22f2a7 (diff)
all: Use multiple QUIC streams (fixes #8879)
Work in progress, to be described more fully in time, but in principle: - support multiple streams on a single connection at the protocol level - use multiple streams for concurrent requests - hope for improved greatness
-rw-r--r--cmd/syncthing/perfstats_unix.go5
-rw-r--r--lib/connections/limiter.go115
-rw-r--r--lib/connections/limiter_test.go65
-rw-r--r--lib/connections/quic_dial.go8
-rw-r--r--lib/connections/quic_listen.go20
-rw-r--r--lib/connections/quic_misc.go30
-rw-r--r--lib/connections/relay_dial.go3
-rw-r--r--lib/connections/relay_listen.go3
-rw-r--r--lib/connections/service.go14
-rw-r--r--lib/connections/structs.go4
-rw-r--r--lib/connections/tcp_dial.go3
-rw-r--r--lib/connections/tcp_listen.go3
-rw-r--r--lib/model/model.go14
-rw-r--r--lib/model/model_test.go4
-rw-r--r--lib/netutil/counting.go166
-rw-r--r--lib/netutil/limiting.go137
-rw-r--r--lib/netutil/stream.go106
-rw-r--r--lib/protocol/benchmark_test.go8
-rw-r--r--lib/protocol/bep.pb.go443
-rw-r--r--lib/protocol/counting.go58
-rw-r--r--lib/protocol/protocol.go270
-rw-r--r--lib/protocol/protocol_test.go47
-rw-r--r--proto/lib/protocol/bep.proto7
-rw-r--r--test/h1/config.xml132
-rw-r--r--test/h2/config.xml143
25 files changed, 1298 insertions, 510 deletions
diff --git a/cmd/syncthing/perfstats_unix.go b/cmd/syncthing/perfstats_unix.go
index 6da5a2116b..2ad395119b 100644
--- a/cmd/syncthing/perfstats_unix.go
+++ b/cmd/syncthing/perfstats_unix.go
@@ -16,7 +16,7 @@ import (
"syscall"
"time"
- "github.com/syncthing/syncthing/lib/protocol"
+ "github.com/syncthing/syncthing/lib/netutil"
)
func startPerfStats() {
@@ -48,7 +48,8 @@ func savePerfStats(file string) {
cpuUsagePercent := 100 * float64(usageDiff) / float64(timeDiff)
prevTime = curTime
prevUsage = curUsage
- in, out := protocol.TotalInOut()
+ cnt := netutil.RootCounter()
+ in, out := cnt.BytesRead(), cnt.BytesWritten()
var inRate, outRate float64
if timeDiff > 0 {
inRate = float64(in-prevIn) / (float64(timeDiff) / 1e9) // bytes per second
diff --git a/lib/connections/limiter.go b/lib/connections/limiter.go
index 4fa4578dcd..81855f7fad 100644
--- a/lib/connections/limiter.go
+++ b/lib/connections/limiter.go
@@ -9,7 +9,7 @@ package connections
import (
"context"
"fmt"
- "io"
+ "math"
"sync/atomic"
"github.com/syncthing/syncthing/lib/config"
@@ -34,6 +34,7 @@ type waiter interface {
// This is the rate limiting operation
WaitN(ctx context.Context, n int) error
Limit() rate.Limit
+ Burst() int
}
const (
@@ -177,33 +178,27 @@ func (*limiter) String() string {
return "connections.limiter"
}
-func (lim *limiter) getLimiters(remoteID protocol.DeviceID, rw io.ReadWriter, isLAN bool) (io.Reader, io.Writer) {
+func (lim *limiter) getLimiters(remoteID protocol.DeviceID, isLAN bool) (waiterHolder, waiterHolder) {
lim.mu.Lock()
- wr := lim.newLimitedWriterLocked(remoteID, rw, isLAN)
- rd := lim.newLimitedReaderLocked(remoteID, rw, isLAN)
+ wr := lim.newWriteLimiterLocked(remoteID, isLAN)
+ rd := lim.newReadLimiterLocked(remoteID, isLAN)
lim.mu.Unlock()
return rd, wr
}
-func (lim *limiter) newLimitedReaderLocked(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
- return &limitedReader{
- reader: r,
- waiterHolder: waiterHolder{
- waiter: totalWaiter{lim.getReadLimiterLocked(remoteID), lim.read},
- limitsLAN: &lim.limitsLAN,
- isLAN: isLAN,
- },
+func (lim *limiter) newReadLimiterLocked(remoteID protocol.DeviceID, isLAN bool) waiterHolder {
+ return waiterHolder{
+ waiter: totalWaiter{lim.getReadLimiterLocked(remoteID), lim.read},
+ limitsLAN: &lim.limitsLAN,
+ isLAN: isLAN,
}
}
-func (lim *limiter) newLimitedWriterLocked(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
- return &limitedWriter{
- writer: w,
- waiterHolder: waiterHolder{
- waiter: totalWaiter{lim.getWriteLimiterLocked(remoteID), lim.write},
- limitsLAN: &lim.limitsLAN,
- isLAN: isLAN,
- },
+func (lim *limiter) newWriteLimiterLocked(remoteID protocol.DeviceID, isLAN bool) waiterHolder {
+ return waiterHolder{
+ waiter: totalWaiter{lim.getWriteLimiterLocked(remoteID), lim.write},
+ limitsLAN: &lim.limitsLAN,
+ isLAN: isLAN,
}
}
@@ -224,60 +219,6 @@ func getRateLimiter(m map[protocol.DeviceID]*rate.Limiter, deviceID protocol.Dev
return limiter
}
-// limitedReader is a rate limited io.Reader
-type limitedReader struct {
- reader io.Reader
- waiterHolder
-}
-
-func (r *limitedReader) Read(buf []byte) (int, error) {
- n, err := r.reader.Read(buf)
- if !r.unlimited() {
- r.take(n)
- }
- return n, err
-}
-
-// limitedWriter is a rate limited io.Writer
-type limitedWriter struct {
- writer io.Writer
- waiterHolder
-}
-
-func (w *limitedWriter) Write(buf []byte) (int, error) {
- if w.unlimited() {
- return w.writer.Write(buf)
- }
-
- // This does (potentially) multiple smaller writes in order to be less
- // bursty with large writes and slow rates. At the same time we don't
- // want to do hilarious amounts of tiny writes when the rate is high, so
- // try to be a bit adaptable. We range from the minimum write size of 1
- // KiB up to the limiter burst size, aiming for about a write every
- // 10ms.
- singleWriteSize := int(w.waiter.Limit() / 100) // 10ms worth of data
- singleWriteSize = ((singleWriteSize / 1024) + 1) * 1024 // round up to the next kibibyte
- if singleWriteSize > limiterBurstSize {
- singleWriteSize = limiterBurstSize
- }
-
- written := 0
- for written < len(buf) {
- toWrite := singleWriteSize
- if toWrite > len(buf)-written {
- toWrite = len(buf) - written
- }
- w.take(toWrite)
- n, err := w.writer.Write(buf[written : written+toWrite])
- written += n
- if err != nil {
- return written, err
- }
- }
-
- return written, nil
-}
-
// waiterHolder is the common functionality around having and evaluating a
// waiter, valid for both writers and readers
type waiterHolder struct {
@@ -286,17 +227,17 @@ type waiterHolder struct {
isLAN bool
}
-// unlimited returns true if the waiter is not limiting the rate
-func (w waiterHolder) unlimited() bool {
+// Unlimited returns true if the waiter is not limiting the rate
+func (w waiterHolder) Unlimited() bool {
if w.isLAN && !w.limitsLAN.Load() {
return true
}
return w.waiter.Limit() == rate.Inf
}
-// take is a utility function to consume tokens, because no call to WaitN
+// Take is a utility function to consume tokens, because no call to WaitN
// must be larger than the limiter burst size or it will hang.
-func (w waiterHolder) take(tokens int) {
+func (w waiterHolder) Take(tokens int) {
// For writes we already split the buffer into smaller operations so those
// will always end up in the fast path below. For reads, however, we don't
// control the size of the incoming buffer and don't split the calls
@@ -322,6 +263,14 @@ func (w waiterHolder) take(tokens int) {
}
}
+func (w waiterHolder) Limit() int {
+ return int(w.waiter.Limit())
+}
+
+func (w waiterHolder) Burst() int {
+ return w.waiter.Burst()
+}
+
// totalWaiter waits for all of the waiters
type totalWaiter []waiter
@@ -345,3 +294,13 @@ func (tw totalWaiter) Limit() rate.Limit {
}
return min
}
+
+func (tw totalWaiter) Burst() int {
+ min := math.MaxInt
+ for _, w := range tw {
+ if l := w.Burst(); l < min {
+ min = l
+ }
+ }
+ return min
+}
diff --git a/lib/connections/limiter_test.go b/lib/connections/limiter_test.go
index fd7e3547ee..1e907472cf 100644
--- a/lib/connections/limiter_test.go
+++ b/lib/connections/limiter_test.go
@@ -17,12 +17,15 @@ import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/protocol"
"golang.org/x/time/rate"
)
-var device1, device2, device3, device4 protocol.DeviceID
-var dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
+var (
+ device1, device2, device3, device4 protocol.DeviceID
+ dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
+)
func init() {
device1, _ = protocol.DeviceIDFromString("AIR6LPZ7K4PTTUXQSMUUCPQ5YWOEDFIIQJUG7772YQXXR5YD6AWQ")
@@ -231,14 +234,11 @@ func TestLimitedWriterWrite(t *testing.T) {
// regardless of the rate.
dst := new(bytes.Buffer)
cw := &countingWriter{w: dst}
- lw := &limitedWriter{
- writer: cw,
- waiterHolder: waiterHolder{
- waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
- limitsLAN: new(atomic.Bool),
- isLAN: false, // enables limiting
- },
- }
+ lw := netutil.NewLimitedWriter(cw, waiterHolder{
+ waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
+ limitsLAN: new(atomic.Bool),
+ isLAN: false, // enables limiting
+ })
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
@@ -260,14 +260,11 @@ func TestLimitedWriterWrite(t *testing.T) {
// count the write calls. Now we make sure the fast path is used.
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
- lw = &limitedWriter{
- writer: cw,
- waiterHolder: waiterHolder{
- waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
- limitsLAN: new(atomic.Bool),
- isLAN: true, // disables limiting
- },
- }
+ lw = netutil.NewLimitedWriter(cw, waiterHolder{
+ waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
+ limitsLAN: new(atomic.Bool),
+ isLAN: true, // disables limiting
+ })
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
@@ -284,14 +281,11 @@ func TestLimitedWriterWrite(t *testing.T) {
// rate, with multiple unlimited raters even (global and per-device).
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
- lw = &limitedWriter{
- writer: cw,
- waiterHolder: waiterHolder{
- waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
- limitsLAN: new(atomic.Bool),
- isLAN: false, // enables limiting
- },
- }
+ lw = netutil.NewLimitedWriter(cw, waiterHolder{
+ waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
+ limitsLAN: new(atomic.Bool),
+ isLAN: false, // enables limiting
+ })
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
@@ -308,18 +302,15 @@ func TestLimitedWriterWrite(t *testing.T) {
// is a combo of limited and unlimited writers.
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
- lw = &limitedWriter{
- writer: cw,
- waiterHolder: waiterHolder{
- waiter: totalWaiter{
- rate.NewLimiter(rate.Inf, limiterBurstSize),
- rate.NewLimiter(rate.Limit(42), limiterBurstSize),
- rate.NewLimiter(rate.Inf, limiterBurstSize),
- },
- limitsLAN: new(atomic.Bool),
- isLAN: false, // enables limiting
+ lw = netutil.NewLimitedWriter(cw, waiterHolder{
+ waiter: totalWaiter{
+ rate.NewLimiter(rate.Inf, limiterBurstSize),
+ rate.NewLimiter(rate.Limit(42), limiterBurstSize),
+ rate.NewLimiter(rate.Inf, limiterBurstSize),
},
- }
+ limitsLAN: new(atomic.Bool),
+ isLAN: false, // enables limiting
+ })
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
diff --git a/lib/connections/quic_dial.go b/lib/connections/quic_dial.go
index 6c9a036700..ebf4d6e010 100644
--- a/lib/connections/quic_dial.go
+++ b/lib/connections/quic_dial.go
@@ -95,7 +95,13 @@ func (d *quicDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL
if isLocal {
priority = d.lanPriority
}
- return newInternalConn(&quicTlsConn{session, stream, createdConn}, connTypeQUICClient, isLocal, priority), nil
+ qtlsc := &quicTlsConn{
+ Connection: session,
+ Stream: stream,
+ createdConn: createdConn,
+ supportsSubstreams: false, // set later based on handshake
+ }
+ return newInternalConn(qtlsc, connTypeQUICClient, isLocal, priority), nil
}
type quicDialerFactory struct{}
diff --git a/lib/connections/quic_listen.go b/lib/connections/quic_listen.go
index 9eab87026d..18723fcd4b 100644
--- a/lib/connections/quic_listen.go
+++ b/lib/connections/quic_listen.go
@@ -97,15 +97,15 @@ func (t *quicListener) serve(ctx context.Context) error {
}
defer func() { _ = udpConn.Close() }()
- svc, conn := stun.New(t.cfg, t, udpConn)
- defer conn.Close()
+ // svc, conn := stun.New(t.cfg, t, udpConn)
+ // defer conn.Close()
- go svc.Serve(ctx)
+ // go svc.Serve(ctx)
- t.registry.Register(t.uri.Scheme, conn)
- defer t.registry.Unregister(t.uri.Scheme, conn)
+ t.registry.Register(t.uri.Scheme, udpConn)
+ defer t.registry.Unregister(t.uri.Scheme, udpConn)
- listener, err := quic.Listen(conn, t.tlsCfg, quicConfig)
+ listener, err := quic.Listen(udpConn, t.tlsCfg, quicConfig)
if err != nil {
l.Infoln("Listen (BEP/quic):", err)
return err
@@ -174,7 +174,13 @@ func (t *quicListener) serve(ctx context.Context) error {
if isLocal {
priority = t.cfg.Options().ConnectionPriorityQUICLAN
}
- t.conns <- newInternalConn(&quicTlsConn{session, stream, nil}, connTypeQUICServer, isLocal, priority)
+ qtlsc := &quicTlsConn{
+ Connection: session,
+ Stream: stream,
+ createdConn: nil,
+ supportsSubstreams: false, // set later based on handshake
+ }
+ t.conns <- newInternalConn(qtlsc, connTypeQUICServer, isLocal, priority)
}
}
diff --git a/lib/connections/quic_misc.go b/lib/connections/quic_misc.go
index d5f0b0dfd7..8ea2a65003 100644
--- a/lib/connections/quic_misc.go
+++ b/lib/connections/quic_misc.go
@@ -10,20 +10,25 @@
package connections
import (
+ "context"
"crypto/tls"
+ "io"
"net"
"net/url"
"time"
"github.com/quic-go/quic-go"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/osutil"
)
var quicConfig = &quic.Config{
- ConnectionIDLength: 4,
- MaxIdleTimeout: 30 * time.Second,
- KeepAlivePeriod: 15 * time.Second,
+ ConnectionIDLength: 4,
+ MaxIdleTimeout: 30 * time.Second,
+ KeepAlivePeriod: 15 * time.Second,
+ MaxStreamReceiveWindow: 32 << 20,
+ MaxConnectionReceiveWindow: 256 << 20,
}
func quicNetwork(uri *url.URL) string {
@@ -42,6 +47,11 @@ type quicTlsConn struct {
quic.Stream
// If we created this connection, we should be the ones closing it.
createdConn net.PacketConn
+
+ // We might support substreams, but we can't try to use them unless the
+ // other side does, too. This boolean controls that, and is set based on
+ // the Hello received from the other side.
+ supportsSubstreams bool
}
func (q *quicTlsConn) Close() error {
@@ -64,6 +74,20 @@ func (q *quicTlsConn) ConnectionState() tls.ConnectionState {
return q.Connection.ConnectionState().TLS.ConnectionState
}
+func (q *quicTlsConn) CreateSubstream(ctx context.Context) (io.ReadWriteCloser, error) {
+ if !q.supportsSubstreams {
+ return nil, netutil.ErrSubstreamsUnsupported
+ }
+ return q.Connection.OpenStreamSync(ctx)
+}
+
+func (q *quicTlsConn) AcceptSubstream(ctx context.Context) (io.ReadWriteCloser, error) {
+ if !q.supportsSubstreams {
+ return nil, netutil.ErrSubstreamsUnsupported
+ }
+ return q.Connection.AcceptStream(ctx)
+}
+
func packetConnUnspecified(conn interface{}) bool {
addr := conn.(net.PacketConn).LocalAddr()
ip, err := osutil.IPFromAddr(addr)
diff --git a/lib/connections/relay_dial.go b/lib/connections/relay_dial.go
index d79719533f..d8db7854e8 100644
--- a/lib/connections/relay_dial.go
+++ b/lib/connections/relay_dial.go
@@ -15,6 +15,7 @@ import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/client"
)
@@ -62,7 +63,7 @@ func (d *relayDialer) Dial(ctx context.Context, id protocol.DeviceID, uri *url.U
return internalConn{}, err
}
- return newInternalConn(tc, connTypeRelayClient, false, d.wanPriority), nil
+ return newInternalConn(netutil.NewTLSConnStream(tc), connTypeRelayClient, false, d.wanPriority), nil
}
func (d *relayDialer) Priority(host string) int {
diff --git a/lib/connections/relay_listen.go b/lib/connections/relay_listen.go
index b6e89c8133..88f0985537 100644
--- a/lib/connections/relay_listen.go
+++ b/lib/connections/relay_listen.go
@@ -18,6 +18,7 @@ import (
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/nat"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/relay/client"
"github.com/syncthing/syncthing/lib/svcutil"
)
@@ -106,7 +107,7 @@ func (t *relayListener) handleInvitations(ctx context.Context, clnt client.Relay
continue
}
- t.conns <- newInternalConn(tc, connTypeRelayServer, false, t.cfg.Options().ConnectionPriorityRelay)
+ t.conns <- newInternalConn(netutil.NewTLSConnStream(tc), connTypeRelayServer, false, t.cfg.Options().ConnectionPriorityRelay)
// Poor mans notifier that informs the connection service that the
// relay URI has changed. This can only happen when we connect to a
diff --git a/lib/connections/service.go b/lib/connections/service.go
index 32859553f4..280900c87d 100644
--- a/lib/connections/service.go
+++ b/lib/connections/service.go
@@ -28,6 +28,7 @@ import (
"github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/nat"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/svcutil"
@@ -372,6 +373,13 @@ func (s *service) handleHellos(ctx context.Context) error {
}
_ = c.SetDeadline(time.Time{})
+ // If this is a QUIC connection, enable streams support if the other
+ // side supports it.
+ if qc, ok := c.tlsConn.(*quicTlsConn); ok && hello.SupportsMultipleQUICStreams {
+ l.Infof("Connection with %s at %s (%s) supports multiple QUIC streams", remoteID, c.RemoteAddr(), c.Type())
+ qc.supportsSubstreams = true
+ }
+
// The Model will return an error for devices that we don't want to
// have a connection with for whatever reason, for example unknown devices.
if err := s.model.OnHello(remoteID, c.RemoteAddr(), hello); err != nil {
@@ -410,9 +418,9 @@ func (s *service) handleHellos(ctx context.Context) error {
// Wrap the connection in rate limiters. The limiter itself will
// keep up with config changes to the rate and whether or not LAN
// connections are limited.
- rd, wr := s.limiter.getLimiters(remoteID, c, c.IsLocal())
-
- protoConn := protocol.NewConnection(remoteID, rd, wr, c, s.model, c, deviceCfg.Compression, s.cfg.FolderPasswords(remoteID), s.keyGen)
+ rlim, wlim := s.limiter.getLimiters(remoteID, c.IsLocal())
+ limStrm := netutil.NewLimitedStream(c, rlim, wlim)
+ protoConn := protocol.NewConnection(remoteID, limStrm, s.model, c, deviceCfg.Compression, s.cfg.FolderPasswords(remoteID), s.keyGen)
go func() {
<-protoConn.Closed()
s.dialNowDevicesMut.Lock()
diff --git a/lib/connections/structs.go b/lib/connections/structs.go
index 1f5bf376b3..6848957ffd 100644
--- a/lib/connections/structs.go
+++ b/lib/connections/structs.go
@@ -10,7 +10,6 @@ import (
"context"
"crypto/tls"
"fmt"
- "io"
"net"
"net/url"
"time"
@@ -18,6 +17,7 @@ import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/nat"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/stats"
@@ -26,7 +26,7 @@ import (
)
type tlsConn interface {
- io.ReadWriteCloser
+ netutil.Stream
ConnectionState() tls.ConnectionState
RemoteAddr() net.Addr
SetDeadline(time.Time) error
diff --git a/lib/connections/tcp_dial.go b/lib/connections/tcp_dial.go
index 04a551e466..cd626ab712 100644
--- a/lib/connections/tcp_dial.go
+++ b/lib/connections/tcp_dial.go
@@ -15,6 +15,7 @@ import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/protocol"
)
@@ -62,7 +63,7 @@ func (d *tcpDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL)
if isLocal {
priority = d.lanPriority
}
- return newInternalConn(tc, connTypeTCPClient, isLocal, priority), nil
+ return newInternalConn(netutil.NewTLSConnStream(tc), connTypeTCPClient, isLocal, priority), nil
}
type tcpDialerFactory struct{}
diff --git a/lib/connections/tcp_listen.go b/lib/connections/tcp_listen.go
index c932fc13b8..2cd700447a 100644
--- a/lib/connections/tcp_listen.go
+++ b/lib/connections/tcp_listen.go
@@ -18,6 +18,7 @@ import (
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/nat"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/svcutil"
)
@@ -154,7 +155,7 @@ func (t *tcpListener) serve(ctx context.Context) error {
if isLocal {
priority = t.cfg.Options().ConnectionPriorityTCPLAN
}
- t.conns <- newInternalConn(tc, connTypeTCPServer, isLocal, priority)
+ t.conns <- newInternalConn(netutil.NewTLSConnStream(tc), connTypeTCPServer, isLocal, priority)
}
}
diff --git a/lib/model/model.go b/lib/model/model.go
index 2b4a58a618..fa79c12c14 100644
--- a/lib/model/model.go
+++ b/lib/model/model.go
@@ -35,6 +35,7 @@ import (
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/scanner"
@@ -755,11 +756,11 @@ func (m *model) ConnectionStats() map[string]interface{} {
res["connections"] = conns
- in, out := protocol.TotalInOut()
+ cnt := netutil.RootCounter()
res["total"] = map[string]interface{}{
"at": time.Now().Truncate(time.Second),
- "inBytesTotal": in,
- "outBytesTotal": out,
+ "inBytesTotal": cnt.BytesRead(),
+ "outBytesTotal": cnt.BytesWritten(),
}
return res
@@ -2204,9 +2205,10 @@ func (m *model) GetHello(id protocol.DeviceID) protocol.HelloIntf {
}
}
return &protocol.Hello{
- DeviceName: name,
- ClientName: m.clientName,
- ClientVersion: m.clientVersion,
+ DeviceName: name,
+ ClientName: m.clientName,
+ ClientVersion: m.clientVersion,
+ SupportsMultipleQUICStreams: true, // we support multiple streams over QUIC
}
}
diff --git a/lib/model/model_test.go b/lib/model/model_test.go
index 9e04130d40..54af136a3b 100644
--- a/lib/model/model_test.go
+++ b/lib/model/model_test.go
@@ -31,6 +31,7 @@ import (
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
+ "github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
protocolmocks "github.com/syncthing/syncthing/lib/protocol/mocks"
@@ -3186,7 +3187,8 @@ func TestConnCloseOnRestart(t *testing.T) {
br := &testutils.BlockingRW{}
nw := &testutils.NoopRW{}
- m.AddConnection(protocol.NewConnection(device1, br, nw, testutils.NoopCloser{}, m, new(protocolmocks.ConnectionInfo), protocol.CompressionNever, nil, m.keyGen), protocol.Hello{})
+ strm := netutil.NewRWStream(br, nw)
+ m.AddConnection(protocol.NewConnection(device1, strm, m, new(protocolmocks.ConnectionInfo), protocol.CompressionNever, nil, m.keyGen), protocol.Hello{})
m.pmut.RLock()
if len(m.closed) != 1 {
t.Fatalf("Expected just one conn (len(m.conn) == %v)", len(m.conn))
diff --git a/lib/netutil/counting.go b/lib/netutil/counting.go
new file mode 100644
index 0000000000..5511376ca0
--- /dev/null
+++ b/lib/netutil/counting.go
@@ -0,0 +1,166 @@
+// Copyright (C) 2023 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package netutil
+
+import (
+ "context"
+ "io"
+ "sync/atomic"
+ "time"
+)
+
+type Counted interface {
+ BytesRead() int64
+ LastRead() time.Time
+ BytesWritten() int64
+ LastWrite() time.Time
+}
+
+var rootCounter Counter
+
+func RootCounter() Counted {
+ return &rootCounter
+}
+
+type CountedStream interface {
+ Counted
+ Stream
+}
+
+type Counter struct {
+ readBytes atomic.Int64
+ lastRead atomic.Int64
+ writeBytes atomic.Int64
+ lastWrite atomic.Int64
+ parent *Counter
+}
+
+func NewCounter() *Counter {
+ return newCounterWithParent(&rootCounter)
+}
+
+func newCounterWithParent(parent *Counter) *Counter {
+ return &Counter{
+ parent: parent,
+ }
+}
+
+func (c *Counter) BytesRead() int64 {
+ return c.readBytes.Load()
+}
+
+func (c *Counter) BytesWritten() int64 {
+ return c.writeBytes.Load()
+}
+
+func (c *Counter) LastRead() time.Time {
+ return time.Unix(0, c.lastRead.Load())
+}
+
+func (c *Counter) LastWrite() time.Time {
+ return time.Unix(0, c.lastWrite.Load())
+}
+
+func (c *Counter) addRead(n int) {
+ c.readBytes.Add(int64(n))
+ c.lastRead.Store(time.Now().UnixNano())
+ if c.parent != nil {
+ c.parent.addRead(n)
+ }
+}
+
+func (c *Counter) addWrite(n int) {
+ c.writeBytes.Add(int64(n))
+ c.lastWrite.Store(time.Now().UnixNano())
+ if c.parent != nil {
+ c.parent.addWr