summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-22 22:42:16 +0200
committerGitHub <noreply@github.com>2022-11-22 22:42:16 +0200
commit4e61a4244e2ab45c29de0ddd84bfec8d9339f388 (patch)
tree4882db8ee7bd9de3251960320c7f553edc52b6c9 /streaming/replication.c
parent77a304f52e4c6aadef0eac06b4869b7e1c829175 (diff)
Replication fixes #3 (#14035)
* cleanup and additional information about replication * fix deadlock on sender mutex * do not ignore start streaming empty requests; when there duplicate requests, merge them * flipped the flag * final touch * added queued flag on the charts to prevent them from being obsoleted by the service thread
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c151
1 files changed, 111 insertions, 40 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 3cb8424113..3f2f7939c6 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -290,7 +290,7 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(
st->replay.after != 0 || st->replay.before != 0,
- "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight",
+ "REPLAY ERROR: host '%s', chart '%s': sending replication request, while there is another inflight",
rrdhost_hostname(st->rrdhost), rrdset_id(st)
);
@@ -403,15 +403,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
// replication request in sender DICTIONARY
// used for de-duplicating the requests
struct replication_request {
- struct sender_state *sender;
- usec_t sender_last_flush_ut;
- STRING *chart_id;
- time_t after; // key for sorting (JudyL)
- time_t before;
- Word_t unique_id;
- bool start_streaming;
- bool found;
- bool index_in_judy;
+ struct sender_state *sender; // the sender we should put the reply at
+ STRING *chart_id; // the chart of the request
+ time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
+ time_t before; // the end time of the query (maybe zero)
+ bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
+
+ usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
+ Word_t unique_id; // auto-increment, later requests have bigger
+ bool found; // used as a result boolean for the find call
+ bool indexed_in_judy; // true when the request is indexed in judy
};
// replication sort entry in JudyL array
@@ -486,13 +487,15 @@ static void replication_recursive_unlock() {
static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+ rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
+
// copy the request
rse->rq = rq;
rse->unique_id = rep.next_unique_id++;
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
- rq->index_in_judy = false;
+ rq->indexed_in_judy = false;
return rse;
}
@@ -524,7 +527,7 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
// add it to the inner judy, using unique_id as key
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
*item = rse;
- rq->index_in_judy = true;
+ rq->indexed_in_judy = true;
if(!rep.first_time_t || rq->after < rep.first_time_t)
rep.first_time_t = rq->after;
@@ -540,7 +543,9 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
rep.removed++;
rep.requests_count--;
- rse->rq->index_in_judy = false;
+ rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
+
+ rse->rq->indexed_in_judy = false;
// delete it from the inner judy
JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
@@ -562,7 +567,7 @@ static void replication_sort_entry_del(struct replication_request *rq) {
struct replication_sort_entry *rse_to_delete = NULL;
replication_recursive_lock();
- if(rq->index_in_judy) {
+ if(rq->indexed_in_judy) {
inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0);
if (inner_judy_pptr) {
@@ -582,6 +587,13 @@ static void replication_sort_entry_del(struct replication_request *rq) {
replication_recursive_unlock();
}
+static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) {
+ if(unlikely(first))
+ return JudyLFirst(PArray, PIndex, PJE0);
+
+ return JudyLNext(PArray, PIndex, PJE0);
+}
+
static struct replication_request replication_request_get_first_available() {
Pvoid_t *inner_judy_pptr;
@@ -590,16 +602,13 @@ static struct replication_request replication_request_get_first_available() {
struct replication_request rq = (struct replication_request){ .found = false };
- if(rep.last_after && rep.last_unique_id) {
- rep.last_after--;
- rep.last_unique_id--;
- }
- else {
+ if(unlikely(!rep.last_after || !rep.last_unique_id)) {
rep.last_after = 0;
rep.last_unique_id = 0;
}
- while(!rq.found && (inner_judy_pptr = JudyLNext(rep.JudyL_array, &rep.last_after, PJE0))) {
+ bool find_same_after = true;
+ while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(rep.JudyL_array, &rep.last_after, find_same_after))) {
Pvoid_t *our_item_pptr;
while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) {
@@ -610,10 +619,10 @@ static struct replication_request replication_request_get_first_available() {
rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
bool sender_has_been_flushed_since_this_request =
- rse->rq->sender_last_flush_ut != __atomic_load_n(&s->last_flush_time_ut, __ATOMIC_SEQ_CST);
+ rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
bool sender_has_room_to_spare =
- s->replication_sender_buffer_percent_used <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
+ s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
rep.skipped_not_connected++;
@@ -635,6 +644,9 @@ static struct replication_request replication_request_get_first_available() {
rep.skipped_no_room++;
}
+ // call JudyLNext from now on
+ find_same_after = false;
+
// prepare for the next iteration on the outer loop
rep.last_unique_id = 0;
}
@@ -650,6 +662,14 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may
struct sender_state *s = sender_state; (void)s;
struct replication_request *rq = value;
+ RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
+ if(!st) {
+ internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
+ string2str(rq->chart_id), rrdhost_hostname(rq->sender->host));
+ }
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+
// IMPORTANT:
// We use the react instead of the insert callback
// because we want the item to be atomically visible
@@ -661,7 +681,9 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may
// related to it.
replication_sort_entry_add(rq);
- __atomic_fetch_add(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
+
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_plus_one(s);
}
static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
@@ -671,24 +693,63 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
internal_error(
true,
- "STREAM %s [send to %s]: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ "STREAM %s [send to %s]: REPLAY ERROR: merging duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
(unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
(unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
- string_freez(rq_new->chart_id);
+ bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false;
- return false;
+ if(rq_new->after < rq->after && rq_new->after != 0)
+ updated_after = true;
+
+ if(rq_new->before > rq->before)
+ updated_before = true;
+
+ if(rq_new->start_streaming != rq->start_streaming)
+ updated_start_streaming = true;
+
+ if(updated_after || updated_before || updated_start_streaming) {
+ replication_recursive_lock();
+
+ if(rq->indexed_in_judy)
+ replication_sort_entry_del(rq);
+
+ if(rq_new->after < rq->after && rq_new->after != 0)
+ rq->after = rq_new->after;
+
+ if(rq->after == 0)
+ rq->before = 0;
+ else if(rq_new->before > rq->before)
+ rq->before = rq_new->before;
+
+ rq->start_streaming = rq->start_streaming;
+ replication_sort_entry_add(rq);
+
+ replication_recursive_unlock();
+ updated = true;
+
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY ERROR: updated duplicate replication command for chart '%s' (from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false");
+ }
+
+ string_freez(rq_new->chart_id);
+ return updated;
}
static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
struct replication_request *rq = value;
- if(rq->index_in_judy)
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_minus_one(rq->sender);
+
+ if(rq->indexed_in_judy)
replication_sort_entry_del(rq);
string_freez(rq->chart_id);
- __atomic_fetch_sub(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
}
@@ -702,13 +763,13 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.after = after,
.before = before,
.start_streaming = start_streaming,
- .sender_last_flush_ut = __atomic_load_n(&sender->last_flush_time_ut, __ATOMIC_SEQ_CST),
+ .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
};
dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request));
}
-void replication_flush_sender(struct sender_state *sender) {
+void replication_sender_delete_pending_requests(struct sender_state *sender) {
// allow the dictionary destructor to go faster on locks
replication_recursive_lock();
dictionary_flush(sender->replication_requests);
@@ -746,7 +807,7 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
replication_recursive_unlock();
}
- s->replication_sender_buffer_percent_used = percentage;
+ s->buffer_used_percentage = percentage;
}
// ----------------------------------------------------------------------------
@@ -843,12 +904,19 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_is_busy(WORKER_JOB_FIND_CHART);
RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id));
if(!st) {
- internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
+ internal_error(true, "REPLAY ERROR: chart '%s' not found on host '%s'",
string2str(rq.chart_id), rrdhost_hostname(rq.sender->host));
continue;
}
+ if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ }
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+
worker_is_busy(WORKER_JOB_QUERYING);
latest_first_time_t = rq.after;
@@ -856,8 +924,8 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(rq.after < rq.sender->replication_first_time || !rq.sender->replication_first_time)
rq.sender->replication_first_time = rq.after;
- if(rq.before < rq.sender->replication_min_time || !rq.sender->replication_min_time)
- rq.sender->replication_min_time = rq.before;
+ if(rq.before < rq.sender->replication_current_time || !rq.sender->replication_current_time)
+ rq.sender->replication_current_time = rq.before;
netdata_thread_disable_cancelability();
@@ -869,17 +937,20 @@ void *replication_thread_main(void *ptr __maybe_unused) {
rep.executed++;
- if(start_streaming && rq.sender_last_flush_ut == __atomic_load_n(&rq.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) {
+ if(start_streaming && rq.sender_last_flush_ut == rrdpush_sender_get_flush_time(rq.sender)) {
worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING);
- __atomic_fetch_add(&rq.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST);
// enable normal streaming if we have to
// but only if the sender buffer has not been flushed since we started
- debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
- rrdhost_hostname(rq.sender->host), rrdset_id(st));
-
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+ }
+ else
+ internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating",
+ string2str(rq.chart_id), rrdhost_hostname(st->rrdhost));
}
}