summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-15 23:49:42 +0200
committerGitHub <noreply@github.com>2022-11-15 23:49:42 +0200
commitbecd97a3660af34104c557ba6c2877f624143c2e (patch)
tree14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /streaming
parent1789d07c43182152437459a7a4f81267bbdd752c (diff)
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)" This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/compression.c1
-rw-r--r--streaming/receiver.c25
-rw-r--r--streaming/replication.c16
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c184
5 files changed, 62 insertions, 166 deletions
diff --git a/streaming/compression.c b/streaming/compression.c
index 9fa69c5cd2..1fddc02b91 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -5,7 +5,6 @@
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
-// signature MUST end with a newline
#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define SIGNATURE_SIZE 4
diff --git a/streaming/receiver.c b/streaming/receiver.c
index a872642a44..40673f05b4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -240,11 +240,8 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
return 0;
}
- // for compressed streams, the compression signature header ends with a new line
- // so, here we read a single line from the stream.
-
int ret = 0;
- if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len, &ret)) {
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) {
internal_error(true, "read_stream() failed (1).");
return 1;
}
@@ -287,7 +284,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
}
// Fill read buffer with decompressed data
- r->read_len += (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len);
+ r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer));
return 0;
}
@@ -727,16 +724,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_set_is_parent_label(++localhost->senders_count);
- if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
- RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- }
- rrdset_foreach_done(st);
- }
-
rrdcontext_host_child_connected(rpt->host);
+
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
size_t count = streaming_parser(rpt, &cd, fp_in, fp_out,
@@ -756,15 +746,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
rpt->hostname, rpt->client_ip, rpt->client_port, count);
- if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
- RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- }
- rrdset_foreach_done(st);
- }
-
rrdcontext_host_child_disconnected(rpt->host);
#ifdef ENABLE_ACLK
diff --git a/streaming/replication.c b/streaming/replication.c
index 39a115d44b..ef384f4e8f 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -42,7 +42,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
rrddim_foreach_done(rd);
}
- time_t now = after + 1, actual_after = 0, actual_before = 0;
+ time_t now = after, actual_after = 0, actual_before = 0;
while(now <= before) {
time_t min_start_time = 0, min_end_time = 0;
for (size_t i = 0; i < dimensions && data[i].rd; i++) {
@@ -252,18 +252,6 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR
}
#endif
-#ifdef NETDATA_INTERNAL_CHECKS
- internal_error(
- st->replay.after != 0 || st->replay.before != 0,
- "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight",
- rrdhost_hostname(st->rrdhost), rrdset_id(st)
- );
-
- st->replay.start_streaming = start_streaming;
- st->replay.after = after;
- st->replay.before = before;
-#endif
-
debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
@@ -289,7 +277,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
// if replication is disabled, send an empty replication request
// asking no data
- if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) {
+ if (!host->rrdpush_enable_replication) {
internal_error(true,
"REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
rrdhost_hostname(host), rrdset_id(st));
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 5b277cf7ee..819a94cd27 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -170,8 +170,6 @@ struct sender_state {
#endif
DICTIONARY *replication_requests;
- time_t replication_first_time;
- time_t replication_min_time;
};
struct receiver_state {
diff --git a/streaming/sender.c b/streaming/sender.c
index b34132b75c..e0964f9e3a 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -21,14 +21,9 @@
#define WORKER_SENDER_JOB_BUFFER_RATIO 15
#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
#define WORKER_SENDER_JOB_BYTES_SENT 17
-#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
-#define WORKER_SENDER_JOB_REPLAY_RESPONSE 19
-#define WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE 20
-#define WORKER_SENDER_JOB_REPLAY_COMPLETION 21
-#define WORKER_SENDER_JOB_FUNCTION 22
-
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 23
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 23
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 18
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 18
#endif
extern struct config stream_config;
@@ -86,8 +81,6 @@ static inline void deactivate_compression(struct sender_state *s) {
}
#endif
-#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
-
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s, BUFFER *wb) {
@@ -107,15 +100,15 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
- if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
- info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
- rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
+ if(unlikely(s->host->sender->buffer->max_size < (buffer_strlen(wb) + 1) * 2)) {
+ error("STREAM %s [send to %s]: max buffer size of %zu is too small for data of size %zu. Increasing the max buffer size to twice the max data size.",
+ rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1);
- s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
+ s->host->sender->buffer->max_size = (buffer_strlen(wb) + 1) * 2;
}
#ifdef ENABLE_COMPRESSION
- if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
+ if (s->flags & SENDER_FLAG_COMPRESSION && s->compressor) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -475,7 +468,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
- if(!(s->flags & SENDER_FLAG_COMPRESSION))
+ if(!(s->flags & SENDER_FLAG_COMPRESSION) && stream_has_capability(s, STREAM_CAP_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
#endif // ENABLE_COMPRESSION
@@ -671,12 +664,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
#ifdef ENABLE_COMPRESSION
- if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
- if(!s->compressor)
- s->compressor = create_compressor();
- else
+ // if the stream does not have compression capability,
+ // shut it down for us too.
+ // FIXME - this means that if there are multiple parents and one of them does not support compression
+ // we are going to shut it down for all of them eventually...
+ if(!stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ s->flags &= ~SENDER_FLAG_COMPRESSION;
+
+ if(s->flags & SENDER_FLAG_COMPRESSION) {
+ if(s->compressor)
s->compressor->reset(s->compressor);
}
+ else
+ info("STREAM %s [send to %s]: compression is disabled on this connection.", rrdhost_hostname(host), s->connected_to);
+
#endif //ENABLE_COMPRESSION
log_sender_capabilities(s);
@@ -880,8 +881,6 @@ void execute_commands(struct sender_state *s) {
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
- worker_is_busy(WORKER_SENDER_JOB_FUNCTION);
-
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
@@ -910,10 +909,7 @@ void execute_commands(struct sender_state *s) {
stream_execute_function_callback(wb, code, tmp);
}
}
- }
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
- worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
-
+ } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
const char *chart_id = get_word(words, num_words, 1);
const char *start_streaming = get_word(words, num_words, 2);
const char *after = get_word(words, num_words, 3);
@@ -936,8 +932,7 @@ void execute_commands(struct sender_state *s) {
};
dictionary_set(s->replication_requests, chart_id, &tmp, sizeof(struct replication_request));
}
- }
- else {
+ } else {
error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
}
@@ -1050,12 +1045,10 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item, v
struct replication_request *rr = old_value;
struct replication_request *rr_new = new_value;
- internal_error(
- true,
- "STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
- (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
- (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
+ error("STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
+ (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
+ (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
bool updated = false;
@@ -1099,8 +1092,6 @@ void sender_init(RRDHOST *host)
host->sender->flags |= SENDER_FLAG_COMPRESSION;
host->sender->compressor = create_compressor();
}
- else
- host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
netdata_mutex_init(&host->sender->mutex);
@@ -1119,98 +1110,45 @@ static size_t sender_buffer_used_percent(struct sender_state *s) {
return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size;
}
-int replication_request_compar(const DICTIONARY_ITEM **item1, const DICTIONARY_ITEM **item2) {
- struct replication_request *rr1 = dictionary_acquired_item_value(*item1);
- struct replication_request *rr2 = dictionary_acquired_item_value(*item2);
-
- time_t after1 = rr1->after;
- time_t after2 = rr2->after;
-
- if(after1 < after2)
- return -1;
- if(after1 > after2)
- return 1;
-
- return 0;
-}
-
-int process_one_replication_request(const DICTIONARY_ITEM *item, void *value, void *data) {
- struct sender_state *s = data;
-
- size_t used_percent = sender_buffer_used_percent(s);
- if(used_percent >= 50) return -1; // signal the traversal to stop
-
- worker_is_busy(WORKER_SENDER_JOB_REPLAY_RESPONSE);
-
- struct replication_request *rr = value;
- const char *name = dictionary_acquired_item_name(item);
-
- // delete it from the dictionary
- // the current item is referenced - it will not go away until the next iteration of the dfe loop
- dictionary_del(s->replication_requests, name);
-
- // find the chart
- RRDSET *st = rrdset_find(s->host, name);
- if(unlikely(!st)) {
- internal_error(true,
- "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
- , rrdhost_hostname(s->host), s->connected_to, name);
- return 0;
- }
-
- if(rr->after < s->replication_first_time || !s->replication_first_time)
- s->replication_first_time = rr->after;
-
- if(rr->before < s->replication_min_time || !s->replication_min_time)
- s->replication_min_time = rr->before;
-
- netdata_thread_disable_cancelability();
-
- // send the replication data
- bool start_streaming = replicate_chart_response(st->rrdhost, st,
- rr->start_streaming, rr->after, rr->before);
-
- netdata_thread_enable_cancelability();
-
- // enable normal streaming if we have to
- if (start_streaming) {
- debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
- rrdhost_hostname(s->host), rrdset_id(st));
-
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- }
-
- return 1;
-}
-
static void process_replication_requests(struct sender_state *s) {
- size_t entries = dictionary_entries(s->replication_requests);
+ if(dictionary_entries(s->replication_requests) == 0)
+ return;
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, (NETDATA_DOUBLE)entries);
+ struct replication_request *rr;
+ dfe_start_write(s->replication_requests, rr) {
+ size_t used_percent = sender_buffer_used_percent(s);
+ if(used_percent > 50) break;
+
+ // delete it from the dictionary
+ // the current item is referenced - it will not go away until the next iteration of the dfe loop
+ dictionary_del(s->replication_requests, rr_dfe.name);
+
+ // find the chart
+ RRDSET *st = rrdset_find(s->host, rr_dfe.name);
+ if(unlikely(!st)) {
+ internal_error(true,
+ "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
+ , rrdhost_hostname(s->host), s->connected_to, rr_dfe.name);
+ continue;
+ }
- if(!entries) {
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0);
- return;
- }
+ netdata_thread_disable_cancelability();
- s->replication_min_time = 0;
+ // send the replication data
+ bool start_streaming = replicate_chart_response(st->rrdhost, st,
+ rr->start_streaming, rr->after, rr->before);
- int count = dictionary_sorted_walkthrough_rw(s->replication_requests, DICTIONARY_LOCK_WRITE,
- process_one_replication_request, s,
- replication_request_compar);
+ netdata_thread_enable_cancelability();
- if(count != 0 && s->replication_min_time && s->replication_first_time) {
- time_t now = now_realtime_sec();
- if(now > s->replication_first_time && now >= s->replication_min_time) {
- time_t completed = s->replication_min_time - s->replication_first_time;
- time_t all_duration = now - s->replication_first_time;
+ // enable normal streaming if we have to
+ if (start_streaming) {
+ debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
+ rrdhost_hostname(s->host), rrdset_id(st));
- NETDATA_DOUBLE percent = (NETDATA_DOUBLE) completed * 100.0 / (NETDATA_DOUBLE) all_duration;
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, percent);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
}
}
-
- worker_is_idle();
+ dfe_done(rr);
}
void *rrdpush_sender_thread(void *ptr) {
@@ -1233,17 +1171,9 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
- worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
- worker_register_job_name(WORKER_SENDER_JOB_REPLAY_RESPONSE, "replay response");
- worker_register_job_name(WORKER_SENDER_JOB_FUNCTION, "function");
-
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENTAL);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENTAL);
- worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
- worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, "replications pending", "commands", WORKER_METRIC_ABSOLUTE);
-
- worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0);
struct sender_state *s = ptr;
s->tid = gettid();