summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2022-10-31 19:53:20 +0200
committerGitHub <noreply@github.com>2022-10-31 19:53:20 +0200
commit282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch)
treeb23e108b35adc8ed322e8167d0f1fe607c2cfa4c /streaming/sender.c
parentdf87a538cfaba5014a752937714756b7c5d30c93 (diff)
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)" This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7. * Profile plugin * Fix macos static thread * Add support for replication - Add a new capability for replication, when not supported the agent should behave as previously. - When replication is supported, the text protocol supports the following new commands: - CHART_DEFINITION_END: send the first/last entry of the child - REPLAY_RRDSET_BEGIN: sends the name of the chart we are replicating - REPLAY_RRDSET_HEADER: sends a line describing the columns of the following command (ie. start-time, end-time, dim1-name, ...) - REPLAY_RRDSET_DONE: sends values to push for a specific start/end time - REPLAY_RRDSET_END: send the (a) update every of the chart, (b) first/last entries in DB, (c) whether the child's been told to start streaming, (d) original after/before period to replicate. - REPLAY_CHART: Sent from a parent to a child, specifying (a) the chart name we want data for, (b) whether the child should start streaming once it has fullfilled the request with the aforementioned commands, (c) after/before of the data the parent wants - As a consequence of the new protocol, streaming is disabled for all charts on a new connection. It's enabled once replication is finished. - The configuration parameters are specified from within stream.conf: - "enable replication = yes|no" - "seconds to replicate = 3600" - "replication step = 600" (ie. how many seconds to fill per roundtrip request. * Minor fixes - quote set and dim ids - start streaming after writing replicated data to the buffer - write replicated data only when buffer is less than 50% full. - use reentrant iteration for charts * Do not send chart definitions on connection. * Track replication status through rrdset flags. * Add debug flag for noisy log messages. * Add license notice. * Iterate charts with reentrant loop * Set replication finished flag when streaming is disabled. * Revert "Profile plugin" This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d. Used only for testing purposes. * Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"" This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d. Reapply commit that I had to revert in order to be able to build the agent on MacOS. * Build replication source files with CMake. * Pass number of words in plugind functions. * Use get_word instead of indexing words. * Use size_t instead of int. * Pay only what we use when splitting words. * no need to redefine PLUGINSD_MAX_WORDS * fix formatting warning * all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart * keep a sender dictionary with all the replication commands received and remove replication commands from charts * do not replicate future data * use last_updated to find the end of the db * uniformity of replication logs * rewrite of the query logic * replication.c in C; debug info in human readable dates * update the chart on every replication row * update all chart members so that rrdset_done() can continue * update the protocol to push one dimension per line and transfer data collection state to parent * fix formatting * remove replication object from pluginsd * shorter communication * fix typo * support for replication proxies * proper use of flags * set receiver replication finished flag on charts created after the sender has been connected * clear RRDSET_FLAG_SYNC_CLOCK on replicated charts * log storing of nulls * log first store * log update every switches * test ignoring timestamps but sending a point just after replication end * replication should work on end_time * use replicated timestamps * at the final replication step, replicate all the remaining points * cleanup code from tests * print timestamps as unsigned long long * more formating changes; fix conflicting type of replicate_chart_response() * updated stream.conf * always respond to replication requests * in non-dbengine db modes, do not replicate more than the database size * advance the db pointer of legacy db modes * should be multiplied by update_every * fix buggy label parsing - identified by codacy * dont log error on history mismatches for db mode dbengine * allow SSL requests to streaming children * dont use ssl variable Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c236
1 files changed, 163 insertions, 73 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 6759e9c983..20d57b7135 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -31,6 +31,12 @@ extern int netdata_use_ssl_on_stream;
extern char *netdata_ssl_ca_path;
extern char *netdata_ssl_ca_file;
+struct replication_request {
+ bool start_streaming;
+ time_t after;
+ time_t before;
+};
+
static __thread BUFFER *sender_thread_buffer = NULL;
static __thread bool sender_thread_buffer_used = false;
@@ -94,6 +100,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
+ if(unlikely(s->host->sender->buffer->max_size < (buffer_strlen(wb) + 1) * 2)) {
+ error("STREAM %s [send to %s]: max buffer size of %zu is too small for data of size %zu. Increasing the max buffer size to twice the max data size.",
+ rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1);
+
+ s->host->sender->buffer->max_size = (buffer_strlen(wb) + 1) * 2;
+ }
+
#ifdef ENABLE_COMPRESSION
if (s->flags & SENDER_FLAG_COMPRESSION && s->compressor) {
while(src_len) {
@@ -213,10 +226,24 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
// resets all the chart, so that their definitions
// will be resent to the central netdata
static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
+ error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host));
+
+ bool receive_has_replication = host != localhost && host->receiver && stream_has_capability(host->receiver, STREAM_CAP_REPLICATION);
+ bool send_has_replication = host->sender && stream_has_capability(host->sender, STREAM_CAP_REPLICATION);
+
RRDSET *st;
rrdset_foreach_read(st, host) {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ if(!receive_has_replication)
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+
+ if(send_has_replication)
+ // it will be enabled once replication is done on the sending side
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
st->upstream_resync_time = 0;
RRDDIM *rd;
@@ -234,6 +261,7 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
rrdpush_sender_thread_reset_all_charts(host);
rrdpush_sender_thread_send_custom_host_variables(host);
+ dictionary_flush(host->sender->replication_requests);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -848,17 +876,19 @@ void execute_commands(struct sender_state *s) {
internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+ size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
- if(words[0] && strcmp(words[0], PLUGINSD_KEYWORD_FUNCTION) == 0) {
- char *transaction = words[1];
- char *timeout_s = words[2];
- char *function = words[3];
+ const char *keyword = get_word(words, num_words, 0);
+
+ if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
- words[0],
+ keyword,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -879,9 +909,32 @@ void execute_commands(struct sender_state *s) {
stream_execute_function_callback(wb, code, tmp);
}
}
- }
- else
+ } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ const char *chart_id = get_word(words, num_words, 1);
+ const char *start_streaming = get_word(words, num_words, 2);
+ const char *after = get_word(words, num_words, 3);
+ const char *before = get_word(words, num_words, 4);
+
+ if (!chart_id || !start_streaming || !after || !before) {
+ error("STREAM %s [send to %s] %s command is incomplete"
+ " (chart=%s, start_streaming=%s, after=%s, before=%s)",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ chart_id ? chart_id : "(unset)",
+ start_streaming ? start_streaming : "(unset)",
+ after ? after : "(unset)",
+ before ? before : "(unset)");
+ } else {
+ struct replication_request tmp = {
+ .start_streaming = !strcmp(start_streaming, "true"),
+ .after = strtoll(after, NULL, 0),
+ .before = strtoll(before, NULL, 0),
+ };
+ dictionary_set(s->replication_requests, chart_id, &tmp, sizeof(struct replication_request));
+ }
+ } else {
error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ }
start = newline + 1;
}
@@ -898,44 +951,9 @@ void execute_commands(struct sender_state *s) {
struct rrdpush_sender_thread_data {
struct sender_state *sender_state;
RRDHOST *host;
- DICTFE dictfe;
- enum {
- SENDING_DEFINITIONS_RESTART,
- SENDING_DEFINITIONS_CONTINUE,
- SENDING_DEFINITIONS_DONE,
- } sending_definitions_status;
char *pipe_buffer;
};
-static size_t cbuffer_available_bytes_with_lock(struct rrdpush_sender_thread_data *thread_data) {
- netdata_mutex_lock(&thread_data->sender_state->mutex);
- size_t outstanding = cbuffer_available_size_unsafe(thread_data->sender_state->host->sender->buffer);
- netdata_mutex_unlock(&thread_data->sender_state->mutex);
- return outstanding;
-}
-
-static void rrdpush_queue_incremental_definitions(struct rrdpush_sender_thread_data *thread_data) {
-
- while(rrdhost_flag_check(thread_data->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED)
- && thread_data->sending_definitions_status != SENDING_DEFINITIONS_DONE
- && cbuffer_available_bytes_with_lock(thread_data) > (thread_data->sender_state->buffer->max_size / 2)) {
-
- if(thread_data->sending_definitions_status == SENDING_DEFINITIONS_RESTART)
- info("STREAM %s [send to %s]: sending metric definitions...", rrdhost_hostname(thread_data->host), thread_data->sender_state->connected_to);
-
- bool more_defs_available = rrdpush_incremental_transmission_of_chart_definitions(
- thread_data->sender_state->host, &thread_data->dictfe,
- thread_data->sending_definitions_status == SENDING_DEFINITIONS_RESTART, false);
-
- if (unlikely(!more_defs_available)) {
- thread_data->sending_definitions_status = SENDING_DEFINITIONS_DONE;
- info("STREAM %s [send to %s]: sending metric definitions finished.", rrdhost_hostname(thread_data->host), thread_data->sender_state->connected_to);
- }
- else
- thread_data->sending_definitions_status = SENDING_DEFINITIONS_CONTINUE;
- }
-}
-
static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
@@ -997,8 +1015,6 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
RRDHOST *host = data->host;
- rrdpush_incremental_transmission_of_chart_definitions(host, &data->dictfe, false, true);
-
netdata_mutex_lock(&host->sender->mutex);
info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host));
@@ -1021,28 +1037,114 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
freez(data);
}
-void sender_init(RRDHOST *parent)
+static void replication_request_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
+ ;
+}
+static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *sender_state) {
+ struct sender_state *s = sender_state;
+ struct replication_request *rr = old_value;
+ struct replication_request *rr_new = new_value;
+
+ error("STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
+ (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
+ (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
+
+ bool updated = false;
+
+ if(rr_new->after < rr->after) {
+ rr->after = rr_new->after;
+ updated = true;
+ }
+
+ if(rr_new->before > rr->before) {
+ rr->before = rr_new->before;
+ updated = true;
+ }
+
+ if(rr_new->start_streaming != rr->start_streaming) {
+ rr->start_streaming = true;
+ updated = true;
+ }
+
+ return updated;
+}
+static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
+ ;
+}
+
+void sender_init(RRDHOST *host)
{
- if (parent->sender)
+ if (host->sender)
return;
- parent->sender = callocz(1, sizeof(*parent->sender));
- parent->sender->host = parent;
- parent->sender->buffer = cbuffer_new(1024, 1024*1024);
- parent->sender->capabilities = STREAM_OUR_CAPABILITIES;
+ host->sender = callocz(1, sizeof(*host->sender));
+ host->sender->host = host;
+ host->sender->buffer = cbuffer_new(1024, 1024 * 1024);
+ host->sender->capabilities = STREAM_OUR_CAPABILITIES;
- parent->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
- parent->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
- parent->sender->rrdpush_sender_socket = -1;
+ host->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
+ host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
+ host->sender->rrdpush_sender_socket = -1;
#ifdef ENABLE_COMPRESSION
if(default_compression_enabled) {
- parent->sender->flags |= SENDER_FLAG_COMPRESSION;
- parent->sender->compressor = create_compressor();
+ host->sender->flags |= SENDER_FLAG_COMPRESSION;
+ host->sender->compressor = create_compressor();
}
#endif
- netdata_mutex_init(&parent->sender->mutex);
+ netdata_mutex_init(&host->sender->mutex);
+
+ host->sender->replication_requests = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_insert_callback(host->sender->replication_requests, replication_request_insert_callback, host->sender);
+ dictionary_register_conflict_callback(host->sender->replication_requests, replication_request_conflict_callback, host->sender);
+ dictionary_register_delete_callback(host->sender->replication_requests, replication_request_delete_callback, host->sender);
+}
+
+static size_t sender_buffer_used_percent(struct sender_state *s) {
+ netdata_mutex_lock(&s->mutex);
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
+ netdata_mutex_unlock(&s->mutex);
+
+ return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size;
+}
+
+static void process_replication_requests(struct sender_state *s) {
+ if(dictionary_entries(s->replication_requests) == 0)
+ return;
+
+ struct replication_request *rr;
+ dfe_start_write(s->replication_requests, rr) {
+ size_t used_percent = sender_buffer_used_percent(s);
+ if(used_percent > 50) break;
+
+ // delete it from the dictionary
+ // the current item is referenced - it will not go away until the next iteration of the dfe loop
+ dictionary_del(s->replication_requests, rr_dfe.name);
+
+ // find the chart
+ RRDSET *st = rrdset_find(s->host, rr_dfe.name);
+ if(unlikely(!st)) {
+ internal_error(true,
+ "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
+ , rrdhost_hostname(s->host), s->connected_to, rr_dfe.name);
+ continue;
+ }
+
+ // send the replication data
+ bool start_streaming = replicate_chart_response(st->rrdhost, st,
+ rr->start_streaming, rr->after, rr->before);
+
+ // enable normal streaming if we have to
+ if (start_streaming) {
+ debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
+ rrdhost_hostname(s->host), rrdset_id(st));
+
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ }
+ }
+ dfe_done(rr);
}
void *rrdpush_sender_thread(void *ptr) {
@@ -1127,7 +1229,6 @@ void *rrdpush_sender_thread(void *ptr) {
thread_data->pipe_buffer = mallocz(pipe_buffer_size);
thread_data->sender_state = s;
thread_data->host = s->host;
- thread_data->sending_definitions_status = SENDING_DEFINITIONS_RESTART;
// reset our cleanup flags
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
@@ -1141,7 +1242,6 @@ void *rrdpush_sender_thread(void *ptr) {
// The connection attempt blocks (after which we use the socket in nonblocking)
if(unlikely(s->rrdpush_sender_socket == -1)) {
worker_is_busy(WORKER_SENDER_JOB_CONNECT);
- thread_data->sending_definitions_status = SENDING_DEFINITIONS_RESTART;
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
s->flags &= ~SENDER_FLAG_OVERFLOW;
s->read_len = 0;
@@ -1151,13 +1251,6 @@ void *rrdpush_sender_thread(void *ptr) {
if(unlikely(!attempt_to_connect(s)))
continue;
- if (stream_has_capability(s, STREAM_CAP_GAP_FILLING)) {
- time_t now = now_realtime_sec();
- BUFFER *wb = sender_start(s);
- buffer_sprintf(wb, "TIMESTAMP %"PRId64"", (int64_t)now);
- sender_commit(s, wb);
- }
-
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
@@ -1178,9 +1271,6 @@ void *rrdpush_sender_thread(void *ptr) {
continue;
}
- if(unlikely(thread_data->sending_definitions_status != SENDING_DEFINITIONS_DONE))
- rrdpush_queue_incremental_definitions(thread_data);
-
netdata_mutex_lock(&s->mutex);
size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
@@ -1191,10 +1281,8 @@ void *rrdpush_sender_thread(void *ptr) {
if(outstanding)
s->send_attempts++;
else {
- if(unlikely(thread_data->sending_definitions_status == SENDING_DEFINITIONS_DONE
- && rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED)
- && !rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS)
- )) {
+ if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED) &&
+ !rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
// let the data collection threads know we are ready to push metrics
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
@@ -1287,6 +1375,8 @@ void *rrdpush_sender_thread(void *ptr) {
execute_commands(s);
}
+ process_replication_requests(s);
+
if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
char *error = NULL;