diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go | 101 |
1 files changed, 73 insertions, 28 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go index fc21b84cc1..75fa4274b6 100644 --- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go @@ -47,12 +47,16 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) { slog.String("discoverer", shortName), ), cfgSource: cfg.Source, - interval: time.Second * 60, ll: &localListenersExec{ binPath: filepath.Join(dir, "local-listeners"), timeout: time.Second * 5, }, + interval: time.Minute * 2, + expiryTime: time.Minute * 10, + cache: make(map[uint64]*cacheItem), + started: make(chan struct{}), } + d.Tags().Merge(tags) return d, nil @@ -72,6 +76,15 @@ type ( interval time.Duration ll localListeners + + expiryTime time.Duration + cache map[uint64]*cacheItem // [target.Hash] + + started chan struct{} + } + cacheItem struct { + lastSeenTime time.Time + tgt model.Target } localListeners interface { discover(ctx context.Context) ([]byte, error) @@ -83,25 +96,30 @@ func (d *Discoverer) String() string { } func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + close(d.started) + if err := d.discoverLocalListeners(ctx, in); err != nil { d.Error(err) return } - //tk := time.NewTicker(d.interval) - //defer tk.Stop() - // - //for { - // select { - // case <-ctx.Done(): - // return - // case <-tk.C: - // if err := d.discoverLocalListeners(ctx, in); err != nil { - // d.Error(err) - // return - // } - // } - //} + tk := time.NewTicker(d.interval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Warning(err) + return + } + } + } } func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error { @@ -113,11 +131,13 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod return err } - tggs, err := d.parseLocalListeners(bs) + tgts, err := d.parseLocalListeners(bs) if err != nil { return err } + tggs := d.processTargets(tgts) + select { case <-ctx.Done(): case in <- tggs: @@ -126,7 +146,42 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod return nil } -func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error) { +func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup { + tgg := &targetGroup{ + provider: fullName, + source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + if d.expiryTime.Milliseconds() == 0 { + tgg.targets = tgts + return []model.TargetGroup{tgg} + } + + now := time.Now() + + for _, tgt := range tgts { + hash := tgt.Hash() + if _, ok := d.cache[hash]; !ok { + d.cache[hash] = &cacheItem{tgt: tgt} + } + d.cache[hash].lastSeenTime = now + } + + for k, v := range d.cache { + if now.Sub(v.lastSeenTime) > d.expiryTime { + delete(d.cache, k) + continue + } + tgg.targets = append(tgg.targets, v.tgt) + } + + return []model.TargetGroup{tgg} +} + +func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) { var tgts []model.Target sc := bufio.NewScanner(bytes.NewReader(bs)) @@ -161,17 +216,7 @@ func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error) tgts = append(tgts, &tgt) } - tgg := &targetGroup{ - provider: fullName, - source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), - targets: tgts, - } - - if d.cfgSource != "" { - tgg.source += fmt.Sprintf(",%s", d.cfgSource) - } - - return []model.TargetGroup{tgg}, nil + return tgts, nil } type localListenersExec struct { |