summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go235
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go161
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go162
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go55
4 files changed, 613 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go
new file mode 100644
index 0000000000..d3ff9f333d
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go
@@ -0,0 +1,235 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "net"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/web"
+
+ "github.com/docker/docker/api/types"
+ typesContainer "github.com/docker/docker/api/types/container"
+ docker "github.com/docker/docker/client"
+ "github.com/ilyam8/hashstructure"
+)
+
+func NewDiscoverer(cfg Config) (*Discoverer, error) {
+ tags, err := model.ParseTags(cfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("parse tags: %v", err)
+ }
+
+ d := &Discoverer{
+ Logger: logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("discoverer", "docker"),
+ ),
+ cfgSource: cfg.Source,
+ newDockerClient: func(addr string) (dockerClient, error) {
+ return docker.NewClientWithOpts(docker.WithHost(addr))
+ },
+ addr: docker.DefaultDockerHost,
+ listInterval: time.Second * 60,
+ timeout: time.Second * 2,
+ seenTggSources: make(map[string]bool),
+ started: make(chan struct{}),
+ }
+
+ d.Tags().Merge(tags)
+
+ if cfg.Timeout.Duration().Seconds() != 0 {
+ d.timeout = cfg.Timeout.Duration()
+ }
+ if cfg.Address != "" {
+ d.addr = cfg.Address
+ }
+
+ return d, nil
+}
+
+type Config struct {
+ Source string
+
+ Tags string `yaml:"tags"`
+ Address string `yaml:"address"`
+ Timeout web.Duration `yaml:"timeout"`
+}
+
+type (
+ Discoverer struct {
+ *logger.Logger
+ model.Base
+
+ dockerClient dockerClient
+ newDockerClient func(addr string) (dockerClient, error)
+ addr string
+
+ cfgSource string
+
+ listInterval time.Duration
+ timeout time.Duration
+ seenTggSources map[string]bool // [targetGroup.Source]
+
+ started chan struct{}
+ }
+ dockerClient interface {
+ NegotiateAPIVersion(context.Context)
+ ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error)
+ Close() error
+ }
+)
+
+func (d *Discoverer) String() string {
+ return "sd:docker"
+}
+
+func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer func() { d.cleanup(); d.Info("instance is stopped") }()
+
+ close(d.started)
+
+ if d.dockerClient == nil {
+ client, err := d.newDockerClient(d.addr)
+ if err != nil {
+ d.Errorf("error on creating docker client: %v", err)
+ return
+ }
+ d.dockerClient = client
+ }
+
+ d.dockerClient.NegotiateAPIVersion(ctx)
+
+ if err := d.listContainers(ctx, in); err != nil {
+ d.Error(err)
+ return
+ }
+
+ tk := time.NewTicker(d.listInterval)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ if err := d.listContainers(ctx, in); err != nil {
+ d.Warning(err)
+ }
+ }
+ }
+}
+
+func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error {
+ listCtx, cancel := context.WithTimeout(ctx, d.timeout)
+ defer cancel()
+
+ containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{})
+ if err != nil {
+ return err
+ }
+
+ var tggs []model.TargetGroup
+ seen := make(map[string]bool)
+
+ for _, cntr := range containers {
+ if tgg := d.buildTargetGroup(cntr); tgg != nil {
+ tggs = append(tggs, tgg)
+ seen[tgg.Source()] = true
+ }
+ }
+
+ for src := range d.seenTggSources {
+ if !seen[src] {
+ tggs = append(tggs, &targetGroup{source: src})
+ }
+ }
+ d.seenTggSources = seen
+
+ select {
+ case <-ctx.Done():
+ case in <- tggs:
+ }
+
+ return nil
+}
+
+func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup {
+ if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 {
+ return nil
+ }
+
+ tgg := &targetGroup{
+ source: cntrSource(cntr),
+ }
+ if d.cfgSource != "" {
+ tgg.source += fmt.Sprintf(",%s", d.cfgSource)
+ }
+
+ for netDriver, network := range cntr.NetworkSettings.Networks {
+ // container with network mode host will be discovered by local-listeners
+ for _, port := range cntr.Ports {
+ tgt := &target{
+ ID: cntr.ID,
+ Name: strings.TrimPrefix(cntr.Names[0], "/"),
+ Image: cntr.Image,
+ Command: cntr.Command,
+ Labels: mapAny(cntr.Labels),
+ PrivatePort: strconv.Itoa(int(port.PrivatePort)),
+ PublicPort: strconv.Itoa(int(port.PublicPort)),
+ PublicPortIP: port.IP,
+ PortProtocol: port.Type,
+ NetworkMode: cntr.HostConfig.NetworkMode,
+ NetworkDriver: netDriver,
+ IPAddress: network.IPAddress,
+ }
+ tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort)
+
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+
+ tgt.hash = hash
+ tgt.Tags().Merge(d.Tags())
+
+ tgg.targets = append(tgg.targets, tgt)
+ }
+ }
+
+ return tgg
+}
+
+func (d *Discoverer) cleanup() {
+ if d.dockerClient != nil {
+ _ = d.dockerClient.Close()
+ }
+}
+
+func cntrSource(cntr types.Container) string {
+ name := strings.TrimPrefix(cntr.Names[0], "/")
+ return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image)
+}
+
+func calcHash(obj any) (uint64, error) {
+ return hashstructure.Hash(obj, nil)
+}
+
+func mapAny(src map[string]string) map[string]any {
+ if src == nil {
+ return nil
+ }
+ m := make(map[string]any, len(src))
+ for k, v := range src {
+ m[k] = v
+ }
+ return m
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go
new file mode 100644
index 0000000000..14ad5f9200
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go
@@ -0,0 +1,161 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/docker/docker/api/types"
+ typesNetwork "github.com/docker/docker/api/types/network"
+)
+
+func TestDiscoverer_Discover(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *discoverySim
+ }{
+ "add containers": {
+ createSim: func() *discoverySim {
+ nginx1 := prepareNginxContainer("nginx1")
+ nginx2 := prepareNginxContainer("nginx2")
+
+ sim := &discoverySim{
+ dockerCli: func(cli dockerCli, _ time.Duration) {
+ cli.addContainer(nginx1)
+ cli.addContainer(nginx2)
+ },
+ wantGroups: []model.TargetGroup{
+ &targetGroup{
+ source: cntrSource(nginx1),
+ targets: []model.Target{
+ withHash(&target{
+ ID: nginx1.ID,
+ Name: nginx1.Names[0][1:],
+ Image: nginx1.Image,
+ Command: nginx1.Command,
+ Labels: mapAny(nginx1.Labels),
+ PrivatePort: "80",
+ PublicPort: "8080",
+ PublicPortIP: "0.0.0.0",
+ PortProtocol: "tcp",
+ NetworkMode: "default",
+ NetworkDriver: "bridge",
+ IPAddress: "192.0.2.0",
+ Address: "192.0.2.0:80",
+ }),
+ },
+ },
+ &targetGroup{
+ source: cntrSource(nginx2),
+ targets: []model.Target{
+ withHash(&target{
+ ID: nginx2.ID,
+ Name: nginx2.Names[0][1:],
+ Image: nginx2.Image,
+ Command: nginx2.Command,
+ Labels: mapAny(nginx2.Labels),
+ PrivatePort: "80",
+ PublicPort: "8080",
+ PublicPortIP: "0.0.0.0",
+ PortProtocol: "tcp",
+ NetworkMode: "default",
+ NetworkDriver: "bridge",
+ IPAddress: "192.0.2.0",
+ Address: "192.0.2.0:80",
+ }),
+ },
+ },
+ },
+ }
+ return sim
+ },
+ },
+ "remove containers": {
+ createSim: func() *discoverySim {
+ nginx1 := prepareNginxContainer("nginx1")
+ nginx2 := prepareNginxContainer("nginx2")
+
+ sim := &discoverySim{
+ dockerCli: func(cli dockerCli, interval time.Duration) {
+ cli.addContainer(nginx1)
+ cli.addContainer(nginx2)
+ time.Sleep(interval * 2)
+ cli.removeContainer(nginx1.ID)
+ },
+ wantGroups: []model.TargetGroup{
+ &targetGroup{
+ source: cntrSource(nginx1),
+ targets: nil,
+ },
+ &targetGroup{
+ source: cntrSource(nginx2),
+ targets: []model.Target{
+ withHash(&target{
+ ID: nginx2.ID,
+ Name: nginx2.Names[0][1:],
+ Image: nginx2.Image,
+ Command: nginx2.Command,
+ Labels: mapAny(nginx2.Labels),
+ PrivatePort: "80",
+ PublicPort: "8080",
+ PublicPortIP: "0.0.0.0",
+ PortProtocol: "tcp",
+ NetworkMode: "default",
+ NetworkDriver: "bridge",
+ IPAddress: "192.0.2.0",
+ Address: "192.0.2.0:80",
+ }),
+ },
+ },
+ },
+ }
+ return sim
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareNginxContainer(name string) types.Container {
+ return types.Container{
+ ID: "id-" + name,
+ Names: []string{"/" + name},
+ Image: "nginx-image",
+ ImageID: "nginx-image-id",
+ Command: "nginx-command",
+ Ports: []types.Port{
+ {
+ IP: "0.0.0.0",
+ PrivatePort: 80,
+ PublicPort: 8080,
+ Type: "tcp",
+ },
+ },
+ Labels: map[string]string{"key1": "value1"},
+ HostConfig: struct {
+ NetworkMode string `json:",omitempty"`
+ }{
+ NetworkMode: "default",
+ },
+ NetworkSettings: &types.SummaryNetworkSettings{
+ Networks: map[string]*typesNetwork.EndpointSettings{
+ "bridge": {IPAddress: "192.0.2.0"},
+ },
+ },
+ }
+}
+
+func withHash(tgt *target) *target {
+ tgt.hash, _ = calcHash(tgt)
+ tags, _ := model.ParseTags("docker")
+ tgt.Tags().Merge(tags)
+ return tgt
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go
new file mode 100644
index 0000000000..7b0b76abac
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go
@@ -0,0 +1,162 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "context"
+ "sort"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/docker/docker/api/types"
+ typesContainer "github.com/docker/docker/api/types/container"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type dockerCli interface {
+ addContainer(cntr types.Container)
+ removeContainer(id string)
+}
+
+type discoverySim struct {
+ dockerCli func(cli dockerCli, interval time.Duration)
+ wantGroups []model.TargetGroup
+}
+
+func (sim *discoverySim) run(t *testing.T) {
+ d, err := NewDiscoverer(Config{
+ Source: "",
+ Tags: "docker",
+ })
+ require.NoError(t, err)
+
+ mock := newMockDockerd()
+
+ d.newDockerClient = func(addr string) (dockerClient, error) {
+ return mock, nil
+ }
+ d.listInterval = time.Millisecond * 100
+
+ seen := make(map[string]model.TargetGroup)
+ ctx, cancel := context.WithCancel(context.Background())
+ in := make(chan []model.TargetGroup)
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ d.Discover(ctx, in)
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case tggs := <-in:
+ for _, tgg := range tggs {
+ seen[tgg.Source()] = tgg
+ }
+ }
+ }
+ }()
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ wg.Wait()
+ }()
+
+ select {
+ case <-d.started:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery failed to start")
+ }
+
+ sim.dockerCli(mock, d.listInterval)
+ time.Sleep(time.Second)
+
+ cancel()
+
+ select {
+ case <-done:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery hasn't finished after cancel")
+ }
+
+ var tggs []model.TargetGroup
+ for _, tgg := range seen {
+ tggs = append(tggs, tgg)
+ }
+
+ sortTargetGroups(tggs)
+ sortTargetGroups(sim.wantGroups)
+
+ wantLen, gotLen := len(sim.wantGroups), len(tggs)
+ assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen)
+ assert.Equal(t, sim.wantGroups, tggs)
+
+ assert.True(t, mock.negApiVerCalled, "NegotiateAPIVersion called")
+ assert.True(t, mock.closeCalled, "Close called")
+}
+
+func newMockDockerd() *mockDockerd {
+ return &mockDockerd{
+ containers: make(map[string]types.Container),
+ }
+}
+
+type mockDockerd struct {
+ negApiVerCalled bool
+ closeCalled bool
+ mux sync.Mutex
+ containers map[string]types.Container
+}
+
+func (m *mockDockerd) addContainer(cntr types.Container) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ m.containers[cntr.ID] = cntr
+}
+
+func (m *mockDockerd) removeContainer(id string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ delete(m.containers, id)
+}
+
+func (m *mockDockerd) ContainerList(_ context.Context, _ typesContainer.ListOptions) ([]types.Container, error) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ var cntrs []types.Container
+ for _, cntr := range m.containers {
+ cntrs = append(cntrs, cntr)
+ }
+
+ return cntrs, nil
+}
+
+func (m *mockDockerd) NegotiateAPIVersion(_ context.Context) {
+ m.negApiVerCalled = true
+}
+
+func (m *mockDockerd) Close() error {
+ m.closeCalled = true
+ return nil
+}
+
+func sortTargetGroups(tggs []model.TargetGroup) {
+ if len(tggs) == 0 {
+ return
+ }
+ sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() })
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go
new file mode 100644
index 0000000000..2422bc98eb
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go
@@ -0,0 +1,55 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "fmt"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+)
+
+type targetGroup struct {
+ source string
+ targets []model.Target
+}
+
+func (g *targetGroup) Provider() string { return "sd:docker" }
+func (g *targetGroup) Source() string { return g.source }
+func (g *targetGroup) Targets() []model.Target { return g.targets }
+
+type target struct {
+ model.Base
+
+ hash uint64
+
+ ID string
+ Name string
+ Image string
+ Command string
+ Labels map[string]any
+ PrivatePort string // Port on the container
+ PublicPort string // Port exposed on the host
+ PublicPortIP string // Host IP address that the container's port is mapped to
+ PortProtocol string
+ NetworkMode string
+ NetworkDriver string
+ IPAddress string
+
+ Address string // "IPAddress:PrivatePort"
+}
+
+func (t *target) TUID() string {
+ if t.PublicPort != "" {
+ return fmt.Sprintf("%s_%s_%s_%s_%s_%s",
+ t.Name, t.IPAddress, t.PublicPortIP, t.PortProtocol, t.PublicPort, t.PrivatePort)
+ }
+ if t.PrivatePort != "" {
+ return fmt.Sprintf("%s_%s_%s_%s",
+ t.Name, t.IPAddress, t.PortProtocol, t.PrivatePort)
+ }
+ return fmt.Sprintf("%s_%s", t.Name, t.IPAddress)
+}
+
+func (t *target) Hash() uint64 {
+ return t.hash
+}