diff options
author | vkalintiris <vasilis@netdata.cloud> | 2022-10-31 19:53:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-31 19:53:20 +0200 |
commit | 282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch) | |
tree | b23e108b35adc8ed322e8167d0f1fe607c2cfa4c /streaming/replication.c | |
parent | df87a538cfaba5014a752937714756b7c5d30c93 (diff) |
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"
This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7.
* Profile plugin
* Fix macos static thread
* Add support for replication
- Add a new capability for replication, when not supported the agent
should behave as previously.
- When replication is supported, the text protocol supports the
following new commands:
- CHART_DEFINITION_END: send the first/last entry of the child
- REPLAY_RRDSET_BEGIN: sends the name of the chart we are
replicating
- REPLAY_RRDSET_HEADER: sends a line describing the columns of the
following command (ie. start-time, end-time, dim1-name, ...)
- REPLAY_RRDSET_DONE: sends values to push for a specific start/end
time
- REPLAY_RRDSET_END: send the (a) update every of the chart, (b)
first/last entries in DB, (c) whether the child's been told to
start streaming, (d) original after/before period to replicate.
- REPLAY_CHART: Sent from a parent to a child, specifying (a)
the chart name we want data for, (b) whether the child should
start streaming once it has fullfilled the request with the
aforementioned commands, (c) after/before of the data the parent
wants
- As a consequence of the new protocol, streaming is disabled for all
charts on a new connection. It's enabled once replication is finished.
- The configuration parameters are specified from within stream.conf:
- "enable replication = yes|no"
- "seconds to replicate = 3600"
- "replication step = 600" (ie. how many seconds to fill per
roundtrip request.
* Minor fixes
- quote set and dim ids
- start streaming after writing replicated data to the buffer
- write replicated data only when buffer is less than 50% full.
- use reentrant iteration for charts
* Do not send chart definitions on connection.
* Track replication status through rrdset flags.
* Add debug flag for noisy log messages.
* Add license notice.
* Iterate charts with reentrant loop
* Set replication finished flag when streaming is disabled.
* Revert "Profile plugin"
This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d.
Used only for testing purposes.
* Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)""
This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d.
Reapply commit that I had to revert in order to be able to build the
agent on MacOS.
* Build replication source files with CMake.
* Pass number of words in plugind functions.
* Use get_word instead of indexing words.
* Use size_t instead of int.
* Pay only what we use when splitting words.
* no need to redefine PLUGINSD_MAX_WORDS
* fix formatting warning
* all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart
* keep a sender dictionary with all the replication commands received and remove replication commands from charts
* do not replicate future data
* use last_updated to find the end of the db
* uniformity of replication logs
* rewrite of the query logic
* replication.c in C; debug info in human readable dates
* update the chart on every replication row
* update all chart members so that rrdset_done() can continue
* update the protocol to push one dimension per line and transfer data collection state to parent
* fix formatting
* remove replication object from pluginsd
* shorter communication
* fix typo
* support for replication proxies
* proper use of flags
* set receiver replication finished flag on charts created after the sender has been connected
* clear RRDSET_FLAG_SYNC_CLOCK on replicated charts
* log storing of nulls
* log first store
* log update every switches
* test ignoring timestamps but sending a point just after replication end
* replication should work on end_time
* use replicated timestamps
* at the final replication step, replicate all the remaining points
* cleanup code from tests
* print timestamps as unsigned long long
* more formating changes; fix conflicting type of replicate_chart_response()
* updated stream.conf
* always respond to replication requests
* in non-dbengine db modes, do not replicate more than the database size
* advance the db pointer of legacy db modes
* should be multiplied by update_every
* fix buggy label parsing - identified by codacy
* dont log error on history mismatches for db mode dbengine
* allow SSL requests to streaming children
* dont use ssl variable
Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 344 |
1 files changed, 344 insertions, 0 deletions
diff --git a/streaming/replication.c b/streaming/replication.c new file mode 100644 index 0000000000..ebfd79031f --- /dev/null +++ b/streaming/replication.c @@ -0,0 +1,344 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "replication.h" + +static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) { + size_t dimensions = rrdset_number_of_dimensions(st); + + struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; + + struct { + DICTIONARY *dict; + const DICTIONARY_ITEM *rda; + RRDDIM *rd; + struct storage_engine_query_handle handle; + STORAGE_POINT sp; + } data[dimensions]; + + memset(data, 0, sizeof(data)); + + if(enable_streaming && st->last_updated.tv_sec > before) { + internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu", + rrdset_id(st), + (unsigned long long)before, + (unsigned long long)st->last_updated.tv_sec + ); + before = st->last_updated.tv_sec; + } + + // prepare our array of dimensions + { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if (rd_dfe.counter >= dimensions) + break; + + data[rd_dfe.counter].dict = rd_dfe.dict; + data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + data[rd_dfe.counter].rd = rd; + + ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before); + } + rrddim_foreach_done(rd); + } + + time_t now = after, actual_after = 0, actual_before = 0; + while(now <= before) { + time_t min_start_time = 0, min_end_time = 0; + for (size_t i = 0; i < dimensions && data[i].rd; i++) { + // fetch the first valid point for the dimension + int max_skip = 100; + while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0) + data[i].sp = ops->next_metric(&data[i].handle); + + if(max_skip <= 0) + error("REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long)now); + + if(data[i].sp.end_time < now) + continue; + + if(!min_start_time) { + min_start_time = data[i].sp.start_time; + min_end_time = data[i].sp.end_time; + } + else { + min_start_time = MIN(min_start_time, data[i].sp.start_time); + min_end_time = MIN(min_end_time, data[i].sp.end_time); + } + } + + if(min_end_time < now) { + internal_error(true, + "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); + break; + } + + if(min_end_time <= min_start_time) + min_start_time = min_end_time - st->update_every; + + if(!actual_after) { + actual_after = min_end_time; + actual_before = min_end_time; + } + else + actual_before = min_end_time; + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu\n" + , (unsigned long long)min_start_time + , (unsigned long long)min_end_time); + + // output the replay values for this time + for (size_t i = 0; i < dimensions && data[i].rd; i++) { + if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time) + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT_AUTO " \"%s\"\n", + rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : ""); + else + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n", + rrddim_id(data[i].rd)); + } + + now = min_end_time + 1; + } + +#ifdef NETDATA_INTERNAL_CHECKS + if(actual_after) { + char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1]; + log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); + log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); + internal_error(true, + "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, + (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); + } + else + internal_error(true, + "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)after, (unsigned long long)before); +#endif + + // release all the dictionary items acquired + // finalize the queries + for(size_t i = 0; i < dimensions && data[i].rda ;i++) { + ops->finalize(&data[i].handle); + dictionary_acquired_item_release(data[i].dict, data[i].rda); + } + + return before; +} + +static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT_AUTO " " NETDATA_DOUBLE_FORMAT_AUTO "\n", + rrddim_id(rd), + (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, + rd->last_collected_value, + rd->last_calculated_value, + rd->last_stored_value + ); + } + rrddim_foreach_done(rd); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu " TOTAL_NUMBER_FORMAT " " TOTAL_NUMBER_FORMAT "\n", + (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, + (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec, + st->last_collected_total, + st->collected_total + ); +} + +bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) { + time_t query_after = after; + time_t query_before = before; + time_t now = now_realtime_sec(); + + // find the first entry we have + time_t first_entry_local = rrdset_first_entry_t(st); + if(first_entry_local > now) { + internal_error(true, + "RRDSET: '%s' first time %llu is in the future (now is %llu)", + rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now); + first_entry_local = now; + } + + if (query_after < first_entry_local) + query_after = first_entry_local; + + // find the latest entry we have + time_t last_entry_local = st->last_updated.tv_sec; + if(last_entry_local > now) { + internal_error(true, + "RRDSET: '%s' last updated time %llu is in the future (now is %llu)", + rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); + last_entry_local = now; + } + + if (query_before > last_entry_local) + query_before = last_entry_local; + + // if the parent asked us to start streaming, then fill the rest with the data that we have + if (start_streaming) + query_before = last_entry_local; + + if (query_after > query_before) { + time_t tmp = query_before; + query_before = query_after; + query_after = tmp; + } + + bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false; + + // we might want to optimize this by filling a temporary buffer + // and copying the result to the host's buffer in order to avoid + // holding the host's buffer lock for too long + BUFFER *wb = sender_start(host->sender); + { + // pass the original after/before so that the parent knows about + // which time range we responded + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); + + if(after != 0 && before != 0) + before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming); + else { + after = 0; + before = 0; + enable_streaming = true; + } + + if(enable_streaming) + replicate_chart_collection_state(wb, st); + + // end with first/last entries we have, and the first start time and + // last end time of the data we sent + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n", + (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local, + enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); + } + sender_commit(host->sender, wb); + + return enable_streaming; +} + +static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) { + +#ifdef NETDATA_INTERNAL_CHECKS + if(after && before) { + char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1]; + log_date(after_buf, LOG_DATE_LENGTH, after); + log_date(before_buf, LOG_DATE_LENGTH, before); + internal_error(true, + "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)after, after_buf, (unsigned long long)before, before_buf, + start_streaming?"true":"false"); + } + else { + internal_error(true, + "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + start_streaming?"true":"false"); + } +#endif + + debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + rrdset_id(st), start_streaming ? "true" : "false", + (unsigned long long)after, (unsigned long long)before); + + int ret = callback(buffer, callback_data); + if (ret < 0) { + error("failed to send replay request to child (ret=%d)", ret); + return false; + } + + return true; +} + +bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, + time_t first_entry_child, time_t last_entry_child, + time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) +{ + time_t now = now_realtime_sec(); + + // if replication is disabled, send an empty replication request + // asking no data + if (!host->rrdpush_enable_replication) { + internal_error(true, + "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled", + rrdhost_hostname(host), rrdset_id(st)); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + // Child has no stored data + if (!last_entry_child) { + error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data", + rrdhost_hostname(host), rrdset_id(st)); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + // Nothing to get if the chart has not dimensions + if (!rrdset_number_of_dimensions(st)) { + error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions", + rrdhost_hostname(host), rrdset_id(st)); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + // if the child's first/last entries are nonsensical, resume streaming + // without asking for any data + if (first_entry_child <= 0) { + error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + if (first_entry_child > last_entry_child) { + error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + time_t last_entry_local = rrdset_last_entry_t(st); + if(last_entry_local > now) { + internal_error(true, + "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); + last_entry_local = now; + } + + // should never happen but it if does, start streaming without asking + // for any data + if (last_entry_local > last_entry_child) { + error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)", + rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + + time_t first_entry_wanted; + if (prev_first_entry_wanted && prev_last_entry_wanted) { + first_entry_wanted = prev_last_entry_wanted; + if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate) + first_entry_wanted = now - host->rrdpush_seconds_to_replicate; + } + else + first_entry_wanted = MAX(last_entry_local, first_entry_child); + + time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step; + last_entry_wanted = MIN(last_entry_wanted, last_entry_child); + + bool start_streaming = (last_entry_wanted == last_entry_child); + + return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted); +} |