diff options
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 71 | ||||
-rw-r--r-- | aclk/aclk_query.c | 9 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 4 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 4 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 4 |
5 files changed, 51 insertions, 41 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 5afbba274e..7e5b1f8f8b 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -157,7 +157,7 @@ static int wait_till_cloud_enabled() info("Waiting for Cloud to be enabled"); while (!netdata_cloud_setting) { sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; } return 0; @@ -176,7 +176,7 @@ static int wait_till_agent_claimed(void) char *agent_id = get_agent_claimid(); while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; agent_id = get_agent_claimid(); } @@ -196,7 +196,7 @@ static int wait_till_agent_claimed(void) static int wait_till_agent_claim_ready() { url_t url; - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { if (wait_till_agent_claimed()) return 1; @@ -330,7 +330,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client); static int handle_connection(mqtt_wss_client client) { time_t last_periodic_query_wakeup = now_monotonic_sec(); - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { // timeout 1000 to check at least once a second // for netdata_exit if (mqtt_wss_service(client, 1000) < 0){ @@ -463,7 +463,7 @@ static int aclk_block_till_recon_allowed() { // we want to wake up from time to time to check netdata_exit while (recon_delay) { - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; if (recon_delay > NETDATA_EXIT_POLL_MS) { sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS); @@ -473,7 +473,7 @@ static int aclk_block_till_recon_allowed() { sleep_usec(recon_delay * USEC_PER_MS); recon_delay = 0; } - return netdata_exit; + return !service_running(SERVICE_ACLK); } #ifndef ACLK_DISABLE_CHALLENGE @@ -516,7 +516,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { error_report("Do not move the cloud base url out of post_conf_load!!"); @@ -564,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) continue; } - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; if (aclk_env->encoding != ACLK_ENC_PROTO) { @@ -748,7 +748,7 @@ void *aclk_main(void *ptr) aclk_connected = 0; log_access("ACLK DISCONNECTED"); } - } while (!netdata_exit); + } while (service_running(SERVICE_ACLK)); aclk_graceful_disconnect(mqttwss_client); @@ -783,35 +783,40 @@ exit: void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; - int ret; + int ret = 0; if (!aclk_connected) return; - ret = get_node_id(&host->host_uuid, &node_id); - if (ret > 0) { - // this means we were not able to check if node_id already present - error("Unable to check for node_id. Ignoring the host state update."); - return; + if (host->node_id && !uuid_is_null(*host->node_id)) { + uuid_copy(node_id, *host->node_id); } - if (ret < 0) { - // node_id not found - aclk_query_t create_query; - create_query = aclk_query_new(REGISTER_NODE); - rrdhost_aclk_state_lock(localhost); - node_instance_creation_t node_instance_creation = { - .claim_id = localhost->aclk_state.claimed_id, - .hops = host->system_info->hops, - .hostname = rrdhost_hostname(host), - .machine_guid = host->machine_guid - }; - create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); - rrdhost_aclk_state_unlock(localhost); - create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; - create_query->data.bin_payload.msg_name = "CreateNodeInstance"; - info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); - aclk_queue_query(create_query); - return; + else { + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + node_instance_creation_t node_instance_creation = { + .claim_id = localhost->aclk_state.claimed_id, + .hops = host->system_info->hops, + .hostname = rrdhost_hostname(host), + .machine_guid = host->machine_guid}; + create_query->data.bin_payload.payload = + generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); + rrdhost_aclk_state_unlock(localhost); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; + info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); + aclk_queue_query(create_query); + return; + } } aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 249e2b5363..2d14badeef 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -327,6 +327,11 @@ static void worker_aclk_register(void) { } } +static void aclk_query_request_cancel(void *data) +{ + pthread_cond_broadcast((pthread_cond_t *) data); +} + /** * Main query processing thread */ @@ -336,7 +341,9 @@ void *aclk_query_main_thread(void *ptr) struct aclk_query_thread *query_thr = ptr; - while (!netdata_exit) { + service_register(SERVICE_THREAD_TYPE_NETDATA, aclk_query_request_cancel, NULL, &query_cond_wait, false); + + while (service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) { aclk_query_process_msgs(query_thr); worker_is_idle(); diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 9a450571e1..e7cad5ded6 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -26,7 +26,7 @@ static inline int _aclk_queue_query(aclk_query_t query) ACLK_QUEUE_LOCK; if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; - if(!netdata_exit) + if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); aclk_query_free(query); return 1; @@ -66,7 +66,7 @@ aclk_query_t aclk_queue_pop(void) ACLK_QUEUE_LOCK; if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; - if(!netdata_exit) + if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); return NULL; } diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 83bc5508be..104fbcb3ec 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -283,9 +283,7 @@ int create_node_instance_result(const char *msg, size_t msg_len) node_state_update.live = 1; node_state_update.hops = 0; } else { - netdata_mutex_lock(&host->receiver_lock); - node_state_update.live = (host->receiver != NULL); - netdata_mutex_unlock(&host->receiver_lock); + node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)); node_state_update.hops = host->system_info->hops; } } diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 511ba952dc..4b6c03ed52 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -314,13 +314,13 @@ void *aclk_stats_main_thread(void *ptr) struct aclk_metrics_per_sample per_sample; struct aclk_metrics permanent; - while (!netdata_exit) { + while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) { netdata_thread_testcancel(); // ------------------------------------------------------------------------ // Wait for the next iteration point. heartbeat_next(&hb, step_ut); - if (netdata_exit) break; + if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break; ACLK_STATS_LOCK; // to not hold lock longer than necessary, especially not to hold it |