summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2022-10-31 19:53:20 +0200
committerGitHub <noreply@github.com>2022-10-31 19:53:20 +0200
commit282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch)
treeb23e108b35adc8ed322e8167d0f1fe607c2cfa4c /streaming/replication.c
parentdf87a538cfaba5014a752937714756b7c5d30c93 (diff)
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)" This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7. * Profile plugin * Fix macos static thread * Add support for replication - Add a new capability for replication, when not supported the agent should behave as previously. - When replication is supported, the text protocol supports the following new commands: - CHART_DEFINITION_END: send the first/last entry of the child - REPLAY_RRDSET_BEGIN: sends the name of the chart we are replicating - REPLAY_RRDSET_HEADER: sends a line describing the columns of the following command (ie. start-time, end-time, dim1-name, ...) - REPLAY_RRDSET_DONE: sends values to push for a specific start/end time - REPLAY_RRDSET_END: send the (a) update every of the chart, (b) first/last entries in DB, (c) whether the child's been told to start streaming, (d) original after/before period to replicate. - REPLAY_CHART: Sent from a parent to a child, specifying (a) the chart name we want data for, (b) whether the child should start streaming once it has fullfilled the request with the aforementioned commands, (c) after/before of the data the parent wants - As a consequence of the new protocol, streaming is disabled for all charts on a new connection. It's enabled once replication is finished. - The configuration parameters are specified from within stream.conf: - "enable replication = yes|no" - "seconds to replicate = 3600" - "replication step = 600" (ie. how many seconds to fill per roundtrip request. * Minor fixes - quote set and dim ids - start streaming after writing replicated data to the buffer - write replicated data only when buffer is less than 50% full. - use reentrant iteration for charts * Do not send chart definitions on connection. * Track replication status through rrdset flags. * Add debug flag for noisy log messages. * Add license notice. * Iterate charts with reentrant loop * Set replication finished flag when streaming is disabled. * Revert "Profile plugin" This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d. Used only for testing purposes. * Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"" This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d. Reapply commit that I had to revert in order to be able to build the agent on MacOS. * Build replication source files with CMake. * Pass number of words in plugind functions. * Use get_word instead of indexing words. * Use size_t instead of int. * Pay only what we use when splitting words. * no need to redefine PLUGINSD_MAX_WORDS * fix formatting warning * all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart * keep a sender dictionary with all the replication commands received and remove replication commands from charts * do not replicate future data * use last_updated to find the end of the db * uniformity of replication logs * rewrite of the query logic * replication.c in C; debug info in human readable dates * update the chart on every replication row * update all chart members so that rrdset_done() can continue * update the protocol to push one dimension per line and transfer data collection state to parent * fix formatting * remove replication object from pluginsd * shorter communication * fix typo * support for replication proxies * proper use of flags * set receiver replication finished flag on charts created after the sender has been connected * clear RRDSET_FLAG_SYNC_CLOCK on replicated charts * log storing of nulls * log first store * log update every switches * test ignoring timestamps but sending a point just after replication end * replication should work on end_time * use replicated timestamps * at the final replication step, replicate all the remaining points * cleanup code from tests * print timestamps as unsigned long long * more formating changes; fix conflicting type of replicate_chart_response() * updated stream.conf * always respond to replication requests * in non-dbengine db modes, do not replicate more than the database size * advance the db pointer of legacy db modes * should be multiplied by update_every * fix buggy label parsing - identified by codacy * dont log error on history mismatches for db mode dbengine * allow SSL requests to streaming children * dont use ssl variable Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c344
1 files changed, 344 insertions, 0 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
new file mode 100644
index 0000000000..ebfd79031f
--- /dev/null
+++ b/streaming/replication.c
@@ -0,0 +1,344 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "replication.h"
+
+static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
+ size_t dimensions = rrdset_number_of_dimensions(st);
+
+ struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
+
+ struct {
+ DICTIONARY *dict;
+ const DICTIONARY_ITEM *rda;
+ RRDDIM *rd;
+ struct storage_engine_query_handle handle;
+ STORAGE_POINT sp;
+ } data[dimensions];
+
+ memset(data, 0, sizeof(data));
+
+ if(enable_streaming && st->last_updated.tv_sec > before) {
+ internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu",
+ rrdset_id(st),
+ (unsigned long long)before,
+ (unsigned long long)st->last_updated.tv_sec
+ );
+ before = st->last_updated.tv_sec;
+ }
+
+ // prepare our array of dimensions
+ {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if (rd_dfe.counter >= dimensions)
+ break;
+
+ data[rd_dfe.counter].dict = rd_dfe.dict;
+ data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ data[rd_dfe.counter].rd = rd;
+
+ ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
+ }
+ rrddim_foreach_done(rd);
+ }
+
+ time_t now = after, actual_after = 0, actual_before = 0;
+ while(now <= before) {
+ time_t min_start_time = 0, min_end_time = 0;
+ for (size_t i = 0; i < dimensions && data[i].rd; i++) {
+ // fetch the first valid point for the dimension
+ int max_skip = 100;
+ while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
+ data[i].sp = ops->next_metric(&data[i].handle);
+
+ if(max_skip <= 0)
+ error("REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long)now);
+
+ if(data[i].sp.end_time < now)
+ continue;
+
+ if(!min_start_time) {
+ min_start_time = data[i].sp.start_time;
+ min_end_time = data[i].sp.end_time;
+ }
+ else {
+ min_start_time = MIN(min_start_time, data[i].sp.start_time);
+ min_end_time = MIN(min_end_time, data[i].sp.end_time);
+ }
+ }
+
+ if(min_end_time < now) {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
+ break;
+ }
+
+ if(min_end_time <= min_start_time)
+ min_start_time = min_end_time - st->update_every;
+
+ if(!actual_after) {
+ actual_after = min_end_time;
+ actual_before = min_end_time;
+ }
+ else
+ actual_before = min_end_time;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu\n"
+ , (unsigned long long)min_start_time
+ , (unsigned long long)min_end_time);
+
+ // output the replay values for this time
+ for (size_t i = 0; i < dimensions && data[i].rd; i++) {
+ if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT_AUTO " \"%s\"\n",
+ rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
+ else
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
+ rrddim_id(data[i].rd));
+ }
+
+ now = min_end_time + 1;
+ }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(actual_after) {
+ char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
+ log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
+ log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
+ (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
+ }
+ else
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)after, (unsigned long long)before);
+#endif
+
+ // release all the dictionary items acquired
+ // finalize the queries
+ for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
+ ops->finalize(&data[i].handle);
+ dictionary_acquired_item_release(data[i].dict, data[i].rda);
+ }
+
+ return before;
+}
+
+static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT_AUTO " " NETDATA_DOUBLE_FORMAT_AUTO "\n",
+ rrddim_id(rd),
+ (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
+ rd->last_collected_value,
+ rd->last_calculated_value,
+ rd->last_stored_value
+ );
+ }
+ rrddim_foreach_done(rd);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu " TOTAL_NUMBER_FORMAT " " TOTAL_NUMBER_FORMAT "\n",
+ (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
+ (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec,
+ st->last_collected_total,
+ st->collected_total
+ );
+}
+
+bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+ time_t query_after = after;
+ time_t query_before = before;
+ time_t now = now_realtime_sec();
+
+ // find the first entry we have
+ time_t first_entry_local = rrdset_first_entry_t(st);
+ if(first_entry_local > now) {
+ internal_error(true,
+ "RRDSET: '%s' first time %llu is in the future (now is %llu)",
+ rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
+ first_entry_local = now;
+ }
+
+ if (query_after < first_entry_local)
+ query_after = first_entry_local;
+
+ // find the latest entry we have
+ time_t last_entry_local = st->last_updated.tv_sec;
+ if(last_entry_local > now) {
+ internal_error(true,
+ "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
+ rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+ last_entry_local = now;
+ }
+
+ if (query_before > last_entry_local)
+ query_before = last_entry_local;
+
+ // if the parent asked us to start streaming, then fill the rest with the data that we have
+ if (start_streaming)
+ query_before = last_entry_local;
+
+ if (query_after > query_before) {
+ time_t tmp = query_before;
+ query_before = query_after;
+ query_after = tmp;
+ }
+
+ bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
+
+ // we might want to optimize this by filling a temporary buffer
+ // and copying the result to the host's buffer in order to avoid
+ // holding the host's buffer lock for too long
+ BUFFER *wb = sender_start(host->sender);
+ {
+ // pass the original after/before so that the parent knows about
+ // which time range we responded
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
+
+ if(after != 0 && before != 0)
+ before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming);
+ else {
+ after = 0;
+ before = 0;
+ enable_streaming = true;
+ }
+
+ if(enable_streaming)
+ replicate_chart_collection_state(wb, st);
+
+ // end with first/last entries we have, and the first start time and
+ // last end time of the data we sent
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n",
+ (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local,
+ enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+ }
+ sender_commit(host->sender, wb);
+
+ return enable_streaming;
+}
+
+static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(after && before) {
+ char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1];
+ log_date(after_buf, LOG_DATE_LENGTH, after);
+ log_date(before_buf, LOG_DATE_LENGTH, before);
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)after, after_buf, (unsigned long long)before, before_buf,
+ start_streaming?"true":"false");
+ }
+ else {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ start_streaming?"true":"false");
+ }
+#endif
+
+ debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ rrdset_id(st), start_streaming ? "true" : "false",
+ (unsigned long long)after, (unsigned long long)before);
+
+ int ret = callback(buffer, callback_data);
+ if (ret < 0) {
+ error("failed to send replay request to child (ret=%d)", ret);
+ return false;
+ }
+
+ return true;
+}
+
+bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
+ time_t first_entry_child, time_t last_entry_child,
+ time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
+{
+ time_t now = now_realtime_sec();
+
+ // if replication is disabled, send an empty replication request
+ // asking no data
+ if (!host->rrdpush_enable_replication) {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ // Child has no stored data
+ if (!last_entry_child) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ // Nothing to get if the chart has not dimensions
+ if (!rrdset_number_of_dimensions(st)) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ // if the child's first/last entries are nonsensical, resume streaming
+ // without asking for any data
+ if (first_entry_child <= 0) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ if (first_entry_child > last_entry_child) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ time_t last_entry_local = rrdset_last_entry_t(st);
+ if(last_entry_local > now) {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+ last_entry_local = now;
+ }
+
+ // should never happen but it if does, start streaming without asking
+ // for any data
+ if (last_entry_local > last_entry_child) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ time_t first_entry_wanted;
+ if (prev_first_entry_wanted && prev_last_entry_wanted) {
+ first_entry_wanted = prev_last_entry_wanted;
+ if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate)
+ first_entry_wanted = now - host->rrdpush_seconds_to_replicate;
+ }
+ else
+ first_entry_wanted = MAX(last_entry_local, first_entry_child);
+
+ time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step;
+ last_entry_wanted = MIN(last_entry_wanted, last_entry_child);
+
+ bool start_streaming = (last_entry_wanted == last_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted);
+}