summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-15 23:49:42 +0200
committerGitHub <noreply@github.com>2022-11-15 23:49:42 +0200
commitbecd97a3660af34104c557ba6c2877f624143c2e (patch)
tree14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /streaming/sender.c
parent1789d07c43182152437459a7a4f81267bbdd752c (diff)
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)" This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c184
1 files changed, 57 insertions, 127 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index b34132b75c..e0964f9e3a 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -21,14 +21,9 @@
#define WORKER_SENDER_JOB_BUFFER_RATIO 15
#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
#define WORKER_SENDER_JOB_BYTES_SENT 17
-#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
-#define WORKER_SENDER_JOB_REPLAY_RESPONSE 19
-#define WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE 20
-#define WORKER_SENDER_JOB_REPLAY_COMPLETION 21
-#define WORKER_SENDER_JOB_FUNCTION 22
-
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 23
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 23
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 18
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 18
#endif
extern struct config stream_config;
@@ -86,8 +81,6 @@ static inline void deactivate_compression(struct sender_state *s) {
}
#endif
-#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
-
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s, BUFFER *wb) {
@@ -107,15 +100,15 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
- if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
- info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
- rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
+ if(unlikely(s->host->sender->buffer->max_size < (buffer_strlen(wb) + 1) * 2)) {
+ error("STREAM %s [send to %s]: max buffer size of %zu is too small for data of size %zu. Increasing the max buffer size to twice the max data size.",
+ rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1);
- s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
+ s->host->sender->buffer->max_size = (buffer_strlen(wb) + 1) * 2;
}
#ifdef ENABLE_COMPRESSION
- if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
+ if (s->flags & SENDER_FLAG_COMPRESSION && s->compressor) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -475,7 +468,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
- if(!(s->flags & SENDER_FLAG_COMPRESSION))
+ if(!(s->flags & SENDER_FLAG_COMPRESSION) && stream_has_capability(s, STREAM_CAP_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
#endif // ENABLE_COMPRESSION
@@ -671,12 +664,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
#ifdef ENABLE_COMPRESSION
- if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
- if(!s->compressor)
- s->compressor = create_compressor();
- else
+ // if the stream does not have compression capability,
+ // shut it down for us too.
+ // FIXME - this means that if there are multiple parents and one of them does not support compression
+ // we are going to shut it down for all of them eventually...
+ if(!stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ s->flags &= ~SENDER_FLAG_COMPRESSION;
+
+ if(s->flags & SENDER_FLAG_COMPRESSION) {
+ if(s->compressor)
s->compressor->reset(s->compressor);
}
+ else
+ info("STREAM %s [send to %s]: compression is disabled on this connection.", rrdhost_hostname(host), s->connected_to);
+
#endif //ENABLE_COMPRESSION
log_sender_capabilities(s);
@@ -880,8 +881,6 @@ void execute_commands(struct sender_state *s) {
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
- worker_is_busy(WORKER_SENDER_JOB_FUNCTION);
-
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
@@ -910,10 +909,7 @@ void execute_commands(struct sender_state *s) {
stream_execute_function_callback(wb, code, tmp);
}
}
- }
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
- worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
-
+ } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
const char *chart_id = get_word(words, num_words, 1);
const char *start_streaming = get_word(words, num_words, 2);
const char *after = get_word(words, num_words, 3);
@@ -936,8 +932,7 @@ void execute_commands(struct sender_state *s) {
};
dictionary_set(s->replication_requests, chart_id, &tmp, sizeof(struct replication_request));
}
- }
- else {
+ } else {
error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
}
@@ -1050,12 +1045,10 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item, v
struct replication_request *rr = old_value;
struct replication_request *rr_new = new_value;
- internal_error(
- true,
- "STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
- (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
- (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
+ error("STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
+ (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
+ (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
bool updated = false;
@@ -1099,8 +1092,6 @@ void sender_init(RRDHOST *host)
host->sender->flags |= SENDER_FLAG_COMPRESSION;
host->sender->compressor = create_compressor();
}
- else
- host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
netdata_mutex_init(&host->sender->mutex);
@@ -1119,98 +1110,45 @@ static size_t sender_buffer_used_percent(struct sender_state *s) {
return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size;
}
-int replication_request_compar(const DICTIONARY_ITEM **item1, const DICTIONARY_ITEM **item2) {
- struct replication_request *rr1 = dictionary_acquired_item_value(*item1);
- struct replication_request *rr2 = dictionary_acquired_item_value(*item2);
-
- time_t after1 = rr1->after;
- time_t after2 = rr2->after;
-
- if(after1 < after2)
- return -1;
- if(after1 > after2)
- return 1;
-
- return 0;
-}
-
-int process_one_replication_request(const DICTIONARY_ITEM *item, void *value, void *data) {
- struct sender_state *s = data;
-
- size_t used_percent = sender_buffer_used_percent(s);
- if(used_percent >= 50) return -1; // signal the traversal to stop
-
- worker_is_busy(WORKER_SENDER_JOB_REPLAY_RESPONSE);
-
- struct replication_request *rr = value;
- const char *name = dictionary_acquired_item_name(item);
-
- // delete it from the dictionary
- // the current item is referenced - it will not go away until the next iteration of the dfe loop
- dictionary_del(s->replication_requests, name);
-
- // find the chart
- RRDSET *st = rrdset_find(s->host, name);
- if(unlikely(!st)) {
- internal_error(true,
- "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
- , rrdhost_hostname(s->host), s->connected_to, name);
- return 0;
- }
-
- if(rr->after < s->replication_first_time || !s->replication_first_time)
- s->replication_first_time = rr->after;
-
- if(rr->before < s->replication_min_time || !s->replication_min_time)
- s->replication_min_time = rr->before;
-
- netdata_thread_disable_cancelability();
-
- // send the replication data
- bool start_streaming = replicate_chart_response(st->rrdhost, st,
- rr->start_streaming, rr->after, rr->before);
-
- netdata_thread_enable_cancelability();
-
- // enable normal streaming if we have to
- if (start_streaming) {
- debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
- rrdhost_hostname(s->host), rrdset_id(st));
-
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- }
-
- return 1;
-}
-
static void process_replication_requests(struct sender_state *s) {
- size_t entries = dictionary_entries(s->replication_requests);
+ if(dictionary_entries(s->replication_requests) == 0)
+ return;
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, (NETDATA_DOUBLE)entries);
+ struct replication_request *rr;
+ dfe_start_write(s->replication_requests, rr) {
+ size_t used_percent = sender_buffer_used_percent(s);
+ if(used_percent > 50) break;
+
+ // delete it from the dictionary
+ // the current item is referenced - it will not go away until the next iteration of the dfe loop
+ dictionary_del(s->replication_requests, rr_dfe.name);
+
+ // find the chart
+ RRDSET *st = rrdset_find(s->host, rr_dfe.name);
+ if(unlikely(!st)) {
+ internal_error(true,
+ "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
+ , rrdhost_hostname(s->host), s->connected_to, rr_dfe.name);
+ continue;
+ }
- if(!entries) {
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0);
- return;
- }
+ netdata_thread_disable_cancelability();
- s->replication_min_time = 0;
+ // send the replication data
+ bool start_streaming = replicate_chart_response(st->rrdhost, st,
+ rr->start_streaming, rr->after, rr->before);
- int count = dictionary_sorted_walkthrough_rw(s->replication_requests, DICTIONARY_LOCK_WRITE,
- process_one_replication_request, s,
- replication_request_compar);
+ netdata_thread_enable_cancelability();
- if(count != 0 && s->replication_min_time && s->replication_first_time) {
- time_t now = now_realtime_sec();
- if(now > s->replication_first_time && now >= s->replication_min_time) {
- time_t completed = s->replication_min_time - s->replication_first_time;
- time_t all_duration = now - s->replication_first_time;
+ // enable normal streaming if we have to
+ if (start_streaming) {
+ debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
+ rrdhost_hostname(s->host), rrdset_id(st));
- NETDATA_DOUBLE percent = (NETDATA_DOUBLE) completed * 100.0 / (NETDATA_DOUBLE) all_duration;
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, percent);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
}
}
-
- worker_is_idle();
+ dfe_done(rr);
}
void *rrdpush_sender_thread(void *ptr) {
@@ -1233,17 +1171,9 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
- worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
- worker_register_job_name(WORKER_SENDER_JOB_REPLAY_RESPONSE, "replay response");
- worker_register_job_name(WORKER_SENDER_JOB_FUNCTION, "function");
-
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENTAL);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENTAL);
- worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
- worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, "replications pending", "commands", WORKER_METRIC_ABSOLUTE);
-
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0);
struct sender_state *s = ptr;
s->tid = gettid();