summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-22 22:42:16 +0200
committerGitHub <noreply@github.com>2022-11-22 22:42:16 +0200
commit4e61a4244e2ab45c29de0ddd84bfec8d9339f388 (patch)
tree4882db8ee7bd9de3251960320c7f553edc52b6c9 /streaming/sender.c
parent77a304f52e4c6aadef0eac06b4869b7e1c829175 (diff)
Replication fixes #3 (#14035)
* cleanup and additional information about replication * fix deadlock on sender mutex * do not ignore start streaming empty requests; when there duplicate requests, merge them * flipped the flag * final touch * added queued flag on the charts to prevent them from being obsoleted by the service thread
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c74
1 files changed, 41 insertions, 33 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 72affc2907..85aad3a3e5 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -169,19 +169,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
rrdpush_signal_sender_to_wake_up(s);
}
-
-static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- if(host->sender->rrdpush_sender_socket != -1) {
- close(host->sender->rrdpush_sender_socket);
- host->sender->rrdpush_sender_socket = -1;
- }
-
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
-
- replication_flush_sender(host->sender);
-}
-
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
buffer_sprintf(
wb
@@ -236,21 +223,10 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host));
- bool receive_has_replication = host != localhost && host->receiver && stream_has_capability(host->receiver, STREAM_CAP_REPLICATION);
- bool send_has_replication = host->sender && stream_has_capability(host->sender, STREAM_CAP_REPLICATION);
-
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
-
- if(!receive_has_replication)
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
-
- if(send_has_replication)
- // it will be enabled once replication is done on the sending side
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- else
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
st->upstream_resync_time = 0;
@@ -260,21 +236,52 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
rrddim_foreach_done(rd);
}
rrdset_foreach_done(st);
+
+ rrdhost_sender_replicating_charts_zero(host);
}
-static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
- __atomic_store_n(&host->sender->last_flush_time_ut, now_realtime_usec(), __ATOMIC_SEQ_CST);
+static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
netdata_mutex_lock(&host->sender->mutex);
+
+ // flush the output buffer from any data it may have
cbuffer_flush(host->sender->buffer);
replication_recalculate_buffer_used_ratio_unsafe(host->sender);
+
netdata_mutex_unlock(&host->sender->mutex);
+}
+
+static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
+ // stop all replication commands inflight
+ replication_sender_delete_pending_requests(host->sender);
+
+ // reset the state of all charts
rrdpush_sender_thread_reset_all_charts(host);
+
+ rrdpush_sender_replicating_charts_zero(host->sender);
+}
+
+static void rrdpush_sender_on_connect(RRDHOST *host) {
+ rrdpush_sender_cbuffer_flush(host);
+ rrdpush_sender_charts_and_replication_reset(host);
rrdpush_sender_thread_send_custom_host_variables(host);
- replication_flush_sender(host->sender);
+}
+
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+ if(host->sender->rrdpush_sender_socket != -1) {
+ close(host->sender->rrdpush_sender_socket);
+ host->sender->rrdpush_sender_socket = -1;
+ }
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- __atomic_store_n(&host->sender->receiving_metrics, 0, __ATOMIC_SEQ_CST);
+ // do not flush the circular buffer here
+ // this function is called sometimes with the mutex lock, sometimes without the lock
+ rrdpush_sender_charts_and_replication_reset(host);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -704,7 +711,7 @@ static bool attempt_to_connect(struct sender_state *state)
if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
// reset the buffer, to properly send charts and metrics
- rrdpush_sender_thread_data_flush(state->host);
+ rrdpush_sender_on_connect(state->host);
// send from the beginning
state->begin = 0;
@@ -1185,8 +1192,9 @@ void *rrdpush_sender_thread(void *ptr) {
// If the TCP window never opened then something is wrong, restart connection
if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout &&
- __atomic_load_n(&s->replication_pending_requests, __ATOMIC_SEQ_CST) == 0) &&
- __atomic_load_n(&s->receiving_metrics, __ATOMIC_SEQ_CST) != 0) {
+ !rrdpush_sender_pending_replication_requests(s) &&
+ !rrdpush_sender_replicating_charts(s)
+ )) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
rrdpush_sender_thread_close_socket(s->host);