diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-09 20:43:52 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-09 20:43:52 +0200 |
commit | 96ba2dadb48439e667c62b721a6d8c41a92a5187 (patch) | |
tree | c45ae146c1c1a55d3d63981ab04dcbefaf1c09b9 /streaming | |
parent | 4b39609c35fceae5374fd051c0b987eb708312f6 (diff) |
break active-active loop from replicating non-existing child to each other (#13968)
break active-active loop from replicating non-existing child to each-other
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 5 | ||||
-rw-r--r-- | streaming/rrdpush.c | 8 | ||||
-rw-r--r-- | streaming/sender.c | 6 |
3 files changed, 10 insertions, 9 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index ed5d51fe3d..40673f05b4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -726,6 +726,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdcontext_host_child_connected(rpt->host); + + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + size_t count = streaming_parser(rpt, &cd, fp_in, fp_out, #ifdef ENABLE_HTTPS (rpt->ssl.conn) ? &rpt->ssl : NULL @@ -734,6 +737,8 @@ static int rrdpush_receive(struct receiver_state *rpt) #endif ); + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, "DISCONNECTED"); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 0090f5f22f..187d9a9ebb 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -378,16 +378,12 @@ void rrdset_done_push(RRDSET *st) { RRDHOST *host = st->rrdhost; // fetch the flags we need to check with one atomic operation - RRDHOST_FLAGS host_flags = rrdhost_flag_check(host, - RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS - | RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS - | RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN - ); + RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST); // check if we are not connected if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) { - if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN))) + if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED)))) rrdpush_sender_thread_spawn(host); if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { diff --git a/streaming/sender.c b/streaming/sender.c index 8579f81475..0682597286 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -165,13 +165,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - if(host->sender->rrdpush_sender_socket != -1) { close(host->sender->rrdpush_sender_socket); host->sender->rrdpush_sender_socket = -1; } + + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); } static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) { |