summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-22 22:42:16 +0200
committerGitHub <noreply@github.com>2022-11-22 22:42:16 +0200
commit4e61a4244e2ab45c29de0ddd84bfec8d9339f388 (patch)
tree4882db8ee7bd9de3251960320c7f553edc52b6c9 /streaming
parent77a304f52e4c6aadef0eac06b4869b7e1c829175 (diff)
Replication fixes #3 (#14035)
* cleanup and additional information about replication * fix deadlock on sender mutex * do not ignore start streaming empty requests; when there duplicate requests, merge them * flipped the flag * final touch * added queued flag on the charts to prevent them from being obsoleted by the service thread
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c28
-rw-r--r--streaming/replication.c151
-rw-r--r--streaming/replication.h2
-rw-r--r--streaming/rrdpush.h34
-rw-r--r--streaming/sender.c74
5 files changed, 189 insertions, 100 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 32b510e7ce..61ee33bc45 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -431,6 +431,15 @@ done:
return result;
}
+static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) {
+ RRDSET *st;
+ rrdset_foreach_read(st, rpt->host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ }
+ rrdset_foreach_done(st);
+ rrdhost_receiver_replicating_charts_zero(rpt->host);
+}
static int rrdpush_receive(struct receiver_state *rpt)
{
@@ -721,14 +730,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_set_is_parent_label(++localhost->senders_count);
- if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
- RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- }
- rrdset_foreach_done(st);
- }
-
+ rrdpush_receiver_replication_reset(rpt);
rrdcontext_host_child_connected(rpt->host);
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
@@ -750,16 +752,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
rpt->hostname, rpt->client_ip, rpt->client_port, count);
- if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
- RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- }
- rrdset_foreach_done(st);
- }
-
rrdcontext_host_child_disconnected(rpt->host);
+ rrdpush_receiver_replication_reset(rpt);
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
diff --git a/streaming/replication.c b/streaming/replication.c
index 3cb8424113..3f2f7939c6 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -290,7 +290,7 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(
st->replay.after != 0 || st->replay.before != 0,
- "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight",
+ "REPLAY ERROR: host '%s', chart '%s': sending replication request, while there is another inflight",
rrdhost_hostname(st->rrdhost), rrdset_id(st)
);
@@ -403,15 +403,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
// replication request in sender DICTIONARY
// used for de-duplicating the requests
struct replication_request {
- struct sender_state *sender;
- usec_t sender_last_flush_ut;
- STRING *chart_id;
- time_t after; // key for sorting (JudyL)
- time_t before;
- Word_t unique_id;
- bool start_streaming;
- bool found;
- bool index_in_judy;
+ struct sender_state *sender; // the sender we should put the reply at
+ STRING *chart_id; // the chart of the request
+ time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
+ time_t before; // the end time of the query (maybe zero)
+ bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
+
+ usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
+ Word_t unique_id; // auto-increment, later requests have bigger
+ bool found; // used as a result boolean for the find call
+ bool indexed_in_judy; // true when the request is indexed in judy
};
// replication sort entry in JudyL array
@@ -486,13 +487,15 @@ static void replication_recursive_unlock() {
static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+ rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
+
// copy the request
rse->rq = rq;
rse->unique_id = rep.next_unique_id++;
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
- rq->index_in_judy = false;
+ rq->indexed_in_judy = false;
return rse;
}
@@ -524,7 +527,7 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
// add it to the inner judy, using unique_id as key
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
*item = rse;
- rq->index_in_judy = true;
+ rq->indexed_in_judy = true;
if(!rep.first_time_t || rq->after < rep.first_time_t)
rep.first_time_t = rq->after;
@@ -540,7 +543,9 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
rep.removed++;
rep.requests_count--;
- rse->rq->index_in_judy = false;
+ rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
+
+ rse->rq->indexed_in_judy = false;
// delete it from the inner judy
JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
@@ -562,7 +567,7 @@ static void replication_sort_entry_del(struct replication_request *rq) {
struct replication_sort_entry *rse_to_delete = NULL;
replication_recursive_lock();
- if(rq->index_in_judy) {
+ if(rq->indexed_in_judy) {
inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0);
if (inner_judy_pptr) {
@@ -582,6 +587,13 @@ static void replication_sort_entry_del(struct replication_request *rq) {
replication_recursive_unlock();
}
+static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) {
+ if(unlikely(first))
+ return JudyLFirst(PArray, PIndex, PJE0);
+
+ return JudyLNext(PArray, PIndex, PJE0);
+}
+
static struct replication_request replication_request_get_first_available() {
Pvoid_t *inner_judy_pptr;
@@ -590,16 +602,13 @@ static struct replication_request replication_request_get_first_available() {
struct replication_request rq = (struct replication_request){ .found = false };
- if(rep.last_after && rep.last_unique_id) {
- rep.last_after--;
- rep.last_unique_id--;
- }
- else {
+ if(unlikely(!rep.last_after || !rep.last_unique_id)) {
rep.last_after = 0;
rep.last_unique_id = 0;
}
- while(!rq.found && (inner_judy_pptr = JudyLNext(rep.JudyL_array, &rep.last_after, PJE0))) {
+ bool find_same_after = true;
+ while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(rep.JudyL_array, &rep.last_after, find_same_after))) {
Pvoid_t *our_item_pptr;
while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) {
@@ -610,10 +619,10 @@ static struct replication_request replication_request_get_first_available() {
rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
bool sender_has_been_flushed_since_this_request =
- rse->rq->sender_last_flush_ut != __atomic_load_n(&s->last_flush_time_ut, __ATOMIC_SEQ_CST);
+ rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
bool sender_has_room_to_spare =
- s->replication_sender_buffer_percent_used <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
+ s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
rep.skipped_not_connected++;
@@ -635,6 +644,9 @@ static struct replication_request replication_request_get_first_available() {
rep.skipped_no_room++;
}
+ // call JudyLNext from now on
+ find_same_after = false;
+
// prepare for the next iteration on the outer loop
rep.last_unique_id = 0;
}
@@ -650,6 +662,14 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may
struct sender_state *s = sender_state; (void)s;
struct replication_request *rq = value;
+ RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
+ if(!st) {
+ internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
+ string2str(rq->chart_id), rrdhost_hostname(rq->sender->host));
+ }
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+
// IMPORTANT:
// We use the react instead of the insert callback
// because we want the item to be atomically visible
@@ -661,7 +681,9 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may
// related to it.
replication_sort_entry_add(rq);
- __atomic_fetch_add(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
+
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_plus_one(s);
}
static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
@@ -671,24 +693,63 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
internal_error(
true,
- "STREAM %s [send to %s]: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ "STREAM %s [send to %s]: REPLAY ERROR: merging duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
(unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
(unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
- string_freez(rq_new->chart_id);
+ bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false;
- return false;
+ if(rq_new->after < rq->after && rq_new->after != 0)
+ updated_after = true;
+
+ if(rq_new->before > rq->before)
+ updated_before = true;
+
+ if(rq_new->start_streaming != rq->start_streaming)
+ updated_start_streaming = true;
+
+ if(updated_after || updated_before || updated_start_streaming) {
+ replication_recursive_lock();
+
+ if(rq->indexed_in_judy)
+ replication_sort_entry_del(rq);
+
+ if(rq_new->after < rq->after && rq_new->after != 0)
+ rq->after = rq_new->after;
+
+ if(rq->after == 0)
+ rq->before = 0;
+ else if(rq_new->before > rq->before)
+ rq->before = rq_new->before;
+
+ rq->start_streaming = rq->start_streaming;
+ replication_sort_entry_add(rq);
+
+ replication_recursive_unlock();
+ updated = true;
+
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY ERROR: updated duplicate replication command for chart '%s' (from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false");
+ }
+
+ string_freez(rq_new->chart_id);
+ return updated;
}
static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
struct replication_request *rq = value;
- if(rq->index_in_judy)
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_minus_one(rq->sender);
+
+ if(rq->indexed_in_judy)
replication_sort_entry_del(rq);
string_freez(rq->chart_id);
- __atomic_fetch_sub(&rq->sender->replication_pending_requests, 1, __ATOMIC_SEQ_CST);
}
@@ -702,13 +763,13 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.after = after,
.before = before,
.start_streaming = start_streaming,
- .sender_last_flush_ut = __atomic_load_n(&sender->last_flush_time_ut, __ATOMIC_SEQ_CST),
+ .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
};
dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request));
}
-void replication_flush_sender(struct sender_state *sender) {
+void replication_sender_delete_pending_requests(struct sender_state *sender) {
// allow the dictionary destructor to go faster on locks
replication_recursive_lock();
dictionary_flush(sender->replication_requests);
@@ -746,7 +807,7 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
replication_recursive_unlock();
}
- s->replication_sender_buffer_percent_used = percentage;
+ s->buffer_used_percentage = percentage;
}
// ----------------------------------------------------------------------------
@@ -843,12 +904,19 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_is_busy(WORKER_JOB_FIND_CHART);
RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id));
if(!st) {
- internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
+ internal_error(true, "REPLAY ERROR: chart '%s' not found on host '%s'",
string2str(rq.chart_id), rrdhost_hostname(rq.sender->host));
continue;
}
+ if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ }
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+
worker_is_busy(WORKER_JOB_QUERYING);
latest_first_time_t = rq.after;
@@ -856,8 +924,8 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(rq.after < rq.sender->replication_first_time || !rq.sender->replication_first_time)
rq.sender->replication_first_time = rq.after;
- if(rq.before < rq.sender->replication_min_time || !rq.sender->replication_min_time)
- rq.sender->replication_min_time = rq.before;
+ if(rq.before < rq.sender->replication_current_time || !rq.sender->replication_current_time)
+ rq.sender->replication_current_time = rq.before;
netdata_thread_disable_cancelability();
@@ -869,17 +937,20 @@ void *replication_thread_main(void *ptr __maybe_unused) {
rep.executed++;
- if(start_streaming && rq.sender_last_flush_ut == __atomic_load_n(&rq.sender->last_flush_time_ut, __ATOMIC_SEQ_CST)) {
+ if(start_streaming && rq.sender_last_flush_ut == rrdpush_sender_get_flush_time(rq.sender)) {
worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING);
- __atomic_fetch_add(&rq.sender->receiving_metrics, 1, __ATOMIC_SEQ_CST);
// enable normal streaming if we have to
// but only if the sender buffer has not been flushed since we started
- debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
- rrdhost_hostname(rq.sender->host), rrdset_id(st));
-
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+ }
+ else
+ internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating",
+ string2str(rq.chart_id), rrdhost_hostname(st->rrdhost));
}
}
diff --git a/streaming/replication.h b/streaming/replication.h
index 7542e43d08..dde3cc1a96 100644
--- a/streaming/replication.h
+++ b/streaming/replication.h
@@ -16,7 +16,7 @@ bool replicate_chart_request(send_command callback, void *callback_data,
void replication_init_sender(struct sender_state *sender);
void replication_cleanup_sender(struct sender_state *sender);
-void replication_flush_sender(struct sender_state *sender);
+void replication_sender_delete_pending_requests(struct sender_state *sender);
void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming);
void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s);
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index ed3b30bc5c..079c63acc5 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -159,20 +159,36 @@ struct sender_state {
struct compressor_state *compressor;
#endif
#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl; // Structure used to encrypt the connection
+ struct netdata_ssl ssl; // structure used to encrypt the connection
#endif
- DICTIONARY *replication_requests;
- size_t replication_pending_requests;
- time_t replication_first_time;
- time_t replication_min_time;
- size_t replication_sender_buffer_percent_used;
- bool replication_reached_max;
+ DICTIONARY *replication_requests; // de-duplication of replication requests, per chart
- usec_t last_flush_time_ut;
- size_t receiving_metrics;
+ size_t replication_pending_requests; // the currently outstanding replication requests
+ size_t replication_charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+
+ time_t replication_first_time; // the oldest time that has been requested to be replicated
+ time_t replication_current_time; // the minimum(before) of the executed replication requests
+
+ bool replication_reached_max; // used to avoid resetting the replication thread too frequently
+
+ size_t buffer_used_percentage; // the current utilization of the sending buffer
+ usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
};
+#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED);
+#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->last_flush_time_ut), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication_charts_replicating), __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication_charts_replicating), 0, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication_pending_requests), __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication_pending_requests), 0, __ATOMIC_RELAXED)
+
struct receiver_state {
RRDHOST *host;
netdata_thread_t thread;
diff --git a/streaming/sender.c b/streaming/sender.c
index 72affc2907..85aad3a3e5 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -169,19 +169,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
rrdpush_signal_sender_to_wake_up(s);
}
-
-static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- if(host->sender->rrdpush_sender_socket != -1) {
- close(host->sender->rrdpush_sender_socket);
- host->sender->rrdpush_sender_socket = -1;
- }
-
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
-
- replication_flush_sender(host->sender);
-}
-
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
buffer_sprintf(
wb
@@ -236,21 +223,10 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host));
- bool receive_has_replication = host != localhost && host->receiver && stream_has_capability(host->receiver, STREAM_CAP_REPLICATION);
- bool send_has_replication = host->sender && stream_has_capability(host->sender, STREAM_CAP_REPLICATION);
-
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
-
- if(!receive_has_replication)
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
-
- if(send_has_replication)
- // it will be enabled once replication is done on the sending side
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- else
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
st->upstream_resync_time = 0;
@@ -260,21 +236,52 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
rrddim_foreach_done(rd);
}
rrdset_foreach_done(st);
+
+ rrdhost_sender_replicating_charts_zero(host);
}
-static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
- __atomic_store_n(&host->sender->last_flush_time_ut, now_realtime_usec(), __ATOMIC_SEQ_CST);
+static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
netdata_mutex_lock(&host->sender->mutex);
+
+ // flush the output buffer from any data it may have
cbuffer_flush(host->sender->buffer);
replication_recalculate_buffer_used_ratio_unsafe(host->sender);
+
netdata_mutex_unlock(&host->sender->mutex);
+}
+
+static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
+ // stop all replication commands inflight
+ replication_sender_delete_pending_requests(host->sender);
+
+ // reset the state of all charts
rrdpush_sender_thread_reset_all_charts(host);
+
+ rrdpush_sender_replicating_charts_zero(host->sender);
+}
+
+static void rrdpush_sender_on_connect(RRDHOST *host) {
+ rrdpush_sender_cbuffer_flush(host);
+ rrdpush_sender_charts_and_replication_reset(host);
rrdpush_sender_thread_send_custom_host_variables(host);
- replication_flush_sender(host->sender);
+}
+
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+ if(host->sender->rrdpush_sender_socket != -1) {
+ close(host->sender->rrdpush_sender_socket);
+ host->sender->rrdpush_sender_socket = -1;
+ }
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- __atomic_store_n(&host->sender->receiving_metrics, 0, __ATOMIC_SEQ_CST);
+ // do not flush the circular buffer here
+ // this function is called sometimes with the mutex lock, sometimes without the lock
+ rrdpush_sender_charts_and_replication_reset(host);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -704,7 +711,7 @@ static bool attempt_to_connect(struct sender_state *state)
if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
// reset the buffer, to properly send charts and metrics
- rrdpush_sender_thread_data_flush(state->host);
+ rrdpush_sender_on_connect(state->host);
// send from the beginning
state->begin = 0;
@@ -1185,8 +1192,9 @@ void *rrdpush_sender_thread(void *ptr) {
// If the TCP window never opened then something is wrong, restart connection
if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout &&
- __atomic_load_n(&s->replication_pending_requests, __ATOMIC_SEQ_CST) == 0) &&
- __atomic_load_n(&s->receiving_metrics, __ATOMIC_SEQ_CST) != 0) {
+ !rrdpush_sender_pending_replication_requests(s) &&
+ !rrdpush_sender_replicating_charts(s)
+ )) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
rrdpush_sender_thread_close_socket(s->host);