diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go | 161 |
1 files changed, 126 insertions, 35 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go index 201b2cad7d..f13d01c691 100644 --- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go @@ -4,6 +4,10 @@ package netlisteners import ( "context" + "errors" + "sort" + "strings" + "sync" "testing" "time" @@ -13,63 +17,150 @@ import ( "github.com/stretchr/testify/require" ) +type listenersCli interface { + addListener(s string) + removeListener(s string) +} + type discoverySim struct { - mock *mockLocalListenersExec - wantDoneBeforeCancel bool - wantTargetGroups []model.TargetGroup + listenersCli func(cli listenersCli, interval, expiry time.Duration) + wantGroups []model.TargetGroup } func (sim *discoverySim) run(t *testing.T) { - d, err := NewDiscoverer(Config{Tags: "hostnetsocket"}) + d, err := NewDiscoverer(Config{ + Source: "", + Tags: "netlisteners", + }) require.NoError(t, err) - d.ll = sim.mock + mock := newMockLocalListenersExec() + + d.ll = mock + + d.interval = time.Millisecond * 100 + d.expiryTime = time.Second * 1 + seen := make(map[string]model.TargetGroup) ctx, cancel := context.WithCancel(context.Background()) - tggs, done := sim.collectTargetGroups(t, ctx, d) + in := make(chan []model.TargetGroup) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + d.Discover(ctx, in) + }() - if sim.wantDoneBeforeCancel { - select { - case <-done: - default: - assert.Fail(t, "discovery hasn't finished before cancel") + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case tggs := <-in: + for _, tgg := range tggs { + seen[tgg.Source()] = tgg + } + } } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-d.started: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery failed to start") } - assert.Equal(t, sim.wantTargetGroups, tggs) + + sim.listenersCli(mock, d.interval, d.expiryTime) cancel() + select { case <-done: case <-time.After(time.Second * 3): - assert.Fail(t, "discovery hasn't finished after cancel") + require.Fail(t, "discovery hasn't finished after cancel") + } + + var tggs []model.TargetGroup + for _, tgg := range seen { + tggs = append(tggs, tgg) + } + + sortTargetGroups(tggs) + sortTargetGroups(sim.wantGroups) + + wantLen, gotLen := calcTargets(sim.wantGroups), calcTargets(tggs) + assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen) + assert.Equal(t, sim.wantGroups, tggs) +} + +func newMockLocalListenersExec() *mockLocalListenersExec { + return &mockLocalListenersExec{ + listeners: make(map[string]bool), } } -func (sim *discoverySim) collectTargetGroups(t *testing.T, ctx context.Context, d *Discoverer) ([]model.TargetGroup, chan struct{}) { +type mockLocalListenersExec struct { + errResponse bool + mux sync.Mutex + listeners map[string]bool +} - in := make(chan []model.TargetGroup) - done := make(chan struct{}) +func (m *mockLocalListenersExec) addListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() - go func() { defer close(done); d.Discover(ctx, in) }() + m.listeners[s] = true +} - timeout := time.Second * 5 - var tggs []model.TargetGroup +func (m *mockLocalListenersExec) removeListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() - func() { - for { - select { - case groups := <-in: - if tggs = append(tggs, groups...); len(tggs) == len(sim.wantTargetGroups) { - return - } - case <-done: - return - case <-time.After(timeout): - t.Logf("discovery timed out after %s", timeout) - return - } - } - }() + delete(m.listeners, s) +} + +func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) { + if m.errResponse { + return nil, errors.New("mock discover() error") + } + + m.mux.Lock() + defer m.mux.Unlock() + + var buf strings.Builder + for s := range m.listeners { + buf.WriteString(s) + buf.WriteByte('\n') + } + + return []byte(buf.String()), nil +} + +func calcTargets(tggs []model.TargetGroup) int { + var n int + for _, tgg := range tggs { + n += len(tgg.Targets()) + } + return n +} - return tggs, done +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) + + for idx := range tggs { + tgts := tggs[idx].Targets() + sort.Slice(tgts, func(i, j int) bool { return tgts[i].Hash() < tgts[j].Hash() }) + } } |