summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2020-04-09 18:26:26 +0300
committerGitHub <noreply@github.com>2020-04-09 18:26:26 +0300
commit764a0676e82c89b4a4516a31a7782ea606071fa5 (patch)
tree8ebde6a0252daa116fd0c69960201b9eb34b2189 /aclk
parentde1ff28879066d333dcf5c3809d9ec1ad3e00c4a (diff)
Improved ACLK memory management and shutdown sequence (#8611)
Improved ACLK memory management and shutdown sequence
Diffstat (limited to 'aclk')
-rw-r--r--aclk/agent_cloud_link.c38
-rw-r--r--aclk/agent_cloud_link.h4
-rw-r--r--aclk/mqtt.c7
3 files changed, 29 insertions, 20 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index dfa1ad43d7..a41d17e7bd 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -187,7 +187,7 @@ biofailed:
* should be called with
*
* mode 0 to reset the delay
- * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
*
*/
unsigned long int aclk_reconnect_delay(int mode)
@@ -210,8 +210,6 @@ unsigned long int aclk_reconnect_delay(int mode)
delay = (delay * 1000) + (random() % 1000);
}
- // sleep_usec(USEC_PER_MS * delay);
-
return delay;
}
@@ -308,7 +306,7 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
if (tmp_query->run_after == run_after) {
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
- return 1;
+ return 0;
}
if (last_query)
@@ -865,18 +863,22 @@ int aclk_process_queries()
static void aclk_query_thread_cleanup(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
info("cleaning up...");
- COLLECTOR_LOCK;
-
_reset_collector_list();
freez(collector_list);
- COLLECTOR_UNLOCK;
+ // Clean memory for pending queries if any
+ struct aclk_query *this_query;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ do {
+ this_query = aclk_queue_pop();
+ aclk_query_free(this_query);
+ } while (this_query);
+
+ freez(static_thread->thread);
+ freez(static_thread);
}
/**
@@ -913,7 +915,7 @@ void *aclk_query_main_thread(void *ptr)
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
errno = 0;
error("ACLK failed to queue on_connect command");
- aclk_metadata_submitted = 0;
+ aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
}
}
@@ -973,7 +975,6 @@ static void aclk_main_cleanup(void *ptr)
}
}
- info("Disconnected");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
@@ -1278,7 +1279,6 @@ void *aclk_main(void *ptr)
{
struct netdata_static_thread *query_thread;
- netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
if (!netdata_cloud_setting) {
info("Killing ACLK thread -> cloud functionality has been disabled");
return NULL;
@@ -1318,10 +1318,11 @@ void *aclk_main(void *ptr)
sleep_usec(USEC_PER_SEC * 60);
}
create_publish_base_topic();
- create_private_key();
usec_t reconnect_expiry = 0; // In usecs
+ netdata_thread_disable_cancelability();
+
while (!netdata_exit) {
static int first_init = 0;
size_t write_q, write_q_bytes, read_q;
@@ -1375,7 +1376,8 @@ void *aclk_main(void *ptr)
}
} // forever
exited:
- aclk_shutdown();
+ // Wakeup query thread to cleanup
+ QUERY_THREAD_WAKEUP;
freez(aclk_username);
freez(aclk_password);
@@ -1384,7 +1386,7 @@ exited:
if (aclk_private_key != NULL)
RSA_free(aclk_private_key);
- netdata_thread_cleanup_pop(1);
+ aclk_main_cleanup(ptr);
return NULL;
}
@@ -1869,6 +1871,12 @@ int aclk_handle_cloud_request(char *payload)
return 1;
}
+ // Checked to be "http", not needed anymore
+ if (likely(cloud_to_agent.type_id)) {
+ freez(cloud_to_agent.type_id);
+ cloud_to_agent.type_id = NULL;
+ }
+
if (unlikely(aclk_submit_request(&cloud_to_agent)))
debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index 9e6954bb35..f147669e5d 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -73,16 +73,12 @@ void *aclk_main(void *ptr);
extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
-//int aclk_init();
-//char *get_base_topic();
-
extern char *is_agent_claimed(void);
extern void aclk_lws_wss_mqtt_layer_disconect_notif();
char *create_uuid();
// callbacks for agent cloud link
int aclk_subscribe(char *topic, int qos);
-void aclk_shutdown();
int cloud_to_agent_parse(JSON_ENTRY *e);
void aclk_disconnect();
void aclk_connect();
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index 72944c958f..b070f7fb09 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -52,7 +52,12 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
UNUSED(obj);
UNUSED(rc);
- info("Connection to cloud failed");
+ if (netdata_exit)
+ info("Connection to cloud terminated due to agent shutdown");
+ else {
+ errno = 0;
+ error("Connection to cloud failed");
+ }
aclk_disconnect();
aclk_lws_wss_mqtt_layer_disconect_notif();