From 53efa359d60683cad6dc73ecf84d0df7ee621303 Mon Sep 17 00:00:00 2001 From: Andrew Moss <1043609+amoss@users.noreply.github.com> Date: Wed, 20 May 2020 16:28:45 +0200 Subject: Regenerate topic base on connect (#9044) Allow agents to be reclaimed while they are running. Fix a race hazard between claiming and the ACLK. Changes the private key, base topic, username and contents of the LWT. Co-authored-by: --- aclk/agent_cloud_link.c | 136 ++++++++++++++++++++++++++++-------------------- claim/claim.c | 42 +++++++++++---- web/api/web_api_v1.c | 7 ++- 3 files changed, 117 insertions(+), 68 deletions(-) diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 6f99bf6f15..05c0225ad8 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -24,6 +24,7 @@ static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_connected = 0; // Exposed in the web-api int aclk_force_reconnect = 0; // Indication from lower layers +int aclk_kill_link = 0; // Tell the agent to tear down the link usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer @@ -149,6 +150,9 @@ int cloud_to_agent_parse(JSON_ENTRY *e) static RSA *aclk_private_key = NULL; static int create_private_key() { + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + aclk_private_key = NULL; char filename[FILENAME_MAX + 1]; snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); @@ -429,29 +433,29 @@ struct aclk_query *aclk_queue_pop() // This will give the base topic that the agent will publish messages. // subtopics will be sent under the base topic e.g. base_topic/subtopic -// This is called by aclk_init(), to compute the base topic once and have -// it stored internally. -// Need to check if additional logic should be added to make sure that there -// is enough information to determine the base topic at init time +// This is called during the connection, we delete any previous topic +// in-case the user has changed the agent id and reclaimed. char *create_publish_base_topic() { - if (unlikely(!is_agent_claimed())) + char *agent_id = is_agent_claimed(); + if (unlikely(!agent_id)) return NULL; ACLK_LOCK; - if (unlikely(!global_base_topic)) { - char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; + if (global_base_topic) + freez(global_base_topic); + char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; - snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); - tmp = strchr(tmp_topic, '\n'); - if (unlikely(tmp)) - *tmp = '\0'; - global_base_topic = strdupz(tmp_topic); - } + snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id); + tmp = strchr(tmp_topic, '\n'); + if (unlikely(tmp)) + *tmp = '\0'; + global_base_topic = strdupz(tmp_topic); ACLK_UNLOCK; + freez(agent_id); return global_base_topic; } @@ -992,6 +996,39 @@ void *aclk_query_main_thread(void *ptr) return NULL; } +static void aclk_graceful_disconnect() +{ + size_t write_q, write_q_bytes, read_q; + time_t event_loop_timeout; + + // Send a graceful disconnect message + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, 0, 0); + buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); + aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); + buffer_free(b); + + event_loop_timeout = now_realtime_sec() + 5; + write_q = 1; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + + aclk_shutting_down = 1; + _link_shutdown(); + aclk_lws_wss_mqtt_layer_disconect_notif(); + + write_q = 1; + event_loop_timeout = now_realtime_sec() + 5; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + aclk_shutting_down = 0; +} + + // Thread cleanup static void aclk_main_cleanup(void *ptr) { @@ -1000,36 +1037,12 @@ static void aclk_main_cleanup(void *ptr) info("cleaning up..."); - if (is_agent_claimed() && aclk_connected) { - size_t write_q, write_q_bytes, read_q; - time_t event_loop_timeout; - + char *agent_id = is_agent_claimed(); + if (agent_id && aclk_connected) { + freez(agent_id); // Wakeup thread to cleanup QUERY_THREAD_WAKEUP; - // Send a graceful disconnect message - BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0); - buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); - aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); - buffer_free(b); - - event_loop_timeout = now_realtime_sec() + 5; - write_q = 1; - while (write_q && event_loop_timeout > now_realtime_sec()) { - _link_event_loop(); - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - } - - aclk_shutting_down = 1; - _link_shutdown(); - aclk_lws_wss_mqtt_layer_disconect_notif(); - - write_q = 1; - event_loop_timeout = now_realtime_sec() + 5; - while (write_q && event_loop_timeout > now_realtime_sec()) { - _link_event_loop(); - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - } + aclk_graceful_disconnect(); } @@ -1298,23 +1311,32 @@ void aclk_get_challenge(char *aclk_hostname, char *aclk_port) } if (aclk_password != NULL ) freez(aclk_password); - if (aclk_username == NULL) - aclk_username = strdupz(agent_id); aclk_password = password.result; + if (aclk_username != NULL) + freez(aclk_username); + aclk_username = agent_id; + agent_id = NULL; CLEANUP: + if (agent_id != NULL) + freez(agent_id); freez(data_buffer); return; } static void aclk_try_to_connect(char *hostname, char *port, int port_num) { + if (!aclk_private_key) { + error("Cannot try to establish the agent cloud link - no private key available!"); + return; + } info("Attempting to establish the agent cloud link"); aclk_get_challenge(hostname, port); if (aclk_password == NULL) return; int rc; aclk_connecting = 1; + create_publish_base_topic(); rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password); if (unlikely(rc)) { error("Failed to initialize the agent cloud link library"); @@ -1369,18 +1391,21 @@ void *aclk_main(void *ptr) uint32_t port_num = 0; info("Waiting for netdata to be claimed"); while(1) { - while (likely(!is_agent_claimed())) { + char *agent_id = is_agent_claimed(); + while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) goto exited; + agent_id = is_agent_claimed(); } + freez(agent_id); // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { error("Do not move the cloud base url out of post_conf_load!!"); goto exited; - } + } if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { error("Agent is claimed but the configuration is invalid, please fix"); } @@ -1399,14 +1424,19 @@ void *aclk_main(void *ptr) } } - create_publish_base_topic(); - usec_t reconnect_expiry = 0; // In usecs while (!netdata_exit) { static int first_init = 0; - size_t write_q, write_q_bytes, read_q; - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + /* size_t write_q, write_q_bytes, read_q; + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ + + if (aclk_kill_link) { // User has reloaded the claiming state + aclk_kill_link = 0; + aclk_graceful_disconnect(); + create_private_key(); + continue; + } if (aclk_force_reconnect) { aclk_lws_wss_destroy_context(); @@ -1577,14 +1607,6 @@ void aclk_disconnect() aclk_force_reconnect = 1; } -void aclk_shutdown() -{ - info("Shutdown initiated"); - aclk_connected = 0; - _link_shutdown(); - info("Shutdown complete"); -} - inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us) { uuid_t uuid; diff --git a/claim/claim.c b/claim/claim.c index af6ec41f76..59c824235d 100644 --- a/claim/claim.c +++ b/claim/claim.c @@ -26,12 +26,19 @@ static char *claiming_errors[] = { "Gateway Timeout", // 16 "Service Unavailable" // 17 }; - +static netdata_mutex_t claim_mutex = NETDATA_MUTEX_INITIALIZER; static char *claimed_id = NULL; -char *is_agent_claimed(void) +/* Retrieve the claim id for the agent. + * Caller owns the string. +*/ +char *is_agent_claimed() { - return claimed_id; + char *result; + netdata_mutex_lock(&claim_mutex); + result = (claimed_id == NULL) ? NULL : strdup(claimed_id); + netdata_mutex_unlock(&claim_mutex); + return result; } #define CLAIMING_COMMAND_LENGTH 16384 @@ -109,12 +116,34 @@ void claim_agent(char *claiming_arguments) #endif } +#ifdef ENABLE_ACLK +extern int aclk_connected, aclk_kill_link; +#endif + +/* Change the claimed state of the agent. + * + * This only happens when the user has explicitly requested it: + * - via the cli tool by reloading the claiming state + * - after spawning the claim because of a command-line argument + * If this happens with the ACLK active under an old claim then we MUST KILL THE LINK + */ void load_claiming_state(void) { + // -------------------------------------------------------------------- + // Check if the cloud is enabled +#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) + netdata_cloud_setting = 0; +#else + netdata_mutex_lock(&claim_mutex); if (claimed_id != NULL) { freez(claimed_id); claimed_id = NULL; } + if (aclk_connected) + { + info("Agent was already connected to Cloud - forcing reconnection under new credentials"); + aclk_kill_link = 1; + } // Propagate into aclk and registry. Be kind of atomic... appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", DEFAULT_CLOUD_BASE_URL); @@ -124,18 +153,13 @@ void load_claiming_state(void) long bytes_read; claimed_id = read_by_filename(filename, &bytes_read); + netdata_mutex_unlock(&claim_mutex); // Only the main thread can call this function, safe to release and then read if (!claimed_id) { info("Unable to load '%s', setting state to AGENT_UNCLAIMED", filename); return; } info("File '%s' was found. Setting state to AGENT_CLAIMED.", filename); - - // -------------------------------------------------------------------- - // Check if the cloud is enabled -#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) - netdata_cloud_setting = 0; -#else netdata_cloud_setting = appconfig_get_boolean(&cloud_config, CONFIG_SECTION_GLOBAL, "enabled", 1); #endif } diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c index a31f7df42e..b2a3028afa 100644 --- a/web/api/web_api_v1.c +++ b/web/api/web_api_v1.c @@ -874,10 +874,13 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb) #else buffer_strcat(wb, "\t\"cloud-available\": false,\n"); #endif - if (is_agent_claimed() == NULL) + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) buffer_strcat(wb, "\t\"agent-claimed\": false,\n"); - else + else { buffer_strcat(wb, "\t\"agent-claimed\": true,\n"); + freez(agent_id); + } #ifdef ENABLE_ACLK if (aclk_connected) buffer_strcat(wb, "\t\"aclk-available\": true\n"); -- cgit v1.2.3