diff options
author | Ilya Mashchenko <ilya@netdata.cloud> | 2024-03-14 16:47:50 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-14 16:47:50 +0200 |
commit | ddb61900a313b1c17be6a039b939be0ae720c7fc (patch) | |
tree | 4880763e8ff9fbac228cb7942f6034bfa76da393 /src | |
parent | 9a07f50ec8373b2ff1b9506e7768e2a36aa5259a (diff) |
go.d.plugin: execute local-listeners periodically (#17160)
Diffstat (limited to 'src')
3 files changed, 283 insertions, 111 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go index fc21b84cc1..75fa4274b6 100644 --- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go @@ -47,12 +47,16 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) { slog.String("discoverer", shortName), ), cfgSource: cfg.Source, - interval: time.Second * 60, ll: &localListenersExec{ binPath: filepath.Join(dir, "local-listeners"), timeout: time.Second * 5, }, + interval: time.Minute * 2, + expiryTime: time.Minute * 10, + cache: make(map[uint64]*cacheItem), + started: make(chan struct{}), } + d.Tags().Merge(tags) return d, nil @@ -72,6 +76,15 @@ type ( interval time.Duration ll localListeners + + expiryTime time.Duration + cache map[uint64]*cacheItem // [target.Hash] + + started chan struct{} + } + cacheItem struct { + lastSeenTime time.Time + tgt model.Target } localListeners interface { discover(ctx context.Context) ([]byte, error) @@ -83,25 +96,30 @@ func (d *Discoverer) String() string { } func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + close(d.started) + if err := d.discoverLocalListeners(ctx, in); err != nil { d.Error(err) return } - //tk := time.NewTicker(d.interval) - //defer tk.Stop() - // - //for { - // select { - // case <-ctx.Done(): - // return - // case <-tk.C: - // if err := d.discoverLocalListeners(ctx, in); err != nil { - // d.Error(err) - // return - // } - // } - //} + tk := time.NewTicker(d.interval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Warning(err) + return + } + } + } } func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error { @@ -113,11 +131,13 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod return err } - tggs, err := d.parseLocalListeners(bs) + tgts, err := d.parseLocalListeners(bs) if err != nil { return err } + tggs := d.processTargets(tgts) + select { case <-ctx.Done(): case in <- tggs: @@ -126,7 +146,42 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod return nil } -func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error) { +func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup { + tgg := &targetGroup{ + provider: fullName, + source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + if d.expiryTime.Milliseconds() == 0 { + tgg.targets = tgts + return []model.TargetGroup{tgg} + } + + now := time.Now() + + for _, tgt := range tgts { + hash := tgt.Hash() + if _, ok := d.cache[hash]; !ok { + d.cache[hash] = &cacheItem{tgt: tgt} + } + d.cache[hash].lastSeenTime = now + } + + for k, v := range d.cache { + if now.Sub(v.lastSeenTime) > d.expiryTime { + delete(d.cache, k) + continue + } + tgg.targets = append(tgg.targets, v.tgt) + } + + return []model.TargetGroup{tgg} +} + +func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) { var tgts []model.Target sc := bufio.NewScanner(bytes.NewReader(bs)) @@ -161,17 +216,7 @@ func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error) tgts = append(tgts, &tgt) } - tgg := &targetGroup{ - provider: fullName, - source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), - targets: tgts, - } - - if d.cfgSource != "" { - tgg.source += fmt.Sprintf(",%s", d.cfgSource) - } - - return []model.TargetGroup{tgg}, nil + return tgts, nil } type localListenersExec struct { diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go index 1a14b48d39..b724517a09 100644 --- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go @@ -3,28 +3,23 @@ package netlisteners import ( - "context" - "errors" "testing" + "time" "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" ) -var ( - localListenersOutputSample = []byte(` -UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D -TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D -TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D -UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1 -`) -) - func TestDiscoverer_Discover(t *testing.T) { tests := map[string]discoverySim{ - "valid response": { - mock: &mockLocalListenersExec{}, - wantDoneBeforeCancel: false, - wantTargetGroups: []model.TargetGroup{&targetGroup{ + "add listeners": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ provider: "sd:net_listeners", source: "discoverer=net_listeners,host=localhost", targets: []model.Target{ @@ -59,23 +54,83 @@ func TestDiscoverer_Discover(t *testing.T) { }, }}, }, - "empty response": { - mock: &mockLocalListenersExec{emptyResponse: true}, - wantDoneBeforeCancel: false, - wantTargetGroups: []model.TargetGroup{&targetGroup{ + "remove listeners; not expired": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ provider: "sd:net_listeners", source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "UDP6", + Address: "::1", + Port: "8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "TCP6", + Address: "::1", + Port: "8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "TCP", + Address: "127.0.0.1", + Port: "8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "UDP", + Address: "127.0.0.1", + Port: "53768", + Comm: "go.d.plugin", + Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1", + }), + }, }}, }, - "error on exec": { - mock: &mockLocalListenersExec{err: true}, - wantDoneBeforeCancel: true, - wantTargetGroups: nil, - }, - "invalid data": { - mock: &mockLocalListenersExec{invalidResponse: true}, - wantDoneBeforeCancel: true, - wantTargetGroups: nil, + "remove listeners; expired": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(expiry * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ + provider: "sd:net_listeners", + source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "TCP6", + Address: "::1", + Port: "8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "TCP", + Address: "127.0.0.1", + Port: "8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + }, + }}, }, } @@ -88,26 +143,7 @@ func TestDiscoverer_Discover(t *testing.T) { func withHash(l *target) *target { l.hash, _ = calcHash(l) - tags, _ := model.ParseTags("hostnetsocket") + tags, _ := model.ParseTags("netlisteners") l.Tags().Merge(tags) return l } - -type mockLocalListenersExec struct { - err bool - emptyResponse bool - invalidResponse bool -} - -func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) { - if m.err { - return nil, errors.New("mock discover() error") - } - if m.emptyResponse { - return nil, nil - } - if m.invalidResponse { - return []byte("this is very incorrect data"), nil - } - return localListenersOutputSample, nil -} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go index 201b2cad7d..f13d01c691 100644 --- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go @@ -4,6 +4,10 @@ package netlisteners import ( "context" + "errors" + "sort" + "strings" + "sync" "testing" "time" @@ -13,63 +17,150 @@ import ( "github.com/stretchr/testify/require" ) +type listenersCli interface { + addListener(s string) + removeListener(s string) +} + type discoverySim struct { - mock *mockLocalListenersExec - wantDoneBeforeCancel bool - wantTargetGroups []model.TargetGroup + listenersCli func(cli listenersCli, interval, expiry time.Duration) + wantGroups []model.TargetGroup } func (sim *discoverySim) run(t *testing.T) { - d, err := NewDiscoverer(Config{Tags: "hostnetsocket"}) + d, err := NewDiscoverer(Config{ + Source: "", + Tags: "netlisteners", + }) require.NoError(t, err) - d.ll = sim.mock + mock := newMockLocalListenersExec() + + d.ll = mock + + d.interval = time.Millisecond * 100 + d.expiryTime = time.Second * 1 + seen := make(map[string]model.TargetGroup) ctx, cancel := context.WithCancel(context.Background()) - tggs, done := sim.collectTargetGroups(t, ctx, d) + in := make(chan []model.TargetGroup) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + d.Discover(ctx, in) + }() - if sim.wantDoneBeforeCancel { - select { - case <-done: - default: - assert.Fail(t, "discovery hasn't finished before cancel") + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case tggs := <-in: + for _, tgg := range tggs { + seen[tgg.Source()] = tgg + } + } } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-d.started: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery failed to start") } - assert.Equal(t, sim.wantTargetGroups, tggs) + + sim.listenersCli(mock, d.interval, d.expiryTime) cancel() + select { case <-done: case <-time.After(time.Second * 3): - assert.Fail(t, "discovery hasn't finished after cancel") + require.Fail(t, "discovery hasn't finished after cancel") + } + + var tggs []model.TargetGroup + for _, tgg := range seen { + tggs = append(tggs, tgg) + } + + sortTargetGroups(tggs) + sortTargetGroups(sim.wantGroups) + + wantLen, gotLen := calcTargets(sim.wantGroups), calcTargets(tggs) + assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen) + assert.Equal(t, sim.wantGroups, tggs) +} + +func newMockLocalListenersExec() *mockLocalListenersExec { + return &mockLocalListenersExec{ + listeners: make(map[string]bool), } } -func (sim *discoverySim) collectTargetGroups(t *testing.T, ctx context.Context, d *Discoverer) ([]model.TargetGroup, chan struct{}) { +type mockLocalListenersExec struct { + errResponse bool + mux sync.Mutex + listeners map[string]bool +} - in := make(chan []model.TargetGroup) - done := make(chan struct{}) +func (m *mockLocalListenersExec) addListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() - go func() { defer close(done); d.Discover(ctx, in) }() + m.listeners[s] = true +} - timeout := time.Second * 5 - var tggs []model.TargetGroup +func (m *mockLocalListenersExec) removeListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() - func() { - for { - select { - case groups := <-in: - if tggs = append(tggs, groups...); len(tggs) == len(sim.wantTargetGroups) { - return - } - case <-done: - return - case <-time.After(timeout): - t.Logf("discovery timed out after %s", timeout) - return - } - } - }() + delete(m.listeners, s) +} + +func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) { + if m.errResponse { + return nil, errors.New("mock discover() error") + } + + m.mux.Lock() + defer m.mux.Unlock() + + var buf strings.Builder + for s := range m.listeners { + buf.WriteString(s) + buf.WriteByte('\n') + } + + return []byte(buf.String()), nil +} + +func calcTargets(tggs []model.TargetGroup) int { + var n int + for _, tgg := range tggs { + n += len(tgg.Targets()) + } + return n +} - return tggs, done +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) + + for idx := range tggs { + tgts := tggs[idx].Targets() + sort.Slice(tgts, func(i, j int) bool { return tgts[i].Hash() < tgts[j].Hash() }) + } } |