summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-24 00:24:21 +0200
committerGitHub <noreply@github.com>2022-11-24 00:24:21 +0200
commit8e1a99ad79a1394cbb0ffcaa24bdde85c7b14d81 (patch)
tree61020aea9c8c6e48032d5721aec4b9ddbfe7c35b /streaming/sender.c
parent0fe7b1c8a84b7836b5bb13c37924d9fd851c6233 (diff)
replication fixes #5 (#14038)
* pluginsd cleanup; replication logic cleanup; fix bug in replication begin * log replication start/stop and change the keyword of NETDATA_LOG_REPLICATION_REQUESTS logs to REPLAY * dont ask for data the child does not have; log fixes * more pluginsd cleanup * count sender dictionary entries * fix dictionary_flush()
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c10
1 files changed, 7 insertions, 3 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 85aad3a3e5..f4624f5998 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -23,9 +23,10 @@
#define WORKER_SENDER_JOB_BYTES_SENT 17
#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 20
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 20
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
#endif
extern struct config stream_config;
@@ -225,7 +226,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
st->upstream_resync_time = 0;
@@ -1099,6 +1100,7 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
struct sender_state *s = ptr;
s->tid = gettid();
@@ -1342,6 +1344,8 @@ void *rrdpush_sender_thread(void *ptr) {
rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
+
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication_requests));
}
netdata_thread_cleanup_pop(1);