summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-05-20 16:28:45 +0200
committerGitHub <noreply@github.com>2020-05-20 16:28:45 +0200
commit53efa359d60683cad6dc73ecf84d0df7ee621303 (patch)
treeb1537798907719d4b33ecfed7122be67f789991c /aclk
parentae0d6007f1b2caf32202c95bd47a3e5ccfe80fa0 (diff)
Regenerate topic base on connect (#9044)
Allow agents to be reclaimed while they are running. Fix a race hazard between claiming and the ACLK. Changes the private key, base topic, username and contents of the LWT. Co-authored-by: <hilari@hilarimoragrega.com>
Diffstat (limited to 'aclk')
-rw-r--r--aclk/agent_cloud_link.c136
1 files changed, 79 insertions, 57 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 6f99bf6f15..05c0225ad8 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -24,6 +24,7 @@ static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0; // Exposed in the web-api
int aclk_force_reconnect = 0; // Indication from lower layers
+int aclk_kill_link = 0; // Tell the agent to tear down the link
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
@@ -149,6 +150,9 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
static RSA *aclk_private_key = NULL;
static int create_private_key()
{
+ if (aclk_private_key != NULL)
+ RSA_free(aclk_private_key);
+ aclk_private_key = NULL;
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
@@ -429,29 +433,29 @@ struct aclk_query *aclk_queue_pop()
// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g. base_topic/subtopic
-// This is called by aclk_init(), to compute the base topic once and have
-// it stored internally.
-// Need to check if additional logic should be added to make sure that there
-// is enough information to determine the base topic at init time
+// This is called during the connection, we delete any previous topic
+// in-case the user has changed the agent id and reclaimed.
char *create_publish_base_topic()
{
- if (unlikely(!is_agent_claimed()))
+ char *agent_id = is_agent_claimed();
+ if (unlikely(!agent_id))
return NULL;
ACLK_LOCK;
- if (unlikely(!global_base_topic)) {
- char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
+ if (global_base_topic)
+ freez(global_base_topic);
+ char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
- snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
- tmp = strchr(tmp_topic, '\n');
- if (unlikely(tmp))
- *tmp = '\0';
- global_base_topic = strdupz(tmp_topic);
- }
+ snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
+ tmp = strchr(tmp_topic, '\n');
+ if (unlikely(tmp))
+ *tmp = '\0';
+ global_base_topic = strdupz(tmp_topic);
ACLK_UNLOCK;
+ freez(agent_id);
return global_base_topic;
}
@@ -992,6 +996,39 @@ void *aclk_query_main_thread(void *ptr)
return NULL;
}
+static void aclk_graceful_disconnect()
+{
+ size_t write_q, write_q_bytes, read_q;
+ time_t event_loop_timeout;
+
+ // Send a graceful disconnect message
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, 0, 0);
+ buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
+ aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
+ buffer_free(b);
+
+ event_loop_timeout = now_realtime_sec() + 5;
+ write_q = 1;
+ while (write_q && event_loop_timeout > now_realtime_sec()) {
+ _link_event_loop();
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ }
+
+ aclk_shutting_down = 1;
+ _link_shutdown();
+ aclk_lws_wss_mqtt_layer_disconect_notif();
+
+ write_q = 1;
+ event_loop_timeout = now_realtime_sec() + 5;
+ while (write_q && event_loop_timeout > now_realtime_sec()) {
+ _link_event_loop();
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ }
+ aclk_shutting_down = 0;
+}
+
+
// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
@@ -1000,36 +1037,12 @@ static void aclk_main_cleanup(void *ptr)
info("cleaning up...");
- if (is_agent_claimed() && aclk_connected) {
- size_t write_q, write_q_bytes, read_q;
- time_t event_loop_timeout;
-
+ char *agent_id = is_agent_claimed();
+ if (agent_id && aclk_connected) {
+ freez(agent_id);
// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
- // Send a graceful disconnect message
- BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, 0, 0);
- buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
- aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
- buffer_free(b);
-
- event_loop_timeout = now_realtime_sec() + 5;
- write_q = 1;
- while (write_q && event_loop_timeout > now_realtime_sec()) {
- _link_event_loop();
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
- }
-
- aclk_shutting_down = 1;
- _link_shutdown();
- aclk_lws_wss_mqtt_layer_disconect_notif();
-
- write_q = 1;
- event_loop_timeout = now_realtime_sec() + 5;
- while (write_q && event_loop_timeout > now_realtime_sec()) {
- _link_event_loop();
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
- }
+ aclk_graceful_disconnect();
}
@@ -1298,23 +1311,32 @@ void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
}
if (aclk_password != NULL )
freez(aclk_password);
- if (aclk_username == NULL)
- aclk_username = strdupz(agent_id);
aclk_password = password.result;
+ if (aclk_username != NULL)
+ freez(aclk_username);
+ aclk_username = agent_id;
+ agent_id = NULL;
CLEANUP:
+ if (agent_id != NULL)
+ freez(agent_id);
freez(data_buffer);
return;
}
static void aclk_try_to_connect(char *hostname, char *port, int port_num)
{
+ if (!aclk_private_key) {
+ error("Cannot try to establish the agent cloud link - no private key available!");
+ return;
+ }
info("Attempting to establish the agent cloud link");
aclk_get_challenge(hostname, port);
if (aclk_password == NULL)
return;
int rc;
aclk_connecting = 1;
+ create_publish_base_topic();
rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
if (unlikely(rc)) {
error("Failed to initialize the agent cloud link library");
@@ -1369,18 +1391,21 @@ void *aclk_main(void *ptr)
uint32_t port_num = 0;
info("Waiting for netdata to be claimed");
while(1) {
- while (likely(!is_agent_claimed())) {
+ char *agent_id = is_agent_claimed();
+ while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
if (netdata_exit)
goto exited;
+ agent_id = is_agent_claimed();
}
+ freez(agent_id);
// The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
// We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error("Do not move the cloud base url out of post_conf_load!!");
goto exited;
- }
+ }
if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
error("Agent is claimed but the configuration is invalid, please fix");
}
@@ -1399,14 +1424,19 @@ void *aclk_main(void *ptr)
}
}
- create_publish_base_topic();
-
usec_t reconnect_expiry = 0; // In usecs
while (!netdata_exit) {
static int first_init = 0;
- size_t write_q, write_q_bytes, read_q;
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ /* size_t write_q, write_q_bytes, read_q;
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
+
+ if (aclk_kill_link) { // User has reloaded the claiming state
+ aclk_kill_link = 0;
+ aclk_graceful_disconnect();
+ create_private_key();
+ continue;
+ }
if (aclk_force_reconnect) {
aclk_lws_wss_destroy_context();
@@ -1577,14 +1607,6 @@ void aclk_disconnect()
aclk_force_reconnect = 1;
}
-void aclk_shutdown()
-{
- info("Shutdown initiated");
- aclk_connected = 0;
- _link_shutdown();
- info("Shutdown complete");
-}
-
inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
{
uuid_t uuid;