summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go101
1 files changed, 73 insertions, 28 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
index fc21b84cc1..75fa4274b6 100644
--- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
@@ -47,12 +47,16 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) {
slog.String("discoverer", shortName),
),
cfgSource: cfg.Source,
- interval: time.Second * 60,
ll: &localListenersExec{
binPath: filepath.Join(dir, "local-listeners"),
timeout: time.Second * 5,
},
+ interval: time.Minute * 2,
+ expiryTime: time.Minute * 10,
+ cache: make(map[uint64]*cacheItem),
+ started: make(chan struct{}),
}
+
d.Tags().Merge(tags)
return d, nil
@@ -72,6 +76,15 @@ type (
interval time.Duration
ll localListeners
+
+ expiryTime time.Duration
+ cache map[uint64]*cacheItem // [target.Hash]
+
+ started chan struct{}
+ }
+ cacheItem struct {
+ lastSeenTime time.Time
+ tgt model.Target
}
localListeners interface {
discover(ctx context.Context) ([]byte, error)
@@ -83,25 +96,30 @@ func (d *Discoverer) String() string {
}
func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ close(d.started)
+
if err := d.discoverLocalListeners(ctx, in); err != nil {
d.Error(err)
return
}
- //tk := time.NewTicker(d.interval)
- //defer tk.Stop()
- //
- //for {
- // select {
- // case <-ctx.Done():
- // return
- // case <-tk.C:
- // if err := d.discoverLocalListeners(ctx, in); err != nil {
- // d.Error(err)
- // return
- // }
- // }
- //}
+ tk := time.NewTicker(d.interval)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ if err := d.discoverLocalListeners(ctx, in); err != nil {
+ d.Warning(err)
+ return
+ }
+ }
+ }
}
func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error {
@@ -113,11 +131,13 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod
return err
}
- tggs, err := d.parseLocalListeners(bs)
+ tgts, err := d.parseLocalListeners(bs)
if err != nil {
return err
}
+ tggs := d.processTargets(tgts)
+
select {
case <-ctx.Done():
case in <- tggs:
@@ -126,7 +146,42 @@ func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []mod
return nil
}
-func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error) {
+func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup {
+ tgg := &targetGroup{
+ provider: fullName,
+ source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
+ }
+ if d.cfgSource != "" {
+ tgg.source += fmt.Sprintf(",%s", d.cfgSource)
+ }
+
+ if d.expiryTime.Milliseconds() == 0 {
+ tgg.targets = tgts
+ return []model.TargetGroup{tgg}
+ }
+
+ now := time.Now()
+
+ for _, tgt := range tgts {
+ hash := tgt.Hash()
+ if _, ok := d.cache[hash]; !ok {
+ d.cache[hash] = &cacheItem{tgt: tgt}
+ }
+ d.cache[hash].lastSeenTime = now
+ }
+
+ for k, v := range d.cache {
+ if now.Sub(v.lastSeenTime) > d.expiryTime {
+ delete(d.cache, k)
+ continue
+ }
+ tgg.targets = append(tgg.targets, v.tgt)
+ }
+
+ return []model.TargetGroup{tgg}
+}
+
+func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) {
var tgts []model.Target
sc := bufio.NewScanner(bytes.NewReader(bs))
@@ -161,17 +216,7 @@ func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.TargetGroup, error)
tgts = append(tgts, &tgt)
}
- tgg := &targetGroup{
- provider: fullName,
- source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
- targets: tgts,
- }
-
- if d.cfgSource != "" {
- tgg.source += fmt.Sprintf(",%s", d.cfgSource)
- }
-
- return []model.TargetGroup{tgg}, nil
+ return tgts, nil
}
type localListenersExec struct {