diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-22 02:08:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-22 02:08:51 +0200 |
commit | 4a7048fc1ffc5d5d9a1160debbfb191a4aee99b2 (patch) | |
tree | fb37d14154b71a09fce2b371b199caa6389bf48e /streaming | |
parent | b8d50ecc953f7962587aaae1a793827faf4ce992 (diff) |
use 2 levels of judy arrays to speed up replication on very busy parents (#14031)
* use 2 levels of judy arrays to speed up replication on very busy parents
* delete requests from judy when executed
* do not process requests when the sender is not connected; not all requests removed are executed, so count the executed accurately
* flush replication data on sender disconnect
* cache used buffer ratio in sender structure
* cleanup replication requests when they are not valid any more
* properly update inner and outer judy arrays on deletions
* detailed replication stats
* fix bug in dictionary where deletion and flushes lead to crashes
* replication should only report retention of tier 0
* replication now has 2 buffer limits 10 -> 30 %
* detailed statistics about resets and skipped requests
* register worker metrics
* added counter for waits
* make new requests discoverable
* make it continue on iterations properly
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/replication.c | 415 | ||||
-rw-r--r-- | streaming/replication.h | 1 | ||||
-rw-r--r-- | streaming/rrdpush.c | 2 | ||||
-rw-r--r-- | streaming/rrdpush.h | 2 | ||||
-rw-r--r-- | streaming/sender.c | 6 |
5 files changed, 251 insertions, 175 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 851b5b4c7c..3cb8424113 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -3,6 +3,9 @@ #include "replication.h" #include "Judy.h" +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 30 +#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 + static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) { size_t dimensions = rrdset_number_of_dimensions(st); @@ -395,17 +398,6 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST } // ---------------------------------------------------------------------------- - -static size_t sender_buffer_used_percent(struct sender_state *s) { - netdata_mutex_lock(&s->mutex); - size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); - netdata_mutex_unlock(&s->mutex); - - return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size; -} - - -// ---------------------------------------------------------------------------- // replication thread // replication request in sender DICTIONARY @@ -416,18 +408,18 @@ struct replication_request { STRING *chart_id; time_t after; // key for sorting (JudyL) time_t before; + Word_t unique_id; bool start_streaming; bool found; + bool index_in_judy; }; // replication sort entry in JudyL array // used for sorting all requests, across all nodes struct replication_sort_entry { - struct replication_request req; + struct replication_request *rq; - const void *unique_id; // used as a key to identify the sort entry - we never access its contents - bool executed; - struct replication_sort_entry *next; + size_t unique_id; // used as a key to identify the sort entry - we never access its contents }; // the global variables for the replication thread @@ -435,18 +427,33 @@ static struct replication_thread { netdata_mutex_t mutex; size_t added; + size_t executed; size_t removed; time_t first_time_t; size_t requests_count; + Word_t next_unique_id; struct replication_request *requests; + Word_t last_after; + Word_t last_unique_id; + + size_t skipped_not_connected; + size_t skipped_no_room; + size_t sender_resets; + size_t waits; + Pvoid_t JudyL_array; } rep = { .mutex = NETDATA_MUTEX_INITIALIZER, .added = 0, - .removed = 0, + .executed = 0, .first_time_t = 0, .requests_count = 0, + .next_unique_id = 1, + .skipped_no_room = 0, + .skipped_not_connected = 0, + .sender_resets = 0, + .waits = 0, .requests = NULL, .JudyL_array = NULL, }; @@ -476,159 +483,172 @@ static void replication_recursive_unlock() { // ---------------------------------------------------------------------------- // replication sort entry management -static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *r, const void *unique_id) { - struct replication_sort_entry *t = mallocz(sizeof(struct replication_sort_entry)); +static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { + struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); // copy the request - t->req = *r; - t->req.chart_id = string_dup(r->chart_id); - + rse->rq = rq; + rse->unique_id = rep.next_unique_id++; - t->unique_id = unique_id; - t->executed = false; - t->next = NULL; - return t; + // save the unique id into the request, to be able to delete it later + rq->unique_id = rse->unique_id; + rq->index_in_judy = false; + return rse; } -static void replication_sort_entry_destroy(struct replication_sort_entry *t) { - string_freez(t->req.chart_id); - freez(t); +static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { + freez(rse); } -static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *r, const void *unique_id) { - struct replication_sort_entry *t = replication_sort_entry_create(r, unique_id); - +static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) { replication_recursive_lock(); + struct replication_sort_entry *rse = replication_sort_entry_create(rq); + + if(rq->after < (time_t)rep.last_after) { + // make it find this request first + rep.last_after = rq->after; + rep.last_unique_id = rq->unique_id; + } + rep.added++; + rep.requests_count++; - Pvoid_t *PValue; + Pvoid_t *inner_judy_ptr; - PValue = JudyLGet(rep.JudyL_array, (Word_t) r->after, PJE0); - if(!PValue) - PValue = JudyLIns(&rep.JudyL_array, (Word_t) r->after, PJE0); + // find the outer judy entry, using after as key + inner_judy_ptr = JudyLGet(rep.JudyL_array, (Word_t) rq->after, PJE0); + if(!inner_judy_ptr) + inner_judy_ptr = JudyLIns(&rep.JudyL_array, (Word_t) rq->after, PJE0); - t->next = *PValue; - *PValue = t; + // add it to the inner judy, using unique_id as key + Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); + *item = rse; + rq->index_in_judy = true; - if(!rep.first_time_t || r->after < rep.first_time_t) - rep.first_time_t = r->after; + if(!rep.first_time_t || rq->after < rep.first_time_t) + rep.first_time_t = rq->after; replication_recursive_unlock(); - return t; + return rse; } -static void replication_sort_entry_del(struct sender_state *sender, STRING *chart_id, time_t after, const DICTIONARY_ITEM *item) { - Pvoid_t *PValue; - struct replication_sort_entry *to_delete = NULL; - - replication_recursive_lock(); +static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { + bool inner_judy_deleted = false; rep.removed++; + rep.requests_count--; - PValue = JudyLGet(rep.JudyL_array, after, PJE0); - if(PValue) { - struct replication_sort_entry *t = *PValue; - t->executed = true; // make sure we don't get it again + rse->rq->index_in_judy = false; - if(!t->next) { - // we are alone here, delete the judy entry + // delete it from the inner judy + JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); - if(t->unique_id != item) - fatal("Item to delete is not matching host '%s', chart '%s', time %ld.", - rrdhost_hostname(sender->host), string2str(chart_id), after); + // if no items left, delete it from the outer judy + if(**inner_judy_ppptr == NULL) { + JudyLDel(&rep.JudyL_array, rse->rq->after, PJE0); + inner_judy_deleted = true; + } - to_delete = t; - JudyLDel(&rep.JudyL_array, after, PJE0); - } - else { - // find our entry in the linked list + // free memory + replication_sort_entry_destroy(rse); - struct replication_sort_entry *t_old = NULL; - do { - if(t->unique_id == item) { - to_delete = t; + return inner_judy_deleted; +} - if(t_old) - t_old->next = t->next; - else - *PValue = t->next; +static void replication_sort_entry_del(struct replication_request *rq) { + Pvoid_t *inner_judy_pptr; + struct replication_sort_entry *rse_to_delete = NULL; - break; - } + replication_recursive_lock(); + if(rq->index_in_judy) { + + inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0); + if (inner_judy_pptr) { + Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); + if (our_item_pptr) { + rse_to_delete = *our_item_pptr; + replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr); + } + } - t_old = t; - t = t->next; + if (!rse_to_delete) + fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.", + rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after); - } while(t); - } } - if(!to_delete) - fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.", - rrdhost_hostname(sender->host), string2str(chart_id), after); - replication_recursive_unlock(); - - replication_sort_entry_destroy(to_delete); } static struct replication_request replication_request_get_first_available() { - struct replication_sort_entry *found = NULL; - Pvoid_t *PValue; - Word_t Index; + Pvoid_t *inner_judy_pptr; replication_recursive_lock(); - rep.requests_count = JudyLCount(rep.JudyL_array, 0, 0xFFFFFFFF, PJE0); - if(!rep.requests_count) { - replication_recursive_unlock(); - return (struct replication_request){ .found = false }; + struct replication_request rq = (struct replication_request){ .found = false }; + + + if(rep.last_after && rep.last_unique_id) { + rep.last_after--; + rep.last_unique_id--; + } + else { + rep.last_after = 0; + rep.last_unique_id = 0; } - Index = 0; - PValue = JudyLFirst(rep.JudyL_array, &Index, PJE0); - while(!found && PValue) { - struct replication_sort_entry *t; - - for(t = *PValue; t ;t = t->next) { - if(!t->executed - && sender_buffer_used_percent(t->req.sender) <= 10 - && t->req.sender_last_flush_ut == __atomic_load_n(&t->req.sender->last_flush_time_ut, __ATOMIC_SEQ_CST) - ) { - found = t; - found->executed = true; - break; + while(!rq.found && (inner_judy_pptr = JudyLNext(rep.JudyL_array, &rep.last_after, PJE0))) { + Pvoid_t *our_item_pptr; + + while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) { + struct replication_sort_entry *rse = *our_item_pptr; + struct sender_state *s = rse->rq->sender; + + bool sender_is_connected = + rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + + bool sender_has_been_flushed_since_this_request = + rse->rq->sender_last_flush_ut != __atomic_load_n(&s->last_flush_time_ut, __ATOMIC_SEQ_CST); + + bool sender_has_room_to_spare = + s->replication_sender_buffer_percent_used <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED; + + if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { + rep.skipped_not_connected++; + if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + break; } - } - if(!found) - PValue = JudyLNext(rep.JudyL_array, &Index, PJE0); - } + else if(sender_has_room_to_spare) { + // copy the request to return it + rq = *rse->rq; + + // set the return result to found + rq.found = true; - // copy the values we need, while we have the lock - struct replication_request ret; + if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + break; + } + else + rep.skipped_no_room++; + } - if(found) { - ret = found->req; - ret.chart_id = string_dup(ret.chart_id); - ret.found = true; + // prepare for the next iteration on the outer loop + rep.last_unique_id = 0; } - else - ret.found = false; replication_recursive_unlock(); - - return ret; + return rq; } // ---------------------------------------------------------------------------- // replication request management -static void replication_request_react_callback(const DICTIONARY_ITEM *item, void *value __maybe_unused, void *sender_state __maybe_unused) { +static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) { struct sender_state *s = sender_state; (void)s; - struct replication_request *r = value; + struct replication_request *rq = value; // IMPORTANT: // We use the react instead of the insert callback @@ -640,34 +660,35 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item, void // may see the replication sort entry, but fail to find the dictionary item // related to it. - replication_sort_entry_add(r, item); - __atomic_fetch_add(&r->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); + replication_sort_entry_add(rq); + __atomic_fetch_add(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); } static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) { struct sender_state *s = sender_state; (void)s; - struct replication_request *r = old_value; (void)r; - struct replication_request *r_new = new_value; + struct replication_request *rq = old_value; (void)rq; + struct replication_request *rq_new = new_value; internal_error( true, "STREAM %s [send to %s]: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), - (unsigned long long)r->after, (unsigned long long)r->before, r->start_streaming ? "true" : "false", - (unsigned long long)r_new->after, (unsigned long long)r_new->before, r_new->start_streaming ? "true" : "false"); + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); - string_freez(r_new->chart_id); + string_freez(rq_new->chart_id); return false; } -static void replication_request_delete_callback(const DICTIONARY_ITEM *item, void *value, void *sender_state __maybe_unused) { - struct replication_request *r = value; +static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) { + struct replication_request *rq = value; - replication_sort_entry_del(r->sender, r->chart_id, r->after, item); + if(rq->index_in_judy) + replication_sort_entry_del(rq); - string_freez(r->chart_id); - __atomic_fetch_sub(&r->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); + string_freez(rq->chart_id); + __atomic_fetch_sub(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); } @@ -675,7 +696,7 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item, voi // public API void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) { - struct replication_request tmp = { + struct replication_request rq = { .sender = sender, .chart_id = string_strdupz(chart_id), .after = after, @@ -684,7 +705,7 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .sender_last_flush_ut = __atomic_load_n(&sender->last_flush_time_ut, __ATOMIC_SEQ_CST), }; - dictionary_set(sender->replication_requests, chart_id, &tmp, sizeof(struct replication_request)); + dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request)); } void replication_flush_sender(struct sender_state *sender) { @@ -708,6 +729,26 @@ void replication_cleanup_sender(struct sender_state *sender) { replication_recursive_unlock(); } +void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { + size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); + size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size; + + if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) + s->replication_reached_max = true; + + if(s->replication_reached_max && + percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) { + s->replication_reached_max = false; + replication_recursive_lock(); + rep.last_after = 0; + rep.last_unique_id = 0; + rep.sender_resets++; + replication_recursive_unlock(); + } + + s->replication_sender_buffer_percent_used = percentage; +} + // ---------------------------------------------------------------------------- // replication thread @@ -721,99 +762,125 @@ static void replication_main_cleanup(void *ptr) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } -#define WORKER_JOB_ITERATION 1 -#define WORKER_JOB_REPLAYING 2 -#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 3 -#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 4 -#define WORKER_JOB_CUSTOM_METRIC_ADDED 5 -#define WORKER_JOB_CUSTOM_METRIC_DONE 6 +#define WORKER_JOB_FIND_NEXT 1 +#define WORKER_JOB_QUERYING 2 +#define WORKER_JOB_DELETE_ENTRY 3 +#define WORKER_JOB_FIND_CHART 4 +#define WORKER_JOB_STATISTICS 5 +#define WORKER_JOB_ACTIVATE_ENABLE_STREAMING 6 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 7 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 8 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 9 +#define WORKER_JOB_CUSTOM_METRIC_DONE 10 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 11 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 13 +#define WORKER_JOB_CUSTOM_METRIC_WAITS 14 void *replication_thread_main(void *ptr __maybe_unused) { netdata_thread_cleanup_push(replication_main_cleanup, ptr); worker_register("REPLICATION"); - worker_register_job_name(WORKER_JOB_ITERATION, "iteration"); - worker_register_job_name(WORKER_JOB_REPLAYING, "replaying"); + worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next"); + worker_register_job_name(WORKER_JOB_QUERYING, "querying"); + worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); + worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); + worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming"); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL); - while(!netdata_exit) { - worker_is_busy(WORKER_JOB_ITERATION); - - // this call also updates our statistics - struct replication_request r = replication_request_get_first_available(); + time_t latest_first_time_t = 0; - if(r.found) { - // delete the request from the dictionary - dictionary_del(r.sender->replication_requests, string2str(r.chart_id)); - } + while(!netdata_exit) { + worker_is_busy(WORKER_JOB_FIND_NEXT); + struct replication_request rq = replication_request_get_first_available(); + worker_is_busy(WORKER_JOB_STATISTICS); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)rep.requests_count); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)rep.added); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.removed); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.executed); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)rep.skipped_not_connected); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)rep.skipped_no_room); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)rep.sender_resets); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)rep.waits); - if(!r.found && !rep.requests_count) { - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); - worker_is_idle(); - sleep_usec(1000 * USEC_PER_MS); - continue; + if(latest_first_time_t) { + time_t now = now_realtime_sec(); + time_t total = now - rep.first_time_t; + time_t done = latest_first_time_t - rep.first_time_t; + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total); } - if(!r.found) { + if(!rq.found) { worker_is_idle(); - sleep_usec(1 * USEC_PER_MS); + + if(!rep.requests_count) + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); + + // make it start from the beginning + rep.last_after = 0; + rep.last_unique_id = 0; + + rep.waits++; + + sleep_usec(1000 * USEC_PER_MS); continue; } + else { + // delete the request from the dictionary + worker_is_busy(WORKER_JOB_DELETE_ENTRY); + dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id)); + } - RRDSET *st = rrdset_find(r.sender->host, string2str(r.chart_id)); + worker_is_busy(WORKER_JOB_FIND_CHART); + RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id)); if(!st) { internal_error(true, "REPLAY: chart '%s' not found on host '%s'", - string2str(r.chart_id), rrdhost_hostname(r.sender->host)); + string2str(rq.chart_id), rrdhost_hostname(rq.sender->host)); continue; } - worker_is_busy(WORKER_JOB_REPLAYING); + worker_is_busy(WORKER_JOB_QUERYING); - time_t latest_first_time_t = r.after; + latest_first_time_t = rq.after; - if(r.after < r.sender->replication_first_time || !r.sender->replication_first_time) - r.sender->replication_first_time = r.after; + if(rq.after < rq.sender->replication_first_time || !rq.sender->replication_first_time) + rq.sender->replication_first_time = rq.after; - if(r.before < r.sender->replication_min_time || !r.sender->replication_min_time) - r.sender->replication_min_time = r.before; + if(rq.before < rq.sender->replication_min_time || !rq.sender->replication_min_time) + rq.sender->replication_min_time = rq.before; netdata_thread_disable_cancelability(); // send the replication data bool start_streaming = replicate_chart_response(st->rrdhost, st, - r.start_streaming, r.after, r.before); + rq.start_streaming, rq.after, rq.before); netdata_thread_enable_cancelability(); - if(start_streaming && r.sender_last_flush_ut == __atomic_load_n(&r.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) { - __atomic_fetch_add(&r.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST); + rep.executed++; + + if(start_streaming && rq.sender_last_flush_ut == __atomic_load_n(&rq.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) { + worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING); + __atomic_fetch_add(&rq.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST); // enable normal streaming if we have to // but only if the sender buffer has not been flushed since we started debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s", - rrdhost_hostname(r.sender->host), rrdset_id(st)); + rrdhost_hostname(rq.sender->host), rrdset_id(st)); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); } - - // statistics - { - time_t now = now_realtime_sec(); - time_t total = now - rep.first_time_t; - time_t done = latest_first_time_t - rep.first_time_t; - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total); - } } netdata_thread_cleanup_pop(1); diff --git a/streaming/replication.h b/streaming/replication.h index 684efdc120..7542e43d08 100644 --- a/streaming/replication.h +++ b/streaming/replication.h @@ -18,5 +18,6 @@ void replication_init_sender(struct sender_state *sender); void replication_cleanup_sender(struct sender_state *sender); void replication_flush_sender(struct sender_state *sender); void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming); +void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s); #endif /* REPLICATION_H */ diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 2369065783..d19335256e 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -293,7 +293,7 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { - time_t first_entry_local = rrdset_first_entry_t(st); + time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0); time_t last_entry_local = st->last_updated.tv_sec; if(!last_entry_local) { diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index e8c792566d..ed3b30bc5c 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -166,6 +166,8 @@ struct sender_state { size_t replication_pending_requests; time_t replication_first_time; time_t replication_min_time; + size_t replication_sender_buffer_percent_used; + bool replication_reached_max; usec_t last_flush_time_ut; size_t receiving_metrics; diff --git a/streaming/sender.c b/streaming/sender.c index a5219b14d7..72affc2907 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -163,6 +163,8 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { s->flags |= SENDER_FLAG_OVERFLOW; #endif + replication_recalculate_buffer_used_ratio_unsafe(s); + netdata_mutex_unlock(&s->mutex); rrdpush_signal_sender_to_wake_up(s); } @@ -176,6 +178,8 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + + replication_flush_sender(host->sender); } static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) { @@ -263,6 +267,7 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) { netdata_mutex_lock(&host->sender->mutex); cbuffer_flush(host->sender->buffer); + replication_recalculate_buffer_used_ratio_unsafe(host->sender); netdata_mutex_unlock(&host->sender->mutex); rrdpush_sender_thread_reset_all_charts(host); @@ -770,6 +775,7 @@ static ssize_t attempt_to_send(struct sender_state *s) { else debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); + replication_recalculate_buffer_used_ratio_unsafe(s); netdata_mutex_unlock(&s->mutex); return ret; |