summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c71
-rw-r--r--aclk/aclk_query.c9
-rw-r--r--aclk/aclk_query_queue.c4
-rw-r--r--aclk/aclk_rx_msgs.c4
-rw-r--r--aclk/aclk_stats.c4
5 files changed, 51 insertions, 41 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 5afbba274e..7e5b1f8f8b 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -157,7 +157,7 @@ static int wait_till_cloud_enabled()
info("Waiting for Cloud to be enabled");
while (!netdata_cloud_setting) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
}
return 0;
@@ -176,7 +176,7 @@ static int wait_till_agent_claimed(void)
char *agent_id = get_agent_claimid();
while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
agent_id = get_agent_claimid();
}
@@ -196,7 +196,7 @@ static int wait_till_agent_claimed(void)
static int wait_till_agent_claim_ready()
{
url_t url;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
if (wait_till_agent_claimed())
return 1;
@@ -330,7 +330,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client);
static int handle_connection(mqtt_wss_client client)
{
time_t last_periodic_query_wakeup = now_monotonic_sec();
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
@@ -463,7 +463,7 @@ static int aclk_block_till_recon_allowed() {
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (recon_delay > NETDATA_EXIT_POLL_MS) {
sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
@@ -473,7 +473,7 @@ static int aclk_block_till_recon_allowed() {
sleep_usec(recon_delay * USEC_PER_MS);
recon_delay = 0;
}
- return netdata_exit;
+ return !service_running(SERVICE_ACLK);
}
#ifndef ACLK_DISABLE_CHALLENGE
@@ -516,7 +516,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error_report("Do not move the cloud base url out of post_conf_load!!");
@@ -564,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
continue;
}
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (aclk_env->encoding != ACLK_ENC_PROTO) {
@@ -748,7 +748,7 @@ void *aclk_main(void *ptr)
aclk_connected = 0;
log_access("ACLK DISCONNECTED");
}
- } while (!netdata_exit);
+ } while (service_running(SERVICE_ACLK));
aclk_graceful_disconnect(mqttwss_client);
@@ -783,35 +783,40 @@ exit:
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
- int ret;
+ int ret = 0;
if (!aclk_connected)
return;
- ret = get_node_id(&host->host_uuid, &node_id);
- if (ret > 0) {
- // this means we were not able to check if node_id already present
- error("Unable to check for node_id. Ignoring the host state update.");
- return;
+ if (host->node_id && !uuid_is_null(*host->node_id)) {
+ uuid_copy(node_id, *host->node_id);
}
- if (ret < 0) {
- // node_id not found
- aclk_query_t create_query;
- create_query = aclk_query_new(REGISTER_NODE);
- rrdhost_aclk_state_lock(localhost);
- node_instance_creation_t node_instance_creation = {
- .claim_id = localhost->aclk_state.claimed_id,
- .hops = host->system_info->hops,
- .hostname = rrdhost_hostname(host),
- .machine_guid = host->machine_guid
- };
- create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
- rrdhost_aclk_state_unlock(localhost);
- create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
- create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
- aclk_queue_query(create_query);
- return;
+ else {
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = rrdhost_hostname(host),
+ .machine_guid = host->machine_guid};
+ create_query->data.bin_payload.payload =
+ generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+ aclk_queue_query(create_query);
+ return;
+ }
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 249e2b5363..2d14badeef 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -327,6 +327,11 @@ static void worker_aclk_register(void) {
}
}
+static void aclk_query_request_cancel(void *data)
+{
+ pthread_cond_broadcast((pthread_cond_t *) data);
+}
+
/**
* Main query processing thread
*/
@@ -336,7 +341,9 @@ void *aclk_query_main_thread(void *ptr)
struct aclk_query_thread *query_thr = ptr;
- while (!netdata_exit) {
+ service_register(SERVICE_THREAD_TYPE_NETDATA, aclk_query_request_cancel, NULL, &query_cond_wait, false);
+
+ while (service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) {
aclk_query_process_msgs(query_thr);
worker_is_idle();
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 9a450571e1..e7cad5ded6 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -26,7 +26,7 @@ static inline int _aclk_queue_query(aclk_query_t query)
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
- if(!netdata_exit)
+ if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
aclk_query_free(query);
return 1;
@@ -66,7 +66,7 @@ aclk_query_t aclk_queue_pop(void)
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
- if(!netdata_exit)
+ if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
return NULL;
}
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 83bc5508be..104fbcb3ec 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -283,9 +283,7 @@ int create_node_instance_result(const char *msg, size_t msg_len)
node_state_update.live = 1;
node_state_update.hops = 0;
} else {
- netdata_mutex_lock(&host->receiver_lock);
- node_state_update.live = (host->receiver != NULL);
- netdata_mutex_unlock(&host->receiver_lock);
+ node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN));
node_state_update.hops = host->system_info->hops;
}
}
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 511ba952dc..4b6c03ed52 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -314,13 +314,13 @@ void *aclk_stats_main_thread(void *ptr)
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) {
netdata_thread_testcancel();
// ------------------------------------------------------------------------
// Wait for the next iteration point.
heartbeat_next(&hb, step_ut);
- if (netdata_exit) break;
+ if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break;
ACLK_STATS_LOCK;
// to not hold lock longer than necessary, especially not to hold it