summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_query.c
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-11-26 17:26:01 +0100
committerGitHub <noreply@github.com>2020-11-26 17:26:01 +0100
commitf1db235a36d85f81c5a19a7087567405771874f7 (patch)
treef99084d252e0af0496bf4776fa2c118f15dbee1a /aclk/aclk_query.c
parent4f867a58e5287e54aa5ecae023868615cafb9f93 (diff)
ACLK Child Availability Messages (#9918)
* new ACLK messages for Claiming MVP1
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r--aclk/aclk_query.c124
1 files changed, 92 insertions, 32 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 5e4b879512..be58c2209b 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -512,6 +512,13 @@ cleanup:
return retval;
}
+#define ACLK_HOST_PTR_COMPULSORY(x) \
+ if (unlikely(!host)) { \
+ errno = 0; \
+ error(x " needs host pointer"); \
+ break; \
+ }
+
/*
* This function will fetch the next pending command and process it
*
@@ -546,25 +553,40 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
switch (this_query->cmd) {
case ACLK_CMD_ONCONNECT:
- debug(D_ACLK, "EXECUTING on connect metadata command");
- ACLK_SHARED_STATE_LOCK;
- meta_state = aclk_shared_state.metadata_submitted;
- aclk_shared_state.metadata_submitted = ACLK_METADATA_SENT;
- ACLK_SHARED_STATE_UNLOCK;
- aclk_send_metadata(meta_state);
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
+#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
+ if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
+ error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
+ break;
+ }
+#else
+#warning "This check became unnecessary. Remove"
+#endif
+
+ debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"",
+ host->hostname,
+ host->machine_guid);
+
+ rrdhost_aclk_state_lock(host);
+ meta_state = host->aclk_state.metadata;
+ host->aclk_state.metadata = ACLK_METADATA_SENT;
+ rrdhost_aclk_state_unlock(host);
+ aclk_send_metadata(meta_state, host);
break;
case ACLK_CMD_CHART:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART");
+
debug(D_ACLK, "EXECUTING a chart update command");
- if (!host)
- fatal("Pointer to host compulsory");
- aclk_send_single_chart(host->hostname, this_query->query);
+ aclk_send_single_chart(host, this_query->query);
break;
case ACLK_CMD_CHARTDEL:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL");
+
debug(D_ACLK, "EXECUTING a chart delete command");
//TODO: This send the info metadata for now
- aclk_send_info_metadata(ACLK_METADATA_SENT);
+ aclk_send_info_metadata(ACLK_METADATA_SENT, host);
break;
case ACLK_CMD_ALARM:
@@ -581,7 +603,19 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
aclk_execute_query_v2(this_query);
break;
+ case ACLK_CMD_CHILD_CONNECT:
+ case ACLK_CMD_CHILD_DISCONNECT:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT");
+
+ debug(
+ D_ACLK, "Execution Child %s command",
+ this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect");
+ aclk_send_info_child_connection(host, this_query->cmd);
+ break;
+
default:
+ errno = 0;
+ error("Unknown ACLK Query Command");
break;
}
debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
@@ -634,6 +668,39 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
}
/**
+ * Checks and updates popcorning state of rrdhost
+ * returns actual/updated popcorning state
+ */
+
+ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
+{
+ rrdhost_aclk_state_lock(host);
+ ACLK_POPCORNING_STATE ret = host->aclk_state.state;
+ if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+ }
+
+ if (!host->aclk_state.t_last_popcorn_update){
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+ }
+
+ time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update;
+
+ if (t_diff >= ACLK_STABLE_TIMEOUT) {
+ host->aclk_state.state = ACLK_HOST_STABLE;
+ host->aclk_state.t_last_popcorn_update = 0;
+ rrdhost_aclk_state_unlock(host);
+ info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff);
+ return ACLK_HOST_STABLE;
+ }
+
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+}
+
+/**
* Main query processing thread
*
* On startup wait for the agent collectors to initialize
@@ -644,32 +711,14 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
void *aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
- time_t previous_popcorn_interrupt = 0;
while (!netdata_exit) {
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
- ACLK_SHARED_STATE_UNLOCK;
- break;
- }
-
- time_t checkpoint = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
-
- if (checkpoint > ACLK_STABLE_TIMEOUT) {
- aclk_shared_state.agent_state = AGENT_STABLE;
- ACLK_SHARED_STATE_UNLOCK;
- info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
+ if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) {
#ifdef ACLK_DEBUG
_dump_collector_list();
#endif
break;
}
-
- if (previous_popcorn_interrupt != aclk_shared_state.last_popcorn_interrupt) {
- info("Waiting %ds from this moment for agent collectors to initialize." , ACLK_STABLE_TIMEOUT);
- previous_popcorn_interrupt = aclk_shared_state.last_popcorn_interrupt;
- }
- ACLK_SHARED_STATE_UNLOCK;
sleep_usec(USEC_PER_SEC * 1);
}
@@ -692,15 +741,26 @@ void *aclk_query_main_thread(void *ptr)
aclk_shared_state.version_neg = ACLK_VERSION_MIN;
aclk_set_rx_handlers(aclk_shared_state.version_neg);
}
- if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) {
- if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ ACLK_SHARED_STATE_UNLOCK;
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
+ if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
ACLK_SHARED_STATE_UNLOCK;
errno = 0;
error("ACLK failed to queue on_connect command");
sleep(1);
continue;
}
- aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED;
+ localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED;
+ }
+ rrdhost_aclk_state_unlock(localhost);
+
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
+ aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ aclk_shared_state.next_popcorn_host = NULL;
+ aclk_update_next_child_to_popcorn();
}
ACLK_SHARED_STATE_UNLOCK;