summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-05-20 16:28:45 +0200
committerGitHub <noreply@github.com>2020-05-20 16:28:45 +0200
commit53efa359d60683cad6dc73ecf84d0df7ee621303 (patch)
treeb1537798907719d4b33ecfed7122be67f789991c
parentae0d6007f1b2caf32202c95bd47a3e5ccfe80fa0 (diff)
Regenerate topic base on connect (#9044)
Allow agents to be reclaimed while they are running. Fix a race hazard between claiming and the ACLK. Changes the private key, base topic, username and contents of the LWT. Co-authored-by: <hilari@hilarimoragrega.com>
-rw-r--r--aclk/agent_cloud_link.c136
-rw-r--r--claim/claim.c42
-rw-r--r--web/api/web_api_v1.c7
3 files changed, 117 insertions, 68 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 6f99bf6f15..05c0225ad8 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -24,6 +24,7 @@ static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0; // Exposed in the web-api
int aclk_force_reconnect = 0; // Indication from lower layers
+int aclk_kill_link = 0; // Tell the agent to tear down the link
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
@@ -149,6 +150,9 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
static RSA *aclk_private_key = NULL;
static int create_private_key()
{
+ if (aclk_private_key != NULL)
+ RSA_free(aclk_private_key);
+ aclk_private_key = NULL;
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
@@ -429,29 +433,29 @@ struct aclk_query *aclk_queue_pop()
// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g. base_topic/subtopic
-// This is called by aclk_init(), to compute the base topic once and have
-// it stored internally.
-// Need to check if additional logic should be added to make sure that there
-// is enough information to determine the base topic at init time
+// This is called during the connection, we delete any previous topic
+// in-case the user has changed the agent id and reclaimed.
char *create_publish_base_topic()
{
- if (unlikely(!is_agent_claimed()))
+ char *agent_id = is_agent_claimed();
+ if (unlikely(!agent_id))
return NULL;
ACLK_LOCK;
- if (unlikely(!global_base_topic)) {
- char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
+ if (global_base_topic)
+ freez(global_base_topic);
+ char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
- snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
- tmp = strchr(tmp_topic, '\n');
- if (unlikely(tmp))
- *tmp = '\0';
- global_base_topic = strdupz(tmp_topic);
- }
+ snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
+ tmp = strchr(tmp_topic, '\n');
+ if (unlikely(tmp))
+ *tmp = '\0';
+ global_base_topic = strdupz(tmp_topic);
ACLK_UNLOCK;
+ freez(agent_id);
return global_base_topic;
}
@@ -992,6 +996,39 @@ void *aclk_query_main_thread(void *ptr)
return NULL;
}
+static void aclk_graceful_disconnect()
+{
+ size_t write_q, write_q_bytes, read_q;
+ time_t event_loop_timeout;
+
+ // Send a graceful disconnect message
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, 0, 0);
+ buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
+ aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
+ buffer_free(b);
+
+ event_loop_timeout = now_realtime_sec() + 5;
+ write_q = 1;
+ while (write_q && event_loop_timeout > now_realtime_sec()) {
+ _link_event_loop();
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ }
+
+ aclk_shutting_down = 1;
+ _link_shutdown();
+ aclk_lws_wss_mqtt_layer_disconect_notif();
+
+ write_q = 1;
+ event_loop_timeout = now_realtime_sec() + 5;
+ while (write_q && event_loop_timeout > now_realtime_sec()) {
+ _link_event_loop();
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ }
+ aclk_shutting_down = 0;
+}
+
+
// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
@@ -1000,36 +1037,12 @@ static void aclk_main_cleanup(void *ptr)
info("cleaning up...");
- if (is_agent_claimed() && aclk_connected) {
- size_t write_q, write_q_bytes, read_q;
- time_t event_loop_timeout;
-
+ char *agent_id = is_agent_claimed();
+ if (agent_id && aclk_connected) {
+ freez(agent_id);
// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
- // Send a graceful disconnect message
- BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, 0, 0);
- buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
- aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
- buffer_free(b);
-
- event_loop_timeout = now_realtime_sec() + 5;
- write_q = 1;
- while (write_q && event_loop_timeout > now_realtime_sec()) {
- _link_event_loop();
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
- }
-
- aclk_shutting_down = 1;
- _link_shutdown();
- aclk_lws_wss_mqtt_layer_disconect_notif();
-
- write_q = 1;
- event_loop_timeout = now_realtime_sec() + 5;
- while (write_q && event_loop_timeout > now_realtime_sec()) {
- _link_event_loop();
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
- }
+ aclk_graceful_disconnect();
}
@@ -1298,23 +1311,32 @@ void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
}
if (aclk_password != NULL )
freez(aclk_password);
- if (aclk_username == NULL)
- aclk_username = strdupz(agent_id);
aclk_password = password.result;
+ if (aclk_username != NULL)
+ freez(aclk_username);
+ aclk_username = agent_id;
+ agent_id = NULL;
CLEANUP:
+ if (agent_id != NULL)
+ freez(agent_id);
freez(data_buffer);
return;
}
static void aclk_try_to_connect(char *hostname, char *port, int port_num)
{
+ if (!aclk_private_key) {
+ error("Cannot try to establish the agent cloud link - no private key available!");
+ return;
+ }
info("Attempting to establish the agent cloud link");
aclk_get_challenge(hostname, port);
if (aclk_password == NULL)
return;
int rc;
aclk_connecting = 1;
+ create_publish_base_topic();
rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
if (unlikely(rc)) {
error("Failed to initialize the agent cloud link library");
@@ -1369,18 +1391,21 @@ void *aclk_main(void *ptr)
uint32_t port_num = 0;
info("Waiting for netdata to be claimed");
while(1) {
- while (likely(!is_agent_claimed())) {
+ char *agent_id = is_agent_claimed();
+ while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
if (netdata_exit)
goto exited;
+ agent_id = is_agent_claimed();
}
+ freez(agent_id);
// The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
// We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error("Do not move the cloud base url out of post_conf_load!!");
goto exited;
- }
+ }
if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
error("Agent is claimed but the configuration is invalid, please fix");
}
@@ -1399,14 +1424,19 @@ void *aclk_main(void *ptr)
}
}
- create_publish_base_topic();
-
usec_t reconnect_expiry = 0; // In usecs
while (!netdata_exit) {
static int first_init = 0;
- size_t write_q, write_q_bytes, read_q;
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ /* size_t write_q, write_q_bytes, read_q;
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
+
+ if (aclk_kill_link) { // User has reloaded the claiming state
+ aclk_kill_link = 0;
+ aclk_graceful_disconnect();
+ create_private_key();
+ continue;
+ }
if (aclk_force_reconnect) {
aclk_lws_wss_destroy_context();
@@ -1577,14 +1607,6 @@ void aclk_disconnect()
aclk_force_reconnect = 1;
}
-void aclk_shutdown()
-{
- info("Shutdown initiated");
- aclk_connected = 0;
- _link_shutdown();
- info("Shutdown complete");
-}
-
inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
{
uuid_t uuid;
diff --git a/claim/claim.c b/claim/claim.c
index af6ec41f76..59c824235d 100644
--- a/claim/claim.c
+++ b/claim/claim.c
@@ -26,12 +26,19 @@ static char *claiming_errors[] = {
"Gateway Timeout", // 16
"Service Unavailable" // 17
};
-
+static netdata_mutex_t claim_mutex = NETDATA_MUTEX_INITIALIZER;
static char *claimed_id = NULL;
-char *is_agent_claimed(void)
+/* Retrieve the claim id for the agent.
+ * Caller owns the string.
+*/
+char *is_agent_claimed()
{
- return claimed_id;
+ char *result;
+ netdata_mutex_lock(&claim_mutex);
+ result = (claimed_id == NULL) ? NULL : strdup(claimed_id);
+ netdata_mutex_unlock(&claim_mutex);
+ return result;
}
#define CLAIMING_COMMAND_LENGTH 16384
@@ -109,12 +116,34 @@ void claim_agent(char *claiming_arguments)
#endif
}
+#ifdef ENABLE_ACLK
+extern int aclk_connected, aclk_kill_link;
+#endif
+
+/* Change the claimed state of the agent.
+ *
+ * This only happens when the user has explicitly requested it:
+ * - via the cli tool by reloading the claiming state
+ * - after spawning the claim because of a command-line argument
+ * If this happens with the ACLK active under an old claim then we MUST KILL THE LINK
+ */
void load_claiming_state(void)
{
+ // --------------------------------------------------------------------
+ // Check if the cloud is enabled
+#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
+ netdata_cloud_setting = 0;
+#else
+ netdata_mutex_lock(&claim_mutex);
if (claimed_id != NULL) {
freez(claimed_id);
claimed_id = NULL;
}
+ if (aclk_connected)
+ {
+ info("Agent was already connected to Cloud - forcing reconnection under new credentials");
+ aclk_kill_link = 1;
+ }
// Propagate into aclk and registry. Be kind of atomic...
appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", DEFAULT_CLOUD_BASE_URL);
@@ -124,18 +153,13 @@ void load_claiming_state(void)
long bytes_read;
claimed_id = read_by_filename(filename, &bytes_read);
+ netdata_mutex_unlock(&claim_mutex); // Only the main thread can call this function, safe to release and then read
if (!claimed_id) {
info("Unable to load '%s', setting state to AGENT_UNCLAIMED", filename);
return;
}
info("File '%s' was found. Setting state to AGENT_CLAIMED.", filename);
-
- // --------------------------------------------------------------------
- // Check if the cloud is enabled
-#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
- netdata_cloud_setting = 0;
-#else
netdata_cloud_setting = appconfig_get_boolean(&cloud_config, CONFIG_SECTION_GLOBAL, "enabled", 1);
#endif
}
diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c
index a31f7df42e..b2a3028afa 100644
--- a/web/api/web_api_v1.c
+++ b/web/api/web_api_v1.c
@@ -874,10 +874,13 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
#else
buffer_strcat(wb, "\t\"cloud-available\": false,\n");
#endif
- if (is_agent_claimed() == NULL)
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
buffer_strcat(wb, "\t\"agent-claimed\": false,\n");
- else
+ else {
buffer_strcat(wb, "\t\"agent-claimed\": true,\n");
+ freez(agent_id);
+ }
#ifdef ENABLE_ACLK
if (aclk_connected)
buffer_strcat(wb, "\t\"aclk-available\": true\n");