summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-24 00:24:21 +0200
committerGitHub <noreply@github.com>2022-11-24 00:24:21 +0200
commit8e1a99ad79a1394cbb0ffcaa24bdde85c7b14d81 (patch)
tree61020aea9c8c6e48032d5721aec4b9ddbfe7c35b /streaming
parent0fe7b1c8a84b7836b5bb13c37924d9fd851c6233 (diff)
replication fixes #5 (#14038)
* pluginsd cleanup; replication logic cleanup; fix bug in replication begin * log replication start/stop and change the keyword of NETDATA_LOG_REPLICATION_REQUESTS logs to REPLAY * dont ask for data the child does not have; log fixes * more pluginsd cleanup * count sender dictionary entries * fix dictionary_flush()
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c351
-rw-r--r--streaming/rrdpush.c22
-rw-r--r--streaming/sender.c10
3 files changed, 243 insertions, 140 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 1e1f7751e9..6e3b145507 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -3,7 +3,7 @@
#include "replication.h"
#include "Judy.h"
-#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 30
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
@@ -23,8 +23,8 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
memset(data, 0, sizeof(data));
if(enable_streaming && st->last_updated.tv_sec > before) {
- internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu",
- rrdset_id(st),
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)before,
(unsigned long long)st->last_updated.tv_sec
);
@@ -65,7 +65,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
data[i].sp = ops->next_metric(&data[i].handle);
internal_error(max_skip <= 0,
- "REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
+ "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
if(data[i].sp.end_time < now)
@@ -84,7 +84,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
time_t wall_clock_time = now_realtime_sec();
if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) {
internal_error(true,
- "REPLAY: host '%s', chart '%s': db provided future start time %llu or end time %llu (now is %llu)",
+ "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)min_start_time,
(unsigned long long)min_end_time,
@@ -93,9 +93,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
}
if(min_end_time < now) {
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
- "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu",
+ "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
break;
}
@@ -130,23 +132,23 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
now = min_end_time + 1;
}
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
if(actual_after) {
char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
internal_error(true,
- "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+ "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
(unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
}
else
internal_error(true,
- "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)",
+ "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
(unsigned long long)after, (unsigned long long)before);
-#endif
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
// release all the dictionary items acquired
// finalize the queries
@@ -193,8 +195,9 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
time_t first_entry_local = rrdset_first_entry_t(st);
if(first_entry_local > now + tolerance) {
internal_error(true,
- "RRDSET: '%s' first time %llu is in the future (now is %llu)",
- rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
+ "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)first_entry_local, (unsigned long long)now);
first_entry_local = now;
}
@@ -205,15 +208,16 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
time_t last_entry_local = st->last_updated.tv_sec;
if(!last_entry_local) {
internal_error(true,
- "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
- rrdset_id(st));
+ "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
last_entry_local = rrdset_last_entry_t(st);
}
if(last_entry_local > now + tolerance) {
internal_error(true,
- "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
- rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+ "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)last_entry_local, (unsigned long long)now);
last_entry_local = now;
}
@@ -263,51 +267,90 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
return enable_streaming;
}
-static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+// ----------------------------------------------------------------------------
+// sending replication requests
- if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || after < st->rrdhost->receiver->replication_first_time_t))
- st->rrdhost->receiver->replication_first_time_t = after;
+struct replication_request_details {
+ struct {
+ send_command callback;
+ void *data;
+ } caller;
-#ifdef NETDATA_INTERNAL_CHECKS
- if(after && before) {
- char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1];
- log_date(after_buf, LOG_DATE_LENGTH, after);
- log_date(before_buf, LOG_DATE_LENGTH, before);
- internal_error(true,
- "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)after, after_buf, (unsigned long long)before, before_buf,
- start_streaming?"true":"false");
- }
- else {
- internal_error(true,
- "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- start_streaming?"true":"false");
- }
-#endif
+ RRDHOST *host;
+ RRDSET *st;
-#ifdef NETDATA_INTERNAL_CHECKS
- internal_error(
- st->replay.after != 0 || st->replay.before != 0,
- "REPLAY ERROR: host '%s', chart '%s': sending replication request, while there is another inflight",
- rrdhost_hostname(st->rrdhost), rrdset_id(st)
- );
-
- st->replay.start_streaming = start_streaming;
- st->replay.after = after;
- st->replay.before = before;
-#endif
+ struct {
+ time_t first_entry_t; // the first entry time the child has
+ time_t last_entry_t; // the last entry time the child has
+ } child_db;
+
+ struct {
+ time_t first_entry_t; // the first entry time we have
+ time_t last_entry_t; // the last entry time we have
+ bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed
+ } local_db;
+
+ struct {
+ time_t from; // the starting time of the entire gap we have
+ time_t to; // the ending time of the entire gap we have
+ } gap;
- debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
- rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+ struct {
+ time_t after; // the start time we requested previously from this child
+ time_t before; // the end time we requested previously from this child
+ } last_request;
+
+ struct {
+ time_t after; // the start time of this replication request - the child will add 1 second
+ time_t before; // the end time of this replication request
+ bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
+ } wanted;
+
+ time_t now; // the current wall clock time
+};
+
+static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
+ RRDSET *st = r->st;
+
+ if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
+ st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
+
+ if(r->wanted.after)
+ log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
+
+ if(r->wanted.before)
+ log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
+
+ internal_error(true,
+ "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
+ "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s"
+ , rrdhost_hostname(r->host), rrdset_id(r->st)
+ , r->wanted.after, wanted_after_buf
+ , r->wanted.before, wanted_before_buf
+ , r->wanted.start_streaming ? "YES" : "NO"
+ , msg
+ , r->last_request.after, r->last_request.before
+ , r->child_db.first_entry_t, r->child_db.last_entry_t
+ , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW"
+ , r->gap.from, r->gap.to
+ , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
+ , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
+ );
+
+ st->replay.start_streaming = r->wanted.start_streaming;
+ st->replay.after = r->wanted.after;
+ st->replay.before = r->wanted.before;
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
char buffer[2048 + 1];
snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
- rrdset_id(st), start_streaming ? "true" : "false",
- (unsigned long long)after, (unsigned long long)before);
+ rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
+ (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
- int ret = callback(buffer, callback_data);
+ int ret = r->caller.callback(buffer, r->caller.data);
if (ret < 0) {
error("REPLICATION: failed to send replication request to child (error %d)", ret);
return false;
@@ -320,81 +363,110 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
time_t first_entry_child, time_t last_entry_child,
time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
{
- time_t now = now_realtime_sec();
-
- // if replication is disabled, send an empty replication request
- // asking no data
- if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) {
- internal_error(true,
- "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
- rrdhost_hostname(host), rrdset_id(st));
+ struct replication_request_details r = {
+ .caller = {
+ .callback = callback,
+ .data = callback_data,
+ },
+
+ .host = host,
+ .st = st,
+
+ .child_db = {
+ .first_entry_t = first_entry_child,
+ .last_entry_t = last_entry_child,
+ },
+
+ .last_request = {
+ .after = prev_first_entry_wanted,
+ .before = prev_last_entry_wanted,
+ },
+
+ .wanted = {
+ .after = 0,
+ .before = 0,
+ .start_streaming = true,
+ },
+
+ .now = now_realtime_sec(),
+ };
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ // get our local database retention
+ r.local_db.first_entry_t = rrdset_first_entry_t(st);
+ r.local_db.last_entry_t = rrdset_last_entry_t(st);
+ if(r.local_db.last_entry_t > r.now) {
+ r.local_db.last_entry_t = r.now;
+ r.local_db.last_entry_t_adjusted_to_now = true;
}
- // Child has no stored data
- if (!last_entry_child) {
- error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data",
- rrdhost_hostname(host), rrdset_id(st));
+ // let's find the GAP we have
+ if(!r.last_request.after || !r.last_request.before) {
+ // there is no previous request
+
+ if(r.local_db.last_entry_t)
+ // we have some data, let's continue from the last point we have
+ r.gap.from = r.local_db.last_entry_t;
+ else
+ // we don't have any data, the gap is the max timeframe we are allowed to replicate
+ r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate;
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+ }
+ else {
+ // we had sent a request - let's continue at the point we left it
+ // for this we don't take into account the actual data in our db
+ // because the child may also have gaps and we need to get over it
+ r.gap.from = r.last_request.before;
}
- // Nothing to get if the chart has not dimensions
- if (!rrdset_number_of_dimensions(st)) {
- error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions",
- rrdhost_hostname(host), rrdset_id(st));
+ // we want all the data up to now
+ r.gap.to = r.now;
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
+ // The gap is now r.gap.from -> r.gap.to
- // if the child's first/last entries are nonsensical, resume streaming
- // without asking for any data
- if (first_entry_child <= 0) {
- error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child);
+ if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
+ return send_replay_chart_cmd(&r, "empty replication request, replication is disabled");
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
+ if (unlikely(!r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, child has no stored data");
- if (first_entry_child > last_entry_child) {
- error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+ if (unlikely(!rrdset_number_of_dimensions(st)))
+ return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions");
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
+ if (r.child_db.first_entry_t <= 0)
+ return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid");
- time_t last_entry_local = rrdset_last_entry_t(st);
- if(last_entry_local > now) {
- internal_error(true,
- "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
- last_entry_local = now;
- }
+ if (r.child_db.first_entry_t > r.child_db.last_entry_t)
+ return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)");
- // should never happen but if it does, start streaming without asking for any data
- if (last_entry_local > last_entry_child) {
- error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child);
+ if (r.local_db.last_entry_t > r.child_db.last_entry_t)
+ return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one");
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
+ // let's find what the child can provide to fill that gap
- time_t first_entry_wanted;
- if (prev_first_entry_wanted && prev_last_entry_wanted) {
- first_entry_wanted = prev_last_entry_wanted;
- if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate)
- first_entry_wanted = now - host->rrdpush_seconds_to_replicate;
- }
+ if(r.child_db.first_entry_t > r.gap.from)
+ // the child does not have all the data - let's get what it has
+ r.wanted.after = r.child_db.first_entry_t;
+ else
+ // ok, the child can fill the entire gap we have
+ r.wanted.after = r.gap.from;
+
+ if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
+ // the duration is too big for one request - let's take the first step
+ r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
else
- first_entry_wanted = MAX(last_entry_local, first_entry_child);
+ // wow, we can do it in one request
+ r.wanted.before = r.gap.to;
- time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step;
- last_entry_wanted = MIN(last_entry_wanted, last_entry_child);
+ // don't ask from the child more than it has
+ if(r.wanted.before > r.child_db.last_entry_t)
+ r.wanted.before = r.child_db.last_entry_t;
- bool start_streaming = (last_entry_wanted == last_entry_child);
+ // the child should start streaming immediately if the wanted duration is small
+ r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t);
- return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted);
+ // the wanted timeframe is now r.wanted.after -> r.wanted.before
+ // send it
+ return send_replay_chart_cmd(&r, "OK");
}
// ----------------------------------------------------------------------------
@@ -633,6 +705,7 @@ static struct replication_request replication_request_get_first_available() {
else if(sender_has_room_to_spare) {
// copy the request to return it
rq = *rse->rq;
+ rq.chart_id = string_dup(rq.chart_id);
// set the return result to found
rq.found = true;
@@ -662,14 +735,6 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may
struct sender_state *s = sender_state; (void)s;
struct replication_request *rq = value;
- RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
- if(!st) {
- internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
- string2str(rq->chart_id), rrdhost_hostname(rq->sender->host));
- }
- else
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
-
// IMPORTANT:
// We use the react instead of the insert callback
// because we want the item to be atomically visible
@@ -691,12 +756,26 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
struct replication_request *rq = old_value; (void)rq;
struct replication_request *rq_new = new_value;
- internal_error(
- true,
- "STREAM %s [send to %s]: REPLAY ERROR: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
- (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
- (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+ replication_recursive_lock();
+
+ if(!rq->indexed_in_judy) {
+ replication_sort_entry_add(rq);
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+ }
+ else
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+
+ replication_recursive_unlock();
// bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false;
//
@@ -880,7 +959,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total);
}
- if(!rq.found) {
+ if(unlikely(!rq.found)) {
worker_is_idle();
if(!rep.requests_count)
@@ -898,7 +977,15 @@ void *replication_thread_main(void *ptr __maybe_unused) {
else {
// delete the request from the dictionary
worker_is_busy(WORKER_JOB_DELETE_ENTRY);
- dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id));
+ if(!dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id)))
+ error("REPLAY: 'host:%s/chart:%s' failed to be deleted from sender dictionary",
+ rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
+
+ if(rq.sender->replication_pending_requests == 0 && dictionary_entries(rq.sender->replication_requests) != 0)
+ error("REPLAY: 'host:%s/chart:%s' sender dictionary has %zu entries, but sender pending requests are %zu",
+ rrdhost_hostname(rq.sender->host), string2str(rq.chart_id),
+ dictionary_entries(rq.sender->replication_requests),
+ rq.sender->replication_pending_requests);
}
worker_is_busy(WORKER_JOB_FIND_CHART);
@@ -910,13 +997,6 @@ void *replication_thread_main(void *ptr __maybe_unused) {
continue;
}
- if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
- }
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
-
worker_is_busy(WORKER_JOB_QUERYING);
latest_first_time_t = rq.after;
@@ -947,11 +1027,18 @@ void *replication_thread_main(void *ptr __maybe_unused) {
rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
}
else
internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating",
string2str(rq.chart_id), rrdhost_hostname(st->rrdhost));
}
+
+ string_freez(rq.chart_id);
}
netdata_thread_cleanup_pop(1);
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index d19335256e..e471c97ead 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -298,21 +298,32 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
if(!last_entry_local) {
internal_error(true,
- "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
- rrdset_id(st));
+ "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
last_entry_local = rrdset_last_entry_t(st);
time_t now = now_realtime_sec();
+
if(last_entry_local > now) {
internal_error(true,
- "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
- rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+ "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)last_entry_local, (unsigned long long)now);
last_entry_local = now;
}
}
buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu\n",
(unsigned long long)first_entry_local, (unsigned long long)last_entry_local);
+
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
}
st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
@@ -344,7 +355,8 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "\n", 1);
}
else {
- internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
+ internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
// we will include it in the next iteration
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
}
diff --git a/streaming/sender.c b/streaming/sender.c
index 85aad3a3e5..f4624f5998 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -23,9 +23,10 @@
#define WORKER_SENDER_JOB_BYTES_SENT 17
#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 20
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 20
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
#endif
extern struct config stream_config;
@@ -225,7 +226,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
st->upstream_resync_time = 0;
@@ -1099,6 +1100,7 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
struct sender_state *s = ptr;
s->tid = gettid();
@@ -1342,6 +1344,8 @@ void *rrdpush_sender_thread(void *ptr) {
rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
+
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication_requests));
}
netdata_thread_cleanup_pop(1);