diff options
-rwxr-xr-x | database/engine/metadata_log/metalogpluginsd.c | 6 | ||||
-rw-r--r-- | database/rrdhost.c | 6 | ||||
-rw-r--r-- | streaming/receiver.c | 10 | ||||
-rw-r--r-- | streaming/rrdpush.c | 9 |
4 files changed, 22 insertions, 9 deletions
diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c index 25af24143b..1fa94e315a 100755 --- a/database/engine/metadata_log/metalogpluginsd.c +++ b/database/engine/metadata_log/metalogpluginsd.c @@ -12,7 +12,6 @@ PARSER_RC metalog_pluginsd_host_action( { int history = 5; RRD_MEMORY_MODE mode = RRD_MEMORY_MODE_DBENGINE; - int health_enabled = default_health_enabled; int rrdpush_enabled = default_rrdpush_enabled; char *rrdpush_destination = default_rrdpush_destination; char *rrdpush_api_key = default_rrdpush_api_key; @@ -49,9 +48,6 @@ PARSER_RC metalog_pluginsd_host_action( update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every); if(update_every < 0) update_every = 1; - //health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled); - health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled); - //rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled); rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled); @@ -77,7 +73,7 @@ PARSER_RC metalog_pluginsd_host_action( , update_every , history // entries , mode - , health_enabled // health enabled + , 0 // health enabled , rrdpush_enabled // Push enabled , rrdpush_destination //destination , rrdpush_api_key // api key diff --git a/database/rrdhost.c b/database/rrdhost.c index 08bdb1b3ba..c5e0e069cd 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -573,6 +573,12 @@ RRDHOST *rrdhost_find_or_create( , rrdpush_send_charts_matching , system_info); } + if (host) { + rrdhost_wrlock(host); + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + host->senders_disconnected_time = 0; + rrdhost_unlock(host); + } rrdhost_cleanup_orphan_hosts_nolock(host); diff --git a/streaming/receiver.c b/streaming/receiver.c index 57962a3928..8c0a2019b6 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -421,9 +421,7 @@ static int rrdpush_receive(struct receiver_state *rpt) } */ - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_ORPHAN); // rpt->host->connected_senders++; - rpt->host->senders_disconnected_time = 0; rpt->host->labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM; if(health_enabled != CONFIG_BOOLEAN_NO) { @@ -453,17 +451,21 @@ static int rrdpush_receive(struct receiver_state *rpt) // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread if (!netdata_exit && rpt->host) { + rrd_rdlock(); + rrdhost_wrlock(rpt->host); netdata_mutex_lock(&rpt->host->receiver_lock); if (rpt->host->receiver == rpt) { - rrdhost_wrlock(rpt->host); rpt->host->senders_disconnected_time = now_realtime_sec(); rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); if(health_enabled == CONFIG_BOOLEAN_AUTO) rpt->host->health_enabled = 0; - rrdhost_unlock(rpt->host); + } + if (rpt->host->receiver == rpt) { rrdpush_sender_thread_stop(rpt->host); } netdata_mutex_unlock(&rpt->host->receiver_lock); + rrdhost_unlock(rpt->host); + rrd_unlock(); } // cleanup diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 65ca18b5b7..bdbb2fa1c9 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -632,11 +632,16 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { * lookup to the now-attached structure). */ struct receiver_state *rpt = callocz(1, sizeof(*rpt)); + + rrd_rdlock(); RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0); if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ host = NULL; if (host) { + rrdhost_wrlock(host); netdata_mutex_lock(&host->receiver_lock); + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + host->senders_disconnected_time = 0; if (host->receiver != NULL) { time_t age = now_realtime_sec() - host->receiver->last_msg_t; if (age > 30) { @@ -647,6 +652,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { } else { netdata_mutex_unlock(&host->receiver_lock); + rrdhost_unlock(host); + rrd_unlock(); log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED"); info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is active (within last %ld sec), rejecting new connection.", host->hostname, w->client_ip, w->client_port, age); @@ -659,7 +666,9 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { } host->receiver = rpt; netdata_mutex_unlock(&host->receiver_lock); + rrdhost_unlock(host); } + rrd_unlock(); rpt->last_msg_t = now_realtime_sec(); |