diff options
author | vkalintiris <vasilis@netdata.cloud> | 2022-10-31 19:53:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-31 19:53:20 +0200 |
commit | 282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch) | |
tree | b23e108b35adc8ed322e8167d0f1fe607c2cfa4c /streaming | |
parent | df87a538cfaba5014a752937714756b7c5d30c93 (diff) |
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"
This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7.
* Profile plugin
* Fix macos static thread
* Add support for replication
- Add a new capability for replication, when not supported the agent
should behave as previously.
- When replication is supported, the text protocol supports the
following new commands:
- CHART_DEFINITION_END: send the first/last entry of the child
- REPLAY_RRDSET_BEGIN: sends the name of the chart we are
replicating
- REPLAY_RRDSET_HEADER: sends a line describing the columns of the
following command (ie. start-time, end-time, dim1-name, ...)
- REPLAY_RRDSET_DONE: sends values to push for a specific start/end
time
- REPLAY_RRDSET_END: send the (a) update every of the chart, (b)
first/last entries in DB, (c) whether the child's been told to
start streaming, (d) original after/before period to replicate.
- REPLAY_CHART: Sent from a parent to a child, specifying (a)
the chart name we want data for, (b) whether the child should
start streaming once it has fullfilled the request with the
aforementioned commands, (c) after/before of the data the parent
wants
- As a consequence of the new protocol, streaming is disabled for all
charts on a new connection. It's enabled once replication is finished.
- The configuration parameters are specified from within stream.conf:
- "enable replication = yes|no"
- "seconds to replicate = 3600"
- "replication step = 600" (ie. how many seconds to fill per
roundtrip request.
* Minor fixes
- quote set and dim ids
- start streaming after writing replicated data to the buffer
- write replicated data only when buffer is less than 50% full.
- use reentrant iteration for charts
* Do not send chart definitions on connection.
* Track replication status through rrdset flags.
* Add debug flag for noisy log messages.
* Add license notice.
* Iterate charts with reentrant loop
* Set replication finished flag when streaming is disabled.
* Revert "Profile plugin"
This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d.
Used only for testing purposes.
* Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)""
This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d.
Reapply commit that I had to revert in order to be able to build the
agent on MacOS.
* Build replication source files with CMake.
* Pass number of words in plugind functions.
* Use get_word instead of indexing words.
* Use size_t instead of int.
* Pay only what we use when splitting words.
* no need to redefine PLUGINSD_MAX_WORDS
* fix formatting warning
* all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart
* keep a sender dictionary with all the replication commands received and remove replication commands from charts
* do not replicate future data
* use last_updated to find the end of the db
* uniformity of replication logs
* rewrite of the query logic
* replication.c in C; debug info in human readable dates
* update the chart on every replication row
* update all chart members so that rrdset_done() can continue
* update the protocol to push one dimension per line and transfer data collection state to parent
* fix formatting
* remove replication object from pluginsd
* shorter communication
* fix typo
* support for replication proxies
* proper use of flags
* set receiver replication finished flag on charts created after the sender has been connected
* clear RRDSET_FLAG_SYNC_CLOCK on replicated charts
* log storing of nulls
* log first store
* log update every switches
* test ignoring timestamps but sending a point just after replication end
* replication should work on end_time
* use replicated timestamps
* at the final replication step, replicate all the remaining points
* cleanup code from tests
* print timestamps as unsigned long long
* more formating changes; fix conflicting type of replicate_chart_response()
* updated stream.conf
* always respond to replication requests
* in non-dbengine db modes, do not replicate more than the database size
* advance the db pointer of legacy db modes
* should be multiplied by update_every
* fix buggy label parsing - identified by codacy
* dont log error on history mismatches for db mode dbengine
* allow SSL requests to streaming children
* dont use ssl variable
Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 136 | ||||
-rw-r--r-- | streaming/replication.c | 344 | ||||
-rw-r--r-- | streaming/replication.h | 17 | ||||
-rw-r--r-- | streaming/rrdpush.c | 88 | ||||
-rw-r--r-- | streaming/rrdpush.h | 18 | ||||
-rw-r--r-- | streaming/sender.c | 236 | ||||
-rw-r--r-- | streaming/stream.conf | 53 |
7 files changed, 661 insertions, 231 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 36c085062d..9ffada2417 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -65,99 +65,43 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { #include "collectors/plugins.d/pluginsd_parser.h" -PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action) { UNUSED(plugins_action); - char *remote_time_txt = words[1]; - time_t remote_time = 0; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd; - if (!(cd->capabilities & STREAM_CAP_GAP_FILLING)) { - error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->capabilities); - return PARSER_RC_OK; // Ignore error and continue stream - } - if (remote_time_txt && *remote_time_txt) { - remote_time = str2ull(remote_time_txt); - time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host); - time_t gap = 0; - if (prev == 0) - info( - "STREAM %s from %s: Initial connection (no gap to check), " - "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"", - rrdhost_hostname(host), - cd->cmd, - (int64_t)remote_time, - (int64_t)now, - (int64_t)now - remote_time); - else { - gap = now - prev; - info( - "STREAM %s from %s: Checking for gaps... " - "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap", - rrdhost_hostname(host), - cd->cmd, - (int64_t)remote_time, - (int64_t)prev, - (int64_t)now, - (int64_t)(remote_time - now), - (int64_t)gap); - } - char message[128]; - sprintf( - message, - "REPLICATE %"PRId64" %"PRId64"\n", - (int64_t)(remote_time - gap), - (int64_t)remote_time); - int ret; -#ifdef ENABLE_HTTPS - SSL *conn = host->receiver->ssl.conn ; - if(conn && !host->receiver->ssl.flags) { - ret = SSL_write(conn, message, strlen(message)); - } else { - ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); - } -#else - ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); -#endif - if (ret != (int)strlen(message)) - error("Failed to send initial timestamp - gaps may appear in charts"); - return PARSER_RC_OK; - } - return PARSER_RC_ERROR; -} -PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action) -{ - UNUSED(plugins_action); + const char *host_uuid_str = get_word(words, num_words, 1); + const char *claim_id_str = get_word(words, num_words, 2); - uuid_t uuid; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - - if (!words[1] || !words[2]) { - error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", words[1]?words[1]:"[unset]", words[2]?words[2]:"[unset]"); + if (!host_uuid_str || !claim_id_str) { + error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", + host_uuid_str ? host_uuid_str : "[unset]", + claim_id_str ? claim_id_str : "[unset]"); return PARSER_RC_ERROR; } + uuid_t uuid; + RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; + // We don't need the parsed UUID // just do it to check the format - if(uuid_parse(words[1], uuid)) { - error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]); + if(uuid_parse(host_uuid_str, uuid)) { + error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); return PARSER_RC_ERROR; } - if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) { - error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]); + if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) { + error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); return PARSER_RC_ERROR; } - if(strcmp(words[1], host->machine_guid)) { - error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid); + if(strcmp(host_uuid_str, host->machine_guid)) { + error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); return PARSER_RC_OK; //the message is OK problem must be somewhere else } rrdhost_aclk_state_lock(host); if (host->aclk_state.claimed_id) freez(host->aclk_state.claimed_id); - host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; + host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); @@ -400,7 +344,7 @@ static void streaming_parser_thread_cleanup(void *ptr) { parser_destroy(parser); } -size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out) { +static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out, void *ssl) { size_t result; PARSER_USER_OBJECT user = { @@ -411,7 +355,7 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp .trust_durations = 1 }; - PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT); + PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT, ssl); rrd_collector_started(); @@ -419,7 +363,6 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser); - parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); user.parser = parser; @@ -472,6 +415,9 @@ static int rrdpush_receive(struct receiver_state *rpt) char *rrdpush_destination = default_rrdpush_destination; char *rrdpush_api_key = default_rrdpush_api_key; char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + bool rrdpush_enable_replication = default_rrdpush_enable_replication; + time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; + time_t rrdpush_replication_step = default_rrdpush_replication_step; time_t alarms_delay = 60; rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every); @@ -507,6 +453,15 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication); + rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication); + + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate); + rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate); + + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step); + rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step); + #ifdef ENABLE_COMPRESSION unsigned int rrdpush_compression = default_compression_enabled; rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); @@ -556,6 +511,9 @@ static int rrdpush_receive(struct receiver_state *rpt) , rrdpush_destination , rrdpush_api_key , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step , rpt->system_info , 0 ); @@ -601,6 +559,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching, + rrdpush_enable_replication, + rrdpush_seconds_to_replicate, + rrdpush_replication_step, rpt->system_info); rrd_unlock(); } @@ -748,8 +709,11 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_unlock(rpt->host); // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); cd.capabilities = rpt->capabilities; @@ -764,12 +728,20 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdcontext_host_child_connected(rpt->host); - size_t count = streaming_parser(rpt, &cd, fp_in, fp_out); + size_t count = streaming_parser(rpt, &cd, fp_in, fp_out, +#ifdef ENABLE_HTTPS + (rpt->ssl.conn) ? &rpt->ssl : NULL +#else + NULL +#endif + ); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, + log_stream_connection(rpt->client_ip, rpt->client_port, + rpt->key, rpt->host->machine_guid, rpt->hostname, "DISCONNECTED"); - error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, - rpt->client_port, count); + + error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", + rpt->hostname, rpt->client_ip, rpt->client_port, count); rrdcontext_host_child_disconnected(rpt->host); diff --git a/streaming/replication.c b/streaming/replication.c new file mode 100644 index 0000000000..ebfd79031f --- /dev/null +++ b/streaming/replication.c @@ -0,0 +1,344 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "replication.h" + +static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) { + size_t dimensions = rrdset_number_of_dimensions(st); + + struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; + + struct { + DICTIONARY *dict; + const DICTIONARY_ITEM *rda; + RRDDIM *rd; + struct storage_engine_query_handle handle; + STORAGE_POINT sp; + } data[dimensions]; + + memset(data, 0, sizeof(data)); + + if(enable_streaming && st->last_updated.tv_sec > before) { + internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu", + rrdset_id(st), + (unsigned long long)before, + (unsigned long long)st->last_updated.tv_sec + ); + before = st->last_updated.tv_sec; + } + + // prepare our array of dimensions + { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if (rd_dfe.counter >= dimensions) + break; + + data[rd_dfe.counter].dict = rd_dfe.dict; + data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + data[rd_dfe.counter].rd = rd; + + ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before); + } + rrddim_foreach_done(rd); + } + + time_t now = after, actual_after = 0, actual_before = 0; + while(now <= before) { + time_t min_start_time = 0, min_end_time = 0; + for (size_t i = 0; i < dimensions && data[i].rd; i++) { + // fetch the first valid point for the dimension + int max_skip = 100; + while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0) + data[i].sp = ops->next_metric(&data[i].handle); + + if(max_skip <= 0) + error("REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long)now); + + if(data[i].sp.end_time < now) + continue; + + if(!min_start_time) { + min_start_time = data[i].sp.start_time; + min_end_time = data[i].sp.end_time; + } + else { + min_start_time = MIN(min_start_time, data[i].sp.start_time); + min_end_time = MIN(min_end_time, data[i].sp.end_time); + } + } + + if(min_end_time < now) { + internal_error(true, + "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); + break; + } + + if(min_end_time <= min_start_time) + min_start_time = min_end_time - st->update_every; + + if(!actual_after) { + actual_after = min_end_time; + actual_before = min_end_time; + } + else + actual_before = min_end_time; + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu\n" + , (unsigned long long)min_start_time + , (unsigned long long)min_end_time); + + // output the replay values for this time + for (size_t i = 0; i < dimensions && data[i].rd; i++) { + if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time) + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT_AUTO " \"%s\"\n", + rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : ""); + else + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n", + rrddim_id(data[i].rd)); + } + + now = min_end_time + 1; + } + +#ifdef NETDATA_INTERNAL_CHECKS + if(actual_after) { + char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1]; + log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); + log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); + internal_error(true, + "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, + (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); + } + else + internal_error(true, + "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)after, (unsigned long long)before); +#endif + + // release all the dictionary items acquired + // finalize the queries + for(size_t i = 0; i < dimensions && data[i].rda ;i++) { + ops->finalize(&data[i].handle); + dictionary_acquired_item_release(data[i].dict, data[i].rda); + } + + return before; +} + +static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT_AUTO " " NETDATA_DOUBLE_FORMAT_AUTO "\n", + rrddim_id(rd), + (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, + rd->last_collected_value, + rd->last_calculated_value, + rd->last_stored_value + ); + } + rrddim_foreach_done(rd); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu " TOTAL_NUMBER_FORMAT " " TOTAL_NUMBER_FORMAT "\n", + (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, + (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec, + st->last_collected_total, + st->collected_total + ); +} + +bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) { + time_t query_after = after; + time_t query_before = before; + time_t now = now_realtime_sec(); + + // find the first entry we have + time_t first_entry_local = rrdset_first_entry_t(st); + if(first_entry_local > now) { + internal_error(true, + "RRDSET: '%s' first time %llu is in the future (now is %llu)", + rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now); + first_entry_local = now; + } + + if (query_after < first_entry_local) + query_after = first_entry_local; + + // find the latest entry we have + time_t last_entry_local = st->last_updated.tv_sec; + if(last_entry_local > now) { + internal_error(true, + "RRDSET: '%s' last updated time %llu is in the future (now is %llu)", + rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); + last_entry_local = now; + } + + if (query_before > last_entry_local) + query_before = last_entry_local; + + // if the parent asked us to start streaming, then fill the rest with the data that we have + if (start_streaming) + query_before = last_entry_local; + + if (query_after > query_before) { + time_t tmp = query_before; + query_before = query_after; + query_after = tmp; + } + + bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false; + + // we might want to optimize this by filling a temporary buffer + // and copying the result to the host's buffer in order to avoid + // holding the host's buffer lock for too long + BUFFER *wb = sender_start(host->sender); + { + // pass the original after/before so that the parent knows about + // which time range we responded + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); + + if(after != 0 && before != 0) + before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming); + else { + after = 0; + before = 0; + enable_streaming = true; + } + + if(enable_streaming) + replicate_chart_collection_state(wb, st); + + // end with first/last entries we have, and the first start time and + // last end time of the data we sent + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n", + (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local, + enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); + } + sender_commit(host->sender, wb); + + return enable_streaming; +} + +static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) { + +#ifdef NETDATA_INTERNAL_CHECKS + if(after && before) { + char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1]; + log_date(after_buf, LOG_DATE_LENGTH, after); + log_date(before_buf, LOG_DATE_LENGTH, before); + internal_error(true, + "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)after, after_buf, (unsigned long long)before, before_buf, + start_streaming?"true":"false"); + } + else { + internal_error(true, + "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + start_streaming?"true":"false"); + } +#endif + + debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + rrdset_id(st), start_streaming ? "true" : "false", + (unsigned long long)after, (unsigned long long)before); + + int ret = callback(buffer, callback_data); + if (ret < 0) { + error("failed to send replay request to child (ret=%d)", ret); + return false; + } + + return true; +} + +bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, + time_t first_entry_child, time_t last_entry_child, + time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) +{ + time_t now = now_realtime_sec(); + + // if replication is disabled, send an empty replication request + // asking no data + if (!host->rrdpush_enable_replication) { + internal_error(true, + "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled", + rrdhost_hostname(host), rrdset_id(st)); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + // Child has no stored data + if (!last_entry_child) { + error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data", + rrdhost_hostname(host), rrdset_id(st)); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + // Nothing to get if the chart has not dimensions + if (!rrdset_number_of_dimensions(st)) { + error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions", + rrdhost_hostname(host), rrdset_id(st)); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + // if the child's first/last entries are nonsensical, resume streaming + // without asking for any data + if (first_entry_child <= 0) { + error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + if (first_entry_child > last_entry_child) { + error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + time_t last_entry_local = rrdset_last_entry_t(st); + if(last_entry_local > now) { + internal_error(true, + "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); + last_entry_local = now; + } + + // should never happen but it if does, start streaming without asking + // for any data + if (last_entry_local > last_entry_child) { + error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + time_t first_entry_wanted; + if (prev_first_entry_wanted && prev_last_entry_wanted) { + first_entry_wanted = prev_last_entry_wanted; + if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate) + first_entry_wanted = now - host->rrdpush_seconds_to_replicate; + } + else + first_entry_wanted = MAX(last_entry_local, first_entry_child); + + time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step; + last_entry_wanted = MIN(last_entry_wanted, last_entry_child); + + bool start_streaming = (last_entry_wanted == last_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted); +} diff --git a/streaming/replication.h b/streaming/replication.h new file mode 100644 index 0000000000..11fec65d2e --- /dev/null +++ b/streaming/replication.h @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef REPLICATION_H +#define REPLICATION_H + +#include "daemon/common.h" + +bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before); + +typedef int (*send_command)(const char *txt, void *data); + +bool replicate_chart_request(send_command callback, void *callback_data, + RRDHOST *rh, RRDSET *rs, + time_t first_entry_child, time_t last_entry_child, + time_t response_first_start_time, time_t response_last_end_time); + +#endif /* REPLICATION_H */ diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index c332b3bcd3..8dc0c26312 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -46,6 +46,9 @@ unsigned int default_compression_enabled = 1; char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; char *default_rrdpush_send_charts_matching = NULL; +bool default_rrdpush_enable_replication = true; +time_t default_rrdpush_seconds_to_replicate = 86400; +time_t default_rrdpush_replication_step = 600; #ifdef ENABLE_HTTPS int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL; char *netdata_ssl_ca_path = NULL; @@ -100,6 +103,11 @@ int rrdpush_init() { default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", ""); default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); + + default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication); + default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate); + default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step); + rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time); #ifdef ENABLE_COMPRESSION @@ -153,16 +161,20 @@ int rrdpush_init() { // this is for the first iterations of each chart unsigned int remote_clock_resync_iterations = 60; - static inline bool should_send_chart_matching(RRDSET *st) { // get all the flags we need to check, with one atomic operation RRDSET_FLAGS flags = rrdset_flag_check(st, - RRDSET_FLAG_UPSTREAM_SEND - |RRDSET_FLAG_UPSTREAM_IGNORE - |RRDSET_FLAG_ANOMALY_RATE_CHART - |RRDSET_FLAG_ANOMALY_DETECTION); + RRDSET_FLAG_UPSTREAM_SEND + | RRDSET_FLAG_UPSTREAM_IGNORE + | RRDSET_FLAG_ANOMALY_RATE_CHART + | RRDSET_FLAG_ANOMALY_DETECTION + | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED + ); + + if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) + return false; - if(unlikely(!flags)) { + if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) { RRDHOST *host = st->rrdhost; // Do not stream anomaly rates charts. @@ -295,20 +307,25 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the chart local custom variables rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); + if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { + time_t first_entry_local = rrdset_first_entry_t(st); + time_t last_entry_local = st->last_updated.tv_sec; + buffer_sprintf(wb, "CHART_DEFINITION_END %ld %ld\n", first_entry_local, last_entry_local); + } + st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); } // sends the current chart dimensions -static inline void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) { +void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); - buffer_print_llu(wb, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); - if (stream_has_capability(s, STREAM_CAP_GAP_FILLING)) { - buffer_fast_strcat(wb, " ", 1); - buffer_print_ll(wb, st->last_collected_time.tv_sec); - } + if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time) + buffer_print_llu(wb, st->usec_since_last_update); + else + buffer_fast_strcat(wb, "0", 1); buffer_fast_strcat(wb, "\n", 1); @@ -350,62 +367,30 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { return true; } -bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE *dictfe, bool restart, bool stop) { - if(stop || restart) - dictionary_foreach_done(dictfe); - - if(stop) |