summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-04-08 19:34:32 +0200
committerGitHub <noreply@github.com>2020-04-08 19:34:32 +0200
commit6b63ba8fe52b490f83c1983645c16f1f49591149 (patch)
treee7a7a02d749b5cfa5f1d116cd6bd1f37f268750d /aclk
parent87b17438fc076975a5d74733d775d5e1ce457758 (diff)
Add session-id using connect timestamp (#8633)
Added a session-id to the ACLK messages to overcome a problem with the LWT timestamp being out of sequence with the rest of the message flow.
Diffstat (limited to 'aclk')
-rw-r--r--aclk/agent_cloud_link.c82
-rw-r--r--aclk/agent_cloud_link.h3
-rw-r--r--aclk/mqtt.c33
3 files changed, 59 insertions, 59 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 1adaf6bcce..dfa1ad43d7 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -23,6 +23,8 @@ static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0; // Exposed in the web-api
+usec_t aclk_session_us = 0; // Used by the mqtt layer
+time_t aclk_session_sec = 0; // Used by the mqtt layer
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -750,8 +752,8 @@ int aclk_execute_query(struct aclk_query *this_query)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "http", this_query->msg_id);
-
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
char *encoded_response = aclk_encode_response(w->response.data);
buffer_sprintf(
@@ -821,11 +823,6 @@ int aclk_process_query()
aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
break;
- case ACLK_CMD_ALARMS:
- debug(D_ACLK, "EXECUTING an alarms update command");
- aclk_send_alarm_metadata();
- break;
-
case ACLK_CMD_CLOUD:
debug(D_ACLK, "EXECUTING a cloud command");
aclk_execute_query(this_query);
@@ -939,7 +936,6 @@ void *aclk_query_main_thread(void *ptr)
// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
- char payload[512];
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
@@ -952,24 +948,11 @@ static void aclk_main_cleanup(void *ptr)
// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
// Send a graceful disconnect message
- char *msg_id = create_uuid();
-
- usec_t time_created_offset_usec = now_realtime_usec();
- time_t time_created = time_created_offset_usec / USEC_PER_SEC;
- time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
-
- snprintfz(
- payload, 511,
- "{ \"type\": \"disconnect\","
- " \"msg-id\": \"%s\","
- " \"timestamp\": %ld,"
- " \"timestamp-offset-usec\": %llu,"
- " \"version\": %d,"
- " \"payload\": \"graceful\" }",
- msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
-
- aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id);
- freez(msg_id);
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, 0, 0);
+ buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
+ aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
+ buffer_free(b);
event_loop_timeout = now_realtime_sec() + 5;
write_q = 1;
@@ -1514,7 +1497,7 @@ void aclk_shutdown()
info("Shutdown complete");
}
-inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
+inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
{
uuid_t uuid;
char uuid_str[36 + 1];
@@ -1525,9 +1508,11 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
msg_id = uuid_str;
}
- usec_t time_created_offset_usec = now_realtime_usec();
- time_t time_created = time_created_offset_usec / USEC_PER_SEC;
- time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
+ if (ts_secs == 0) {
+ ts_us = now_realtime_usec();
+ ts_secs = ts_us / USEC_PER_SEC;
+ ts_us = ts_us % USEC_PER_SEC;
+ }
buffer_sprintf(
dest,
@@ -1535,11 +1520,12 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
"\t\"msg-id\": \"%s\",\n"
"\t\"timestamp\": %ld,\n"
"\t\"timestamp-offset-usec\": %llu,\n"
- "\t\"version\": %d,\n"
- "\t\"payload\": ",
- type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
+ "\t\"connect\": %ld,\n"
+ "\t\"connect-offset-usec\": %llu,\n"
+ "\t\"version\": %d",
+ type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);
- debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
+ debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
}
/*
@@ -1599,7 +1585,15 @@ void aclk_send_alarm_metadata()
debug(D_ACLK, "Metadata alarms start");
- aclk_create_header(local_buffer, "connect_alarms", msg_id);
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+ if (aclk_metadata_submitted == ACLK_METADATA_SENT)
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
+ else
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
health_alarms2json(localhost, local_buffer, 1);
@@ -1635,7 +1629,16 @@ int aclk_send_info_metadata()
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "connect", msg_id);
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+ if (aclk_metadata_submitted == ACLK_METADATA_SENT)
+ aclk_create_header(local_buffer, "connect", msg_id, 0, 0);
+ else
+ aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
@@ -1728,7 +1731,9 @@ int aclk_send_single_chart(char *hostname, char *chart)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "chart", msg_id);
+ aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
rrdset2json(st, local_buffer, NULL, NULL, 1);
buffer_sprintf(local_buffer, "\t\n}");
@@ -1793,7 +1798,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
char *msg_id = create_uuid();
buffer_flush(local_buffer);
- aclk_create_header(local_buffer, "status-change", msg_id);
+ aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
health_alarm_entry2json_nolock(local_buffer, ae, host);
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index faf4932f84..9e6954bb35 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -44,7 +44,6 @@ typedef enum aclk_cmd {
ACLK_CMD_CHART,
ACLK_CMD_CHARTDEL,
ACLK_CMD_ALARM,
- ACLK_CMD_ALARMS,
ACLK_CMD_MAX
} ACLK_CMD;
@@ -98,7 +97,7 @@ struct aclk_query *
aclk_query_find(char *token, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query);
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-void aclk_create_header(BUFFER *dest, char *type, char *msg_id);
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us);
int aclk_handle_cloud_request(char *payload);
int aclk_submit_request(struct aclk_request *);
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index dad32b578b..72944c958f 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -5,6 +5,9 @@
#include "mqtt.h"
#include "aclk_lws_wss_client.h"
+extern usec_t aclk_session_us;
+extern time_t aclk_session_sec;
+
inline const char *_link_strerror(int rc)
{
return mosquitto_strerror(rc);
@@ -131,6 +134,11 @@ static int _mqtt_create_connection(char *username, char *password)
return MOSQ_ERR_UNKNOWN;
}
+ // Record the session start time to allow a nominal LWT timestamp
+ usec_t now = now_realtime_usec();
+ aclk_session_sec = now / USEC_PER_SEC;
+ aclk_session_us = now % USEC_PER_SEC;
+
_link_set_lwt("outbound/meta", 2);
mosquitto_connect_callback_set(mosq, connect_callback);
@@ -259,7 +267,6 @@ int _link_set_lwt(char *sub_topic, int qos)
{
int rc;
char topic[ACLK_MAX_TOPIC + 1];
- char payload[512];
char *final_topic;
final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
@@ -269,25 +276,13 @@ int _link_set_lwt(char *sub_topic, int qos)
return 1;
}
- usec_t time_created_offset_usec = now_realtime_usec();
- time_t time_created = time_created_offset_usec / USEC_PER_SEC;
- time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
-
- char *msg_id = create_uuid();
-
- snprintfz(
- payload, 511,
- "{ \"type\": \"disconnect\","
- " \"msg-id\": \"%s\","
- " \"timestamp\": %ld,"
- " \"timestamp-offset-usec\": %llu,"
- " \"version\": %d,"
- " \"payload\": \"unexpected\" }",
- msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
-
- freez(msg_id);
+ usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1;
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC);
+ buffer_strcat(b, ", \"payload\": \"unexpected\" }");
+ rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0);
+ buffer_free(b);
- rc = mosquitto_will_set(mosq, topic, strlen(payload), (const void *) payload, qos, 0);
return rc;
}