diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-11-26 17:26:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-26 17:26:01 +0100 |
commit | f1db235a36d85f81c5a19a7087567405771874f7 (patch) | |
tree | f99084d252e0af0496bf4776fa2c118f15dbee1a /aclk/aclk_query.c | |
parent | 4f867a58e5287e54aa5ecae023868615cafb9f93 (diff) |
ACLK Child Availability Messages (#9918)
* new ACLK messages for Claiming MVP1
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r-- | aclk/aclk_query.c | 124 |
1 files changed, 92 insertions, 32 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 5e4b879512..be58c2209b 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -512,6 +512,13 @@ cleanup: return retval; } +#define ACLK_HOST_PTR_COMPULSORY(x) \ + if (unlikely(!host)) { \ + errno = 0; \ + error(x " needs host pointer"); \ + break; \ + } + /* * This function will fetch the next pending command and process it * @@ -546,25 +553,40 @@ static int aclk_process_query(struct aclk_query_thread *t_info) switch (this_query->cmd) { case ACLK_CMD_ONCONNECT: - debug(D_ACLK, "EXECUTING on connect metadata command"); - ACLK_SHARED_STATE_LOCK; - meta_state = aclk_shared_state.metadata_submitted; - aclk_shared_state.metadata_submitted = ACLK_METADATA_SENT; - ACLK_SHARED_STATE_UNLOCK; - aclk_send_metadata(meta_state); + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { + error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); + break; + } +#else +#warning "This check became unnecessary. Remove" +#endif + + debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"", + host->hostname, + host->machine_guid); + + rrdhost_aclk_state_lock(host); + meta_state = host->aclk_state.metadata; + host->aclk_state.metadata = ACLK_METADATA_SENT; + rrdhost_aclk_state_unlock(host); + aclk_send_metadata(meta_state, host); break; case ACLK_CMD_CHART: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART"); + debug(D_ACLK, "EXECUTING a chart update command"); - if (!host) - fatal("Pointer to host compulsory"); - aclk_send_single_chart(host->hostname, this_query->query); + aclk_send_single_chart(host, this_query->query); break; case ACLK_CMD_CHARTDEL: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL"); + debug(D_ACLK, "EXECUTING a chart delete command"); //TODO: This send the info metadata for now - aclk_send_info_metadata(ACLK_METADATA_SENT); + aclk_send_info_metadata(ACLK_METADATA_SENT, host); break; case ACLK_CMD_ALARM: @@ -581,7 +603,19 @@ static int aclk_process_query(struct aclk_query_thread *t_info) aclk_execute_query_v2(this_query); break; + case ACLK_CMD_CHILD_CONNECT: + case ACLK_CMD_CHILD_DISCONNECT: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT"); + + debug( + D_ACLK, "Execution Child %s command", + this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect"); + aclk_send_info_child_connection(host, this_query->cmd); + break; + default: + errno = 0; + error("Unknown ACLK Query Command"); break; } debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); @@ -634,6 +668,39 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) } /** + * Checks and updates popcorning state of rrdhost + * returns actual/updated popcorning state + */ + +ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + ACLK_POPCORNING_STATE ret = host->aclk_state.state; + if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + if (!host->aclk_state.t_last_popcorn_update){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update; + + if (t_diff >= ACLK_STABLE_TIMEOUT) { + host->aclk_state.state = ACLK_HOST_STABLE; + host->aclk_state.t_last_popcorn_update = 0; + rrdhost_aclk_state_unlock(host); + info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff); + return ACLK_HOST_STABLE; + } + + rrdhost_aclk_state_unlock(host); + return ret; +} + +/** * Main query processing thread * * On startup wait for the agent collectors to initialize @@ -644,32 +711,14 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) void *aclk_query_main_thread(void *ptr) { struct aclk_query_thread *info = ptr; - time_t previous_popcorn_interrupt = 0; while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { - ACLK_SHARED_STATE_UNLOCK; - break; - } - - time_t checkpoint = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; - - if (checkpoint > ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = AGENT_STABLE; - ACLK_SHARED_STATE_UNLOCK; - info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint); + if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) { #ifdef ACLK_DEBUG _dump_collector_list(); #endif break; } - - if (previous_popcorn_interrupt != aclk_shared_state.last_popcorn_interrupt) { - info("Waiting %ds from this moment for agent collectors to initialize." , ACLK_STABLE_TIMEOUT); - previous_popcorn_interrupt = aclk_shared_state.last_popcorn_interrupt; - } - ACLK_SHARED_STATE_UNLOCK; sleep_usec(USEC_PER_SEC * 1); } @@ -692,15 +741,26 @@ void *aclk_query_main_thread(void *ptr) aclk_shared_state.version_neg = ACLK_VERSION_MIN; aclk_set_rx_handlers(aclk_shared_state.version_neg); } - if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) { - if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + ACLK_SHARED_STATE_UNLOCK; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { + if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { ACLK_SHARED_STATE_UNLOCK; errno = 0; error("ACLK failed to queue on_connect command"); sleep(1); continue; } - aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED; + localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED; + } + rrdhost_aclk_state_unlock(localhost); + + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { + aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); } ACLK_SHARED_STATE_UNLOCK; |