diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-11-26 17:26:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-26 17:26:01 +0100 |
commit | f1db235a36d85f81c5a19a7087567405771874f7 (patch) | |
tree | f99084d252e0af0496bf4776fa2c118f15dbee1a /streaming | |
parent | 4f867a58e5287e54aa5ecae023868615cafb9f93 (diff) |
ACLK Child Availability Messages (#9918)
* new ACLK messages for Claiming MVP1
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 23 | ||||
-rw-r--r-- | streaming/rrdpush.c | 6 |
2 files changed, 21 insertions, 8 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 9165038cda..495a40c017 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -126,11 +126,11 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin return PARSER_RC_OK; //the message is OK problem must be somewehere else } - netdata_mutex_lock(&host->claimed_id_lock); - if (host->claimed_id) - freez(host->claimed_id); - host->claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; - netdata_mutex_unlock(&host->claimed_id_lock); + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + freez(host->aclk_state.claimed_id); + host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; + rrdhost_aclk_state_unlock(host); rrdpush_claimed_id(host); @@ -440,6 +440,12 @@ static int rrdpush_receive(struct receiver_state *rpt) cd.version = rpt->stream_version; +#ifdef ENABLE_ACLK + // in case we have cloud connection we inform cloud + // new slave connected + if (netdata_cloud_setting) + aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT); +#endif size_t count = streaming_parser(rpt, &cd, fp); @@ -448,6 +454,13 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); +#ifdef ENABLE_ACLK + // in case we have cloud connection we inform cloud + // new slave connected + if (netdata_cloud_setting) + aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT); +#endif + // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread if (!netdata_exit && rpt->host) { rrd_rdlock(); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index bdbb2fa1c9..3b813e01fe 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -374,11 +374,11 @@ void rrdpush_claimed_id(RRDHOST *host) return; sender_start(host->sender); - netdata_mutex_lock(&host->claimed_id_lock); + rrdhost_aclk_state_lock(host); - buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->claimed_id ? host->claimed_id : "NULL") ); + buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") ); - netdata_mutex_unlock(&host->claimed_id_lock); + rrdhost_aclk_state_unlock(host); sender_commit(host->sender); // signal the sender there are more data |