summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntony Male <antony.male@gmail.com>2016-06-27 21:18:58 +0000
committerJakob Borg <jakob@nym.se>2016-06-27 21:18:58 +0000
commit7ef2743964a3ab399810912b3aa9fa4c018239a7 (patch)
tree15b0413732c0100a8c8a3f25382076d63758797d
parenta165838cbddcc1445619202fa4d01f7b9b996e56 (diff)
lib/events: Introduce per-subscription event IDs (fixes #3335)
Events API consumers rely on being able to detect that events were skipped by the fact that the event ID has increased by more than 1. This is documented, and is absolutely necessary when trying to maintain a local model of Syncthing's state. With the introduction of LocalChangeDetected, which is not exposed to the Events API, this contract was broken. This commit introduces separate concepts of a "Global ID" and a "Subscription ID". The Global ID of an event is unique across all subscriptions. The Subscription ID is local to a particular subscription, and always increments by 1. They are both exposed over the Events API, but the Subscription ID uses the key "id" for backwards compatibility, and the "?since=xx" parameter refers to the Subscription ID (making the Global ID for information only). GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3351 LGTM: calmh
-rw-r--r--cmd/syncthing/main.go2
-rw-r--r--lib/events/events.go52
-rw-r--r--lib/events/events_test.go83
3 files changed, 108 insertions, 29 deletions
diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go
index 40152558b7..794fa23990 100644
--- a/cmd/syncthing/main.go
+++ b/cmd/syncthing/main.go
@@ -539,7 +539,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0)
systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog)
- // Event subscription for the API; must start early to catch the early events. The LocalDiskUpdated
+ // Event subscription for the API; must start early to catch the early events. The LocalChangeDetected
// event might overwhelm the event reciever in some situations so we will not subscribe to it here.
apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected), 1000)
diff --git a/lib/events/events.go b/lib/events/events.go
index 5d44bb4306..f0187b9b4d 100644
--- a/lib/events/events.go
+++ b/lib/events/events.go
@@ -111,16 +111,20 @@ func (t EventType) MarshalText() ([]byte, error) {
const BufferSize = 64
type Logger struct {
- subs []*Subscription
- nextID int
- mutex sync.Mutex
+ subs []*Subscription
+ nextSubscriptionIDs []int
+ nextGlobalID int
+ mutex sync.Mutex
}
type Event struct {
- ID int `json:"id"`
- Time time.Time `json:"time"`
- Type EventType `json:"type"`
- Data interface{} `json:"data"`
+ // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
+ SubscriptionID int `json:"id"`
+ // Global ID of the event across all subscriptions
+ GlobalID int `json:"globalID"`
+ Time time.Time `json:"time"`
+ Type EventType `json:"type"`
+ Data interface{} `json:"data"`
}
type Subscription struct {
@@ -144,16 +148,21 @@ func NewLogger() *Logger {
func (l *Logger) Log(t EventType, data interface{}) {
l.mutex.Lock()
- dl.Debugln("log", l.nextID, t, data)
- l.nextID++
+ dl.Debugln("log", l.nextGlobalID, t, data)
+ l.nextGlobalID++
+
e := Event{
- ID: l.nextID,
- Time: time.Now(),
- Type: t,
- Data: data,
+ GlobalID: l.nextGlobalID,
+ Time: time.Now(),
+ Type: t,
+ Data: data,
}
- for _, s := range l.subs {
+
+ for i, s := range l.subs {
if s.mask&t != 0 {
+ e.SubscriptionID = l.nextSubscriptionIDs[i]
+ l.nextSubscriptionIDs[i]++
+
select {
case s.events <- e:
default:
@@ -182,6 +191,7 @@ func (l *Logger) Subscribe(mask EventType) *Subscription {
}
l.subs = append(l.subs, s)
+ l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
l.mutex.Unlock()
return s
}
@@ -192,9 +202,15 @@ func (l *Logger) Unsubscribe(s *Subscription) {
for i, ss := range l.subs {
if s == ss {
last := len(l.subs) - 1
+
l.subs[i] = l.subs[last]
l.subs[last] = nil
l.subs = l.subs[:last]
+
+ l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
+ l.nextSubscriptionIDs[last] = 0
+ l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
+
break
}
}
@@ -234,7 +250,7 @@ type bufferedSubscription struct {
sub *Subscription
buf []Event
next int
- cur int
+ cur int // Current SubscriptionID
mut sync.Mutex
cond *stdsync.Cond
}
@@ -270,7 +286,7 @@ func (s *bufferedSubscription) pollingLoop() {
s.mut.Lock()
s.buf[s.next] = ev
s.next = (s.next + 1) % len(s.buf)
- s.cur = ev.ID
+ s.cur = ev.SubscriptionID
s.cond.Broadcast()
s.mut.Unlock()
}
@@ -285,12 +301,12 @@ func (s *bufferedSubscription) Since(id int, into []Event) []Event {
}
for i := s.next; i < len(s.buf); i++ {
- if s.buf[i].ID > id {
+ if s.buf[i].SubscriptionID > id {
into = append(into, s.buf[i])
}
}
for i := 0; i < s.next; i++ {
- if s.buf[i].ID > id {
+ if s.buf[i].SubscriptionID > id {
into = append(into, s.buf[i])
}
}
diff --git a/lib/events/events_test.go b/lib/events/events_test.go
index e67e0df5ef..deac7d91b2 100644
--- a/lib/events/events_test.go
+++ b/lib/events/events_test.go
@@ -128,7 +128,7 @@ func TestUnsubscribe(t *testing.T) {
}
}
-func TestIDs(t *testing.T) {
+func TestGlobalIDs(t *testing.T) {
l := events.NewLogger()
s := l.Subscribe(events.AllEvents)
@@ -144,7 +144,7 @@ func TestIDs(t *testing.T) {
if ev.Data.(string) != "foo" {
t.Fatal("Incorrect event:", ev)
}
- id := ev.ID
+ id := ev.GlobalID
ev, err = s.Poll(timeout)
if err != nil {
@@ -153,8 +153,48 @@ func TestIDs(t *testing.T) {
if ev.Data.(string) != "bar" {
t.Fatal("Incorrect event:", ev)
}
- if ev.ID != id+1 {
- t.Fatalf("ID not incremented (%d != %d)", ev.ID, id+1)
+ if ev.GlobalID != id+1 {
+ t.Fatalf("ID not incremented (%d != %d)", ev.GlobalID, id+1)
+ }
+}
+
+func TestSubscriptionIDs(t *testing.T) {
+ l := events.NewLogger()
+
+ s := l.Subscribe(events.DeviceConnected)
+ defer l.Unsubscribe(s)
+
+ l.Log(events.DeviceDisconnected, "a")
+ l.Log(events.DeviceConnected, "b")
+ l.Log(events.DeviceConnected, "c")
+ l.Log(events.DeviceDisconnected, "d")
+
+ ev, err := s.Poll(timeout)
+ if err != nil {
+ t.Fatal("Unexpected error:", err)
+ }
+
+ if ev.GlobalID != 2 {
+ t.Fatal("Incorrect GlobalID:", ev.GlobalID)
+ }
+ if ev.SubscriptionID != 1 {
+ t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
+ }
+
+ ev, err = s.Poll(timeout)
+ if err != nil {
+ t.Fatal("Unexpected error:", err)
+ }
+ if ev.GlobalID != 3 {
+ t.Fatal("Incorrect GlobalID:", ev.GlobalID)
+ }
+ if ev.SubscriptionID != 2 {
+ t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
+ }
+
+ ev, err = s.Poll(timeout)
+ if err != events.ErrTimeout {
+ t.Fatal("Unexpected error:", err)
}
}
@@ -179,10 +219,10 @@ func TestBufferedSub(t *testing.T) {
for recv < 10*events.BufferSize {
evs := bs.Since(recv, nil)
for _, ev := range evs {
- if ev.ID != recv+1 {
- t.Fatalf("Incorrect ID; %d != %d", ev.ID, recv+1)
+ if ev.GlobalID != recv+1 {
+ t.Fatalf("Incorrect ID; %d != %d", ev.GlobalID, recv+1)
}
- recv = ev.ID
+ recv = ev.GlobalID
}
}
}
@@ -213,10 +253,10 @@ func BenchmarkBufferedSub(b *testing.B) {
for i := 0; i < b.N; {
evs = bs.Since(recv, evs[:0])
for _, ev := range evs {
- if ev.ID != recv+1 {
- b.Fatal("skipped event", ev.ID, recv)
+ if ev.GlobalID != recv+1 {
+ b.Fatal("skipped event", ev.GlobalID, recv)
}
- recv = ev.ID
+ recv = ev.GlobalID
coord <- struct{}{}
}
i += len(evs)
@@ -237,3 +277,26 @@ func BenchmarkBufferedSub(b *testing.B) {
<-done
b.ReportAllocs()
}
+
+func TestSinceUsesSubscriptionId(t *testing.T) {
+ l := events.NewLogger()
+
+ s := l.Subscribe(events.DeviceConnected)
+ defer l.Unsubscribe(s)
+ bs := events.NewBufferedSubscription(s, 10*events.BufferSize)
+
+ l.Log(events.DeviceConnected, "a") // SubscriptionID = 1
+ l.Log(events.DeviceDisconnected, "b")
+ l.Log(events.DeviceDisconnected, "c")
+ l.Log(events.DeviceConnected, "d") // SubscriptionID = 2
+
+ events := bs.Since(0, nil)
+ if len(events) != 2 {
+ t.Fatal("Incorrect number of events:", len(events))
+ }
+
+ events = bs.Since(1, nil)
+ if len(events) != 1 {
+ t.Fatal("Incorrect number of events:", len(events))
+ }
+}