summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go161
1 files changed, 126 insertions, 35 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
index 201b2cad7d..f13d01c691 100644
--- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
@@ -4,6 +4,10 @@ package netlisteners
import (
"context"
+ "errors"
+ "sort"
+ "strings"
+ "sync"
"testing"
"time"
@@ -13,63 +17,150 @@ import (
"github.com/stretchr/testify/require"
)
+type listenersCli interface {
+ addListener(s string)
+ removeListener(s string)
+}
+
type discoverySim struct {
- mock *mockLocalListenersExec
- wantDoneBeforeCancel bool
- wantTargetGroups []model.TargetGroup
+ listenersCli func(cli listenersCli, interval, expiry time.Duration)
+ wantGroups []model.TargetGroup
}
func (sim *discoverySim) run(t *testing.T) {
- d, err := NewDiscoverer(Config{Tags: "hostnetsocket"})
+ d, err := NewDiscoverer(Config{
+ Source: "",
+ Tags: "netlisteners",
+ })
require.NoError(t, err)
- d.ll = sim.mock
+ mock := newMockLocalListenersExec()
+
+ d.ll = mock
+
+ d.interval = time.Millisecond * 100
+ d.expiryTime = time.Second * 1
+ seen := make(map[string]model.TargetGroup)
ctx, cancel := context.WithCancel(context.Background())
- tggs, done := sim.collectTargetGroups(t, ctx, d)
+ in := make(chan []model.TargetGroup)
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ d.Discover(ctx, in)
+ }()
- if sim.wantDoneBeforeCancel {
- select {
- case <-done:
- default:
- assert.Fail(t, "discovery hasn't finished before cancel")
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case tggs := <-in:
+ for _, tgg := range tggs {
+ seen[tgg.Source()] = tgg
+ }
+ }
}
+ }()
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ wg.Wait()
+ }()
+
+ select {
+ case <-d.started:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery failed to start")
}
- assert.Equal(t, sim.wantTargetGroups, tggs)
+
+ sim.listenersCli(mock, d.interval, d.expiryTime)
cancel()
+
select {
case <-done:
case <-time.After(time.Second * 3):
- assert.Fail(t, "discovery hasn't finished after cancel")
+ require.Fail(t, "discovery hasn't finished after cancel")
+ }
+
+ var tggs []model.TargetGroup
+ for _, tgg := range seen {
+ tggs = append(tggs, tgg)
+ }
+
+ sortTargetGroups(tggs)
+ sortTargetGroups(sim.wantGroups)
+
+ wantLen, gotLen := calcTargets(sim.wantGroups), calcTargets(tggs)
+ assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen)
+ assert.Equal(t, sim.wantGroups, tggs)
+}
+
+func newMockLocalListenersExec() *mockLocalListenersExec {
+ return &mockLocalListenersExec{
+ listeners: make(map[string]bool),
}
}
-func (sim *discoverySim) collectTargetGroups(t *testing.T, ctx context.Context, d *Discoverer) ([]model.TargetGroup, chan struct{}) {
+type mockLocalListenersExec struct {
+ errResponse bool
+ mux sync.Mutex
+ listeners map[string]bool
+}
- in := make(chan []model.TargetGroup)
- done := make(chan struct{})
+func (m *mockLocalListenersExec) addListener(s string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
- go func() { defer close(done); d.Discover(ctx, in) }()
+ m.listeners[s] = true
+}
- timeout := time.Second * 5
- var tggs []model.TargetGroup
+func (m *mockLocalListenersExec) removeListener(s string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
- func() {
- for {
- select {
- case groups := <-in:
- if tggs = append(tggs, groups...); len(tggs) == len(sim.wantTargetGroups) {
- return
- }
- case <-done:
- return
- case <-time.After(timeout):
- t.Logf("discovery timed out after %s", timeout)
- return
- }
- }
- }()
+ delete(m.listeners, s)
+}
+
+func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) {
+ if m.errResponse {
+ return nil, errors.New("mock discover() error")
+ }
+
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ var buf strings.Builder
+ for s := range m.listeners {
+ buf.WriteString(s)
+ buf.WriteByte('\n')
+ }
+
+ return []byte(buf.String()), nil
+}
+
+func calcTargets(tggs []model.TargetGroup) int {
+ var n int
+ for _, tgg := range tggs {
+ n += len(tgg.Targets())
+ }
+ return n
+}
- return tggs, done
+func sortTargetGroups(tggs []model.TargetGroup) {
+ if len(tggs) == 0 {
+ return
+ }
+ sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() })
+
+ for idx := range tggs {
+ tgts := tggs[idx].Targets()
+ sort.Slice(tgts, func(i, j int) bool { return tgts[i].Hash() < tgts[j].Hash() })
+ }
}