diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-22 22:42:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-22 22:42:16 +0200 |
commit | 4e61a4244e2ab45c29de0ddd84bfec8d9339f388 (patch) | |
tree | 4882db8ee7bd9de3251960320c7f553edc52b6c9 /streaming/replication.c | |
parent | 77a304f52e4c6aadef0eac06b4869b7e1c829175 (diff) |
Replication fixes #3 (#14035)
* cleanup and additional information about replication
* fix deadlock on sender mutex
* do not ignore start streaming empty requests; when there duplicate requests, merge them
* flipped the flag
* final touch
* added queued flag on the charts to prevent them from being obsoleted by the service thread
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 151 |
1 files changed, 111 insertions, 40 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 3cb8424113..3f2f7939c6 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -290,7 +290,7 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR #ifdef NETDATA_INTERNAL_CHECKS internal_error( st->replay.after != 0 || st->replay.before != 0, - "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight", + "REPLAY ERROR: host '%s', chart '%s': sending replication request, while there is another inflight", rrdhost_hostname(st->rrdhost), rrdset_id(st) ); @@ -403,15 +403,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST // replication request in sender DICTIONARY // used for de-duplicating the requests struct replication_request { - struct sender_state *sender; - usec_t sender_last_flush_ut; - STRING *chart_id; - time_t after; // key for sorting (JudyL) - time_t before; - Word_t unique_id; - bool start_streaming; - bool found; - bool index_in_judy; + struct sender_state *sender; // the sender we should put the reply at + STRING *chart_id; // the chart of the request + time_t after; // the start time of the query (maybe zero) key for sorting (JudyL) + time_t before; // the end time of the query (maybe zero) + bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming + + usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request + Word_t unique_id; // auto-increment, later requests have bigger + bool found; // used as a result boolean for the find call + bool indexed_in_judy; // true when the request is indexed in judy }; // replication sort entry in JudyL array @@ -486,13 +487,15 @@ static void replication_recursive_unlock() { static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); + rrdpush_sender_pending_replication_requests_plus_one(rq->sender); + // copy the request rse->rq = rq; rse->unique_id = rep.next_unique_id++; // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; - rq->index_in_judy = false; + rq->indexed_in_judy = false; return rse; } @@ -524,7 +527,7 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat // add it to the inner judy, using unique_id as key Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); *item = rse; - rq->index_in_judy = true; + rq->indexed_in_judy = true; if(!rep.first_time_t || rq->after < rep.first_time_t) rep.first_time_t = rq->after; @@ -540,7 +543,9 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor rep.removed++; rep.requests_count--; - rse->rq->index_in_judy = false; + rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); + + rse->rq->indexed_in_judy = false; // delete it from the inner judy JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); @@ -562,7 +567,7 @@ static void replication_sort_entry_del(struct replication_request *rq) { struct replication_sort_entry *rse_to_delete = NULL; replication_recursive_lock(); - if(rq->index_in_judy) { + if(rq->indexed_in_judy) { inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0); if (inner_judy_pptr) { @@ -582,6 +587,13 @@ static void replication_sort_entry_del(struct replication_request *rq) { replication_recursive_unlock(); } +static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) { + if(unlikely(first)) + return JudyLFirst(PArray, PIndex, PJE0); + + return JudyLNext(PArray, PIndex, PJE0); +} + static struct replication_request replication_request_get_first_available() { Pvoid_t *inner_judy_pptr; @@ -590,16 +602,13 @@ static struct replication_request replication_request_get_first_available() { struct replication_request rq = (struct replication_request){ .found = false }; - if(rep.last_after && rep.last_unique_id) { - rep.last_after--; - rep.last_unique_id--; - } - else { + if(unlikely(!rep.last_after || !rep.last_unique_id)) { rep.last_after = 0; rep.last_unique_id = 0; } - while(!rq.found && (inner_judy_pptr = JudyLNext(rep.JudyL_array, &rep.last_after, PJE0))) { + bool find_same_after = true; + while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(rep.JudyL_array, &rep.last_after, find_same_after))) { Pvoid_t *our_item_pptr; while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) { @@ -610,10 +619,10 @@ static struct replication_request replication_request_get_first_available() { rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); bool sender_has_been_flushed_since_this_request = - rse->rq->sender_last_flush_ut != __atomic_load_n(&s->last_flush_time_ut, __ATOMIC_SEQ_CST); + rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); bool sender_has_room_to_spare = - s->replication_sender_buffer_percent_used <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED; + s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED; if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { rep.skipped_not_connected++; @@ -635,6 +644,9 @@ static struct replication_request replication_request_get_first_available() { rep.skipped_no_room++; } + // call JudyLNext from now on + find_same_after = false; + // prepare for the next iteration on the outer loop rep.last_unique_id = 0; } @@ -650,6 +662,14 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may struct sender_state *s = sender_state; (void)s; struct replication_request *rq = value; + RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + if(!st) { + internal_error(true, "REPLAY: chart '%s' not found on host '%s'", + string2str(rq->chart_id), rrdhost_hostname(rq->sender->host)); + } + else + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED); + // IMPORTANT: // We use the react instead of the insert callback // because we want the item to be atomically visible @@ -661,7 +681,9 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may // related to it. replication_sort_entry_add(rq); - __atomic_fetch_add(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); + + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_plus_one(s); } static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) { @@ -671,24 +693,63 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ internal_error( true, - "STREAM %s [send to %s]: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + "STREAM %s [send to %s]: REPLAY ERROR: merging duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); - string_freez(rq_new->chart_id); + bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false; - return false; + if(rq_new->after < rq->after && rq_new->after != 0) + updated_after = true; + + if(rq_new->before > rq->before) + updated_before = true; + + if(rq_new->start_streaming != rq->start_streaming) + updated_start_streaming = true; + + if(updated_after || updated_before || updated_start_streaming) { + replication_recursive_lock(); + + if(rq->indexed_in_judy) + replication_sort_entry_del(rq); + + if(rq_new->after < rq->after && rq_new->after != 0) + rq->after = rq_new->after; + + if(rq->after == 0) + rq->before = 0; + else if(rq_new->before > rq->before) + rq->before = rq_new->before; + + rq->start_streaming = rq->start_streaming; + replication_sort_entry_add(rq); + + replication_recursive_unlock(); + updated = true; + + internal_error( + true, + "STREAM %s [send to %s]: REPLAY ERROR: updated duplicate replication command for chart '%s' (from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false"); + } + + string_freez(rq_new->chart_id); + return updated; } static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) { struct replication_request *rq = value; - if(rq->index_in_judy) + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_minus_one(rq->sender); + + if(rq->indexed_in_judy) replication_sort_entry_del(rq); string_freez(rq->chart_id); - __atomic_fetch_sub(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST); } @@ -702,13 +763,13 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .after = after, .before = before, .start_streaming = start_streaming, - .sender_last_flush_ut = __atomic_load_n(&sender->last_flush_time_ut, __ATOMIC_SEQ_CST), + .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), }; dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request)); } -void replication_flush_sender(struct sender_state *sender) { +void replication_sender_delete_pending_requests(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks replication_recursive_lock(); dictionary_flush(sender->replication_requests); @@ -746,7 +807,7 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { replication_recursive_unlock(); } - s->replication_sender_buffer_percent_used = percentage; + s->buffer_used_percentage = percentage; } // ---------------------------------------------------------------------------- @@ -843,12 +904,19 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_is_busy(WORKER_JOB_FIND_CHART); RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id)); if(!st) { - internal_error(true, "REPLAY: chart '%s' not found on host '%s'", + internal_error(true, "REPLAY ERROR: chart '%s' not found on host '%s'", string2str(rq.chart_id), rrdhost_hostname(rq.sender->host)); continue; } + if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_plus_one(st->rrdhost); + } + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED); + worker_is_busy(WORKER_JOB_QUERYING); latest_first_time_t = rq.after; @@ -856,8 +924,8 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(rq.after < rq.sender->replication_first_time || !rq.sender->replication_first_time) rq.sender->replication_first_time = rq.after; - if(rq.before < rq.sender->replication_min_time || !rq.sender->replication_min_time) - rq.sender->replication_min_time = rq.before; + if(rq.before < rq.sender->replication_current_time || !rq.sender->replication_current_time) + rq.sender->replication_current_time = rq.before; netdata_thread_disable_cancelability(); @@ -869,17 +937,20 @@ void *replication_thread_main(void *ptr __maybe_unused) { rep.executed++; - if(start_streaming && rq.sender_last_flush_ut == __atomic_load_n(&rq.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) { + if(start_streaming && rq.sender_last_flush_ut == rrdpush_sender_get_flush_time(rq.sender)) { worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING); - __atomic_fetch_add(&rq.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST); // enable normal streaming if we have to // but only if the sender buffer has not been flushed since we started - debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s", - rrdhost_hostname(rq.sender->host), rrdset_id(st)); - - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + } + else + internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating", + string2str(rq.chart_id), rrdhost_hostname(st->rrdhost)); } } |