summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlya Mashchenko <ilya@netdata.cloud>2024-03-14 16:47:50 +0200
committerGitHub <noreply@github.com>2024-03-14 16:47:50 +0200
commitddb61900a313b1c17be6a039b939be0ae720c7fc (patch)
tree4880763e8ff9fbac228cb7942f6034bfa76da393
parent9a07f50ec8373b2ff1b9506e7768e2a36aa5259a (diff)
go.d.plugin: execute local-listeners periodically (#17160)
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go101
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go132
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go161
3 files changed, 283 insertions, 111 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
index fc21b84cc1..75fa4274b6 100644
--- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
@@ -47,12 +47,16 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) {
slog.String("discoverer", shortName),
),
cfgSource: cfg.Source,
- interval: time.Second * 60,
ll: &localListenersExec{
binPath: filepath.Join(dir, "local-listeners"),
timeout: time.Second * 5,
},
+ interval: time.Minute * 2,
+ expiryTime: time.Minute * 10,
+ cache: make(map[uint64]*cacheItem),
+ started: make(chan struct{}),
}
+
d.Tags().Merge(tags)
return d, nil
@@ -72,6 +76,15 @@ type (
interval time.Duration
ll localListeners
+
+ expiryTime time.Duration
+ cache map[uint64]*cacheItem // [target.Hash]
+
+ started chan struct{}
+ }
+ cacheItem struct {
+ lastSeenTime time.Time
+ tgt model.Target
}
localListeners interface {
discover(ctx context.Context) ([]byte, error)
@@ -83,25 +96,30 @@ func (d *Discoverer) String() string {
}
func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ close(d.started)
+
if err := d.discoverLocalListeners(ctx, in); err != nil {
d.Error(err)
return
}
- //tk := time.NewTicker(d.interval)
- //defer tk.Stop()
- //
- //for {
- // select {
- // case <-ctx.Done():
- // return
- // case <-tk.C:
- // if err := d.discoverLocalListeners(ctx, in); err != nil {
- // d.Error(err)
- // return
- // }
- // }
- //}
+ tk := time.NewTicker(d.interval)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ if err := d.discoverLocalListeners(ctx, in); err != nil {
+ d.Warning(err)
+ return
+ }
+ }
+ }
}
func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error {
@@ -113,11 +131,13 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod
return err
}
- tggs, err := d.parseLocalListeners(bs)
+ tgts, err := d.parseLocalListeners(bs)
if err != nil {
return err
}
+ tggs := d.processTargets(tgts)
+
select {
case <-ctx.Done():
case in <- tggs:
@@ -126,7 +146,42 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod
return nil
}
-func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error) {
+func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup {
+ tgg := &targetGroup{
+ provider: fullName,
+ source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
+ }
+ if d.cfgSource != "" {
+ tgg.source += fmt.Sprintf(",%s", d.cfgSource)
+ }
+
+ if d.expiryTime.Milliseconds() == 0 {
+ tgg.targets = tgts
+ return []model.TargetGroup{tgg}
+ }
+
+ now := time.Now()
+
+ for _, tgt := range tgts {
+ hash := tgt.Hash()
+ if _, ok := d.cache[hash]; !ok {
+ d.cache[hash] = &cacheItem{tgt: tgt}
+ }
+ d.cache[hash].lastSeenTime = now
+ }
+
+ for k, v := range d.cache {
+ if now.Sub(v.lastSeenTime) > d.expiryTime {
+ delete(d.cache, k)
+ continue
+ }
+ tgg.targets = append(tgg.targets, v.tgt)
+ }
+
+ return []model.TargetGroup{tgg}
+}
+
+func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) {
var tgts []model.Target
sc := bufio.NewScanner(bytes.NewReader(bs))
@@ -161,17 +216,7 @@ func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error)
tgts = append(tgts, &tgt)
}
- tgg := &targetGroup{
- provider: fullName,
- source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
- targets: tgts,
- }
-
- if d.cfgSource != "" {
- tgg.source += fmt.Sprintf(",%s", d.cfgSource)
- }
-
- return []model.TargetGroup{tgg}, nil
+ return tgts, nil
}
type localListenersExec struct {
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go
index 1a14b48d39..b724517a09 100644
--- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go
@@ -3,28 +3,23 @@
package netlisteners
import (
- "context"
- "errors"
"testing"
+ "time"
"github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
)
-var (
- localListenersOutputSample = []byte(`
-UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D
-TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D
-TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D
-UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1
-`)
-)
-
func TestDiscoverer_Discover(t *testing.T) {
tests := map[string]discoverySim{
- "valid response": {
- mock: &mockLocalListenersExec{},
- wantDoneBeforeCancel: false,
- wantTargetGroups: []model.TargetGroup{&targetGroup{
+ "add listeners": {
+ listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
+ cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(interval * 2)
+ },
+ wantGroups: []model.TargetGroup{&targetGroup{
provider: "sd:net_listeners",
source: "discoverer=net_listeners,host=localhost",
targets: []model.Target{
@@ -59,23 +54,83 @@ func TestDiscoverer_Discover(t *testing.T) {
},
}},
},
- "empty response": {
- mock: &mockLocalListenersExec{emptyResponse: true},
- wantDoneBeforeCancel: false,
- wantTargetGroups: []model.TargetGroup{&targetGroup{
+ "remove listeners; not expired": {
+ listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
+ cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(interval * 2)
+ cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(interval * 2)
+ },
+ wantGroups: []model.TargetGroup{&targetGroup{
provider: "sd:net_listeners",
source: "discoverer=net_listeners,host=localhost",
+ targets: []model.Target{
+ withHash(&target{
+ Protocol: "UDP6",
+ Address: "::1",
+ Port: "8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ withHash(&target{
+ Protocol: "TCP6",
+ Address: "::1",
+ Port: "8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ withHash(&target{
+ Protocol: "TCP",
+ Address: "127.0.0.1",
+ Port: "8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ withHash(&target{
+ Protocol: "UDP",
+ Address: "127.0.0.1",
+ Port: "53768",
+ Comm: "go.d.plugin",
+ Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1",
+ }),
+ },
}},
},
- "error on exec": {
- mock: &mockLocalListenersExec{err: true},
- wantDoneBeforeCancel: true,
- wantTargetGroups: nil,
- },
- "invalid data": {
- mock: &mockLocalListenersExec{invalidResponse: true},
- wantDoneBeforeCancel: true,
- wantTargetGroups: nil,
+ "remove listeners; expired": {
+ listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
+ cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(interval * 2)
+ cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(expiry * 2)
+ },
+ wantGroups: []model.TargetGroup{&targetGroup{
+ provider: "sd:net_listeners",
+ source: "discoverer=net_listeners,host=localhost",
+ targets: []model.Target{
+ withHash(&target{
+ Protocol: "TCP6",
+ Address: "::1",
+ Port: "8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ withHash(&target{
+ Protocol: "TCP",
+ Address: "127.0.0.1",
+ Port: "8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ },
+ }},
},
}
@@ -88,26 +143,7 @@ func TestDiscoverer_Discover(t *testing.T) {
func withHash(l *target) *target {
l.hash, _ = calcHash(l)
- tags, _ := model.ParseTags("hostnetsocket")
+ tags, _ := model.ParseTags("netlisteners")
l.Tags().Merge(tags)
return l
}
-
-type mockLocalListenersExec struct {
- err bool
- emptyResponse bool
- invalidResponse bool
-}
-
-func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) {
- if m.err {
- return nil, errors.New("mock discover() error")
- }
- if m.emptyResponse {
- return nil, nil
- }
- if m.invalidResponse {
- return []byte("this is very incorrect data"), nil
- }
- return localListenersOutputSample, nil
-}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
index 201b2cad7d..f13d01c691 100644
--- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
@@ -4,6 +4,10 @@ package netlisteners
import (
"context"
+ "errors"
+ "sort"
+ "strings"
+ "sync"
"testing"
"time"
@@ -13,63 +17,150 @@ import (
"github.com/stretchr/testify/require"
)
+type listenersCli interface {
+ addListener(s string)
+ removeListener(s string)
+}
+
type discoverySim struct {
- mock *mockLocalListenersExec
- wantDoneBeforeCancel bool
- wantTargetGroups []model.TargetGroup
+ listenersCli func(cli listenersCli, interval, expiry time.Duration)
+ wantGroups []model.TargetGroup
}
func (sim *discoverySim) run(t *testing.T) {
- d, err := NewDiscoverer(Config{Tags: "hostnetsocket"})
+ d, err := NewDiscoverer(Config{
+ Source: "",
+ Tags: "netlisteners",
+ })
require.NoError(t, err)
- d.ll = sim.mock
+ mock := newMockLocalListenersExec()
+
+ d.ll = mock
+
+ d.interval = time.Millisecond * 100
+ d.expiryTime = time.Second * 1
+ seen := make(map[string]model.TargetGroup)
ctx, cancel := context.WithCancel(context.Background())
- tggs, done := sim.collectTargetGroups(t, ctx, d)
+ in := make(chan []model.TargetGroup)
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ d.Discover(ctx, in)
+ }()
- if sim.wantDoneBeforeCancel {
- select {
- case <-done:
- default:
- assert.Fail(t, "discovery hasn't finished before cancel")
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case tggs := <-in:
+ for _, tgg := range tggs {
+ seen[tgg.Source()] = tgg
+ }
+ }
}
+ }()
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ wg.Wait()
+ }()
+
+ select {
+ case <-d.started:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery failed to start")
}
- assert.Equal(t, sim.wantTargetGroups, tggs)
+
+ sim.listenersCli(mock, d.interval, d.expiryTime)
cancel()
+
select {
case <-done:
case <-time.After(time.Second * 3):
- assert.Fail(t, "discovery hasn't finished after cancel")
+ require.Fail(t, "discovery hasn't finished after cancel")
+ }
+
+ var tggs []model.TargetGroup
+ for _, tgg := range seen {
+ tggs = append(tggs, tgg)
+ }
+
+ sortTargetGroups(tggs)
+ sortTargetGroups(sim.wantGroups)
+
+ wantLen, gotLen := calcTargets(sim.wantGroups), calcTargets(tggs)
+ assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen)
+ assert.Equal(t, sim.wantGroups, tggs)
+}
+
+func newMockLocalListenersExec() *mockLocalListenersExec {
+ return &mockLocalListenersExec{
+ listeners: make(map[string]bool),
}
}
-func (sim *discoverySim) collectTargetGroups(t *testing.T, ctx context.Context, d *Discoverer) ([]model.TargetGroup, chan struct{}) {
+type mockLocalListenersExec struct {
+ errResponse bool
+ mux sync.Mutex
+ listeners map[string]bool
+}
- in := make(chan []model.TargetGroup)
- done := make(chan struct{})
+func (m *mockLocalListenersExec) addListener(s string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
- go func() { defer close(done); d.Discover(ctx, in) }()
+ m.listeners[s] = true
+}
- timeout := time.Second * 5
- var tggs []model.TargetGroup
+func (m *mockLocalListenersExec) removeListener(s string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
- func() {
- for {
- select {
- case groups := <-in:
- if tggs = append(tggs, groups...); len(tggs) == len(sim.wantTargetGroups) {
- return
- }
- case <-done:
- return
- case <-time.After(timeout):
- t.Logf("discovery timed out after %s", timeout)
- return
- }
- }
- }()
+ delete(m.listeners, s)
+}
+
+func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) {
+ if m.errResponse {
+ return nil, errors.New("mock discover() error")
+ }
+
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ var buf strings.Builder
+ for s := range m.listeners {
+ buf.WriteString(s)
+ buf.WriteByte('\n')
+ }
+
+ return []byte(buf.String()), nil
+}
+
+func calcTargets(tggs []model.TargetGroup) int {
+ var n int
+ for _, tgg := range tggs {
+ n += len(tgg.Targets())
+ }
+ return n
+}
- return tggs, done
+func sortTargetGroups(tggs []model.TargetGroup) {
+ if len(tggs) == 0 {
+ return
+ }
+ sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() })
+
+ for idx := range tggs {
+ tgts := tggs[idx].Targets()
+ sort.Slice(tgts, func(i, j int) bool { return tgts[i].Hash() < tgts[j].Hash() })
+ }
}