diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-25 20:37:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-25 20:37:15 +0200 |
commit | 2e874e79163771856e4e756b176b729f7d8b0f0f (patch) | |
tree | eeb1ea10af039001e3290090d5a2d365f99f63c7 /streaming | |
parent | 870acd61123ece7c074242e1b02d47cb7c667e38 (diff) |
replication fixes #6 (#14046)
use the faster monotonic clock in workers and replication; avoid unecessary statistics function on every request on replication - gather them all together once every second; check the chart flags on all mirrored hosts, not only the ones that have a sender; cleanup and unify replication logs; added child world time to REND; fix first BEGIN been transmitted when replication starts;
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/replication.c | 460 | ||||
-rw-r--r-- | streaming/replication.h | 2 | ||||
-rw-r--r-- | streaming/rrdpush.c | 60 |
3 files changed, 320 insertions, 202 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 6e3b145507..d88095761e 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -6,7 +6,10 @@ #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 -static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) { +// ---------------------------------------------------------------------------- +// sending replication replies + +static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) { size_t dimensions = rrdset_number_of_dimensions(st); struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; @@ -23,7 +26,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti memset(data, 0, sizeof(data)); if(enable_streaming && st->last_updated.tv_sec > before) { - internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu", + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)before, (unsigned long long)st->last_updated.tv_sec @@ -35,8 +38,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti { RRDDIM *rd; rrddim_foreach_read(rd, st) { - if (rd_dfe.counter >= dimensions) + if (rd_dfe.counter >= dimensions) { + internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); break; + } if(rd->exposed) { data[rd_dfe.counter].dict = rd_dfe.dict; @@ -65,7 +71,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti data[i].sp = ops->next_metric(&data[i].handle); internal_error(max_skip <= 0, - "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now); if(data[i].sp.end_time < now) @@ -81,10 +87,9 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti } } - time_t wall_clock_time = now_realtime_sec(); - if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) { + if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) { internal_error(true, - "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)min_start_time, (unsigned long long)min_end_time, @@ -95,7 +100,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti if(min_end_time < now) { #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, - "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); #endif // NETDATA_LOG_REPLICATION_REQUESTS break; @@ -138,14 +143,14 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); internal_error(true, - "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); } else internal_error(true, - "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS @@ -195,7 +200,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t time_t first_entry_local = rrdset_first_entry_t(st); if(first_entry_local > now + tolerance) { internal_error(true, - "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now); first_entry_local = now; @@ -208,14 +213,20 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t time_t last_entry_local = st->last_updated.tv_sec; if(!last_entry_local) { internal_error(true, - "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.", rrdhost_hostname(st->rrdhost), rrdset_id(st)); last_entry_local = rrdset_last_entry_t(st); + if(!last_entry_local) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + last_entry_local = now; + } } if(last_entry_local > now + tolerance) { internal_error(true, - "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now); last_entry_local = now; @@ -240,28 +251,49 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t // and copying the result to the host's buffer in order to avoid // holding the host's buffer lock for too long BUFFER *wb = sender_start(host->sender); - { - // pass the original after/before so that the parent knows about - // which time range we responded - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - - if(after != 0 && before != 0) - before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming); - else { - after = 0; - before = 0; - enable_streaming = true; - } - if(enable_streaming) - replicate_chart_collection_state(wb, st); + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - // end with first/last entries we have, and the first start time and - // last end time of the data we sent - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n", - (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local, - enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before); + if(after != 0 && before != 0) + before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now); + else { + after = 0; + before = 0; + enable_streaming = true; } + + // get again the world clock time + time_t world_clock_time = now_realtime_sec(); + if(enable_streaming) { + if(now < world_clock_time) { + // we needed time to execute this request + // so, the parent will need to replicate more data + enable_streaming = false; + } + else + replicate_chart_collection_state(wb, st); + } + + // end with first/last entries we have, and the first start time and + // last end time of the data we sent + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n", + + // current chart update every + (int)st->update_every + + // child first db time, child end db time + , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local + + // start streaming boolean + , enable_streaming ? "true" : "false" + + // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true) + , (unsigned long long)after, (unsigned long long)before + + // child world clock time + , (unsigned long long)world_clock_time + ); + sender_commit(host->sender, wb); return enable_streaming; @@ -282,12 +314,14 @@ struct replication_request_details { struct { time_t first_entry_t; // the first entry time the child has time_t last_entry_t; // the last entry time the child has + time_t world_time_t; // the current time of the child } child_db; struct { time_t first_entry_t; // the first entry time we have time_t last_entry_t; // the last entry time we have bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed + time_t now; // the current local world clock time } local_db; struct { @@ -305,8 +339,6 @@ struct replication_request_details { time_t before; // the end time of this replication request bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before' } wanted; - - time_t now; // the current wall clock time }; static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) { @@ -316,6 +348,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c st->rrdhost->receiver->replication_first_time_t = r->wanted.after; #ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.log_next_data_collection = true; + char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = ""; if(r->wanted.after) @@ -326,7 +360,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c internal_error(true, "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " - "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s" + "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s" , rrdhost_hostname(r->host), rrdset_id(r->st) , r->wanted.after, wanted_after_buf , r->wanted.before, wanted_before_buf @@ -334,7 +368,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c , msg , r->last_request.after, r->last_request.before , r->child_db.first_entry_t, r->child_db.last_entry_t - , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW" + , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD" + , r->local_db.first_entry_t, r->local_db.last_entry_t + , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now , r->gap.from, r->gap.to , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" @@ -352,7 +388,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c int ret = r->caller.callback(buffer, r->caller.data); if (ret < 0) { - error("REPLICATION: failed to send replication request to child (error %d)", ret); + error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)", + rrdhost_hostname(r->host), rrdset_id(r->st), ret); return false; } @@ -360,7 +397,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c } bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, - time_t first_entry_child, time_t last_entry_child, + time_t first_entry_child, time_t last_entry_child, time_t child_world_time, time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) { struct replication_request_details r = { @@ -375,6 +412,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST .child_db = { .first_entry_t = first_entry_child, .last_entry_t = last_entry_child, + .world_time_t = child_world_time, + }, + + .local_db = { + .first_entry_t = rrdset_first_entry_t(st), + .last_entry_t = rrdset_last_entry_t(st), + .last_entry_t_adjusted_to_now = false, + .now = now_realtime_sec(), }, .last_request = { @@ -387,15 +432,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST .before = 0, .start_streaming = true, }, - - .now = now_realtime_sec(), }; - // get our local database retention - r.local_db.first_entry_t = rrdset_first_entry_t(st); - r.local_db.last_entry_t = rrdset_last_entry_t(st); - if(r.local_db.last_entry_t > r.now) { - r.local_db.last_entry_t = r.now; + // check our local database retention + if(r.local_db.last_entry_t > r.local_db.now) { + r.local_db.last_entry_t = r.local_db.now; r.local_db.last_entry_t_adjusted_to_now = true; } @@ -408,7 +449,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST r.gap.from = r.local_db.last_entry_t; else // we don't have any data, the gap is the max timeframe we are allowed to replicate - r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate; + r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate; } else { @@ -419,7 +460,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST } // we want all the data up to now - r.gap.to = r.now; + r.gap.to = r.local_db.now; // The gap is now r.gap.from -> r.gap.to @@ -461,8 +502,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST if(r.wanted.before > r.child_db.last_entry_t) r.wanted.before = r.child_db.last_entry_t; - // the child should start streaming immediately if the wanted duration is small - r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t); + if(r.wanted.after > r.wanted.before) + r.wanted.after = r.wanted.before; + + // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child + r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t); // the wanted timeframe is now r.wanted.after -> r.wanted.before // send it @@ -499,11 +543,12 @@ struct replication_sort_entry { static struct replication_thread { netdata_mutex_t mutex; + size_t pending; size_t added; size_t executed; size_t removed; + size_t last_executed; time_t first_time_t; - size_t requests_count; Word_t next_unique_id; struct replication_request *requests; @@ -516,12 +561,13 @@ static struct replication_thread { size_t waits; Pvoid_t JudyL_array; -} rep = { +} replication_globals = { .mutex = NETDATA_MUTEX_INITIALIZER, + .pending = 0, .added = 0, .executed = 0, + .last_executed = 0, .first_time_t = 0, - .requests_count = 0, .next_unique_id = 1, .skipped_no_room = 0, .skipped_not_connected = 0, @@ -535,7 +581,7 @@ static __thread int replication_recursive_mutex_recursions = 0; static void replication_recursive_lock() { if(++replication_recursive_mutex_recursions == 1) - netdata_mutex_lock(&rep.mutex); + netdata_mutex_lock(&replication_globals.mutex); #ifdef NETDATA_INTERNAL_CHECKS if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2) @@ -545,7 +591,7 @@ static void replication_recursive_lock() { static void replication_recursive_unlock() { if(--replication_recursive_mutex_recursions == 0) - netdata_mutex_unlock(&rep.mutex); + netdata_mutex_unlock(&replication_globals.mutex); #ifdef NETDATA_INTERNAL_CHECKS if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2) @@ -563,7 +609,7 @@ static struct replication_sort_entry *replication_sort_entry_create(struct repli // copy the request rse->rq = rq; - rse->unique_id = rep.next_unique_id++; + rse->unique_id = replication_globals.next_unique_id++; // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; @@ -580,29 +626,29 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat struct replication_sort_entry *rse = replication_sort_entry_create(rq); - if(rq->after < (time_t)rep.last_after) { + if(rq->after < (time_t)replication_globals.last_after) { // make it find this request first - rep.last_after = rq->after; - rep.last_unique_id = rq->unique_id; + replication_globals.last_after = rq->after; + replication_globals.last_unique_id = rq->unique_id; } - rep.added++; - rep.requests_count++; + replication_globals.added++; + replication_globals.pending++; Pvoid_t *inner_judy_ptr; // find the outer judy entry, using after as key - inner_judy_ptr = JudyLGet(rep.JudyL_array, (Word_t) rq->after, PJE0); + inner_judy_ptr = JudyLGet(replication_globals.JudyL_array, (Word_t) rq->after, PJE0); if(!inner_judy_ptr) - inner_judy_ptr = JudyLIns(&rep.JudyL_array, (Word_t) rq->after, PJE0); + inner_judy_ptr = JudyLIns(&replication_globals.JudyL_array, (Word_t) rq->after, PJE0); // add it to the inner judy, using unique_id as key Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); *item = rse; rq->indexed_in_judy = true; - if(!rep.first_time_t || rq->after < rep.first_time_t) - rep.first_time_t = rq->after; + if(!replication_globals.first_time_t || rq->after < replication_globals.first_time_t) + replication_globals.first_time_t = rq->after; replication_recursive_unlock(); @@ -612,8 +658,8 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { bool inner_judy_deleted = false; - rep.removed++; - rep.requests_count--; + replication_globals.removed++; + replication_globals.pending--; rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); @@ -624,7 +670,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor // if no items left, delete it from the outer judy if(**inner_judy_ppptr == NULL) { - JudyLDel(&rep.JudyL_array, rse->rq->after, PJE0); + JudyLDel(&replication_globals.JudyL_array, rse->rq->after, PJE0); inner_judy_deleted = true; } @@ -641,7 +687,7 @@ static void replication_sort_entry_del(struct replication_request *rq) { replication_recursive_lock(); if(rq->indexed_in_judy) { - inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0); + inner_judy_pptr = JudyLGet(replication_globals.JudyL_array, rq->after, PJE0); if (inner_judy_pptr) { Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); if (our_item_pptr) { @@ -651,7 +697,7 @@ static void replication_sort_entry_del(struct replication_request *rq) { } if (!rse_to_delete) - fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.", + fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.", rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after); } @@ -674,16 +720,16 @@ static struct replication_request replication_request_get_first_available() { struct replication_request rq = (struct replication_request){ .found = false }; - if(unlikely(!rep.last_after || !rep.last_unique_id)) { - rep.last_after = 0; - rep.last_unique_id = 0; + if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) { + replication_globals.last_after = 0; + replication_globals.last_unique_id = 0; } bool find_same_after = true; - while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(rep.JudyL_array, &rep.last_after, find_same_after))) { + while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) { Pvoid_t *our_item_pptr; - while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) { + while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) { struct replication_sort_entry *rse = *our_item_pptr; struct sender_state *s = rse->rq->sender; @@ -697,7 +743,7 @@ static struct replication_request replication_request_get_first_available() { s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED; if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { - rep.skipped_not_connected++; + replication_globals.skipped_not_connected++; if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) break; } @@ -714,14 +760,14 @@ static struct replication_request replication_request_get_first_available() { break; } else - rep.skipped_no_room++; + replication_globals.skipped_no_room++; } // call JudyLNext from now on find_same_after = false; // prepare for the next iteration on the outer loop - rep.last_unique_id = 0; + replication_globals.last_unique_id = 0; } replication_recursive_unlock(); @@ -756,64 +802,28 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ struct replication_request *rq = old_value; (void)rq; struct replication_request *rq_new = new_value; - replication_recursive_lock(); - - if(!rq->indexed_in_judy) { - replication_sort_entry_add(rq); - internal_error( - true, - "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", - rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), - (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", - (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); - } - else - internal_error( - true, - "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", - rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), - (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", - (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + replication_recursive_lock(); - replication_recursive_unlock(); + if(!rq->indexed_in_judy) { + replication_sort_entry_add(rq); + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + } + else { + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), + dictionary_acquired_item_name(item), + (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false"); + } -// bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false; -// -// if(rq_new->after < rq->after && rq_new->after != 0) -// updated_after = true; -// -// if(rq_new->before > rq->before) -// updated_before = true; -// -// if(rq_new->start_streaming != rq->start_streaming) -// updated_start_streaming = true; -// -// if(updated_after || updated_before || updated_start_streaming) { -// replication_recursive_lock(); -// -// if(rq->indexed_in_judy) -// replication_sort_entry_del(rq); -// -// if(rq_new->after < rq->after && rq_new->after != 0) -// rq->after = rq_new->after; -// -// if(rq->after == 0) -// rq->before = 0; -// else if(rq_new->before > rq->before) -// rq->before = rq_new->before; -// -// rq->start_streaming = rq->start_streaming; -// replication_sort_entry_add(rq); -// -// replication_recursive_unlock(); -// updated = true; -// -// internal_error( -// true, -// "STREAM %s [send to %s]: REPLAY ERROR: updated duplicate replication command for chart '%s' (from %llu to %llu [%s])", -// rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), -// (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false"); -// } + replication_recursive_unlock(); string_freez(rq_new->chart_id); return false; @@ -880,9 +890,9 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) { s->replication_reached_max = false; replication_recursive_lock(); - rep.last_after = 0; - rep.last_unique_id = 0; - rep.sender_resets++; + replication_globals.last_after = 0; + replication_globals.last_unique_id = 0; + replication_globals.sender_resets++; replication_recursive_unlock(); } @@ -916,6 +926,79 @@ static void replication_main_cleanup(void *ptr) { #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 13 #define WORKER_JOB_CUSTOM_METRIC_WAITS 14 +#define WORKER_JOB_CHECK_CONSISTENCY 15 + +#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10 + +static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { + if(host->sender) { + size_t pending_requests = host->sender->replication_pending_requests; + size_t dict_entries = dictionary_entries(host->sender->replication_requests); + + internal_error( + !pending_requests && dict_entries, + "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication", + rrdhost_hostname(host), pending_requests, dict_entries); + } + + size_t ok = 0; + size_t errors = 0; + + RRDSET *st; + rrdset_foreach_read(st, host) { + RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + + bool is_error = false; + + if(!flags) { + internal_error( + true, + "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED", + rrdhost_hostname(host), rrdset_id(st) + ); + is_error = true; + } + + if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + internal_error( + true, + "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished", + rrdhost_hostname(host), rrdset_id(st) + ); + is_error = true; + } + + if(is_error) + errors++; + else + ok++; + } + rrdset_foreach_done(st); + + internal_error(errors, + "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished", + rrdhost_hostname(host), ok, errors); + + return errors; +} + +static void verify_all_hosts_charts_are_streaming_now(void) { +#ifdef NETDATA_INTERNAL_CHECKS + worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY); + + size_t errors = 0; + RRDHOST *host; + dfe_start_reentrant(rrdhost_root_index, host) + errors += verify_host_charts_are_streaming_now(host); + dfe_done(host); + + size_t executed = replication_globals.executed; + internal_error(true, "REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors); + replication_globals.last_executed = executed; +#else + ; +#endif +} void *replication_thread_main(void *ptr __maybe_unused) { netdata_thread_cleanup_push(replication_main_cleanup, ptr); @@ -927,6 +1010,8 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming"); + worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); + worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); @@ -937,62 +1022,75 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL); + // start from 100% completed + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); + time_t latest_first_time_t = 0; + long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place + usec_t last_now_mono_ut = now_monotonic_usec(); while(!netdata_exit) { - worker_is_busy(WORKER_JOB_FIND_NEXT); - struct replication_request rq = replication_request_get_first_available(); - worker_is_busy(WORKER_JOB_STATISTICS); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)rep.requests_count); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)rep.added); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.executed); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)rep.skipped_not_connected); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)rep.skipped_no_room); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)rep.sender_resets); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)rep.waits); - - if(latest_first_time_t) { - time_t now = now_realtime_sec(); - time_t total = now - rep.first_time_t; - time_t done = latest_first_time_t - rep.first_time_t; - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total); - } + // statistics + usec_t now_mono_ut = now_monotonic_usec(); + if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) { + last_now_mono_ut = now_mono_ut; - if(unlikely(!rq.found)) { - worker_is_idle(); + if(!replication_globals.pending && run_verification_countdown-- == 0) { + replication_globals.first_time_t = 0; // reset the statistics about completion percentage + verify_all_hosts_charts_are_streaming_now(); + } - if(!rep.requests_count) + worker_is_busy(WORKER_JOB_STATISTICS); + + if(latest_first_time_t && replication_globals.pending) { + // completion percentage statistics + time_t now = now_realtime_sec(); + time_t total = now - replication_globals.first_time_t; + time_t done = latest_first_time_t - replication_globals.first_time_t; + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, + (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total); + } + else worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); - // make it start from the beginning - rep.last_after = 0; - |