summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdatabase/engine/metadata_log/metalogpluginsd.c6
-rw-r--r--database/rrdhost.c6
-rw-r--r--streaming/receiver.c10
-rw-r--r--streaming/rrdpush.c9
4 files changed, 22 insertions, 9 deletions
diff --git a/database/engine/metadata_log/metalogpluginsd.c b/database/engine/metadata_log/metalogpluginsd.c
index 25af24143b..1fa94e315a 100755
--- a/database/engine/metadata_log/metalogpluginsd.c
+++ b/database/engine/metadata_log/metalogpluginsd.c
@@ -12,7 +12,6 @@ PARSER_RC metalog_pluginsd_host_action(
{
int history = 5;
RRD_MEMORY_MODE mode = RRD_MEMORY_MODE_DBENGINE;
- int health_enabled = default_health_enabled;
int rrdpush_enabled = default_rrdpush_enabled;
char *rrdpush_destination = default_rrdpush_destination;
char *rrdpush_api_key = default_rrdpush_api_key;
@@ -49,9 +48,6 @@ PARSER_RC metalog_pluginsd_host_action(
update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
if(update_every < 0) update_every = 1;
- //health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled);
- health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
-
//rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled);
rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled);
@@ -77,7 +73,7 @@ PARSER_RC metalog_pluginsd_host_action(
, update_every
, history // entries
, mode
- , health_enabled // health enabled
+ , 0 // health enabled
, rrdpush_enabled // Push enabled
, rrdpush_destination //destination
, rrdpush_api_key // api key
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 08bdb1b3ba..c5e0e069cd 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -573,6 +573,12 @@ RRDHOST *rrdhost_find_or_create(
, rrdpush_send_charts_matching
, system_info);
}
+ if (host) {
+ rrdhost_wrlock(host);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->senders_disconnected_time = 0;
+ rrdhost_unlock(host);
+ }
rrdhost_cleanup_orphan_hosts_nolock(host);
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 57962a3928..8c0a2019b6 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -421,9 +421,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
*/
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_ORPHAN);
// rpt->host->connected_senders++;
- rpt->host->senders_disconnected_time = 0;
rpt->host->labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
if(health_enabled != CONFIG_BOOLEAN_NO) {
@@ -453,17 +451,21 @@ static int rrdpush_receive(struct receiver_state *rpt)
// During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
if (!netdata_exit && rpt->host) {
+ rrd_rdlock();
+ rrdhost_wrlock(rpt->host);
netdata_mutex_lock(&rpt->host->receiver_lock);
if (rpt->host->receiver == rpt) {
- rrdhost_wrlock(rpt->host);
rpt->host->senders_disconnected_time = now_realtime_sec();
rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
if(health_enabled == CONFIG_BOOLEAN_AUTO)
rpt->host->health_enabled = 0;
- rrdhost_unlock(rpt->host);
+ }
+ if (rpt->host->receiver == rpt) {
rrdpush_sender_thread_stop(rpt->host);
}
netdata_mutex_unlock(&rpt->host->receiver_lock);
+ rrdhost_unlock(rpt->host);
+ rrd_unlock();
}
// cleanup
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 65ca18b5b7..bdbb2fa1c9 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -632,11 +632,16 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
* lookup to the now-attached structure).
*/
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+
+ rrd_rdlock();
RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
host = NULL;
if (host) {
+ rrdhost_wrlock(host);
netdata_mutex_lock(&host->receiver_lock);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->senders_disconnected_time = 0;
if (host->receiver != NULL) {
time_t age = now_realtime_sec() - host->receiver->last_msg_t;
if (age > 30) {
@@ -647,6 +652,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
}
else {
netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ rrd_unlock();
log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname,
"REJECTED - ALREADY CONNECTED");
info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is active (within last %ld sec), rejecting new connection.", host->hostname, w->client_ip, w->client_port, age);
@@ -659,7 +666,9 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
}
host->receiver = rpt;
netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
}
+ rrd_unlock();
rpt->last_msg_t = now_realtime_sec();