summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorAustin S. Hemmelgarn <austin@netdata.cloud>2020-04-13 10:32:33 -0400
committerAustin S. Hemmelgarn <austin@netdata.cloud>2020-04-13 10:32:33 -0400
commit983a26d1a2c110b35db252b4b79c3f03eb4eeb4b (patch)
tree85d42c30bc81514bd5c18aa564497e439290523b /aclk
parent5a12b4a7e42587058b9b42871a1316545d527a57 (diff)
Revert "Revert changes since v1.21 in pereparation for hotfix release."
Diffstat (limited to 'aclk')
-rw-r--r--aclk/agent_cloud_link.c120
-rw-r--r--aclk/agent_cloud_link.h7
-rw-r--r--aclk/mqtt.c40
3 files changed, 88 insertions, 79 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 1adaf6bcce..a41d17e7bd 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -23,6 +23,8 @@ static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0; // Exposed in the web-api
+usec_t aclk_session_us = 0; // Used by the mqtt layer
+time_t aclk_session_sec = 0; // Used by the mqtt layer
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -185,7 +187,7 @@ biofailed:
* should be called with
*
* mode 0 to reset the delay
- * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
*
*/
unsigned long int aclk_reconnect_delay(int mode)
@@ -208,8 +210,6 @@ unsigned long int aclk_reconnect_delay(int mode)
delay = (delay * 1000) + (random() % 1000);
}
- // sleep_usec(USEC_PER_MS * delay);
-
return delay;
}
@@ -306,7 +306,7 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
if (tmp_query->run_after == run_after) {
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
- return 1;
+ return 0;
}
if (last_query)
@@ -750,8 +750,8 @@ int aclk_execute_query(struct aclk_query *this_query)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "http", this_query->msg_id);
-
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
char *encoded_response = aclk_encode_response(w->response.data);
buffer_sprintf(
@@ -821,11 +821,6 @@ int aclk_process_query()
aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
break;
- case ACLK_CMD_ALARMS:
- debug(D_ACLK, "EXECUTING an alarms update command");
- aclk_send_alarm_metadata();
- break;
-
case ACLK_CMD_CLOUD:
debug(D_ACLK, "EXECUTING a cloud command");
aclk_execute_query(this_query);
@@ -868,18 +863,22 @@ int aclk_process_queries()
static void aclk_query_thread_cleanup(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
info("cleaning up...");
- COLLECTOR_LOCK;
-
_reset_collector_list();
freez(collector_list);
- COLLECTOR_UNLOCK;
+ // Clean memory for pending queries if any
+ struct aclk_query *this_query;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ do {
+ this_query = aclk_queue_pop();
+ aclk_query_free(this_query);
+ } while (this_query);
+
+ freez(static_thread->thread);
+ freez(static_thread);
}
/**
@@ -916,7 +915,7 @@ void *aclk_query_main_thread(void *ptr)
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
errno = 0;
error("ACLK failed to queue on_connect command");
- aclk_metadata_submitted = 0;
+ aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
}
}
@@ -939,7 +938,6 @@ void *aclk_query_main_thread(void *ptr)
// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
- char payload[512];
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
@@ -952,24 +950,11 @@ static void aclk_main_cleanup(void *ptr)
// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
// Send a graceful disconnect message
- char *msg_id = create_uuid();
-
- usec_t time_created_offset_usec = now_realtime_usec();
- time_t time_created = time_created_offset_usec / USEC_PER_SEC;
- time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
-
- snprintfz(
- payload, 511,
- "{ \"type\": \"disconnect\","
- " \"msg-id\": \"%s\","
- " \"timestamp\": %ld,"
- " \"timestamp-offset-usec\": %llu,"
- " \"version\": %d,"
- " \"payload\": \"graceful\" }",
- msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
-
- aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id);
- freez(msg_id);
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, 0, 0);
+ buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
+ aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
+ buffer_free(b);
event_loop_timeout = now_realtime_sec() + 5;
write_q = 1;
@@ -990,7 +975,6 @@ static void aclk_main_cleanup(void *ptr)
}
}
- info("Disconnected");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
@@ -1295,7 +1279,6 @@ void *aclk_main(void *ptr)
{
struct netdata_static_thread *query_thread;
- netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
if (!netdata_cloud_setting) {
info("Killing ACLK thread -> cloud functionality has been disabled");
return NULL;
@@ -1335,10 +1318,11 @@ void *aclk_main(void *ptr)
sleep_usec(USEC_PER_SEC * 60);
}
create_publish_base_topic();
- create_private_key();
usec_t reconnect_expiry = 0; // In usecs
+ netdata_thread_disable_cancelability();
+
while (!netdata_exit) {
static int first_init = 0;
size_t write_q, write_q_bytes, read_q;
@@ -1392,7 +1376,8 @@ void *aclk_main(void *ptr)
}
} // forever
exited:
- aclk_shutdown();
+ // Wakeup query thread to cleanup
+ QUERY_THREAD_WAKEUP;
freez(aclk_username);
freez(aclk_password);
@@ -1401,7 +1386,7 @@ exited:
if (aclk_private_key != NULL)
RSA_free(aclk_private_key);
- netdata_thread_cleanup_pop(1);
+ aclk_main_cleanup(ptr);
return NULL;
}
@@ -1514,7 +1499,7 @@ void aclk_shutdown()
info("Shutdown complete");
}
-inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
+inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
{
uuid_t uuid;
char uuid_str[36 + 1];
@@ -1525,9 +1510,11 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
msg_id = uuid_str;
}
- usec_t time_created_offset_usec = now_realtime_usec();
- time_t time_created = time_created_offset_usec / USEC_PER_SEC;
- time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
+ if (ts_secs == 0) {
+ ts_us = now_realtime_usec();
+ ts_secs = ts_us / USEC_PER_SEC;
+ ts_us = ts_us % USEC_PER_SEC;
+ }
buffer_sprintf(
dest,
@@ -1535,11 +1522,12 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
"\t\"msg-id\": \"%s\",\n"
"\t\"timestamp\": %ld,\n"
"\t\"timestamp-offset-usec\": %llu,\n"
- "\t\"version\": %d,\n"
- "\t\"payload\": ",
- type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
+ "\t\"connect\": %ld,\n"
+ "\t\"connect-offset-usec\": %llu,\n"
+ "\t\"version\": %d",
+ type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);
- debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
+ debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
}
/*
@@ -1599,7 +1587,15 @@ void aclk_send_alarm_metadata()
debug(D_ACLK, "Metadata alarms start");
- aclk_create_header(local_buffer, "connect_alarms", msg_id);
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+ if (aclk_metadata_submitted == ACLK_METADATA_SENT)
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
+ else
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
health_alarms2json(localhost, local_buffer, 1);
@@ -1635,7 +1631,16 @@ int aclk_send_info_metadata()
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "connect", msg_id);
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+ if (aclk_metadata_submitted == ACLK_METADATA_SENT)
+ aclk_create_header(local_buffer, "connect", msg_id, 0, 0);
+ else
+ aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
@@ -1728,7 +1733,9 @@ int aclk_send_single_chart(char *hostname, char *chart)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "chart", msg_id);
+ aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
rrdset2json(st, local_buffer, NULL, NULL, 1);
buffer_sprintf(local_buffer, "\t\n}");
@@ -1793,7 +1800,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
char *msg_id = create_uuid();
buffer_flush(local_buffer);
- aclk_create_header(local_buffer, "status-change", msg_id);
+ aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
health_alarm_entry2json_nolock(local_buffer, ae, host);
@@ -1863,6 +1871,12 @@ int aclk_handle_cloud_request(char *payload)
return 1;
}
+ // Checked to be "http", not needed anymore
+ if (likely(cloud_to_agent.type_id)) {
+ freez(cloud_to_agent.type_id);
+ cloud_to_agent.type_id = NULL;
+ }
+
if (unlikely(aclk_submit_request(&cloud_to_agent)))
debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index faf4932f84..f147669e5d 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -44,7 +44,6 @@ typedef enum aclk_cmd {
ACLK_CMD_CHART,
ACLK_CMD_CHARTDEL,
ACLK_CMD_ALARM,
- ACLK_CMD_ALARMS,
ACLK_CMD_MAX
} ACLK_CMD;
@@ -74,16 +73,12 @@ void *aclk_main(void *ptr);
extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
-//int aclk_init();
-//char *get_base_topic();
-
extern char *is_agent_claimed(void);
extern void aclk_lws_wss_mqtt_layer_disconect_notif();
char *create_uuid();
// callbacks for agent cloud link
int aclk_subscribe(char *topic, int qos);
-void aclk_shutdown();
int cloud_to_agent_parse(JSON_ENTRY *e);
void aclk_disconnect();
void aclk_connect();
@@ -98,7 +93,7 @@ struct aclk_query *
aclk_query_find(char *token, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query);
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-void aclk_create_header(BUFFER *dest, char *type, char *msg_id);
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us);
int aclk_handle_cloud_request(char *payload);
int aclk_submit_request(struct aclk_request *);
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index dad32b578b..b070f7fb09 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -5,6 +5,9 @@
#include "mqtt.h"
#include "aclk_lws_wss_client.h"
+extern usec_t aclk_session_us;
+extern time_t aclk_session_sec;
+
inline const char *_link_strerror(int rc)
{
return mosquitto_strerror(rc);
@@ -49,7 +52,12 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
UNUSED(obj);
UNUSED(rc);
- info("Connection to cloud failed");
+ if (netdata_exit)
+ info("Connection to cloud terminated due to agent shutdown");
+ else {
+ errno = 0;
+ error("Connection to cloud failed");
+ }
aclk_disconnect();
aclk_lws_wss_mqtt_layer_disconect_notif();
@@ -131,6 +139,11 @@ static int _mqtt_create_connection(char *username, char *password)
return MOSQ_ERR_UNKNOWN;
}
+ // Record the session start time to allow a nominal LWT timestamp
+ usec_t now = now_realtime_usec();
+ aclk_session_sec = now / USEC_PER_SEC;
+ aclk_session_us = now % USEC_PER_SEC;
+
_link_set_lwt("outbound/meta", 2);
mosquitto_connect_callback_set(mosq, connect_callback);
@@ -259,7 +272,6 @@ int _link_set_lwt(char *sub_topic, int qos)
{
int rc;
char topic[ACLK_MAX_TOPIC + 1];
- char payload[512];
char *final_topic;
final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
@@ -269,25 +281,13 @@ int _link_set_lwt(char *sub_topic, int qos)
return 1;
}
- usec_t time_created_offset_usec = now_realtime_usec();
- time_t time_created = time_created_offset_usec / USEC_PER_SEC;
- time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
-
- char *msg_id = create_uuid();
-
- snprintfz(
- payload, 511,
- "{ \"type\": \"disconnect\","
- " \"msg-id\": \"%s\","
- " \"timestamp\": %ld,"
- " \"timestamp-offset-usec\": %llu,"
- " \"version\": %d,"
- " \"payload\": \"unexpected\" }",
- msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
-
- freez(msg_id);
+ usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1;
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC);
+ buffer_strcat(b, ", \"payload\": \"unexpected\" }");
+ rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0);
+ buffer_free(b);
- rc = mosquitto_will_set(mosq, topic, strlen(payload), (const void *) payload, qos, 0);
return rc;
}