summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2020-09-04 13:03:25 +0300
committerGitHub <noreply@github.com>2020-09-04 13:03:25 +0300
commitadf253f710f62106193bbcf5f46543ba6fd93d2a (patch)
tree8bba2a75dc41ce88d4d816a9b025cc855c7bfdfe /streaming
parentbe5945212363633b7c5fc704739f93a00ccf69ea (diff)
Fix race condition with orphan hosts (#9862)
* Fix race condition between orphan host cleanup and new streaming connections. * Remove health enabling from log replay, it will be handled at streaming connection time.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c10
-rw-r--r--streaming/rrdpush.c9
2 files changed, 15 insertions, 4 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 57962a3928..8c0a2019b6 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -421,9 +421,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
*/
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_ORPHAN);
// rpt->host->connected_senders++;
- rpt->host->senders_disconnected_time = 0;
rpt->host->labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
if(health_enabled != CONFIG_BOOLEAN_NO) {
@@ -453,17 +451,21 @@ static int rrdpush_receive(struct receiver_state *rpt)
// During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
if (!netdata_exit && rpt->host) {
+ rrd_rdlock();
+ rrdhost_wrlock(rpt->host);
netdata_mutex_lock(&rpt->host->receiver_lock);
if (rpt->host->receiver == rpt) {
- rrdhost_wrlock(rpt->host);
rpt->host->senders_disconnected_time = now_realtime_sec();
rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
if(health_enabled == CONFIG_BOOLEAN_AUTO)
rpt->host->health_enabled = 0;
- rrdhost_unlock(rpt->host);
+ }
+ if (rpt->host->receiver == rpt) {
rrdpush_sender_thread_stop(rpt->host);
}
netdata_mutex_unlock(&rpt->host->receiver_lock);
+ rrdhost_unlock(rpt->host);
+ rrd_unlock();
}
// cleanup
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 65ca18b5b7..bdbb2fa1c9 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -632,11 +632,16 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
* lookup to the now-attached structure).
*/
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+
+ rrd_rdlock();
RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
host = NULL;
if (host) {
+ rrdhost_wrlock(host);
netdata_mutex_lock(&host->receiver_lock);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->senders_disconnected_time = 0;
if (host->receiver != NULL) {
time_t age = now_realtime_sec() - host->receiver->last_msg_t;
if (age > 30) {
@@ -647,6 +652,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
}
else {
netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ rrd_unlock();
log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname,
"REJECTED - ALREADY CONNECTED");
info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is active (within last %ld sec), rejecting new connection.", host->hostname, w->client_ip, w->client_port, age);
@@ -659,7 +666,9 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
}
host->receiver = rpt;
netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
}
+ rrd_unlock();
rpt->last_msg_t = now_realtime_sec();