summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2022-10-31 19:53:20 +0200
committerGitHub <noreply@github.com>2022-10-31 19:53:20 +0200
commit282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch)
treeb23e108b35adc8ed322e8167d0f1fe607c2cfa4c /streaming
parentdf87a538cfaba5014a752937714756b7c5d30c93 (diff)
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)" This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7. * Profile plugin * Fix macos static thread * Add support for replication - Add a new capability for replication, when not supported the agent should behave as previously. - When replication is supported, the text protocol supports the following new commands: - CHART_DEFINITION_END: send the first/last entry of the child - REPLAY_RRDSET_BEGIN: sends the name of the chart we are replicating - REPLAY_RRDSET_HEADER: sends a line describing the columns of the following command (ie. start-time, end-time, dim1-name, ...) - REPLAY_RRDSET_DONE: sends values to push for a specific start/end time - REPLAY_RRDSET_END: send the (a) update every of the chart, (b) first/last entries in DB, (c) whether the child's been told to start streaming, (d) original after/before period to replicate. - REPLAY_CHART: Sent from a parent to a child, specifying (a) the chart name we want data for, (b) whether the child should start streaming once it has fullfilled the request with the aforementioned commands, (c) after/before of the data the parent wants - As a consequence of the new protocol, streaming is disabled for all charts on a new connection. It's enabled once replication is finished. - The configuration parameters are specified from within stream.conf: - "enable replication = yes|no" - "seconds to replicate = 3600" - "replication step = 600" (ie. how many seconds to fill per roundtrip request. * Minor fixes - quote set and dim ids - start streaming after writing replicated data to the buffer - write replicated data only when buffer is less than 50% full. - use reentrant iteration for charts * Do not send chart definitions on connection. * Track replication status through rrdset flags. * Add debug flag for noisy log messages. * Add license notice. * Iterate charts with reentrant loop * Set replication finished flag when streaming is disabled. * Revert "Profile plugin" This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d. Used only for testing purposes. * Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"" This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d. Reapply commit that I had to revert in order to be able to build the agent on MacOS. * Build replication source files with CMake. * Pass number of words in plugind functions. * Use get_word instead of indexing words. * Use size_t instead of int. * Pay only what we use when splitting words. * no need to redefine PLUGINSD_MAX_WORDS * fix formatting warning * all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart * keep a sender dictionary with all the replication commands received and remove replication commands from charts * do not replicate future data * use last_updated to find the end of the db * uniformity of replication logs * rewrite of the query logic * replication.c in C; debug info in human readable dates * update the chart on every replication row * update all chart members so that rrdset_done() can continue * update the protocol to push one dimension per line and transfer data collection state to parent * fix formatting * remove replication object from pluginsd * shorter communication * fix typo * support for replication proxies * proper use of flags * set receiver replication finished flag on charts created after the sender has been connected * clear RRDSET_FLAG_SYNC_CLOCK on replicated charts * log storing of nulls * log first store * log update every switches * test ignoring timestamps but sending a point just after replication end * replication should work on end_time * use replicated timestamps * at the final replication step, replicate all the remaining points * cleanup code from tests * print timestamps as unsigned long long * more formating changes; fix conflicting type of replicate_chart_response() * updated stream.conf * always respond to replication requests * in non-dbengine db modes, do not replicate more than the database size * advance the db pointer of legacy db modes * should be multiplied by update_every * fix buggy label parsing - identified by codacy * dont log error on history mismatches for db mode dbengine * allow SSL requests to streaming children * dont use ssl variable Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c136
-rw-r--r--streaming/replication.c344
-rw-r--r--streaming/replication.h17
-rw-r--r--streaming/rrdpush.c88
-rw-r--r--streaming/rrdpush.h18
-rw-r--r--streaming/sender.c236
-rw-r--r--streaming/stream.conf53
7 files changed, 661 insertions, 231 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 36c085062d..9ffada2417 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -65,99 +65,43 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
#include "collectors/plugins.d/pluginsd_parser.h"
-PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(plugins_action);
- char *remote_time_txt = words[1];
- time_t remote_time = 0;
- RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
- struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
- if (!(cd->capabilities & STREAM_CAP_GAP_FILLING)) {
- error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->capabilities);
- return PARSER_RC_OK; // Ignore error and continue stream
- }
- if (remote_time_txt && *remote_time_txt) {
- remote_time = str2ull(remote_time_txt);
- time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host);
- time_t gap = 0;
- if (prev == 0)
- info(
- "STREAM %s from %s: Initial connection (no gap to check), "
- "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"",
- rrdhost_hostname(host),
- cd->cmd,
- (int64_t)remote_time,
- (int64_t)now,
- (int64_t)now - remote_time);
- else {
- gap = now - prev;
- info(
- "STREAM %s from %s: Checking for gaps... "
- "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap",
- rrdhost_hostname(host),
- cd->cmd,
- (int64_t)remote_time,
- (int64_t)prev,
- (int64_t)now,
- (int64_t)(remote_time - now),
- (int64_t)gap);
- }
- char message[128];
- sprintf(
- message,
- "REPLICATE %"PRId64" %"PRId64"\n",
- (int64_t)(remote_time - gap),
- (int64_t)remote_time);
- int ret;
-#ifdef ENABLE_HTTPS
- SSL *conn = host->receiver->ssl.conn ;
- if(conn && !host->receiver->ssl.flags) {
- ret = SSL_write(conn, message, strlen(message));
- } else {
- ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
- }
-#else
- ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
-#endif
- if (ret != (int)strlen(message))
- error("Failed to send initial timestamp - gaps may appear in charts");
- return PARSER_RC_OK;
- }
- return PARSER_RC_ERROR;
-}
-PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action)
-{
- UNUSED(plugins_action);
+ const char *host_uuid_str = get_word(words, num_words, 1);
+ const char *claim_id_str = get_word(words, num_words, 2);
- uuid_t uuid;
- RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
-
- if (!words[1] || !words[2]) {
- error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", words[1]?words[1]:"[unset]", words[2]?words[2]:"[unset]");
+ if (!host_uuid_str || !claim_id_str) {
+ error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
+ host_uuid_str ? host_uuid_str : "[unset]",
+ claim_id_str ? claim_id_str : "[unset]");
return PARSER_RC_ERROR;
}
+ uuid_t uuid;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
// We don't need the parsed UUID
// just do it to check the format
- if(uuid_parse(words[1], uuid)) {
- error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]);
+ if(uuid_parse(host_uuid_str, uuid)) {
+ error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
return PARSER_RC_ERROR;
}
- if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) {
- error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]);
+ if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) {
+ error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
return PARSER_RC_ERROR;
}
- if(strcmp(words[1], host->machine_guid)) {
- error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid);
+ if(strcmp(host_uuid_str, host->machine_guid)) {
+ error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
return PARSER_RC_OK; //the message is OK problem must be somewhere else
}
rrdhost_aclk_state_lock(host);
if (host->aclk_state.claimed_id)
freez(host->aclk_state.claimed_id);
- host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
+ host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL);
@@ -400,7 +344,7 @@ static void streaming_parser_thread_cleanup(void *ptr) {
parser_destroy(parser);
}
-size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out) {
+static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out, void *ssl) {
size_t result;
PARSER_USER_OBJECT user = {
@@ -411,7 +355,7 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
.trust_durations = 1
};
- PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT);
+ PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT, ssl);
rrd_collector_started();
@@ -419,7 +363,6 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser);
- parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
user.parser = parser;
@@ -472,6 +415,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
char *rrdpush_destination = default_rrdpush_destination;
char *rrdpush_api_key = default_rrdpush_api_key;
char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+ bool rrdpush_enable_replication = default_rrdpush_enable_replication;
+ time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
+ time_t rrdpush_replication_step = default_rrdpush_replication_step;
time_t alarms_delay = 60;
rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every);
@@ -507,6 +453,15 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+ rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication);
+ rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication);
+
+ rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate);
+ rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate);
+
+ rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step);
+ rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step);
+
#ifdef ENABLE_COMPRESSION
unsigned int rrdpush_compression = default_compression_enabled;
rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
@@ -556,6 +511,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rrdpush_destination
, rrdpush_api_key
, rrdpush_send_charts_matching
+ , rrdpush_enable_replication
+ , rrdpush_seconds_to_replicate
+ , rrdpush_replication_step
, rpt->system_info
, 0
);
@@ -601,6 +559,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_destination,
rrdpush_api_key,
rrdpush_send_charts_matching,
+ rrdpush_enable_replication,
+ rrdpush_seconds_to_replicate,
+ rrdpush_replication_step,
rpt->system_info);
rrd_unlock();
}
@@ -748,8 +709,11 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_unlock(rpt->host);
// call the plugins.d processor to receive the metrics
- info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
cd.capabilities = rpt->capabilities;
@@ -764,12 +728,20 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdcontext_host_child_connected(rpt->host);
- size_t count = streaming_parser(rpt, &cd, fp_in, fp_out);
+ size_t count = streaming_parser(rpt, &cd, fp_in, fp_out,
+#ifdef ENABLE_HTTPS
+ (rpt->ssl.conn) ? &rpt->ssl : NULL
+#else
+ NULL
+#endif
+ );
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ rpt->key, rpt->host->machine_guid, rpt->hostname,
"DISCONNECTED");
- error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
- rpt->client_port, count);
+
+ error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
+ rpt->hostname, rpt->client_ip, rpt->client_port, count);
rrdcontext_host_child_disconnected(rpt->host);
diff --git a/streaming/replication.c b/streaming/replication.c
new file mode 100644
index 0000000000..ebfd79031f
--- /dev/null
+++ b/streaming/replication.c
@@ -0,0 +1,344 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "replication.h"
+
+static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
+ size_t dimensions = rrdset_number_of_dimensions(st);
+
+ struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
+
+ struct {
+ DICTIONARY *dict;
+ const DICTIONARY_ITEM *rda;
+ RRDDIM *rd;
+ struct storage_engine_query_handle handle;
+ STORAGE_POINT sp;
+ } data[dimensions];
+
+ memset(data, 0, sizeof(data));
+
+ if(enable_streaming && st->last_updated.tv_sec > before) {
+ internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu",
+ rrdset_id(st),
+ (unsigned long long)before,
+ (unsigned long long)st->last_updated.tv_sec
+ );
+ before = st->last_updated.tv_sec;
+ }
+
+ // prepare our array of dimensions
+ {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if (rd_dfe.counter >= dimensions)
+ break;
+
+ data[rd_dfe.counter].dict = rd_dfe.dict;
+ data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ data[rd_dfe.counter].rd = rd;
+
+ ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
+ }
+ rrddim_foreach_done(rd);
+ }
+
+ time_t now = after, actual_after = 0, actual_before = 0;
+ while(now <= before) {
+ time_t min_start_time = 0, min_end_time = 0;
+ for (size_t i = 0; i < dimensions && data[i].rd; i++) {
+ // fetch the first valid point for the dimension
+ int max_skip = 100;
+ while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
+ data[i].sp = ops->next_metric(&data[i].handle);
+
+ if(max_skip <= 0)
+ error("REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long)now);
+
+ if(data[i].sp.end_time < now)
+ continue;
+
+ if(!min_start_time) {
+ min_start_time = data[i].sp.start_time;
+ min_end_time = data[i].sp.end_time;
+ }
+ else {
+ min_start_time = MIN(min_start_time, data[i].sp.start_time);
+ min_end_time = MIN(min_end_time, data[i].sp.end_time);
+ }
+ }
+
+ if(min_end_time < now) {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
+ break;
+ }
+
+ if(min_end_time <= min_start_time)
+ min_start_time = min_end_time - st->update_every;
+
+ if(!actual_after) {
+ actual_after = min_end_time;
+ actual_before = min_end_time;
+ }
+ else
+ actual_before = min_end_time;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu\n"
+ , (unsigned long long)min_start_time
+ , (unsigned long long)min_end_time);
+
+ // output the replay values for this time
+ for (size_t i = 0; i < dimensions && data[i].rd; i++) {
+ if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT_AUTO " \"%s\"\n",
+ rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
+ else
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
+ rrddim_id(data[i].rd));
+ }
+
+ now = min_end_time + 1;
+ }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(actual_after) {
+ char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
+ log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
+ log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
+ (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
+ }
+ else
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)after, (unsigned long long)before);
+#endif
+
+ // release all the dictionary items acquired
+ // finalize the queries
+ for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
+ ops->finalize(&data[i].handle);
+ dictionary_acquired_item_release(data[i].dict, data[i].rda);
+ }
+
+ return before;
+}
+
+static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT_AUTO " " NETDATA_DOUBLE_FORMAT_AUTO "\n",
+ rrddim_id(rd),
+ (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
+ rd->last_collected_value,
+ rd->last_calculated_value,
+ rd->last_stored_value
+ );
+ }
+ rrddim_foreach_done(rd);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu " TOTAL_NUMBER_FORMAT " " TOTAL_NUMBER_FORMAT "\n",
+ (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
+ (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec,
+ st->last_collected_total,
+ st->collected_total
+ );
+}
+
+bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+ time_t query_after = after;
+ time_t query_before = before;
+ time_t now = now_realtime_sec();
+
+ // find the first entry we have
+ time_t first_entry_local = rrdset_first_entry_t(st);
+ if(first_entry_local > now) {
+ internal_error(true,
+ "RRDSET: '%s' first time %llu is in the future (now is %llu)",
+ rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
+ first_entry_local = now;
+ }
+
+ if (query_after < first_entry_local)
+ query_after = first_entry_local;
+
+ // find the latest entry we have
+ time_t last_entry_local = st->last_updated.tv_sec;
+ if(last_entry_local > now) {
+ internal_error(true,
+ "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
+ rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+ last_entry_local = now;
+ }
+
+ if (query_before > last_entry_local)
+ query_before = last_entry_local;
+
+ // if the parent asked us to start streaming, then fill the rest with the data that we have
+ if (start_streaming)
+ query_before = last_entry_local;
+
+ if (query_after > query_before) {
+ time_t tmp = query_before;
+ query_before = query_after;
+ query_after = tmp;
+ }
+
+ bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
+
+ // we might want to optimize this by filling a temporary buffer
+ // and copying the result to the host's buffer in order to avoid
+ // holding the host's buffer lock for too long
+ BUFFER *wb = sender_start(host->sender);
+ {
+ // pass the original after/before so that the parent knows about
+ // which time range we responded
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
+
+ if(after != 0 && before != 0)
+ before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming);
+ else {
+ after = 0;
+ before = 0;
+ enable_streaming = true;
+ }
+
+ if(enable_streaming)
+ replicate_chart_collection_state(wb, st);
+
+ // end with first/last entries we have, and the first start time and
+ // last end time of the data we sent
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n",
+ (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local,
+ enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+ }
+ sender_commit(host->sender, wb);
+
+ return enable_streaming;
+}
+
+static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(after && before) {
+ char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1];
+ log_date(after_buf, LOG_DATE_LENGTH, after);
+ log_date(before_buf, LOG_DATE_LENGTH, before);
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)after, after_buf, (unsigned long long)before, before_buf,
+ start_streaming?"true":"false");
+ }
+ else {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ start_streaming?"true":"false");
+ }
+#endif
+
+ debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ rrdset_id(st), start_streaming ? "true" : "false",
+ (unsigned long long)after, (unsigned long long)before);
+
+ int ret = callback(buffer, callback_data);
+ if (ret < 0) {
+ error("failed to send replay request to child (ret=%d)", ret);
+ return false;
+ }
+
+ return true;
+}
+
+bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
+ time_t first_entry_child, time_t last_entry_child,
+ time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
+{
+ time_t now = now_realtime_sec();
+
+ // if replication is disabled, send an empty replication request
+ // asking no data
+ if (!host->rrdpush_enable_replication) {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ // Child has no stored data
+ if (!last_entry_child) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ // Nothing to get if the chart has not dimensions
+ if (!rrdset_number_of_dimensions(st)) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ // if the child's first/last entries are nonsensical, resume streaming
+ // without asking for any data
+ if (first_entry_child <= 0) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ if (first_entry_child > last_entry_child) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ time_t last_entry_local = rrdset_last_entry_t(st);
+ if(last_entry_local > now) {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+ last_entry_local = now;
+ }
+
+ // should never happen but it if does, start streaming without asking
+ // for any data
+ if (last_entry_local > last_entry_child) {
+ error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)",
+ rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+
+ time_t first_entry_wanted;
+ if (prev_first_entry_wanted && prev_last_entry_wanted) {
+ first_entry_wanted = prev_last_entry_wanted;
+ if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate)
+ first_entry_wanted = now - host->rrdpush_seconds_to_replicate;
+ }
+ else
+ first_entry_wanted = MAX(last_entry_local, first_entry_child);
+
+ time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step;
+ last_entry_wanted = MIN(last_entry_wanted, last_entry_child);
+
+ bool start_streaming = (last_entry_wanted == last_entry_child);
+
+ return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted);
+}
diff --git a/streaming/replication.h b/streaming/replication.h
new file mode 100644
index 0000000000..11fec65d2e
--- /dev/null
+++ b/streaming/replication.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#include "daemon/common.h"
+
+bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before);
+
+typedef int (*send_command)(const char *txt, void *data);
+
+bool replicate_chart_request(send_command callback, void *callback_data,
+ RRDHOST *rh, RRDSET *rs,
+ time_t first_entry_child, time_t last_entry_child,
+ time_t response_first_start_time, time_t response_last_end_time);
+
+#endif /* REPLICATION_H */
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index c332b3bcd3..8dc0c26312 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -46,6 +46,9 @@ unsigned int default_compression_enabled = 1;
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
+bool default_rrdpush_enable_replication = true;
+time_t default_rrdpush_seconds_to_replicate = 86400;
+time_t default_rrdpush_replication_step = 600;
#ifdef ENABLE_HTTPS
int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
char *netdata_ssl_ca_path = NULL;
@@ -100,6 +103,11 @@ int rrdpush_init() {
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
+
+ default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication);
+ default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
+ default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step);
+
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
#ifdef ENABLE_COMPRESSION
@@ -153,16 +161,20 @@ int rrdpush_init() {
// this is for the first iterations of each chart
unsigned int remote_clock_resync_iterations = 60;
-
static inline bool should_send_chart_matching(RRDSET *st) {
// get all the flags we need to check, with one atomic operation
RRDSET_FLAGS flags = rrdset_flag_check(st,
- RRDSET_FLAG_UPSTREAM_SEND
- |RRDSET_FLAG_UPSTREAM_IGNORE
- |RRDSET_FLAG_ANOMALY_RATE_CHART
- |RRDSET_FLAG_ANOMALY_DETECTION);
+ RRDSET_FLAG_UPSTREAM_SEND
+ | RRDSET_FLAG_UPSTREAM_IGNORE
+ | RRDSET_FLAG_ANOMALY_RATE_CHART
+ | RRDSET_FLAG_ANOMALY_DETECTION
+ | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED
+ );
+
+ if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
+ return false;
- if(unlikely(!flags)) {
+ if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) {
RRDHOST *host = st->rrdhost;
// Do not stream anomaly rates charts.
@@ -295,20 +307,25 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the chart local custom variables
rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+ if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
+ time_t first_entry_local = rrdset_first_entry_t(st);
+ time_t last_entry_local = st->last_updated.tv_sec;
+ buffer_sprintf(wb, "CHART_DEFINITION_END %ld %ld\n", first_entry_local, last_entry_local);
+ }
+
st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
}
// sends the current chart dimensions
-static inline void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) {
+void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) {
buffer_fast_strcat(wb, "BEGIN \"", 7);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
- buffer_print_llu(wb, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
- if (stream_has_capability(s, STREAM_CAP_GAP_FILLING)) {
- buffer_fast_strcat(wb, " ", 1);
- buffer_print_ll(wb, st->last_collected_time.tv_sec);
- }
+ if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time)
+ buffer_print_llu(wb, st->usec_since_last_update);
+ else
+ buffer_fast_strcat(wb, "0", 1);
buffer_fast_strcat(wb, "\n", 1);
@@ -350,62 +367,30 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
return true;
}
-bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE *dictfe, bool restart, bool stop) {
- if(stop || restart)
- dictionary_foreach_done(dictfe);
-
- if(stop)
- return false;
-