summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/pulsar
diff options
context:
space:
mode:
authorIlya Mashchenko <ilya@netdata.cloud>2024-07-02 15:32:34 +0300
committerGitHub <noreply@github.com>2024-07-02 15:32:34 +0300
commit7fee1e522262104b75a5f8d7e38b9299275d87bc (patch)
treef2999f9ca7d5017182d82d3e99d19bb6ce429f4f /src/go/collectors/go.d.plugin/modules/pulsar
parente99da8b64b1588a4ccf19f89953ee83c49584bb7 (diff)
restructure go.d (#18058)
* restruture go.d * update gitignore * update ci files * update gen_docs_integrations.py * update link in go.d conf files * update go.d modules metadata files * update metadata files * update packaging * add log files * integrations commit * update get-go-version.py * go fmt * fix packaging * update go.d readme --------- Co-authored-by: Fotis Voutsas <fotis@netdata.cloud>
Diffstat (limited to 'src/go/collectors/go.d.plugin/modules/pulsar')
l---------src/go/collectors/go.d.plugin/modules/pulsar/README.md1
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/cache.go19
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/charts.go664
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/collect.go138
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/config_schema.json177
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/init.go34
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/integrations/apache_pulsar.md312
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/metadata.yaml519
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/metrics.go116
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/pulsar.go137
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/pulsar_test.go1024
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/testdata/config.json28
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/testdata/config.yaml22
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/testdata/non-pulsar.txt27
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/testdata/standalone-v2.5.0-namespaces.txt500
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/testdata/standalone-v2.5.0-topics-2.txt748
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/testdata/standalone-v2.5.0-topics.txt748
17 files changed, 0 insertions, 5214 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/README.md b/src/go/collectors/go.d.plugin/modules/pulsar/README.md
deleted file mode 120000
index dfa55301c8..0000000000
--- a/src/go/collectors/go.d.plugin/modules/pulsar/README.md
+++ /dev/null
@@ -1 +0,0 @@
-integrations/apache_pulsar.md \ No newline at end of file
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/cache.go b/src/go/collectors/go.d.plugin/modules/pulsar/cache.go
deleted file mode 100644
index 7f113bf86f..0000000000
--- a/src/go/collectors/go.d.plugin/modules/pulsar/cache.go
+++ /dev/null
@@ -1,19 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-package pulsar
-
-func newCache() *cache {
- return &cache{
- namespaces: make(map[namespace]bool),
- topics: make(map[topic]bool),
- }
-}
-
-type (
- namespace struct{ name string }
- topic struct{ namespace, name string }
- cache struct {
- namespaces map[namespace]bool
- topics map[topic]bool
- }
-)
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/charts.go b/src/go/collectors/go.d.plugin/modules/pulsar/charts.go
deleted file mode 100644
index 3ddff66f6d..0000000000
--- a/src/go/collectors/go.d.plugin/modules/pulsar/charts.go
+++ /dev/null
@@ -1,664 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-package pulsar
-
-import (
- "fmt"
- "strings"
-
- "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
-
- "github.com/netdata/netdata/go/go.d.plugin/agent/module"
-)
-
-type (
- Charts = module.Charts
- Chart = module.Chart
- Dims = module.Dims
- Dim = module.Dim
- Opts = module.Opts
-)
-
-var summaryCharts = Charts{
- sumBrokerComponentsChart.Copy(),
-
- sumMessagesRateChart.Copy(),
- sumThroughputRateChart.Copy(),
-
- sumStorageSizeChart.Copy(),
- sumStorageOperationsRateChart.Copy(), // optional
- sumMsgBacklogSizeChart.Copy(),
- sumStorageWriteLatencyChart.Copy(),
- sumEntrySizeChart.Copy(),
-
- sumSubsDelayedChart.Copy(),
- sumSubsMsgRateRedeliverChart.Copy(), // optional
- sumSubsBlockedOnUnackedMsgChart.Copy(), // optional
-
- sumReplicationRateChart.Copy(), // optional
- sumReplicationThroughputRateChart.Copy(), // optional
- sumReplicationBacklogChart.Copy(), // optional
-}
-
-var (
- sumBrokerComponentsChart = Chart{
- ID: "broker_components",
- Title: "Broker Components",
- Units: "components",
- Fam: "ns summary",
- Ctx: "pulsar.broker_components",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: "pulsar_namespaces_count", Name: "namespaces"},
- {ID: metricPulsarTopicsCount, Name: "topics"},
- {ID: metricPulsarSubscriptionsCount, Name: "subscriptions"},
- {ID: metricPulsarProducersCount, Name: "producers"},
- {ID: metricPulsarConsumersCount, Name: "consumers"},
- },
- }
- sumMessagesRateChart = Chart{
- ID: "messages_rate",
- Title: "Messages Rate",
- Units: "messages/s",
- Fam: "ns summary",
- Ctx: "pulsar.messages_rate",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarRateIn, Name: "publish", Div: 1000},
- {ID: metricPulsarRateOut, Name: "dispatch", Mul: -1, Div: 1000},
- },
- }
- sumThroughputRateChart = Chart{
- ID: "throughput_rate",
- Title: "Throughput Rate",
- Units: "KiB/s",
- Fam: "ns summary",
- Ctx: "pulsar.throughput_rate",
- Type: module.Area,
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarThroughputIn, Name: "publish", Div: 1024 * 1000},
- {ID: metricPulsarThroughputOut, Name: "dispatch", Mul: -1, Div: 1024 * 1000},
- },
- }
- sumStorageSizeChart = Chart{
- ID: "storage_size",
- Title: "Storage Size",
- Units: "KiB",
- Fam: "ns summary",
- Ctx: "pulsar.storage_size",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarStorageSize, Name: "used", Div: 1024},
- },
- }
- sumStorageOperationsRateChart = Chart{
- ID: "storage_operations_rate",
- Title: "Storage Read/Write Operations Rate",
- Units: "message batches/s",
- Fam: "ns summary",
- Ctx: "pulsar.storage_operations_rate",
- Type: module.Area,
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarStorageReadRate, Name: "read", Div: 1000},
- {ID: metricPulsarStorageWriteRate, Name: "write", Mul: -1, Div: 1000},
- },
- }
- sumMsgBacklogSizeChart = Chart{
- ID: "msg_backlog",
- Title: "Messages Backlog Size",
- Units: "messages",
- Fam: "ns summary",
- Ctx: "pulsar.msg_backlog",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarMsgBacklog, Name: "backlog"},
- },
- }
- sumStorageWriteLatencyChart = Chart{
- ID: "storage_write_latency",
- Title: "Storage Write Latency",
- Units: "entries/s",
- Fam: "ns summary",
- Ctx: "pulsar.storage_write_latency",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: "pulsar_storage_write_latency_le_0_5", Name: "<=0.5ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_1", Name: "<=1ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_5", Name: "<=5ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_10", Name: "<=10ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_20", Name: "<=20ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_50", Name: "<=50ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_100", Name: "<=100ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_200", Name: "<=200ms", Div: 60},
- {ID: "pulsar_storage_write_latency_le_1000", Name: "<=1s", Div: 60},
- {ID: "pulsar_storage_write_latency_overflow", Name: ">1s", Div: 60},
- },
- }
- sumEntrySizeChart = Chart{
- ID: "entry_size",
- Title: "Entry Size",
- Units: "entries/s",
- Fam: "ns summary",
- Ctx: "pulsar.entry_size",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: "pulsar_entry_size_le_128", Name: "<=128B", Div: 60},
- {ID: "pulsar_entry_size_le_512", Name: "<=512B", Div: 60},
- {ID: "pulsar_entry_size_le_1_kb", Name: "<=1KB", Div: 60},
- {ID: "pulsar_entry_size_le_2_kb", Name: "<=2KB", Div: 60},
- {ID: "pulsar_entry_size_le_4_kb", Name: "<=4KB", Div: 60},
- {ID: "pulsar_entry_size_le_16_kb", Name: "<=16KB", Div: 60},
- {ID: "pulsar_entry_size_le_100_kb", Name: "<=100KB", Div: 60},
- {ID: "pulsar_entry_size_le_1_mb", Name: "<=1MB", Div: 60},
- {ID: "pulsar_entry_size_le_overflow", Name: ">1MB", Div: 60},
- },
- }
- sumSubsDelayedChart = Chart{
- ID: "subscription_delayed",
- Title: "Subscriptions Delayed for Dispatching",
- Units: "message batches",
- Fam: "ns summary",
- Ctx: "pulsar.subscription_delayed",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarSubscriptionDelayed, Name: "delayed"},
- },
- }
- sumSubsMsgRateRedeliverChart = Chart{
- ID: "subscription_msg_rate_redeliver",
- Title: "Subscriptions Redelivered Message Rate",
- Units: "messages/s",
- Fam: "ns summary",
- Ctx: "pulsar.subscription_msg_rate_redeliver",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarSubscriptionMsgRateRedeliver, Name: "redelivered", Div: 1000},
- },
- }
- sumSubsBlockedOnUnackedMsgChart = Chart{
- ID: "subscription_blocked_on_unacked_messages",
- Title: "Subscriptions Blocked On Unacked Messages",
- Units: "subscriptions",
- Fam: "ns summary",
- Ctx: "pulsar.subscription_blocked_on_unacked_messages",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarSubscriptionBlockedOnUnackedMessages, Name: "blocked"},
- },
- }
- sumReplicationRateChart = Chart{
- ID: "replication_rate",
- Title: "Replication Rate",
- Units: "messages/s",
- Fam: "ns summary",
- Ctx: "pulsar.replication_rate",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarReplicationRateIn, Name: "in", Div: 1000},
- {ID: metricPulsarReplicationRateOut, Name: "out", Mul: -1, Div: 1000},
- },
- }
- sumReplicationThroughputRateChart = Chart{
- ID: "replication_throughput_rate",
- Title: "Replication Throughput Rate",
- Units: "KiB/s",
- Fam: "ns summary",
- Ctx: "pulsar.replication_throughput_rate",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarReplicationThroughputIn, Name: "in", Div: 1024 * 1000},
- {ID: metricPulsarReplicationThroughputOut, Name: "out", Mul: -1, Div: 1024 * 1000},
- },
- }
- sumReplicationBacklogChart = Chart{
- ID: "replication_backlog",
- Title: "Replication Backlog",
- Units: "messages",
- Fam: "ns summary",
- Ctx: "pulsar.replication_backlog",
- Opts: Opts{StoreFirst: true},
- Dims: Dims{
- {ID: metricPulsarReplicationBacklog, Name: "backlog"},
- },
- }
-)
-
-var namespaceCharts = Charts{
- nsBrokerComponentsChart.Copy(),
- topicProducersChart.Copy(),
- topicSubscriptionsChart.Copy(),
- topicConsumersChart.Copy(),
-
- nsMessagesRateChart.Copy(),
- topicMessagesRateInChart.Copy(),
- topicMessagesRateOutChart.Copy(),
- nsThroughputRateCharts.Copy(),
- topicThroughputRateInChart.Copy(),
- topicThroughputRateOutChart.Copy(),
-
- nsStorageSizeChart.Copy(),
- topicStorageSizeChart.Copy(),
- nsStorageOperationsChart.Copy(), // optional
- topicStorageReadRateChart.Copy(), // optional
- topicStorageWriteRateChart.Copy(), // optional
- nsMsgBacklogSizeChart.Copy(),
- topicMsgBacklogSizeChart.Copy(),
- nsStorageWriteLatencyChart.Copy(),
- nsEntrySizeChart.Copy(),
-
- nsSubsDelayedChart.Copy(),
- topicSubsDelayedChart.Copy(),
- nsSubsMsgRateRedeliverChart.Copy(), // optional
- topicSubsMsgRateRedeliverChart.Copy(), // optional
- nsSubsBlockedOnUnackedMsgChart.Copy(), // optional
- topicSubsBlockedOnUnackedMsgChart.Copy(), // optional
-
- nsReplicationRateChart.Copy(), // optional
- topicReplicationRateInChart.Copy(), // optional
- topicReplicationRateOutChart.Copy(), // optional
- nsReplicationThroughputChart.Copy(), // optional
- topicReplicationThroughputRateInChart.Copy(), // optional
- topicReplicationThroughputRateOutChart.Copy(), // optional
- nsReplicationBacklogChart.Copy(), // optional
- topicReplicationBacklogChart.Copy(), // optional
-}
-
-func toNamespaceChart(chart Chart) Chart {
- chart = *chart.Copy()
- if chart.ID == sumBrokerComponentsChart.ID {
- _ = chart.RemoveDim("pulsar_namespaces_count")
- }
- chart.ID += "_namespace_%s"
- chart.Fam = "ns %s"
- if idx := strings.IndexByte(chart.Ctx, '.'); idx > 0 {
- // pulsar.messages_rate => pulsar.namespace_messages_rate
- chart.Ctx = chart.Ctx[:idx+1] + "namespace_" + chart.Ctx[idx+1:]
- }
- for _, dim := range chart.Dims {
- dim.ID += "_%s"
- }
- return chart
-}
-
-var (
- nsBrokerComponentsChart = toNamespaceChart(sumBrokerComponentsChart)
- nsMessagesRateChart = toNamespaceChart(sumMessagesRateChart)
- nsThroughputRateCharts = toNamespaceChart(sumThroughputRateChart)
- nsStorageSizeChart = toNamespaceChart(sumStorageSizeChart)
- nsStorageOperationsChart = toNamespaceChart(sumStorageOperationsRateChart)
- nsMsgBacklogSizeChart = toNamespaceChart(sumMsgBacklogSizeChart)
- nsStorageWriteLatencyChart = toNamespaceChart(sumStorageWriteLatencyChart)
- nsEntrySizeChart = toNamespaceChart(sumEntrySizeChart)
- nsSubsDelayedChart = toNamespaceChart(sumSubsDelayedChart)
- nsSubsMsgRateRedeliverChart = toNamespaceChart(sumSubsMsgRateRedeliverChart)
- nsSubsBlockedOnUnackedMsgChart = toNamespaceChart(sumSubsBlockedOnUnackedMsgChart)
- nsReplicationRateChart = toNamespaceChart(sumReplicationRateChart)
- nsReplicationThroughputChart = toNamespaceChart(sumReplicationThroughputRateChart)
- nsReplicationBacklogChart = toNamespaceChart(sumReplicationBacklogChart)
-
- topicProducersChart = Chart{
- ID: "topic_producers_namespace_%s",
- Title: "Topic Producers",
- Units: "producers",
- Fam: "ns %s",
- Ctx: "pulsar.topic_producers",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicSubscriptionsChart = Chart{
- ID: "topic_subscriptions_namespace_%s",
- Title: "Topic Subscriptions",
- Units: "subscriptions",
- Fam: "ns %s",
- Ctx: "pulsar.topic_subscriptions",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicConsumersChart = Chart{
- ID: "topic_consumers_namespace_%s",
- Title: "Topic Consumers",
- Units: "consumers",
- Fam: "ns %s",
- Ctx: "pulsar.topic_consumers",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicMessagesRateInChart = Chart{
- ID: "topic_messages_rate_in_namespace_%s",
- Title: "Topic Publish Messages Rate",
- Units: "publishes/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_messages_rate_in",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicMessagesRateOutChart = Chart{
- ID: "topic_messages_rate_out_namespace_%s",
- Title: "Topic Dispatch Messages Rate",
- Units: "dispatches/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_messages_rate_out",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicThroughputRateInChart = Chart{
- ID: "topic_throughput_rate_in_namespace_%s",
- Title: "Topic Publish Throughput Rate",
- Units: "KiB/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_throughput_rate_in",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicThroughputRateOutChart = Chart{
- ID: "topic_throughput_rate_out_namespace_%s",
- Title: "Topic Dispatch Throughput Rate",
- Units: "KiB/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_throughput_rate_out",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicStorageSizeChart = Chart{
- ID: "topic_storage_size_namespace_%s",
- Title: "Topic Storage Size",
- Units: "KiB",
- Fam: "ns %s",
- Ctx: "pulsar.topic_storage_size",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicStorageReadRateChart = Chart{
- ID: "topic_storage_read_rate_namespace_%s",
- Title: "Topic Storage Read Rate",
- Units: "message batches/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_storage_read_rate",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicStorageWriteRateChart = Chart{
- ID: "topic_storage_write_rate_namespace_%s",
- Title: "Topic Storage Write Rate",
- Units: "message batches/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_storage_write_rate",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicMsgBacklogSizeChart = Chart{
- ID: "topic_msg_backlog_namespace_%s",
- Title: "Topic Messages Backlog Size",
- Units: "messages",
- Fam: "ns %s",
- Ctx: "pulsar.topic_msg_backlog",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicSubsDelayedChart = Chart{
- ID: "topic_subscription_delayed_namespace_%s",
- Title: "Topic Subscriptions Delayed for Dispatching",
- Units: "message batches",
- Fam: "ns %s",
- Ctx: "pulsar.topic_subscription_delayed",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicSubsMsgRateRedeliverChart = Chart{
- ID: "topic_subscription_msg_rate_redeliver_namespace_%s",
- Title: "Topic Subscriptions Redelivered Message Rate",
- Units: "messages/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_subscription_msg_rate_redeliver",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicSubsBlockedOnUnackedMsgChart = Chart{
- ID: "topic_subscription_blocked_on_unacked_messages_namespace_%s",
- Title: "Topic Subscriptions Blocked On Unacked Messages",
- Units: "blocked subscriptions",
- Fam: "ns %s",
- Ctx: "pulsar.topic_subscription_blocked_on_unacked_messages",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicReplicationRateInChart = Chart{
- ID: "topic_replication_rate_in_namespace_%s",
- Title: "Topic Replication Rate From Remote Cluster",
- Units: "messages/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_replication_rate_in",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicReplicationRateOutChart = Chart{
- ID: "replication_rate_out_namespace_%s",
- Title: "Topic Replication Rate To Remote Cluster",
- Units: "messages/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_replication_rate_out",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicReplicationThroughputRateInChart = Chart{
- ID: "topic_replication_throughput_rate_in_namespace_%s",
- Title: "Topic Replication Throughput Rate From Remote Cluster",
- Units: "KiB/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_replication_throughput_rate_in",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicReplicationThroughputRateOutChart = Chart{
- ID: "topic_replication_throughput_rate_out_namespace_%s",
- Title: "Topic Replication Throughput Rate To Remote Cluster",
- Units: "KiB/s",
- Fam: "ns %s",
- Ctx: "pulsar.topic_replication_throughput_rate_out",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
- topicReplicationBacklogChart = Chart{
- ID: "topic_replication_backlog_namespace_%s",
- Title: "Topic Replication Backlog",
- Units: "messages",
- Fam: "ns %s",
- Ctx: "pulsar.topic_replication_backlog",
- Type: module.Stacked,
- Opts: Opts{StoreFirst: true},
- }
-)
-
-func (p *Pulsar) adjustCharts(pms prometheus.Series) {
- if pms := pms.FindByName(metricPulsarStorageReadRate); pms.Len() == 0 || pms[0].Labels.Get("namespace") == "" {
- p.removeSummaryChart(sumStorageOperationsRateChart.ID)
- p.removeNamespaceChart(nsStorageOperationsChart.ID)
- p.removeNamespaceChart(topicStorageReadRateChart.ID)
- p.removeNamespaceChart(topicStorageWriteRateChart.ID)
- delete(p.topicChartsMapping, topicStorageReadRateChart.ID)
- delete(p.topicChartsMapping, topicStorageWriteRateChart.ID)
- }
- if pms.FindByName(metricPulsarSubscriptionMsgRateRedeliver).Len() == 0 {
- p.removeSummaryChart(sumSubsMsgRateRedeliverChart.ID)
- p.removeSummaryChart(sumSubsBlockedOnUnackedMsgChart.ID)
- p.removeNamespaceChart(nsSubsMsgRateRedeliverChart.ID)
- p.removeNamespaceChart(nsSubsBlockedOnUnackedMsgChart.ID)
- p.removeNamespaceChart(topicSubsMsgRateRedeliverChart.ID)
- p.removeNamespaceChart(topicSubsBlockedOnUnackedMsgChart.ID)
- delete(p.topicChartsMapping, topicSubsMsgRateRedeliverChart.ID)
- delete(p.topicChartsMapping, topicSubsBlockedOnUnackedMsgChart.ID)
- }
- if pms.FindByName(metricPulsarReplicationBacklog).Len() == 0 {
- p.removeSummaryChart(sumReplicationRateChart.ID)
- p.removeSummaryChart(sumReplicationThroughputRateChart.ID)
- p.removeSummaryChart(sumReplicationBacklogChart.ID)
- p.removeNamespaceChart(nsReplicationRateChart.ID)
- p.removeNamespaceChart(nsReplicationThroughputChart.ID)
- p.removeNamespaceChart(nsReplicationBacklogChart.ID)
- p.removeNamespaceChart(topicReplicationRateInChart.ID)
- p.removeNamespaceChart(topicReplicationRateOutChart.ID)
- p.removeNamespaceChart(topicReplicationThroughputRateInChart.ID)
- p.removeNamespaceChart(topicReplicationThroughputRateOutChart.ID)
- p.removeNamespaceChart(topicReplicationBacklogChart.ID)
- delete(p.topicChartsMapping, topicReplicationRateInChart.ID)
- delete(p.topicChartsMapping, topicReplicationRateOutChart.ID)
- delete(p.topicChartsMapping, topicReplicationThroughputRateInChart.ID)
- delete(p.topicChartsMapping, topicReplicationThroughputRateOutChart.ID)
- delete(p.topicChartsMapping, topicReplicationBacklogChart.ID)
- }
-}
-
-func (p *Pulsar) removeSummaryChart(chartID string) {
- if err := p.Charts().Remove(chartID); err != nil {
- p.Warning(err)
- }
-}
-
-func (p *Pulsar) removeNamespaceChart(chartID string) {
- if err := p.nsCharts.Remove(chartID); err != nil {
- p.Warning(err)
- }
-}
-
-func (p *Pulsar) updateCharts() {
- // NOTE: order is important
- for ns := range p.curCache.namespaces {
- if !p.cache.namespaces[ns] {
- p.cache.namespaces[ns] = true
- p.addNamespaceCharts(ns)
- }
- }
- for top := range p.curCache.topics {
- if !p.cache.topics[top] {
- p.cache.topics[top] = true
- p.addTopicToCharts(top)
- }
- }
- for top := range p.cache.topics {
- if p.curCache.topics[top] {
- continue
- }
- delete(p.cache.topics, top)
- p.removeTopicFromCharts(top)
- }
- for ns := range p.cache.namespaces {
- if p.curCache.namespaces[ns] {
- continue
- }
- delete(p.cache.namespaces, ns)
- p.removeNamespaceFromCharts(ns)
- }
-}
-
-func (p *Pulsar) addNamespaceCharts(ns namespace) {
- charts := p.nsCharts.Copy()
- for _, chart := range *charts {
- chart.ID = fmt.Sprintf(chart.ID, ns.name)
- chart.Fam = fmt.Sprintf(chart.Fam, ns.name)
- for _, dim := range chart.Dims {
- dim.ID = fmt.Sprintf(dim.ID, ns.name)
- }
- }
- if err := p.Charts().Add(*charts...); err != nil {
- p.Warning(err)
- }
-}
-
-func (p *Pulsar) removeNamespaceFromCharts(ns namespace) {
- for _, chart := range *p.nsCharts {
- id := fmt.Sprintf(chart.ID, ns.name)
- if chart = p.Charts().Get(id); chart != nil {
- chart.MarkRemove()
- } else {
- p.Warningf("could not remove namespace chart '%s'", id)
- }
- }
-}
-
-func (p *Pulsar) addTopicToCharts(top topic) {
- for id, metric := range p.topicChartsMapping {
- id = fmt.Sprintf(id, top.namespace)
- chart := p.Charts().Get(id)
- if chart == nil {
- p.Warningf("could not add topic '%s' to chart '%s': chart not found", top.name, id)
- continue
- }
-
- dim := Dim{ID: metric + "_" + top.name, Name: extractTopicName(top)}
- switch metric {
- case metricPulsarThroughputIn,
- metricPulsarThroughputOut,
- metricPulsarReplicationThroughputIn,
- metricPulsarReplicationThroughputOut:
- dim.Div = 1024 * 1000
- case metricPulsarRateIn,
- metricPulsarRateOut,
- metricPulsarStorageWriteRate,
- metricPulsarStorageReadRate,
- metricPulsarSubscriptionMsgRateRedeliver,
- metricPulsarReplicationRateIn,
- metricPulsarReplicationRateOut:
- dim.Div = 1000
- case metricPulsarStorageSize:
- dim.Div = 1024
- }
-
- if err := chart.AddDim(&dim); err != nil {
- p.Warning(err)
- }
- chart.MarkNotCreated()
- }
-}
-
-func (p *Pulsar) removeTopicFromCharts(top topic) {
- for id, metric := range p.topicChartsMapping {
- id = fmt.Sprintf(id, top.namespace)
- chart := p.Charts().Get(id)
- if chart == nil {
- p.Warningf("could not remove topic '%s' from chart '%s': chart not found", top.name, id)
- continue
- }
-
- if err := chart.MarkDimRemove(metric+"_"+top.name, true); err != nil {
- p.Warning(err)
- }
- chart.MarkNotCreated()
- }
-}
-
-func topicChartsMapping() map[string]string {
- return map[string]string{
- topicSubscriptionsChart.ID: metricPulsarSubscriptionsCount,
- topicProducersChart.ID: metricPulsarProducersCount,
- topicConsumersChart.ID: metricPulsarConsumersCount,
- topicMessagesRateInChart.ID: metricPulsarRateIn,
- topicMessagesRateOutChart.ID: metricPulsarRateOut,
- topicThroughputRateInChart.ID: metricPulsarThroughputIn,
- topicThroughputRateOutChart.ID: metricPulsarThroughputOut,
- topicStorageSizeChart.ID: metricPulsarStorageSize,
- topicStorageReadRateChart.ID: metricPulsarStorageReadRate,
- topicStorageWriteRateChart.ID: metricPulsarStorageWriteRate,
- topicMsgBacklogSizeChart.ID: metricPulsarMsgBacklog,
- topicSubsDelayedChart.ID: metricPulsarSubscriptionDelayed,
- topicSubsMsgRateRedeliverChart.ID: metricPulsarSubscriptionMsgRateRedeliver,
- topicSubsBlockedOnUnackedMsgChart.ID: metricPulsarSubscriptionBlockedOnUnackedMessages,
- topicReplicationRateInChart.ID: metricPulsarReplicationRateIn,
- topicReplicationRateOutChart.ID: metricPulsarReplicationRateOut,
- topicReplicationThroughputRateInChart.ID: metricPulsarReplicationThroughputIn,
- topicReplicationThroughputRateOutChart.ID: metricPulsarReplicationThroughputOut,
- topicReplicationBacklogChart.ID: metricPulsarReplicationBacklog,
- }
-}
-
-func extractTopicName(top topic) string {
- // persistent://sample/ns1/demo-1 => p:demo-1
- if idx := strings.LastIndexByte(top.name, '/'); idx > 0 {
- return top.name[:1] + ":" + top.name[idx+1:]
- }
- return top.name
-}
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/collect.go b/src/go/collectors/go.d.plugin/modules/pulsar/collect.go
deleted file mode 100644
index f28e6cb2c4..0000000000
--- a/src/go/collectors/go.d.plugin/modules/pulsar/collect.go
+++ /dev/null
@@ -1,138 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-package pulsar
-
-import (
- "errors"
- "strings"
-
- "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
- "github.com/netdata/netdata/go/go.d.plugin/pkg/stm"
-)
-
-func isValidPulsarMetrics(pms prometheus.Series) bool {
- return pms.FindByName(metricPulsarTopicsCount).Len() > 0
-}
-
-func (p *Pulsar) resetCurCache() {
- for ns := range p.curCache.namespaces {
- delete(p.curCache.namespaces, ns)
- }
- for top := range p.curCache.topics {
- delete(p.curCache.topics, top)
- }
-}
-
-func (p *Pulsar) collect() (map[string]int64, error) {
- pms, err := p.prom.ScrapeSeries()
- if err != nil {
- return nil, err
- }
-
- if !isValidPulsarMetrics(pms) {
- return nil, errors.New("returned metrics aren't Apache Pulsar metrics")
- }
-
- p.once.Do(func() {
- p.adjustCharts(pms)
- })
-
- mx := p.collectMetrics(pms)
- p.updateCharts()
- p.resetCurCache()
-
- return stm.ToMap(mx), nil
-}
-
-func (p *Pulsar) collectMetrics(pms prometheus.Series) map[string]float64 {
- mx := make(map[string]float64)
- p.collectBroker(mx, pms)
- return mx
-}
-
-func (p *Pulsar) collectBroker(mx map[string]float64, pms prometheus.Series) {
- pms = findPulsarMetrics(pms)
- for _, pm := range pms {
- ns, top := newNamespace(pm), newTopic(pm)
- if ns.name == "" {
- continue
- }
-
- p.curCache.namespaces[ns] = true
-
- value := pm.Value * precision(pm.Name())
- mx[pm.Name()] += value
- mx[pm.Name()+"_"+ns.name] += value
-
- if top.name == "" || !p.topicFilter.MatchString(top.name) {
- continue
- }
-
- p.curCache.topics[top] = true
- mx[pm.Name()+"_"+top.name] += value
- }
- mx["pulsar_namespaces_count"] = float64(len(p.curCache.namespaces))
-}
-
-func newNamespace(pm prometheus.SeriesSample) namespace {
- return namespace{
- name: pm.Labels.Get("namespace"),
- }
-}
-
-func newTopic(pm prometheus.SeriesSample) topic {
- return topic{
- namespace: pm.Labels.Get("namespace"),
- name: pm.Labels.Get("topic"),
- }
-}
-
-func findPulsarMetrics(pms prometheus.Series) prometheus.Series {
- var ms prometheus.Series
- for _, pm := range pms {
- if isPulsarHistogram(pm) {
- ms = append(ms, pm)
- }
- }
- pms = pms.FindByNames(
- metricPulsarTopicsCount,
- metricPulsarSubscriptionDelayed,
- metricPulsarSubscriptionsCount,
- metricPulsarProducersCount,
- metricPulsarConsumersCount,
- metricPulsarRateIn,
- metricPulsarRateOut,
- metricPulsarThroughputIn,
- metricPulsarThroughputOut,
- metricPulsarStorageSize,
- metricPulsarStorageWriteRate,
- metricPulsarStorageReadRate,
- metricPulsarMsgBacklog,
- metricPulsarSubscriptionMsgRateRedeliver,
- metricPulsarSubscriptionBlockedOnUnackedMessages,
- )
- return append(ms, pms...)
-}
-
-func isPulsarHistogram(pm prometheus.SeriesSample) bool {
- s := pm.Name()
- return strings.HasPrefix(s, "pulsar_storage_write_latency") || strings.HasPrefix(s, "pulsar_entry_size")
-}
-
-func precision(metric string) float64 {
- switch metric {
- case metricPulsarRateIn,
- metricPulsarRateOut,
- metricPulsarThroughputIn,
- metricPulsarThroughputOut,
- metricPulsarStorageWriteRate,
- metricPulsarStorageReadRate,
- metricPulsarSubscriptionMsgRateRedeliver,
- metricPulsarReplicationRateIn,
- metricPulsarReplicationRateOut,
- metricPulsarReplicationThroughputIn,
- metricPulsarReplicationThroughputOut:
- return 1000
- }
- return 1
-}
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/config_schema.json b/src/go/collectors/go.d.plugin/modules/pulsar/config_schema.json
de