summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-11-26 17:26:01 +0100
committerGitHub <noreply@github.com>2020-11-26 17:26:01 +0100
commitf1db235a36d85f81c5a19a7087567405771874f7 (patch)
treef99084d252e0af0496bf4776fa2c118f15dbee1a /streaming
parent4f867a58e5287e54aa5ecae023868615cafb9f93 (diff)
ACLK Child Availability Messages (#9918)
* new ACLK messages for Claiming MVP1
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c23
-rw-r--r--streaming/rrdpush.c6
2 files changed, 21 insertions, 8 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 9165038cda..495a40c017 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -126,11 +126,11 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin
return PARSER_RC_OK; //the message is OK problem must be somewehere else
}
- netdata_mutex_lock(&host->claimed_id_lock);
- if (host->claimed_id)
- freez(host->claimed_id);
- host->claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
- netdata_mutex_unlock(&host->claimed_id_lock);
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ freez(host->aclk_state.claimed_id);
+ host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
+ rrdhost_aclk_state_unlock(host);
rrdpush_claimed_id(host);
@@ -440,6 +440,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // new slave connected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT);
+#endif
size_t count = streaming_parser(rpt, &cd, fp);
@@ -448,6 +454,13 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // new slave connected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT);
+#endif
+
// During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
if (!netdata_exit && rpt->host) {
rrd_rdlock();
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index bdbb2fa1c9..3b813e01fe 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -374,11 +374,11 @@ void rrdpush_claimed_id(RRDHOST *host)
return;
sender_start(host->sender);
- netdata_mutex_lock(&host->claimed_id_lock);
+ rrdhost_aclk_state_lock(host);
- buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->claimed_id ? host->claimed_id : "NULL") );
+ buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
- netdata_mutex_unlock(&host->claimed_id_lock);
+ rrdhost_aclk_state_unlock(host);
sender_commit(host->sender);
// signal the sender there are more data