diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-05-20 16:28:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-20 16:28:45 +0200 |
commit | 53efa359d60683cad6dc73ecf84d0df7ee621303 (patch) | |
tree | b1537798907719d4b33ecfed7122be67f789991c /aclk | |
parent | ae0d6007f1b2caf32202c95bd47a3e5ccfe80fa0 (diff) |
Regenerate topic base on connect (#9044)
Allow agents to be reclaimed while they are running. Fix a race hazard between claiming and the ACLK. Changes the private key, base topic, username and contents of the LWT.
Co-authored-by: <hilari@hilarimoragrega.com>
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/agent_cloud_link.c | 136 |
1 files changed, 79 insertions, 57 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 6f99bf6f15..05c0225ad8 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -24,6 +24,7 @@ static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_connected = 0; // Exposed in the web-api int aclk_force_reconnect = 0; // Indication from lower layers +int aclk_kill_link = 0; // Tell the agent to tear down the link usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer @@ -149,6 +150,9 @@ int cloud_to_agent_parse(JSON_ENTRY *e) static RSA *aclk_private_key = NULL; static int create_private_key() { + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + aclk_private_key = NULL; char filename[FILENAME_MAX + 1]; snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); @@ -429,29 +433,29 @@ struct aclk_query *aclk_queue_pop() // This will give the base topic that the agent will publish messages. // subtopics will be sent under the base topic e.g. base_topic/subtopic -// This is called by aclk_init(), to compute the base topic once and have -// it stored internally. -// Need to check if additional logic should be added to make sure that there -// is enough information to determine the base topic at init time +// This is called during the connection, we delete any previous topic +// in-case the user has changed the agent id and reclaimed. char *create_publish_base_topic() { - if (unlikely(!is_agent_claimed())) + char *agent_id = is_agent_claimed(); + if (unlikely(!agent_id)) return NULL; ACLK_LOCK; - if (unlikely(!global_base_topic)) { - char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; + if (global_base_topic) + freez(global_base_topic); + char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; - snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); - tmp = strchr(tmp_topic, '\n'); - if (unlikely(tmp)) - *tmp = '\0'; - global_base_topic = strdupz(tmp_topic); - } + snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id); + tmp = strchr(tmp_topic, '\n'); + if (unlikely(tmp)) + *tmp = '\0'; + global_base_topic = strdupz(tmp_topic); ACLK_UNLOCK; + freez(agent_id); return global_base_topic; } @@ -992,6 +996,39 @@ void *aclk_query_main_thread(void *ptr) return NULL; } +static void aclk_graceful_disconnect() +{ + size_t write_q, write_q_bytes, read_q; + time_t event_loop_timeout; + + // Send a graceful disconnect message + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, 0, 0); + buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); + aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); + buffer_free(b); + + event_loop_timeout = now_realtime_sec() + 5; + write_q = 1; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + + aclk_shutting_down = 1; + _link_shutdown(); + aclk_lws_wss_mqtt_layer_disconect_notif(); + + write_q = 1; + event_loop_timeout = now_realtime_sec() + 5; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + aclk_shutting_down = 0; +} + + // Thread cleanup static void aclk_main_cleanup(void *ptr) { @@ -1000,36 +1037,12 @@ static void aclk_main_cleanup(void *ptr) info("cleaning up..."); - if (is_agent_claimed() && aclk_connected) { - size_t write_q, write_q_bytes, read_q; - time_t event_loop_timeout; - + char *agent_id = is_agent_claimed(); + if (agent_id && aclk_connected) { + freez(agent_id); // Wakeup thread to cleanup QUERY_THREAD_WAKEUP; - // Send a graceful disconnect message - BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0); - buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); - aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); - buffer_free(b); - - event_loop_timeout = now_realtime_sec() + 5; - write_q = 1; - while (write_q && event_loop_timeout > now_realtime_sec()) { - _link_event_loop(); - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - } - - aclk_shutting_down = 1; - _link_shutdown(); - aclk_lws_wss_mqtt_layer_disconect_notif(); - - write_q = 1; - event_loop_timeout = now_realtime_sec() + 5; - while (write_q && event_loop_timeout > now_realtime_sec()) { - _link_event_loop(); - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - } + aclk_graceful_disconnect(); } @@ -1298,23 +1311,32 @@ void aclk_get_challenge(char *aclk_hostname, char *aclk_port) } if (aclk_password != NULL ) freez(aclk_password); - if (aclk_username == NULL) - aclk_username = strdupz(agent_id); aclk_password = password.result; + if (aclk_username != NULL) + freez(aclk_username); + aclk_username = agent_id; + agent_id = NULL; CLEANUP: + if (agent_id != NULL) + freez(agent_id); freez(data_buffer); return; } static void aclk_try_to_connect(char *hostname, char *port, int port_num) { + if (!aclk_private_key) { + error("Cannot try to establish the agent cloud link - no private key available!"); + return; + } info("Attempting to establish the agent cloud link"); aclk_get_challenge(hostname, port); if (aclk_password == NULL) return; int rc; aclk_connecting = 1; + create_publish_base_topic(); rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password); if (unlikely(rc)) { error("Failed to initialize the agent cloud link library"); @@ -1369,18 +1391,21 @@ void *aclk_main(void *ptr) uint32_t port_num = 0; info("Waiting for netdata to be claimed"); while(1) { - while (likely(!is_agent_claimed())) { + char *agent_id = is_agent_claimed(); + while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) goto exited; + agent_id = is_agent_claimed(); } + freez(agent_id); // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { error("Do not move the cloud base url out of post_conf_load!!"); goto exited; - } + } if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { error("Agent is claimed but the configuration is invalid, please fix"); } @@ -1399,14 +1424,19 @@ void *aclk_main(void *ptr) } } - create_publish_base_topic(); - usec_t reconnect_expiry = 0; // In usecs while (!netdata_exit) { static int first_init = 0; - size_t write_q, write_q_bytes, read_q; - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + /* size_t write_q, write_q_bytes, read_q; + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ + + if (aclk_kill_link) { // User has reloaded the claiming state + aclk_kill_link = 0; + aclk_graceful_disconnect(); + create_private_key(); + continue; + } if (aclk_force_reconnect) { aclk_lws_wss_destroy_context(); @@ -1577,14 +1607,6 @@ void aclk_disconnect() aclk_force_reconnect = 1; } -void aclk_shutdown() -{ - info("Shutdown initiated"); - aclk_connected = 0; - _link_shutdown(); - info("Shutdown complete"); -} - inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us) { uuid_t uuid; |