summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-22 02:08:51 +0200
committerGitHub <noreply@github.com>2022-11-22 02:08:51 +0200
commit4a7048fc1ffc5d5d9a1160debbfb191a4aee99b2 (patch)
treefb37d14154b71a09fce2b371b199caa6389bf48e /streaming
parentb8d50ecc953f7962587aaae1a793827faf4ce992 (diff)
use 2 levels of judy arrays to speed up replication on very busy parents (#14031)
* use 2 levels of judy arrays to speed up replication on very busy parents * delete requests from judy when executed * do not process requests when the sender is not connected; not all requests removed are executed, so count the executed accurately * flush replication data on sender disconnect * cache used buffer ratio in sender structure * cleanup replication requests when they are not valid any more * properly update inner and outer judy arrays on deletions * detailed replication stats * fix bug in dictionary where deletion and flushes lead to crashes * replication should only report retention of tier 0 * replication now has 2 buffer limits 10 -> 30 % * detailed statistics about resets and skipped requests * register worker metrics * added counter for waits * make new requests discoverable * make it continue on iterations properly
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c415
-rw-r--r--streaming/replication.h1
-rw-r--r--streaming/rrdpush.c2
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c6
5 files changed, 251 insertions, 175 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 851b5b4c7c..3cb8424113 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -3,6 +3,9 @@
#include "replication.h"
#include "Judy.h"
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 30
+#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
+
static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
size_t dimensions = rrdset_number_of_dimensions(st);
@@ -395,17 +398,6 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
}
// ----------------------------------------------------------------------------
-
-static size_t sender_buffer_used_percent(struct sender_state *s) {
- netdata_mutex_lock(&s->mutex);
- size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
- netdata_mutex_unlock(&s->mutex);
-
- return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size;
-}
-
-
-// ----------------------------------------------------------------------------
// replication thread
// replication request in sender DICTIONARY
@@ -416,18 +408,18 @@ struct replication_request {
STRING *chart_id;
time_t after; // key for sorting (JudyL)
time_t before;
+ Word_t unique_id;
bool start_streaming;
bool found;
+ bool index_in_judy;
};
// replication sort entry in JudyL array
// used for sorting all requests, across all nodes
struct replication_sort_entry {
- struct replication_request req;
+ struct replication_request *rq;
- const void *unique_id; // used as a key to identify the sort entry - we never access its contents
- bool executed;
- struct replication_sort_entry *next;
+ size_t unique_id; // used as a key to identify the sort entry - we never access its contents
};
// the global variables for the replication thread
@@ -435,18 +427,33 @@ static struct replication_thread {
netdata_mutex_t mutex;
size_t added;
+ size_t executed;
size_t removed;
time_t first_time_t;
size_t requests_count;
+ Word_t next_unique_id;
struct replication_request *requests;
+ Word_t last_after;
+ Word_t last_unique_id;
+
+ size_t skipped_not_connected;
+ size_t skipped_no_room;
+ size_t sender_resets;
+ size_t waits;
+
Pvoid_t JudyL_array;
} rep = {
.mutex = NETDATA_MUTEX_INITIALIZER,
.added = 0,
- .removed = 0,
+ .executed = 0,
.first_time_t = 0,
.requests_count = 0,
+ .next_unique_id = 1,
+ .skipped_no_room = 0,
+ .skipped_not_connected = 0,
+ .sender_resets = 0,
+ .waits = 0,
.requests = NULL,
.JudyL_array = NULL,
};
@@ -476,159 +483,172 @@ static void replication_recursive_unlock() {
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *r, const void *unique_id) {
- struct replication_sort_entry *t = mallocz(sizeof(struct replication_sort_entry));
+static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+ struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
// copy the request
- t->req = *r;
- t->req.chart_id = string_dup(r->chart_id);
-
+ rse->rq = rq;
+ rse->unique_id = rep.next_unique_id++;
- t->unique_id = unique_id;
- t->executed = false;
- t->next = NULL;
- return t;
+ // save the unique id into the request, to be able to delete it later
+ rq->unique_id = rse->unique_id;
+ rq->index_in_judy = false;
+ return rse;
}
-static void replication_sort_entry_destroy(struct replication_sort_entry *t) {
- string_freez(t->req.chart_id);
- freez(t);
+static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
+ freez(rse);
}
-static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *r, const void *unique_id) {
- struct replication_sort_entry *t = replication_sort_entry_create(r, unique_id);
-
+static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) {
replication_recursive_lock();
+ struct replication_sort_entry *rse = replication_sort_entry_create(rq);
+
+ if(rq->after < (time_t)rep.last_after) {
+ // make it find this request first
+ rep.last_after = rq->after;
+ rep.last_unique_id = rq->unique_id;
+ }
+
rep.added++;
+ rep.requests_count++;
- Pvoid_t *PValue;
+ Pvoid_t *inner_judy_ptr;
- PValue = JudyLGet(rep.JudyL_array, (Word_t) r->after, PJE0);
- if(!PValue)
- PValue = JudyLIns(&rep.JudyL_array, (Word_t) r->after, PJE0);
+ // find the outer judy entry, using after as key
+ inner_judy_ptr = JudyLGet(rep.JudyL_array, (Word_t) rq->after, PJE0);
+ if(!inner_judy_ptr)
+ inner_judy_ptr = JudyLIns(&rep.JudyL_array, (Word_t) rq->after, PJE0);
- t->next = *PValue;
- *PValue = t;
+ // add it to the inner judy, using unique_id as key
+ Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
+ *item = rse;
+ rq->index_in_judy = true;
- if(!rep.first_time_t || r->after < rep.first_time_t)
- rep.first_time_t = r->after;
+ if(!rep.first_time_t || rq->after < rep.first_time_t)
+ rep.first_time_t = rq->after;
replication_recursive_unlock();
- return t;
+ return rse;
}
-static void replication_sort_entry_del(struct sender_state *sender, STRING *chart_id, time_t after, const DICTIONARY_ITEM *item) {
- Pvoid_t *PValue;
- struct replication_sort_entry *to_delete = NULL;
-
- replication_recursive_lock();
+static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
+ bool inner_judy_deleted = false;
rep.removed++;
+ rep.requests_count--;
- PValue = JudyLGet(rep.JudyL_array, after, PJE0);
- if(PValue) {
- struct replication_sort_entry *t = *PValue;
- t->executed = true; // make sure we don't get it again
+ rse->rq->index_in_judy = false;
- if(!t->next) {
- // we are alone here, delete the judy entry
+ // delete it from the inner judy
+ JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
- if(t->unique_id != item)
- fatal("Item to delete is not matching host '%s', chart '%s', time %ld.",
- rrdhost_hostname(sender->host), string2str(chart_id), after);
+ // if no items left, delete it from the outer judy
+ if(**inner_judy_ppptr == NULL) {
+ JudyLDel(&rep.JudyL_array, rse->rq->after, PJE0);
+ inner_judy_deleted = true;
+ }
- to_delete = t;
- JudyLDel(&rep.JudyL_array, after, PJE0);
- }
- else {
- // find our entry in the linked list
+ // free memory
+ replication_sort_entry_destroy(rse);
- struct replication_sort_entry *t_old = NULL;
- do {
- if(t->unique_id == item) {
- to_delete = t;
+ return inner_judy_deleted;
+}
- if(t_old)
- t_old->next = t->next;
- else
- *PValue = t->next;
+static void replication_sort_entry_del(struct replication_request *rq) {
+ Pvoid_t *inner_judy_pptr;
+ struct replication_sort_entry *rse_to_delete = NULL;
- break;
- }
+ replication_recursive_lock();
+ if(rq->index_in_judy) {
+
+ inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0);
+ if (inner_judy_pptr) {
+ Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
+ if (our_item_pptr) {
+ rse_to_delete = *our_item_pptr;
+ replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
+ }
+ }
- t_old = t;
- t = t->next;
+ if (!rse_to_delete)
+ fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.",
+ rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
- } while(t);
- }
}
- if(!to_delete)
- fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.",
- rrdhost_hostname(sender->host), string2str(chart_id), after);
-
replication_recursive_unlock();
-
- replication_sort_entry_destroy(to_delete);
}
static struct replication_request replication_request_get_first_available() {
- struct replication_sort_entry *found = NULL;
- Pvoid_t *PValue;
- Word_t Index;
+ Pvoid_t *inner_judy_pptr;
replication_recursive_lock();
- rep.requests_count = JudyLCount(rep.JudyL_array, 0, 0xFFFFFFFF, PJE0);
- if(!rep.requests_count) {
- replication_recursive_unlock();
- return (struct replication_request){ .found = false };
+ struct replication_request rq = (struct replication_request){ .found = false };
+
+
+ if(rep.last_after && rep.last_unique_id) {
+ rep.last_after--;
+ rep.last_unique_id--;
+ }
+ else {
+ rep.last_after = 0;
+ rep.last_unique_id = 0;
}
- Index = 0;
- PValue = JudyLFirst(rep.JudyL_array, &Index, PJE0);
- while(!found && PValue) {
- struct replication_sort_entry *t;
-
- for(t = *PValue; t ;t = t->next) {
- if(!t->executed
- && sender_buffer_used_percent(t->req.sender) <= 10
- && t->req.sender_last_flush_ut == __atomic_load_n(&t->req.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)
- ) {
- found = t;
- found->executed = true;
- break;
+ while(!rq.found && (inner_judy_pptr = JudyLNext(rep.JudyL_array, &rep.last_after, PJE0))) {
+ Pvoid_t *our_item_pptr;
+
+ while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) {
+ struct replication_sort_entry *rse = *our_item_pptr;
+ struct sender_state *s = rse->rq->sender;
+
+ bool sender_is_connected =
+ rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ bool sender_has_been_flushed_since_this_request =
+ rse->rq->sender_last_flush_ut != __atomic_load_n(&s->last_flush_time_ut, __ATOMIC_SEQ_CST);
+
+ bool sender_has_room_to_spare =
+ s->replication_sender_buffer_percent_used <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
+
+ if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
+ rep.skipped_not_connected++;
+ if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ break;
}
- }
- if(!found)
- PValue = JudyLNext(rep.JudyL_array, &Index, PJE0);
- }
+ else if(sender_has_room_to_spare) {
+ // copy the request to return it
+ rq = *rse->rq;
+
+ // set the return result to found
+ rq.found = true;
- // copy the values we need, while we have the lock
- struct replication_request ret;
+ if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ break;
+ }
+ else
+ rep.skipped_no_room++;
+ }
- if(found) {
- ret = found->req;
- ret.chart_id = string_dup(ret.chart_id);
- ret.found = true;
+ // prepare for the next iteration on the outer loop
+ rep.last_unique_id = 0;
}
- else
- ret.found = false;
replication_recursive_unlock();
-
- return ret;
+ return rq;
}
// ----------------------------------------------------------------------------
// replication request management
-static void replication_request_react_callback(const DICTIONARY_ITEM *item, void *value __maybe_unused, void *sender_state __maybe_unused) {
+static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
struct sender_state *s = sender_state; (void)s;
- struct replication_request *r = value;
+ struct replication_request *rq = value;
// IMPORTANT:
// We use the react instead of the insert callback
@@ -640,34 +660,35 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item, void
// may see the replication sort entry, but fail to find the dictionary item
// related to it.
- replication_sort_entry_add(r, item);
- __atomic_fetch_add(&r->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
+ replication_sort_entry_add(rq);
+ __atomic_fetch_add(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
}
static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
struct sender_state *s = sender_state; (void)s;
- struct replication_request *r = old_value; (void)r;
- struct replication_request *r_new = new_value;
+ struct replication_request *rq = old_value; (void)rq;
+ struct replication_request *rq_new = new_value;
internal_error(
true,
"STREAM %s [send to %s]: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
- (unsigned long long)r->after, (unsigned long long)r->before, r->start_streaming ? "true" : "false",
- (unsigned long long)r_new->after, (unsigned long long)r_new->before, r_new->start_streaming ? "true" : "false");
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
- string_freez(r_new->chart_id);
+ string_freez(rq_new->chart_id);
return false;
}
-static void replication_request_delete_callback(const DICTIONARY_ITEM *item, void *value, void *sender_state __maybe_unused) {
- struct replication_request *r = value;
+static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
+ struct replication_request *rq = value;
- replication_sort_entry_del(r->sender, r->chart_id, r->after, item);
+ if(rq->index_in_judy)
+ replication_sort_entry_del(rq);
- string_freez(r->chart_id);
- __atomic_fetch_sub(&r->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
+ string_freez(rq->chart_id);
+ __atomic_fetch_sub(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
}
@@ -675,7 +696,7 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item, voi
// public API
void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
- struct replication_request tmp = {
+ struct replication_request rq = {
.sender = sender,
.chart_id = string_strdupz(chart_id),
.after = after,
@@ -684,7 +705,7 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.sender_last_flush_ut = __atomic_load_n(&sender->last_flush_time_ut, __ATOMIC_SEQ_CST),
};
- dictionary_set(sender->replication_requests, chart_id, &tmp, sizeof(struct replication_request));
+ dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request));
}
void replication_flush_sender(struct sender_state *sender) {
@@ -708,6 +729,26 @@ void replication_cleanup_sender(struct sender_state *sender) {
replication_recursive_unlock();
}
+void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
+ size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
+
+ if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
+ s->replication_reached_max = true;
+
+ if(s->replication_reached_max &&
+ percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
+ s->replication_reached_max = false;
+ replication_recursive_lock();
+ rep.last_after = 0;
+ rep.last_unique_id = 0;
+ rep.sender_resets++;
+ replication_recursive_unlock();
+ }
+
+ s->replication_sender_buffer_percent_used = percentage;
+}
+
// ----------------------------------------------------------------------------
// replication thread
@@ -721,99 +762,125 @@ static void replication_main_cleanup(void *ptr) {
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
-#define WORKER_JOB_ITERATION 1
-#define WORKER_JOB_REPLAYING 2
-#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 3
-#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 4
-#define WORKER_JOB_CUSTOM_METRIC_ADDED 5
-#define WORKER_JOB_CUSTOM_METRIC_DONE 6
+#define WORKER_JOB_FIND_NEXT 1
+#define WORKER_JOB_QUERYING 2
+#define WORKER_JOB_DELETE_ENTRY 3
+#define WORKER_JOB_FIND_CHART 4
+#define WORKER_JOB_STATISTICS 5
+#define WORKER_JOB_ACTIVATE_ENABLE_STREAMING 6
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 7
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 8
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 9
+#define WORKER_JOB_CUSTOM_METRIC_DONE 10
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 11
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 13
+#define WORKER_JOB_CUSTOM_METRIC_WAITS 14
void *replication_thread_main(void *ptr __maybe_unused) {
netdata_thread_cleanup_push(replication_main_cleanup, ptr);
worker_register("REPLICATION");
- worker_register_job_name(WORKER_JOB_ITERATION, "iteration");
- worker_register_job_name(WORKER_JOB_REPLAYING, "replaying");
+ worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
+ worker_register_job_name(WORKER_JOB_QUERYING, "querying");
+ worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
+ worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
+ worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming");
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- while(!netdata_exit) {
- worker_is_busy(WORKER_JOB_ITERATION);
-
- // this call also updates our statistics
- struct replication_request r = replication_request_get_first_available();
+ time_t latest_first_time_t = 0;
- if(r.found) {
- // delete the request from the dictionary
- dictionary_del(r.sender->replication_requests, string2str(r.chart_id));
- }
+ while(!netdata_exit) {
+ worker_is_busy(WORKER_JOB_FIND_NEXT);
+ struct replication_request rq = replication_request_get_first_available();
+ worker_is_busy(WORKER_JOB_STATISTICS);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)rep.requests_count);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)rep.added);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.removed);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.executed);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)rep.skipped_not_connected);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)rep.skipped_no_room);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)rep.sender_resets);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)rep.waits);
- if(!r.found && !rep.requests_count) {
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
- worker_is_idle();
- sleep_usec(1000 * USEC_PER_MS);
- continue;
+ if(latest_first_time_t) {
+ time_t now = now_realtime_sec();
+ time_t total = now - rep.first_time_t;
+ time_t done = latest_first_time_t - rep.first_time_t;
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total);
}
- if(!r.found) {
+ if(!rq.found) {
worker_is_idle();
- sleep_usec(1 * USEC_PER_MS);
+
+ if(!rep.requests_count)
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+
+ // make it start from the beginning
+ rep.last_after = 0;
+ rep.last_unique_id = 0;
+
+ rep.waits++;
+
+ sleep_usec(1000 * USEC_PER_MS);
continue;
}
+ else {
+ // delete the request from the dictionary
+ worker_is_busy(WORKER_JOB_DELETE_ENTRY);
+ dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id));
+ }
- RRDSET *st = rrdset_find(r.sender->host, string2str(r.chart_id));
+ worker_is_busy(WORKER_JOB_FIND_CHART);
+ RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id));
if(!st) {
internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
- string2str(r.chart_id), rrdhost_hostname(r.sender->host));
+ string2str(rq.chart_id), rrdhost_hostname(rq.sender->host));
continue;
}
- worker_is_busy(WORKER_JOB_REPLAYING);
+ worker_is_busy(WORKER_JOB_QUERYING);
- time_t latest_first_time_t = r.after;
+ latest_first_time_t = rq.after;
- if(r.after < r.sender->replication_first_time || !r.sender->replication_first_time)
- r.sender->replication_first_time = r.after;
+ if(rq.after < rq.sender->replication_first_time || !rq.sender->replication_first_time)
+ rq.sender->replication_first_time = rq.after;
- if(r.before < r.sender->replication_min_time || !r.sender->replication_min_time)
- r.sender->replication_min_time = r.before;
+ if(rq.before < rq.sender->replication_min_time || !rq.sender->replication_min_time)
+ rq.sender->replication_min_time = rq.before;
netdata_thread_disable_cancelability();
// send the replication data
bool start_streaming = replicate_chart_response(st->rrdhost, st,
- r.start_streaming, r.after, r.before);
+ rq.start_streaming, rq.after, rq.before);
netdata_thread_enable_cancelability();
- if(start_streaming && r.sender_last_flush_ut == __atomic_load_n(&r.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) {
- __atomic_fetch_add(&r.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST);
+ rep.executed++;
+
+ if(start_streaming && rq.sender_last_flush_ut == __atomic_load_n(&rq.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) {
+ worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING);
+ __atomic_fetch_add(&rq.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST);
// enable normal streaming if we have to
// but only if the sender buffer has not been flushed since we started
debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
- rrdhost_hostname(r.sender->host), rrdset_id(st));
+ rrdhost_hostname(rq.sender->host), rrdset_id(st));
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
}
-
- // statistics
- {
- time_t now = now_realtime_sec();
- time_t total = now - rep.first_time_t;
- time_t done = latest_first_time_t - rep.first_time_t;
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total);
- }
}
netdata_thread_cleanup_pop(1);
diff --git a/streaming/replication.h b/streaming/replication.h
index 684efdc120..7542e43d08 100644
--- a/streaming/replication.h
+++ b/streaming/replication.h
@@ -18,5 +18,6 @@ void replication_init_sender(struct sender_state *sender);
void replication_cleanup_sender(struct sender_state *sender);
void replication_flush_sender(struct sender_state *sender);
void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming);
+void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s);
#endif /* REPLICATION_H */
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 2369065783..d19335256e 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -293,7 +293,7 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
- time_t first_entry_local = rrdset_first_entry_t(st);
+ time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0);
time_t last_entry_local = st->last_updated.tv_sec;
if(!last_entry_local) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index e8c792566d..ed3b30bc5c 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -166,6 +166,8 @@ struct sender_state {
size_t replication_pending_requests;
time_t replication_first_time;
time_t replication_min_time;
+ size_t replication_sender_buffer_percent_used;
+ bool replication_reached_max;
usec_t last_flush_time_ut;
size_t receiving_metrics;
diff --git a/streaming/sender.c b/streaming/sender.c
index a5219b14d7..72affc2907 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -163,6 +163,8 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
s->flags |= SENDER_FLAG_OVERFLOW;
#endif
+ replication_recalculate_buffer_used_ratio_unsafe(s);
+
netdata_mutex_unlock(&s->mutex);
rrdpush_signal_sender_to_wake_up(s);
}
@@ -176,6 +178,8 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ replication_flush_sender(host->sender);
}
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
@@ -263,6 +267,7 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
netdata_mutex_lock(&host->sender->mutex);
cbuffer_flush(host->sender->buffer);
+ replication_recalculate_buffer_used_ratio_unsafe(host->sender);
netdata_mutex_unlock(&host->sender->mutex);
rrdpush_sender_thread_reset_all_charts(host);
@@ -770,6 +775,7 @@ static ssize_t attempt_to_send(struct sender_state *s) {
else
debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
+ replication_recalculate_buffer_used_ratio_unsafe(s);
netdata_mutex_unlock(&s->mutex);
return ret;