summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-25 20:37:15 +0200
committerGitHub <noreply@github.com>2022-11-25 20:37:15 +0200
commit2e874e79163771856e4e756b176b729f7d8b0f0f (patch)
treeeeb1ea10af039001e3290090d5a2d365f99f63c7 /streaming
parent870acd61123ece7c074242e1b02d47cb7c667e38 (diff)
replication fixes #6 (#14046)
use the faster monotonic clock in workers and replication; avoid unecessary statistics function on every request on replication - gather them all together once every second; check the chart flags on all mirrored hosts, not only the ones that have a sender; cleanup and unify replication logs; added child world time to REND; fix first BEGIN been transmitted when replication starts;
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c460
-rw-r--r--streaming/replication.h2
-rw-r--r--streaming/rrdpush.c60
3 files changed, 320 insertions, 202 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 6e3b145507..d88095761e 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -6,7 +6,10 @@
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
-static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
+// ----------------------------------------------------------------------------
+// sending replication replies
+
+static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
@@ -23,7 +26,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
memset(data, 0, sizeof(data));
if(enable_streaming && st->last_updated.tv_sec > before) {
- internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu",
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)before,
(unsigned long long)st->last_updated.tv_sec
@@ -35,8 +38,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
{
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (rd_dfe.counter >= dimensions)
+ if (rd_dfe.counter >= dimensions) {
+ internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
break;
+ }
if(rd->exposed) {
data[rd_dfe.counter].dict = rd_dfe.dict;
@@ -65,7 +71,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
data[i].sp = ops->next_metric(&data[i].handle);
internal_error(max_skip <= 0,
- "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
if(data[i].sp.end_time < now)
@@ -81,10 +87,9 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
}
}
- time_t wall_clock_time = now_realtime_sec();
- if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) {
+ if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) {
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)min_start_time,
(unsigned long long)min_end_time,
@@ -95,7 +100,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
if(min_end_time < now) {
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
break;
@@ -138,14 +143,14 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
(unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
}
else
internal_error(true,
- "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
@@ -195,7 +200,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
time_t first_entry_local = rrdset_first_entry_t(st);
if(first_entry_local > now + tolerance) {
internal_error(true,
- "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)first_entry_local, (unsigned long long)now);
first_entry_local = now;
@@ -208,14 +213,20 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
time_t last_entry_local = st->last_updated.tv_sec;
if(!last_entry_local) {
internal_error(true,
- "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
rrdhost_hostname(st->rrdhost), rrdset_id(st));
last_entry_local = rrdset_last_entry_t(st);
+ if(!last_entry_local) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ last_entry_local = now;
+ }
}
if(last_entry_local > now + tolerance) {
internal_error(true,
- "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)last_entry_local, (unsigned long long)now);
last_entry_local = now;
@@ -240,28 +251,49 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
// and copying the result to the host's buffer in order to avoid
// holding the host's buffer lock for too long
BUFFER *wb = sender_start(host->sender);
- {
- // pass the original after/before so that the parent knows about
- // which time range we responded
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
-
- if(after != 0 && before != 0)
- before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming);
- else {
- after = 0;
- before = 0;
- enable_streaming = true;
- }
- if(enable_streaming)
- replicate_chart_collection_state(wb, st);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
- // end with first/last entries we have, and the first start time and
- // last end time of the data we sent
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n",
- (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local,
- enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+ if(after != 0 && before != 0)
+ before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
+ else {
+ after = 0;
+ before = 0;
+ enable_streaming = true;
}
+
+ // get again the world clock time
+ time_t world_clock_time = now_realtime_sec();
+ if(enable_streaming) {
+ if(now < world_clock_time) {
+ // we needed time to execute this request
+ // so, the parent will need to replicate more data
+ enable_streaming = false;
+ }
+ else
+ replicate_chart_collection_state(wb, st);
+ }
+
+ // end with first/last entries we have, and the first start time and
+ // last end time of the data we sent
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
+
+ // current chart update every
+ (int)st->update_every
+
+ // child first db time, child end db time
+ , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
+
+ // start streaming boolean
+ , enable_streaming ? "true" : "false"
+
+ // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
+ , (unsigned long long)after, (unsigned long long)before
+
+ // child world clock time
+ , (unsigned long long)world_clock_time
+ );
+
sender_commit(host->sender, wb);
return enable_streaming;
@@ -282,12 +314,14 @@ struct replication_request_details {
struct {
time_t first_entry_t; // the first entry time the child has
time_t last_entry_t; // the last entry time the child has
+ time_t world_time_t; // the current time of the child
} child_db;
struct {
time_t first_entry_t; // the first entry time we have
time_t last_entry_t; // the last entry time we have
bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed
+ time_t now; // the current local world clock time
} local_db;
struct {
@@ -305,8 +339,6 @@ struct replication_request_details {
time_t before; // the end time of this replication request
bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
} wanted;
-
- time_t now; // the current wall clock time
};
static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
@@ -316,6 +348,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.log_next_data_collection = true;
+
char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
if(r->wanted.after)
@@ -326,7 +360,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
internal_error(true,
"REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
- "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s"
+ "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
, rrdhost_hostname(r->host), rrdset_id(r->st)
, r->wanted.after, wanted_after_buf
, r->wanted.before, wanted_before_buf
@@ -334,7 +368,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
, msg
, r->last_request.after, r->last_request.before
, r->child_db.first_entry_t, r->child_db.last_entry_t
- , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW"
+ , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+ , r->local_db.first_entry_t, r->local_db.last_entry_t
+ , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
, r->gap.from, r->gap.to
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -352,7 +388,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
int ret = r->caller.callback(buffer, r->caller.data);
if (ret < 0) {
- error("REPLICATION: failed to send replication request to child (error %d)", ret);
+ error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+ rrdhost_hostname(r->host), rrdset_id(r->st), ret);
return false;
}
@@ -360,7 +397,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
}
bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
- time_t first_entry_child, time_t last_entry_child,
+ time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
{
struct replication_request_details r = {
@@ -375,6 +412,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
.child_db = {
.first_entry_t = first_entry_child,
.last_entry_t = last_entry_child,
+ .world_time_t = child_world_time,
+ },
+
+ .local_db = {
+ .first_entry_t = rrdset_first_entry_t(st),
+ .last_entry_t = rrdset_last_entry_t(st),
+ .last_entry_t_adjusted_to_now = false,
+ .now = now_realtime_sec(),
},
.last_request = {
@@ -387,15 +432,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
.before = 0,
.start_streaming = true,
},
-
- .now = now_realtime_sec(),
};
- // get our local database retention
- r.local_db.first_entry_t = rrdset_first_entry_t(st);
- r.local_db.last_entry_t = rrdset_last_entry_t(st);
- if(r.local_db.last_entry_t > r.now) {
- r.local_db.last_entry_t = r.now;
+ // check our local database retention
+ if(r.local_db.last_entry_t > r.local_db.now) {
+ r.local_db.last_entry_t = r.local_db.now;
r.local_db.last_entry_t_adjusted_to_now = true;
}
@@ -408,7 +449,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
r.gap.from = r.local_db.last_entry_t;
else
// we don't have any data, the gap is the max timeframe we are allowed to replicate
- r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate;
+ r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
}
else {
@@ -419,7 +460,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
}
// we want all the data up to now
- r.gap.to = r.now;
+ r.gap.to = r.local_db.now;
// The gap is now r.gap.from -> r.gap.to
@@ -461,8 +502,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
if(r.wanted.before > r.child_db.last_entry_t)
r.wanted.before = r.child_db.last_entry_t;
- // the child should start streaming immediately if the wanted duration is small
- r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t);
+ if(r.wanted.after > r.wanted.before)
+ r.wanted.after = r.wanted.before;
+
+ // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child
+ r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
// the wanted timeframe is now r.wanted.after -> r.wanted.before
// send it
@@ -499,11 +543,12 @@ struct replication_sort_entry {
static struct replication_thread {
netdata_mutex_t mutex;
+ size_t pending;
size_t added;
size_t executed;
size_t removed;
+ size_t last_executed;
time_t first_time_t;
- size_t requests_count;
Word_t next_unique_id;
struct replication_request *requests;
@@ -516,12 +561,13 @@ static struct replication_thread {
size_t waits;
Pvoid_t JudyL_array;
-} rep = {
+} replication_globals = {
.mutex = NETDATA_MUTEX_INITIALIZER,
+ .pending = 0,
.added = 0,
.executed = 0,
+ .last_executed = 0,
.first_time_t = 0,
- .requests_count = 0,
.next_unique_id = 1,
.skipped_no_room = 0,
.skipped_not_connected = 0,
@@ -535,7 +581,7 @@ static __thread int replication_recursive_mutex_recursions = 0;
static void replication_recursive_lock() {
if(++replication_recursive_mutex_recursions == 1)
- netdata_mutex_lock(&rep.mutex);
+ netdata_mutex_lock(&replication_globals.mutex);
#ifdef NETDATA_INTERNAL_CHECKS
if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
@@ -545,7 +591,7 @@ static void replication_recursive_lock() {
static void replication_recursive_unlock() {
if(--replication_recursive_mutex_recursions == 0)
- netdata_mutex_unlock(&rep.mutex);
+ netdata_mutex_unlock(&replication_globals.mutex);
#ifdef NETDATA_INTERNAL_CHECKS
if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
@@ -563,7 +609,7 @@ static struct replication_sort_entry *replication_sort_entry_create(struct repli
// copy the request
rse->rq = rq;
- rse->unique_id = rep.next_unique_id++;
+ rse->unique_id = replication_globals.next_unique_id++;
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
@@ -580,29 +626,29 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
struct replication_sort_entry *rse = replication_sort_entry_create(rq);
- if(rq->after < (time_t)rep.last_after) {
+ if(rq->after < (time_t)replication_globals.last_after) {
// make it find this request first
- rep.last_after = rq->after;
- rep.last_unique_id = rq->unique_id;
+ replication_globals.last_after = rq->after;
+ replication_globals.last_unique_id = rq->unique_id;
}
- rep.added++;
- rep.requests_count++;
+ replication_globals.added++;
+ replication_globals.pending++;
Pvoid_t *inner_judy_ptr;
// find the outer judy entry, using after as key
- inner_judy_ptr = JudyLGet(rep.JudyL_array, (Word_t) rq->after, PJE0);
+ inner_judy_ptr = JudyLGet(replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
if(!inner_judy_ptr)
- inner_judy_ptr = JudyLIns(&rep.JudyL_array, (Word_t) rq->after, PJE0);
+ inner_judy_ptr = JudyLIns(&replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
// add it to the inner judy, using unique_id as key
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
*item = rse;
rq->indexed_in_judy = true;
- if(!rep.first_time_t || rq->after < rep.first_time_t)
- rep.first_time_t = rq->after;
+ if(!replication_globals.first_time_t || rq->after < replication_globals.first_time_t)
+ replication_globals.first_time_t = rq->after;
replication_recursive_unlock();
@@ -612,8 +658,8 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
bool inner_judy_deleted = false;
- rep.removed++;
- rep.requests_count--;
+ replication_globals.removed++;
+ replication_globals.pending--;
rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
@@ -624,7 +670,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
// if no items left, delete it from the outer judy
if(**inner_judy_ppptr == NULL) {
- JudyLDel(&rep.JudyL_array, rse->rq->after, PJE0);
+ JudyLDel(&replication_globals.JudyL_array, rse->rq->after, PJE0);
inner_judy_deleted = true;
}
@@ -641,7 +687,7 @@ static void replication_sort_entry_del(struct replication_request *rq) {
replication_recursive_lock();
if(rq->indexed_in_judy) {
- inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0);
+ inner_judy_pptr = JudyLGet(replication_globals.JudyL_array, rq->after, PJE0);
if (inner_judy_pptr) {
Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
if (our_item_pptr) {
@@ -651,7 +697,7 @@ static void replication_sort_entry_del(struct replication_request *rq) {
}
if (!rse_to_delete)
- fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.",
+ fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
}
@@ -674,16 +720,16 @@ static struct replication_request replication_request_get_first_available() {
struct replication_request rq = (struct replication_request){ .found = false };
- if(unlikely(!rep.last_after || !rep.last_unique_id)) {
- rep.last_after = 0;
- rep.last_unique_id = 0;
+ if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) {
+ replication_globals.last_after = 0;
+ replication_globals.last_unique_id = 0;
}
bool find_same_after = true;
- while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(rep.JudyL_array, &rep.last_after, find_same_after))) {
+ while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
Pvoid_t *our_item_pptr;
- while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) {
+ while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
struct replication_sort_entry *rse = *our_item_pptr;
struct sender_state *s = rse->rq->sender;
@@ -697,7 +743,7 @@ static struct replication_request replication_request_get_first_available() {
s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
- rep.skipped_not_connected++;
+ replication_globals.skipped_not_connected++;
if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
break;
}
@@ -714,14 +760,14 @@ static struct replication_request replication_request_get_first_available() {
break;
}
else
- rep.skipped_no_room++;
+ replication_globals.skipped_no_room++;
}
// call JudyLNext from now on
find_same_after = false;
// prepare for the next iteration on the outer loop
- rep.last_unique_id = 0;
+ replication_globals.last_unique_id = 0;
}
replication_recursive_unlock();
@@ -756,64 +802,28 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
struct replication_request *rq = old_value; (void)rq;
struct replication_request *rq_new = new_value;
- replication_recursive_lock();
-
- if(!rq->indexed_in_judy) {
- replication_sort_entry_add(rq);
- internal_error(
- true,
- "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
- (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
- (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
- }
- else
- internal_error(
- true,
- "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
- (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
- (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+ replication_recursive_lock();
- replication_recursive_unlock();
+ if(!rq->indexed_in_judy) {
+ replication_sort_entry_add(rq);
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+ }
+ else {
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
+ dictionary_acquired_item_name(item),
+ (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
+ }
-// bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false;
-//
-// if(rq_new->after < rq->after && rq_new->after != 0)
-// updated_after = true;
-//
-// if(rq_new->before > rq->before)
-// updated_before = true;
-//
-// if(rq_new->start_streaming != rq->start_streaming)
-// updated_start_streaming = true;
-//
-// if(updated_after || updated_before || updated_start_streaming) {
-// replication_recursive_lock();
-//
-// if(rq->indexed_in_judy)
-// replication_sort_entry_del(rq);
-//
-// if(rq_new->after < rq->after && rq_new->after != 0)
-// rq->after = rq_new->after;
-//
-// if(rq->after == 0)
-// rq->before = 0;
-// else if(rq_new->before > rq->before)
-// rq->before = rq_new->before;
-//
-// rq->start_streaming = rq->start_streaming;
-// replication_sort_entry_add(rq);
-//
-// replication_recursive_unlock();
-// updated = true;
-//
-// internal_error(
-// true,
-// "STREAM %s [send to %s]: REPLAY ERROR: updated duplicate replication command for chart '%s' (from %llu to %llu [%s])",
-// rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
-// (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false");
-// }
+ replication_recursive_unlock();
string_freez(rq_new->chart_id);
return false;
@@ -880,9 +890,9 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
s->replication_reached_max = false;
replication_recursive_lock();
- rep.last_after = 0;
- rep.last_unique_id = 0;
- rep.sender_resets++;
+ replication_globals.last_after = 0;
+ replication_globals.last_unique_id = 0;
+ replication_globals.sender_resets++;
replication_recursive_unlock();
}
@@ -916,6 +926,79 @@ static void replication_main_cleanup(void *ptr) {
#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 13
#define WORKER_JOB_CUSTOM_METRIC_WAITS 14
+#define WORKER_JOB_CHECK_CONSISTENCY 15
+
+#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10
+
+static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
+ if(host->sender) {
+ size_t pending_requests = host->sender->replication_pending_requests;
+ size_t dict_entries = dictionary_entries(host->sender->replication_requests);
+
+ internal_error(
+ !pending_requests && dict_entries,
+ "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
+ rrdhost_hostname(host), pending_requests, dict_entries);
+ }
+
+ size_t ok = 0;
+ size_t errors = 0;
+
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+ bool is_error = false;
+
+ if(!flags) {
+ internal_error(
+ true,
+ "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
+ rrdhost_hostname(host), rrdset_id(st)
+ );
+ is_error = true;
+ }
+
+ if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ internal_error(
+ true,
+ "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
+ rrdhost_hostname(host), rrdset_id(st)
+ );
+ is_error = true;
+ }
+
+ if(is_error)
+ errors++;
+ else
+ ok++;
+ }
+ rrdset_foreach_done(st);
+
+ internal_error(errors,
+ "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
+ rrdhost_hostname(host), ok, errors);
+
+ return errors;
+}
+
+static void verify_all_hosts_charts_are_streaming_now(void) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
+
+ size_t errors = 0;
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host)
+ errors += verify_host_charts_are_streaming_now(host);
+ dfe_done(host);
+
+ size_t executed = replication_globals.executed;
+ internal_error(true, "REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
+ replication_globals.last_executed = executed;
+#else
+ ;
+#endif
+}
void *replication_thread_main(void *ptr __maybe_unused) {
netdata_thread_cleanup_push(replication_main_cleanup, ptr);
@@ -927,6 +1010,8 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming");
+ worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
+ worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
@@ -937,62 +1022,75 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ // start from 100% completed
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+
time_t latest_first_time_t = 0;
+ long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
+ usec_t last_now_mono_ut = now_monotonic_usec();
while(!netdata_exit) {
- worker_is_busy(WORKER_JOB_FIND_NEXT);
- struct replication_request rq = replication_request_get_first_available();
- worker_is_busy(WORKER_JOB_STATISTICS);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)rep.requests_count);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)rep.added);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.executed);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)rep.skipped_not_connected);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)rep.skipped_no_room);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)rep.sender_resets);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)rep.waits);
-
- if(latest_first_time_t) {
- time_t now = now_realtime_sec();
- time_t total = now - rep.first_time_t;
- time_t done = latest_first_time_t - rep.first_time_t;
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total);
- }
+ // statistics
+ usec_t now_mono_ut = now_monotonic_usec();
+ if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
+ last_now_mono_ut = now_mono_ut;
- if(unlikely(!rq.found)) {
- worker_is_idle();
+ if(!replication_globals.pending && run_verification_countdown-- == 0) {
+ replication_globals.first_time_t = 0; // reset the statistics about completion percentage
+ verify_all_hosts_charts_are_streaming_now();
+ }
- if(!rep.requests_count)
+ worker_is_busy(WORKER_JOB_STATISTICS);
+
+ if(latest_first_time_t && replication_globals.pending) {
+ // completion percentage statistics
+ time_t now = now_realtime_sec();
+ time_t total = now - replication_globals.first_time_t;
+ time_t done = latest_first_time_t - replication_globals.first_time_t;
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
+ (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
+ }
+ else
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
- // make it start from the beginning
- rep.last_after = 0;
-