diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-15 23:49:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-15 23:49:42 +0200 |
commit | becd97a3660af34104c557ba6c2877f624143c2e (patch) | |
tree | 14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /streaming | |
parent | 1789d07c43182152437459a7a4f81267bbdd752c (diff) |
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)"
This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/compression.c | 1 | ||||
-rw-r--r-- | streaming/receiver.c | 25 | ||||
-rw-r--r-- | streaming/replication.c | 16 | ||||
-rw-r--r-- | streaming/rrdpush.h | 2 | ||||
-rw-r--r-- | streaming/sender.c | 184 |
5 files changed, 62 insertions, 166 deletions
diff --git a/streaming/compression.c b/streaming/compression.c index 9fa69c5cd2..1fddc02b91 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -5,7 +5,6 @@ #define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION" -// signature MUST end with a newline #define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) #define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) #define SIGNATURE_SIZE 4 diff --git a/streaming/receiver.c b/streaming/receiver.c index a872642a44..40673f05b4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -240,11 +240,8 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { return 0; } - // for compressed streams, the compression signature header ends with a new line - // so, here we read a single line from the stream. - int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len, &ret)) { + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) { internal_error(true, "read_stream() failed (1)."); return 1; } @@ -287,7 +284,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { } // Fill read buffer with decompressed data - r->read_len += (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len); + r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); return 0; } @@ -727,16 +724,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_set_is_parent_label(++localhost->senders_count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_connected(rpt->host); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); size_t count = streaming_parser(rpt, &cd, fp_in, fp_out, @@ -756,15 +746,6 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_disconnected(rpt->host); #ifdef ENABLE_ACLK diff --git a/streaming/replication.c b/streaming/replication.c index 39a115d44b..ef384f4e8f 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -42,7 +42,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti rrddim_foreach_done(rd); } - time_t now = after + 1, actual_after = 0, actual_before = 0; + time_t now = after, actual_after = 0, actual_before = 0; while(now <= before) { time_t min_start_time = 0, min_end_time = 0; for (size_t i = 0; i < dimensions && data[i].rd; i++) { @@ -252,18 +252,6 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR } #endif -#ifdef NETDATA_INTERNAL_CHECKS - internal_error( - st->replay.after != 0 || st->replay.before != 0, - "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight", - rrdhost_hostname(st->rrdhost), rrdset_id(st) - ); - - st->replay.start_streaming = start_streaming; - st->replay.after = after; - st->replay.before = before; -#endif - debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); @@ -289,7 +277,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST // if replication is disabled, send an empty replication request // asking no data - if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) { + if (!host->rrdpush_enable_replication) { internal_error(true, "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled", rrdhost_hostname(host), rrdset_id(st)); diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 5b277cf7ee..819a94cd27 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -170,8 +170,6 @@ struct sender_state { #endif DICTIONARY *replication_requests; - time_t replication_first_time; - time_t replication_min_time; }; struct receiver_state { diff --git a/streaming/sender.c b/streaming/sender.c index b34132b75c..e0964f9e3a 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -21,14 +21,9 @@ #define WORKER_SENDER_JOB_BUFFER_RATIO 15 #define WORKER_SENDER_JOB_BYTES_RECEIVED 16 #define WORKER_SENDER_JOB_BYTES_SENT 17 -#define WORKER_SENDER_JOB_REPLAY_REQUEST 18 -#define WORKER_SENDER_JOB_REPLAY_RESPONSE 19 -#define WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE 20 -#define WORKER_SENDER_JOB_REPLAY_COMPLETION 21 -#define WORKER_SENDER_JOB_FUNCTION 22 - -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 23 -#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 23 + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 18 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 18 #endif extern struct config stream_config; @@ -86,8 +81,6 @@ static inline void deactivate_compression(struct sender_state *s) { } #endif -#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3 - // Collector thread finishing a transmission void sender_commit(struct sender_state *s, BUFFER *wb) { @@ -107,15 +100,15 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { netdata_mutex_lock(&s->mutex); - if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { - info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", - rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); + if(unlikely(s->host->sender->buffer->max_size < (buffer_strlen(wb) + 1) * 2)) { + error("STREAM %s [send to %s]: max buffer size of %zu is too small for data of size %zu. Increasing the max buffer size to twice the max data size.", + rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1); - s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; + s->host->sender->buffer->max_size = (buffer_strlen(wb) + 1) * 2; } #ifdef ENABLE_COMPRESSION - if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) { + if (s->flags & SENDER_FLAG_COMPRESSION && s->compressor) { while(src_len) { size_t size_to_compress = src_len; @@ -475,7 +468,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p #ifdef ENABLE_COMPRESSION // If we don't want compression, remove it from our capabilities - if(!(s->flags & SENDER_FLAG_COMPRESSION)) + if(!(s->flags & SENDER_FLAG_COMPRESSION) && stream_has_capability(s, STREAM_CAP_COMPRESSION)) s->capabilities &= ~STREAM_CAP_COMPRESSION; #endif // ENABLE_COMPRESSION @@ -671,12 +664,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p return false; #ifdef ENABLE_COMPRESSION - if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) { - if(!s->compressor) - s->compressor = create_compressor(); - else + // if the stream does not have compression capability, + // shut it down for us too. + // FIXME - this means that if there are multiple parents and one of them does not support compression + // we are going to shut it down for all of them eventually... + if(!stream_has_capability(s, STREAM_CAP_COMPRESSION)) + s->flags &= ~SENDER_FLAG_COMPRESSION; + + if(s->flags & SENDER_FLAG_COMPRESSION) { + if(s->compressor) s->compressor->reset(s->compressor); } + else + info("STREAM %s [send to %s]: compression is disabled on this connection.", rrdhost_hostname(host), s->connected_to); + #endif //ENABLE_COMPRESSION log_sender_capabilities(s); @@ -880,8 +881,6 @@ void execute_commands(struct sender_state *s) { const char *keyword = get_word(words, num_words, 0); if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { - worker_is_busy(WORKER_SENDER_JOB_FUNCTION); - char *transaction = get_word(words, num_words, 1); char *timeout_s = get_word(words, num_words, 2); char *function = get_word(words, num_words, 3); @@ -910,10 +909,7 @@ void execute_commands(struct sender_state *s) { stream_execute_function_callback(wb, code, tmp); } } - } - else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { - worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); - + } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { const char *chart_id = get_word(words, num_words, 1); const char *start_streaming = get_word(words, num_words, 2); const char *after = get_word(words, num_words, 3); @@ -936,8 +932,7 @@ void execute_commands(struct sender_state *s) { }; dictionary_set(s->replication_requests, chart_id, &tmp, sizeof(struct replication_request)); } - } - else { + } else { error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); } @@ -1050,12 +1045,10 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item, v struct replication_request *rr = old_value; struct replication_request *rr_new = new_value; - internal_error( - true, - "STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", - rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), - (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false", - (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false"); + error("STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), + (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false", + (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false"); bool updated = false; @@ -1099,8 +1092,6 @@ void sender_init(RRDHOST *host) host->sender->flags |= SENDER_FLAG_COMPRESSION; host->sender->compressor = create_compressor(); } - else - host->sender->flags &= ~SENDER_FLAG_COMPRESSION; #endif netdata_mutex_init(&host->sender->mutex); @@ -1119,98 +1110,45 @@ static size_t sender_buffer_used_percent(struct sender_state *s) { return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size; } -int replication_request_compar(const DICTIONARY_ITEM **item1, const DICTIONARY_ITEM **item2) { - struct replication_request *rr1 = dictionary_acquired_item_value(*item1); - struct replication_request *rr2 = dictionary_acquired_item_value(*item2); - - time_t after1 = rr1->after; - time_t after2 = rr2->after; - - if(after1 < after2) - return -1; - if(after1 > after2) - return 1; - - return 0; -} - -int process_one_replication_request(const DICTIONARY_ITEM *item, void *value, void *data) { - struct sender_state *s = data; - - size_t used_percent = sender_buffer_used_percent(s); - if(used_percent >= 50) return -1; // signal the traversal to stop - - worker_is_busy(WORKER_SENDER_JOB_REPLAY_RESPONSE); - - struct replication_request *rr = value; - const char *name = dictionary_acquired_item_name(item); - - // delete it from the dictionary - // the current item is referenced - it will not go away until the next iteration of the dfe loop - dictionary_del(s->replication_requests, name); - - // find the chart - RRDSET *st = rrdset_find(s->host, name); - if(unlikely(!st)) { - internal_error(true, - "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command." - , rrdhost_hostname(s->host), s->connected_to, name); - return 0; - } - - if(rr->after < s->replication_first_time || !s->replication_first_time) - s->replication_first_time = rr->after; - - if(rr->before < s->replication_min_time || !s->replication_min_time) - s->replication_min_time = rr->before; - - netdata_thread_disable_cancelability(); - - // send the replication data - bool start_streaming = replicate_chart_response(st->rrdhost, st, - rr->start_streaming, rr->after, rr->before); - - netdata_thread_enable_cancelability(); - - // enable normal streaming if we have to - if (start_streaming) { - debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s", - rrdhost_hostname(s->host), rrdset_id(st)); - - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - } - - return 1; -} - static void process_replication_requests(struct sender_state *s) { - size_t entries = dictionary_entries(s->replication_requests); + if(dictionary_entries(s->replication_requests) == 0) + return; - worker_set_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, (NETDATA_DOUBLE)entries); + struct replication_request *rr; + dfe_start_write(s->replication_requests, rr) { + size_t used_percent = sender_buffer_used_percent(s); + if(used_percent > 50) break; + + // delete it from the dictionary + // the current item is referenced - it will not go away until the next iteration of the dfe loop + dictionary_del(s->replication_requests, rr_dfe.name); + + // find the chart + RRDSET *st = rrdset_find(s->host, rr_dfe.name); + if(unlikely(!st)) { + internal_error(true, + "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command." + , rrdhost_hostname(s->host), s->connected_to, rr_dfe.name); + continue; + } - if(!entries) { - worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0); - return; - } + netdata_thread_disable_cancelability(); - s->replication_min_time = 0; + // send the replication data + bool start_streaming = replicate_chart_response(st->rrdhost, st, + rr->start_streaming, rr->after, rr->before); - int count = dictionary_sorted_walkthrough_rw(s->replication_requests, DICTIONARY_LOCK_WRITE, - process_one_replication_request, s, - replication_request_compar); + netdata_thread_enable_cancelability(); - if(count != 0 && s->replication_min_time && s->replication_first_time) { - time_t now = now_realtime_sec(); - if(now > s->replication_first_time && now >= s->replication_min_time) { - time_t completed = s->replication_min_time - s->replication_first_time; - time_t all_duration = now - s->replication_first_time; + // enable normal streaming if we have to + if (start_streaming) { + debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s", + rrdhost_hostname(s->host), rrdset_id(st)); - NETDATA_DOUBLE percent = (NETDATA_DOUBLE) completed * 100.0 / (NETDATA_DOUBLE) all_duration; - worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, percent); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); } } - - worker_is_idle(); + dfe_done(rr); } void *rrdpush_sender_thread(void *ptr) { @@ -1233,17 +1171,9 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake"); - worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request"); - worker_register_job_name(WORKER_SENDER_JOB_REPLAY_RESPONSE, "replay response"); - worker_register_job_name(WORKER_SENDER_JOB_FUNCTION, "function"); - worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENTAL); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENTAL); - worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); - worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, "replications pending", "commands", WORKER_METRIC_ABSOLUTE); - - worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0); struct sender_state *s = ptr; s->tid = gettid(); |