summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-20 23:47:53 +0200
committerGitHub <noreply@github.com>2022-11-20 23:47:53 +0200
commit284f6f3aa4f36cefad2601c490510621496c2b53 (patch)
tree97a7d55627ef7477f431c53a20d0e6f1f738a419 /streaming/replication.c
parent2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff)
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c501
1 files changed, 485 insertions, 16 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index ef384f4e8f..851b5b4c7c 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "replication.h"
+#include "Judy.h"
static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
size_t dimensions = rrdset_number_of_dimensions(st);
@@ -13,6 +14,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
RRDDIM *rd;
struct storage_engine_query_handle handle;
STORAGE_POINT sp;
+ bool enabled;
} data[dimensions];
memset(data, 0, sizeof(data));
@@ -33,27 +35,35 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
if (rd_dfe.counter >= dimensions)
break;
- data[rd_dfe.counter].dict = rd_dfe.dict;
- data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
- data[rd_dfe.counter].rd = rd;
+ if(rd->exposed) {
+ data[rd_dfe.counter].dict = rd_dfe.dict;
+ data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ data[rd_dfe.counter].rd = rd;
- ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
+ ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
+
+ data[rd_dfe.counter].enabled = true;
+ }
+ else
+ data[rd_dfe.counter].enabled = false;
}
rrddim_foreach_done(rd);
}
- time_t now = after, actual_after = 0, actual_before = 0;
+ time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
while(now <= before) {
time_t min_start_time = 0, min_end_time = 0;
for (size_t i = 0; i < dimensions && data[i].rd; i++) {
+ if(!data[i].enabled) continue;
+
// fetch the first valid point for the dimension
int max_skip = 100;
while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
data[i].sp = ops->next_metric(&data[i].handle);
- if(max_skip <= 0)
- error("REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long)now);
+ internal_error(max_skip <= 0,
+ "REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
if(data[i].sp.end_time < now)
continue;
@@ -68,6 +78,17 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
}
}
+ time_t wall_clock_time = now_realtime_sec();
+ if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) {
+ internal_error(true,
+ "REPLAY: host '%s', chart '%s': db provided future start time %llu or end time %llu (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)min_start_time,
+ (unsigned long long)min_end_time,
+ (unsigned long long)wall_clock_time);
+ break;
+ }
+
if(min_end_time < now) {
internal_error(true,
"REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu",
@@ -85,14 +106,18 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
else
actual_before = min_end_time;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu\n"
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n"
, (unsigned long long)min_start_time
- , (unsigned long long)min_end_time);
+ , (unsigned long long)min_end_time
+ , (unsigned long long)wall_clock_time
+ );
// output the replay values for this time
for (size_t i = 0; i < dimensions && data[i].rd; i++) {
+ if(!data[i].enabled) continue;
+
if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT_AUTO " \"%s\"\n",
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
else
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
@@ -123,6 +148,8 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
// release all the dictionary items acquired
// finalize the queries
for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
+ if(!data[i].enabled) continue;
+
ops->finalize(&data[i].handle);
dictionary_acquired_item_release(data[i].dict, data[i].rda);
}
@@ -133,7 +160,9 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT_AUTO " " NETDATA_DOUBLE_FORMAT_AUTO "\n",
+ if(!rd->exposed) continue;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
rrddim_id(rd),
(usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
rd->last_collected_value,
@@ -233,6 +262,9 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+ if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || after < st->rrdhost->receiver->replication_first_time_t))
+ st->rrdhost->receiver->replication_first_time_t = after;
+
#ifdef NETDATA_INTERNAL_CHECKS
if(after && before) {
char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1];
@@ -252,6 +284,18 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR
}
#endif
+#ifdef NETDATA_INTERNAL_CHECKS
+ internal_error(
+ st->replay.after != 0 || st->replay.before != 0,
+ "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st)
+ );
+
+ st->replay.start_streaming = start_streaming;
+ st->replay.after = after;
+ st->replay.before = before;
+#endif
+
debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
@@ -262,7 +306,7 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR
int ret = callback(buffer, callback_data);
if (ret < 0) {
- error("failed to send replay request to child (ret=%d)", ret);
+ error("REPLICATION: failed to send replication request to child (error %d)", ret);
return false;
}
@@ -277,7 +321,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
// if replication is disabled, send an empty replication request
// asking no data
- if (!host->rrdpush_enable_replication) {
+ if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) {
internal_error(true,
"REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
rrdhost_hostname(host), rrdset_id(st));
@@ -325,8 +369,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
last_entry_local = now;
}
- // should never happen but it if does, start streaming without asking
- // for any data
+ // should never happen but if it does, start streaming without asking for any data
if (last_entry_local > last_entry_child) {
error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)",
rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child);
@@ -350,3 +393,429 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted);
}
+
+// ----------------------------------------------------------------------------
+
+static size_t sender_buffer_used_percent(struct sender_state *s) {
+ netdata_mutex_lock(&s->mutex);
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
+ netdata_mutex_unlock(&s->mutex);
+
+ return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size;
+}
+
+
+// ----------------------------------------------------------------------------
+// replication thread
+
+// replication request in sender DICTIONARY
+// used for de-duplicating the requests
+struct replication_request {
+ struct sender_state *sender;
+ usec_t sender_last_flush_ut;
+ STRING *chart_id;
+ time_t after; // key for sorting (JudyL)
+ time_t before;
+ bool start_streaming;
+ bool found;
+};
+
+// replication sort entry in JudyL array
+// used for sorting all requests, across all nodes
+struct replication_sort_entry {
+ struct replication_request req;
+
+ const void *unique_id; // used as a key to identify the sort entry - we never access its contents
+ bool executed;
+ struct replication_sort_entry *next;
+};
+
+// the global variables for the replication thread
+static struct replication_thread {
+ netdata_mutex_t mutex;
+
+ size_t added;
+ size_t removed;
+ time_t first_time_t;
+ size_t requests_count;
+ struct replication_request *requests;
+
+ Pvoid_t JudyL_array;
+} rep = {
+ .mutex = NETDATA_MUTEX_INITIALIZER,
+ .added = 0,
+ .removed = 0,
+ .first_time_t = 0,
+ .requests_count = 0,
+ .requests = NULL,
+ .JudyL_array = NULL,
+};
+
+static __thread int replication_recursive_mutex_recursions = 0;
+
+static void replication_recursive_lock() {
+ if(++replication_recursive_mutex_recursions == 1)
+ netdata_mutex_lock(&rep.mutex);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
+ fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions);
+#endif
+}
+
+static void replication_recursive_unlock() {
+ if(--replication_recursive_mutex_recursions == 0)
+ netdata_mutex_unlock(&rep.mutex);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
+ fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions);
+#endif
+}
+
+// ----------------------------------------------------------------------------
+// replication sort entry management
+
+static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *r, const void *unique_id) {
+ struct replication_sort_entry *t = mallocz(sizeof(struct replication_sort_entry));
+
+ // copy the request
+ t->req = *r;
+ t->req.chart_id = string_dup(r->chart_id);
+
+
+ t->unique_id = unique_id;
+ t->executed = false;
+ t->next = NULL;
+ return t;
+}
+
+static void replication_sort_entry_destroy(struct replication_sort_entry *t) {
+ string_freez(t->req.chart_id);
+ freez(t);
+}
+
+static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *r, const void *unique_id) {
+ struct replication_sort_entry *t = replication_sort_entry_create(r, unique_id);
+
+ replication_recursive_lock();
+
+ rep.added++;
+
+ Pvoid_t *PValue;
+
+ PValue = JudyLGet(rep.JudyL_array, (Word_t) r->after, PJE0);
+ if(!PValue)
+ PValue = JudyLIns(&rep.JudyL_array, (Word_t) r->after, PJE0);
+
+ t->next = *PValue;
+ *PValue = t;
+
+ if(!rep.first_time_t || r->after < rep.first_time_t)
+ rep.first_time_t = r->after;
+
+ replication_recursive_unlock();
+
+ return t;
+}
+
+static void replication_sort_entry_del(struct sender_state *sender, STRING *chart_id, time_t after, const DICTIONARY_ITEM *item) {
+ Pvoid_t *PValue;
+ struct replication_sort_entry *to_delete = NULL;
+
+ replication_recursive_lock();
+
+ rep.removed++;
+
+ PValue = JudyLGet(rep.JudyL_array, after, PJE0);
+ if(PValue) {
+ struct replication_sort_entry *t = *PValue;
+ t->executed = true; // make sure we don't get it again
+
+ if(!t->next) {
+ // we are alone here, delete the judy entry
+
+ if(t->unique_id != item)
+ fatal("Item to delete is not matching host '%s', chart '%s', time %ld.",
+ rrdhost_hostname(sender->host), string2str(chart_id), after);
+
+ to_delete = t;
+ JudyLDel(&rep.JudyL_array, after, PJE0);
+ }
+ else {
+ // find our entry in the linked list
+
+ struct replication_sort_entry *t_old = NULL;
+ do {
+ if(t->unique_id == item) {
+ to_delete = t;
+
+ if(t_old)
+ t_old->next = t->next;
+ else
+ *PValue = t->next;
+
+ break;
+ }
+
+ t_old = t;
+ t = t->next;
+
+ } while(t);
+ }
+ }
+
+ if(!to_delete)
+ fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.",
+ rrdhost_hostname(sender->host), string2str(chart_id), after);
+
+ replication_recursive_unlock();
+
+ replication_sort_entry_destroy(to_delete);
+}
+
+static struct replication_request replication_request_get_first_available() {
+ struct replication_sort_entry *found = NULL;
+ Pvoid_t *PValue;
+ Word_t Index;
+
+ replication_recursive_lock();
+
+ rep.requests_count = JudyLCount(rep.JudyL_array, 0, 0xFFFFFFFF, PJE0);
+ if(!rep.requests_count) {
+ replication_recursive_unlock();
+ return (struct replication_request){ .found = false };
+ }
+
+ Index = 0;
+ PValue = JudyLFirst(rep.JudyL_array, &Index, PJE0);
+ while(!found && PValue) {
+ struct replication_sort_entry *t;
+
+ for(t = *PValue; t ;t = t->next) {
+ if(!t->executed
+ && sender_buffer_used_percent(t->req.sender) <= 10
+ && t->req.sender_last_flush_ut == __atomic_load_n(&t->req.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)
+ ) {
+ found = t;
+ found->executed = true;
+ break;
+ }
+ }
+
+ if(!found)
+ PValue = JudyLNext(rep.JudyL_array, &Index, PJE0);
+ }
+
+ // copy the values we need, while we have the lock
+ struct replication_request ret;
+
+ if(found) {
+ ret = found->req;
+ ret.chart_id = string_dup(ret.chart_id);
+ ret.found = true;
+ }
+ else
+ ret.found = false;
+
+ replication_recursive_unlock();
+
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
+// replication request management
+
+static void replication_request_react_callback(const DICTIONARY_ITEM *item, void *value __maybe_unused, void *sender_state __maybe_unused) {
+ struct sender_state *s = sender_state; (void)s;
+ struct replication_request *r = value;
+
+ // IMPORTANT:
+ // We use the react instead of the insert callback
+ // because we want the item to be atomically visible
+ // to our replication thread, immediately after.
+
+ // If we put this at the insert callback, the item is not guaranteed
+ // to be atomically visible to others, so the replication thread
+ // may see the replication sort entry, but fail to find the dictionary item
+ // related to it.
+
+ replication_sort_entry_add(r, item);
+ __atomic_fetch_add(&r->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
+}
+
+static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
+ struct sender_state *s = sender_state; (void)s;
+ struct replication_request *r = old_value; (void)r;
+ struct replication_request *r_new = new_value;
+
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
+ (unsigned long long)r->after, (unsigned long long)r->before, r->start_streaming ? "true" : "false",
+ (unsigned long long)r_new->after, (unsigned long long)r_new->before, r_new->start_streaming ? "true" : "false");
+
+ string_freez(r_new->chart_id);
+
+ return false;
+}
+
+static void replication_request_delete_callback(const DICTIONARY_ITEM *item, void *value, void *sender_state __maybe_unused) {
+ struct replication_request *r = value;
+
+ replication_sort_entry_del(r->sender, r->chart_id, r->after, item);
+
+ string_freez(r->chart_id);
+ __atomic_fetch_sub(&r->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
+}
+
+
+// ----------------------------------------------------------------------------
+// public API
+
+void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
+ struct replication_request tmp = {
+ .sender = sender,
+ .chart_id = string_strdupz(chart_id),
+ .after = after,
+ .before = before,
+ .start_streaming = start_streaming,
+ .sender_last_flush_ut = __atomic_load_n(&sender->last_flush_time_ut, __ATOMIC_SEQ_CST),
+ };
+
+ dictionary_set(sender->replication_requests, chart_id, &tmp, sizeof(struct replication_request));
+}
+
+void replication_flush_sender(struct sender_state *sender) {
+ // allow the dictionary destructor to go faster on locks
+ replication_recursive_lock();
+ dictionary_flush(sender->replication_requests);
+ replication_recursive_unlock();
+}
+
+void replication_init_sender(struct sender_state *sender) {
+ sender->replication_requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_react_callback(sender->replication_requests, replication_request_react_callback, sender);
+ dictionary_register_conflict_callback(sender->replication_requests, replication_request_conflict_callback, sender);
+ dictionary_register_delete_callback(sender->replication_requests, replication_request_delete_callback, sender);
+}
+
+void replication_cleanup_sender(struct sender_state *sender) {
+ // allow the dictionary destructor to go faster on locks
+ replication_recursive_lock();
+ dictionary_destroy(sender->replication_requests);
+ replication_recursive_unlock();
+}
+
+// ----------------------------------------------------------------------------
+// replication thread
+
+static void replication_main_cleanup(void *ptr) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ // custom code
+ worker_unregister();
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+#define WORKER_JOB_ITERATION 1
+#define WORKER_JOB_REPLAYING 2
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 3
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 4
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 5
+#define WORKER_JOB_CUSTOM_METRIC_DONE 6
+
+void *replication_thread_main(void *ptr __maybe_unused) {
+ netdata_thread_cleanup_push(replication_main_cleanup, ptr);
+
+ worker_register("REPLICATION");
+
+ worker_register_job_name(WORKER_JOB_ITERATION, "iteration");
+ worker_register_job_name(WORKER_JOB_REPLAYING, "replaying");
+
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+
+ while(!netdata_exit) {
+ worker_is_busy(WORKER_JOB_ITERATION);
+
+ // this call also updates our statistics
+ struct replication_request r = replication_request_get_first_available();
+
+ if(r.found) {
+ // delete the request from the dictionary
+ dictionary_del(r.sender->replication_requests, string2str(r.chart_id));
+ }
+
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)rep.requests_count);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)rep.added);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.removed);
+
+ if(!r.found && !rep.requests_count) {
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+ worker_is_idle();
+ sleep_usec(1000 * USEC_PER_MS);
+ continue;
+ }
+
+ if(!r.found) {
+ worker_is_idle();
+ sleep_usec(1 * USEC_PER_MS);
+ continue;
+ }
+
+ RRDSET *st = rrdset_find(r.sender->host, string2str(r.chart_id));
+ if(!st) {
+ internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
+ string2str(r.chart_id), rrdhost_hostname(r.sender->host));
+
+ continue;
+ }
+
+ worker_is_busy(WORKER_JOB_REPLAYING);
+
+ time_t latest_first_time_t = r.after;
+
+ if(r.after < r.sender->replication_first_time || !r.sender->replication_first_time)
+ r.sender->replication_first_time = r.after;
+
+ if(r.before < r.sender->replication_min_time || !r.sender->replication_min_time)
+ r.sender->replication_min_time = r.before;
+
+ netdata_thread_disable_cancelability();
+
+ // send the replication data
+ bool start_streaming = replicate_chart_response(st->rrdhost, st,
+ r.start_streaming, r.after, r.before);
+
+ netdata_thread_enable_cancelability();
+
+ if(start_streaming && r.sender_last_flush_ut == __atomic_load_n(&r.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) {
+ __atomic_fetch_add(&r.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST);
+
+ // enable normal streaming if we have to
+ // but only if the sender buffer has not been flushed since we started
+
+ debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
+ rrdhost_hostname(r.sender->host), rrdset_id(st));
+
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ }
+
+ // statistics
+ {
+ time_t now = now_realtime_sec();
+ time_t total = now - rep.first_time_t;
+ time_t done = latest_first_time_t - rep.first_time_t;
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total);
+ }
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}