summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-09 20:43:52 +0200
committerGitHub <noreply@github.com>2022-11-09 20:43:52 +0200
commit96ba2dadb48439e667c62b721a6d8c41a92a5187 (patch)
treec45ae146c1c1a55d3d63981ab04dcbefaf1c09b9 /streaming
parent4b39609c35fceae5374fd051c0b987eb708312f6 (diff)
break active-active loop from replicating non-existing child to each other (#13968)
break active-active loop from replicating non-existing child to each-other
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c5
-rw-r--r--streaming/rrdpush.c8
-rw-r--r--streaming/sender.c6
3 files changed, 10 insertions, 9 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index ed5d51fe3d..40673f05b4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -726,6 +726,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdcontext_host_child_connected(rpt->host);
+
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
size_t count = streaming_parser(rpt, &cd, fp_in, fp_out,
#ifdef ENABLE_HTTPS
(rpt->ssl.conn) ? &rpt->ssl : NULL
@@ -734,6 +737,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
#endif
);
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
log_stream_connection(rpt->client_ip, rpt->client_port,
rpt->key, rpt->host->machine_guid, rpt->hostname,
"DISCONNECTED");
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 0090f5f22f..187d9a9ebb 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -378,16 +378,12 @@ void rrdset_done_push(RRDSET *st) {
RRDHOST *host = st->rrdhost;
// fetch the flags we need to check with one atomic operation
- RRDHOST_FLAGS host_flags = rrdhost_flag_check(host,
- RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS
- | RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS
- | RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN
- );
+ RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST);
// check if we are not connected
if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
- if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)))
+ if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED))))
rrdpush_sender_thread_spawn(host);
if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
diff --git a/streaming/sender.c b/streaming/sender.c
index 8579f81475..0682597286 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -165,13 +165,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
-
if(host->sender->rrdpush_sender_socket != -1) {
close(host->sender->rrdpush_sender_socket);
host->sender->rrdpush_sender_socket = -1;
}
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
}
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {