diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-24 00:24:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-24 00:24:21 +0200 |
commit | 8e1a99ad79a1394cbb0ffcaa24bdde85c7b14d81 (patch) | |
tree | 61020aea9c8c6e48032d5721aec4b9ddbfe7c35b /streaming | |
parent | 0fe7b1c8a84b7836b5bb13c37924d9fd851c6233 (diff) |
replication fixes #5 (#14038)
* pluginsd cleanup; replication logic cleanup; fix bug in replication begin
* log replication start/stop and change the keyword of NETDATA_LOG_REPLICATION_REQUESTS logs to REPLAY
* dont ask for data the child does not have; log fixes
* more pluginsd cleanup
* count sender dictionary entries
* fix dictionary_flush()
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/replication.c | 351 | ||||
-rw-r--r-- | streaming/rrdpush.c | 22 | ||||
-rw-r--r-- | streaming/sender.c | 10 |
3 files changed, 243 insertions, 140 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 1e1f7751e9..6e3b145507 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -3,7 +3,7 @@ #include "replication.h" #include "Judy.h" -#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 30 +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) { @@ -23,8 +23,8 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti memset(data, 0, sizeof(data)); if(enable_streaming && st->last_updated.tv_sec > before) { - internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu", - rrdset_id(st), + internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)before, (unsigned long long)st->last_updated.tv_sec ); @@ -65,7 +65,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti data[i].sp = ops->next_metric(&data[i].handle); internal_error(max_skip <= 0, - "REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu", + "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now); if(data[i].sp.end_time < now) @@ -84,7 +84,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti time_t wall_clock_time = now_realtime_sec(); if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) { internal_error(true, - "REPLAY: host '%s', chart '%s': db provided future start time %llu or end time %llu (now is %llu)", + "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)min_start_time, (unsigned long long)min_end_time, @@ -93,9 +93,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti } if(min_end_time < now) { +#ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, - "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu", + "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); +#endif // NETDATA_LOG_REPLICATION_REQUESTS break; } @@ -130,23 +132,23 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti now = min_end_time + 1; } -#ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_LOG_REPLICATION_REQUESTS if(actual_after) { char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1]; log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); internal_error(true, - "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); } else internal_error(true, - "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)", + "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)after, (unsigned long long)before); -#endif +#endif // NETDATA_LOG_REPLICATION_REQUESTS // release all the dictionary items acquired // finalize the queries @@ -193,8 +195,9 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t time_t first_entry_local = rrdset_first_entry_t(st); if(first_entry_local > now + tolerance) { internal_error(true, - "RRDSET: '%s' first time %llu is in the future (now is %llu)", - rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now); + "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)first_entry_local, (unsigned long long)now); first_entry_local = now; } @@ -205,15 +208,16 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t time_t last_entry_local = st->last_updated.tv_sec; if(!last_entry_local) { internal_error(true, - "RRDSET: '%s' last updated time zero. Querying db for last updated time.", - rrdset_id(st)); + "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); last_entry_local = rrdset_last_entry_t(st); } if(last_entry_local > now + tolerance) { internal_error(true, - "RRDSET: '%s' last updated time %llu is in the future (now is %llu)", - rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); + "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)last_entry_local, (unsigned long long)now); last_entry_local = now; } @@ -263,51 +267,90 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t return enable_streaming; } -static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) { +// ---------------------------------------------------------------------------- +// sending replication requests - if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || after < st->rrdhost->receiver->replication_first_time_t)) - st->rrdhost->receiver->replication_first_time_t = after; +struct replication_request_details { + struct { + send_command callback; + void *data; + } caller; -#ifdef NETDATA_INTERNAL_CHECKS - if(after && before) { - char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1]; - log_date(after_buf, LOG_DATE_LENGTH, after); - log_date(before_buf, LOG_DATE_LENGTH, before); - internal_error(true, - "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)after, after_buf, (unsigned long long)before, before_buf, - start_streaming?"true":"false"); - } - else { - internal_error(true, - "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - start_streaming?"true":"false"); - } -#endif + RRDHOST *host; + RRDSET *st; -#ifdef NETDATA_INTERNAL_CHECKS - internal_error( - st->replay.after != 0 || st->replay.before != 0, - "REPLAY ERROR: host '%s', chart '%s': sending replication request, while there is another inflight", - rrdhost_hostname(st->rrdhost), rrdset_id(st) - ); - - st->replay.start_streaming = start_streaming; - st->replay.after = after; - st->replay.before = before; -#endif + struct { + time_t first_entry_t; // the first entry time the child has + time_t last_entry_t; // the last entry time the child has + } child_db; + + struct { + time_t first_entry_t; // the first entry time we have + time_t last_entry_t; // the last entry time we have + bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed + } local_db; + + struct { + time_t from; // the starting time of the entire gap we have + time_t to; // the ending time of the entire gap we have + } gap; - debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", - rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); + struct { + time_t after; // the start time we requested previously from this child + time_t before; // the end time we requested previously from this child + } last_request; + + struct { + time_t after; // the start time of this replication request - the child will add 1 second + time_t before; // the end time of this replication request + bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before' + } wanted; + + time_t now; // the current wall clock time +}; + +static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) { + RRDSET *st = r->st; + + if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t)) + st->rrdhost->receiver->replication_first_time_t = r->wanted.after; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = ""; + + if(r->wanted.after) + log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after); + + if(r->wanted.before) + log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before); + + internal_error(true, + "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " + "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s" + , rrdhost_hostname(r->host), rrdset_id(r->st) + , r->wanted.after, wanted_after_buf + , r->wanted.before, wanted_before_buf + , r->wanted.start_streaming ? "YES" : "NO" + , msg + , r->last_request.after, r->last_request.before + , r->child_db.first_entry_t, r->child_db.last_entry_t + , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW" + , r->gap.from, r->gap.to + , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" + , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" + ); + + st->replay.start_streaming = r->wanted.start_streaming; + st->replay.after = r->wanted.after; + st->replay.before = r->wanted.before; +#endif // NETDATA_LOG_REPLICATION_REQUESTS char buffer[2048 + 1]; snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", - rrdset_id(st), start_streaming ? "true" : "false", - (unsigned long long)after, (unsigned long long)before); + rrdset_id(st), r->wanted.start_streaming ? "true" : "false", + (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); - int ret = callback(buffer, callback_data); + int ret = r->caller.callback(buffer, r->caller.data); if (ret < 0) { error("REPLICATION: failed to send replication request to child (error %d)", ret); return false; @@ -320,81 +363,110 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST time_t first_entry_child, time_t last_entry_child, time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) { - time_t now = now_realtime_sec(); - - // if replication is disabled, send an empty replication request - // asking no data - if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) { - internal_error(true, - "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled", - rrdhost_hostname(host), rrdset_id(st)); + struct replication_request_details r = { + .caller = { + .callback = callback, + .data = callback_data, + }, + + .host = host, + .st = st, + + .child_db = { + .first_entry_t = first_entry_child, + .last_entry_t = last_entry_child, + }, + + .last_request = { + .after = prev_first_entry_wanted, + .before = prev_last_entry_wanted, + }, + + .wanted = { + .after = 0, + .before = 0, + .start_streaming = true, + }, + + .now = now_realtime_sec(), + }; - return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + // get our local database retention + r.local_db.first_entry_t = rrdset_first_entry_t(st); + r.local_db.last_entry_t = rrdset_last_entry_t(st); + if(r.local_db.last_entry_t > r.now) { + r.local_db.last_entry_t = r.now; + r.local_db.last_entry_t_adjusted_to_now = true; } - // Child has no stored data - if (!last_entry_child) { - error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data", - rrdhost_hostname(host), rrdset_id(st)); + // let's find the GAP we have + if(!r.last_request.after || !r.last_request.before) { + // there is no previous request + + if(r.local_db.last_entry_t) + // we have some data, let's continue from the last point we have + r.gap.from = r.local_db.last_entry_t; + else + // we don't have any data, the gap is the max timeframe we are allowed to replicate + r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate; - return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); + } + else { + // we had sent a request - let's continue at the point we left it + // for this we don't take into account the actual data in our db + // because the child may also have gaps and we need to get over it + r.gap.from = r.last_request.before; } - // Nothing to get if the chart has not dimensions - if (!rrdset_number_of_dimensions(st)) { - error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions", - rrdhost_hostname(host), rrdset_id(st)); + // we want all the data up to now + r.gap.to = r.now; - return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); - } + // The gap is now r.gap.from -> r.gap.to - // if the child's first/last entries are nonsensical, resume streaming - // without asking for any data - if (first_entry_child <= 0) { - error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)", - rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child); + if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) + return send_replay_chart_cmd(&r, "empty replication request, replication is disabled"); - return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); - } + if (unlikely(!r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child has no stored data"); - if (first_entry_child > last_entry_child) { - error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)", - rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); + if (unlikely(!rrdset_number_of_dimensions(st))) + return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions"); - return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); - } + if (r.child_db.first_entry_t <= 0) + return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid"); - time_t last_entry_local = rrdset_last_entry_t(st); - if(last_entry_local > now) { - internal_error(true, - "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.", - rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); - last_entry_local = now; - } + if (r.child_db.first_entry_t > r.child_db.last_entry_t) + return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)"); - // should never happen but if it does, start streaming without asking for any data - if (last_entry_local > last_entry_child) { - error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)", - rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child); + if (r.local_db.last_entry_t > r.child_db.last_entry_t) + return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one"); - return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0); - } + // let's find what the child can provide to fill that gap - time_t first_entry_wanted; - if (prev_first_entry_wanted && prev_last_entry_wanted) { - first_entry_wanted = prev_last_entry_wanted; - if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate) - first_entry_wanted = now - host->rrdpush_seconds_to_replicate; - } + if(r.child_db.first_entry_t > r.gap.from) + // the child does not have all the data - let's get what it has + r.wanted.after = r.child_db.first_entry_t; + else + // ok, the child can fill the entire gap we have + r.wanted.after = r.gap.from; + + if(r.gap.to - r.wanted.after > host->rrdpush_replication_step) + // the duration is too big for one request - let's take the first step + r.wanted.before = r.wanted.after + host->rrdpush_replication_step; else - first_entry_wanted = MAX(last_entry_local, first_entry_child); + // wow, we can do it in one request + r.wanted.before = r.gap.to; - time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step; - last_entry_wanted = MIN(last_entry_wanted, last_entry_child); + // don't ask from the child more than it has + if(r.wanted.before > r.child_db.last_entry_t) + r.wanted.before = r.child_db.last_entry_t; - bool start_streaming = (last_entry_wanted == last_entry_child); + // the child should start streaming immediately if the wanted duration is small + r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t); - return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted); + // the wanted timeframe is now r.wanted.after -> r.wanted.before + // send it + return send_replay_chart_cmd(&r, "OK"); } // ---------------------------------------------------------------------------- @@ -633,6 +705,7 @@ static struct replication_request replication_request_get_first_available() { else if(sender_has_room_to_spare) { // copy the request to return it rq = *rse->rq; + rq.chart_id = string_dup(rq.chart_id); // set the return result to found rq.found = true; @@ -662,14 +735,6 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may struct sender_state *s = sender_state; (void)s; struct replication_request *rq = value; - RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); - if(!st) { - internal_error(true, "REPLAY: chart '%s' not found on host '%s'", - string2str(rq->chart_id), rrdhost_hostname(rq->sender->host)); - } - else - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED); - // IMPORTANT: // We use the react instead of the insert callback // because we want the item to be atomically visible @@ -691,12 +756,26 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ struct replication_request *rq = old_value; (void)rq; struct replication_request *rq_new = new_value; - internal_error( - true, - "STREAM %s [send to %s]: REPLAY ERROR: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", - rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), - (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", - (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + replication_recursive_lock(); + + if(!rq->indexed_in_judy) { + replication_sort_entry_add(rq); + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + } + else + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + + replication_recursive_unlock(); // bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false; // @@ -880,7 +959,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total); } - if(!rq.found) { + if(unlikely(!rq.found)) { worker_is_idle(); if(!rep.requests_count) @@ -898,7 +977,15 @@ void *replication_thread_main(void *ptr __maybe_unused) { else { // delete the request from the dictionary worker_is_busy(WORKER_JOB_DELETE_ENTRY); - dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id)); + if(!dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id))) + error("REPLAY: 'host:%s/chart:%s' failed to be deleted from sender dictionary", + rrdhost_hostname(rq.sender->host), string2str(rq.chart_id)); + + if(rq.sender->replication_pending_requests == 0 && dictionary_entries(rq.sender->replication_requests) != 0) + error("REPLAY: 'host:%s/chart:%s' sender dictionary has %zu entries, but sender pending requests are %zu", + rrdhost_hostname(rq.sender->host), string2str(rq.chart_id), + dictionary_entries(rq.sender->replication_requests), + rq.sender->replication_pending_requests); } worker_is_busy(WORKER_JOB_FIND_CHART); @@ -910,13 +997,6 @@ void *replication_thread_main(void *ptr __maybe_unused) { continue; } - if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); - rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - rrdhost_sender_replicating_charts_plus_one(st->rrdhost); - } - rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED); - worker_is_busy(WORKER_JOB_QUERYING); latest_first_time_t = rq.after; @@ -947,11 +1027,18 @@ void *replication_thread_main(void *ptr __maybe_unused) { rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif } else internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating", string2str(rq.chart_id), rrdhost_hostname(st->rrdhost)); } + + string_freez(rq.chart_id); } netdata_thread_cleanup_pop(1); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index d19335256e..e471c97ead 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -298,21 +298,32 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { if(!last_entry_local) { internal_error(true, - "RRDSET: '%s' last updated time zero. Querying db for last updated time.", - rrdset_id(st)); + "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); last_entry_local = rrdset_last_entry_t(st); time_t now = now_realtime_sec(); + if(last_entry_local > now) { internal_error(true, - "RRDSET: '%s' last updated time %llu is in the future (now is %llu)", - rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); + "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long)last_entry_local, (unsigned long long)now); last_entry_local = now; } } buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu\n", (unsigned long long)first_entry_local, (unsigned long long)last_entry_local); + + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif } st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); @@ -344,7 +355,8 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta buffer_fast_strcat(wb, "\n", 1); } else { - internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); + internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); // we will include it in the next iteration rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); } diff --git a/streaming/sender.c b/streaming/sender.c index 85aad3a3e5..f4624f5998 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -23,9 +23,10 @@ #define WORKER_SENDER_JOB_BYTES_SENT 17 #define WORKER_SENDER_JOB_REPLAY_REQUEST 18 #define WORKER_SENDER_JOB_FUNCTION_REQUEST 19 +#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20 -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 20 -#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 20 +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21 #endif extern struct config stream_config; @@ -225,7 +226,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { RRDSET *st; rrdset_foreach_read(st, host) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_QUEUED); + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); st->upstream_resync_time = 0; @@ -1099,6 +1100,7 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); struct sender_state *s = ptr; s->tid = gettid(); @@ -1342,6 +1344,8 @@ void *rrdpush_sender_thread(void *ptr) { rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } + + worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication_requests)); } netdata_thread_cleanup_pop(1); |