diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-04-08 19:34:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-08 19:34:32 +0200 |
commit | 6b63ba8fe52b490f83c1983645c16f1f49591149 (patch) | |
tree | e7a7a02d749b5cfa5f1d116cd6bd1f37f268750d /aclk | |
parent | 87b17438fc076975a5d74733d775d5e1ce457758 (diff) |
Add session-id using connect timestamp (#8633)
Added a session-id to the ACLK messages to overcome a problem with the LWT timestamp being out of sequence with the rest of the message flow.
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/agent_cloud_link.c | 82 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 3 | ||||
-rw-r--r-- | aclk/mqtt.c | 33 |
3 files changed, 59 insertions, 59 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 1adaf6bcce..dfa1ad43d7 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -23,6 +23,8 @@ static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_connected = 0; // Exposed in the web-api +usec_t aclk_session_us = 0; // Used by the mqtt layer +time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; @@ -750,8 +752,8 @@ int aclk_execute_query(struct aclk_query *this_query) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id); - + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); char *encoded_response = aclk_encode_response(w->response.data); buffer_sprintf( @@ -821,11 +823,6 @@ int aclk_process_query() aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); break; - case ACLK_CMD_ALARMS: - debug(D_ACLK, "EXECUTING an alarms update command"); - aclk_send_alarm_metadata(); - break; - case ACLK_CMD_CLOUD: debug(D_ACLK, "EXECUTING a cloud command"); aclk_execute_query(this_query); @@ -939,7 +936,6 @@ void *aclk_query_main_thread(void *ptr) // Thread cleanup static void aclk_main_cleanup(void *ptr) { - char payload[512]; struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; @@ -952,24 +948,11 @@ static void aclk_main_cleanup(void *ptr) // Wakeup thread to cleanup QUERY_THREAD_WAKEUP; // Send a graceful disconnect message - char *msg_id = create_uuid(); - - usec_t time_created_offset_usec = now_realtime_usec(); - time_t time_created = time_created_offset_usec / USEC_PER_SEC; - time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; - - snprintfz( - payload, 511, - "{ \"type\": \"disconnect\"," - " \"msg-id\": \"%s\"," - " \"timestamp\": %ld," - " \"timestamp-offset-usec\": %llu," - " \"version\": %d," - " \"payload\": \"graceful\" }", - msg_id, time_created, time_created_offset_usec, ACLK_VERSION); - - aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id); - freez(msg_id); + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, 0, 0); + buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); + aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); + buffer_free(b); event_loop_timeout = now_realtime_sec() + 5; write_q = 1; @@ -1514,7 +1497,7 @@ void aclk_shutdown() info("Shutdown complete"); } -inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) +inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us) { uuid_t uuid; char uuid_str[36 + 1]; @@ -1525,9 +1508,11 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) msg_id = uuid_str; } - usec_t time_created_offset_usec = now_realtime_usec(); - time_t time_created = time_created_offset_usec / USEC_PER_SEC; - time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; + if (ts_secs == 0) { + ts_us = now_realtime_usec(); + ts_secs = ts_us / USEC_PER_SEC; + ts_us = ts_us % USEC_PER_SEC; + } buffer_sprintf( dest, @@ -1535,11 +1520,12 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) "\t\"msg-id\": \"%s\",\n" "\t\"timestamp\": %ld,\n" "\t\"timestamp-offset-usec\": %llu,\n" - "\t\"version\": %d,\n" - "\t\"payload\": ", - type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION); + "\t\"connect\": %ld,\n" + "\t\"connect-offset-usec\": %llu,\n" + "\t\"version\": %d", + type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION); - debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created); + debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs); } /* @@ -1599,7 +1585,15 @@ void aclk_send_alarm_metadata() debug(D_ACLK, "Metadata alarms start"); - aclk_create_header(local_buffer, "connect_alarms", msg_id); + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (aclk_metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0); + else + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); health_alarms2json(localhost, local_buffer, 1); @@ -1635,7 +1629,16 @@ int aclk_send_info_metadata() buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "connect", msg_id); + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (aclk_metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "connect", msg_id, 0, 0); + else + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + buffer_sprintf(local_buffer, "{\n\t \"info\" : "); web_client_api_request_v1_info_fill_buffer(localhost, local_buffer); debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); @@ -1728,7 +1731,9 @@ int aclk_send_single_chart(char *hostname, char *chart) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "chart", msg_id); + aclk_create_header(local_buffer, "chart", msg_id, 0, 0); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + rrdset2json(st, local_buffer, NULL, NULL, 1); buffer_sprintf(local_buffer, "\t\n}"); @@ -1793,7 +1798,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) char *msg_id = create_uuid(); buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); health_alarm_entry2json_nolock(local_buffer, ae, host); diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h index faf4932f84..9e6954bb35 100644 --- a/aclk/agent_cloud_link.h +++ b/aclk/agent_cloud_link.h @@ -44,7 +44,6 @@ typedef enum aclk_cmd { ACLK_CMD_CHART, ACLK_CMD_CHARTDEL, ACLK_CMD_ALARM, - ACLK_CMD_ALARMS, ACLK_CMD_MAX } ACLK_CMD; @@ -98,7 +97,7 @@ struct aclk_query * aclk_query_find(char *token, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query); int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); -void aclk_create_header(BUFFER *dest, char *type, char *msg_id); +void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us); int aclk_handle_cloud_request(char *payload); int aclk_submit_request(struct aclk_request *); void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name); diff --git a/aclk/mqtt.c b/aclk/mqtt.c index dad32b578b..72944c958f 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -5,6 +5,9 @@ #include "mqtt.h" #include "aclk_lws_wss_client.h" +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; + inline const char *_link_strerror(int rc) { return mosquitto_strerror(rc); @@ -131,6 +134,11 @@ static int _mqtt_create_connection(char *username, char *password) return MOSQ_ERR_UNKNOWN; } + // Record the session start time to allow a nominal LWT timestamp + usec_t now = now_realtime_usec(); + aclk_session_sec = now / USEC_PER_SEC; + aclk_session_us = now % USEC_PER_SEC; + _link_set_lwt("outbound/meta", 2); mosquitto_connect_callback_set(mosq, connect_callback); @@ -259,7 +267,6 @@ int _link_set_lwt(char *sub_topic, int qos) { int rc; char topic[ACLK_MAX_TOPIC + 1]; - char payload[512]; char *final_topic; final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); @@ -269,25 +276,13 @@ int _link_set_lwt(char *sub_topic, int qos) return 1; } - usec_t time_created_offset_usec = now_realtime_usec(); - time_t time_created = time_created_offset_usec / USEC_PER_SEC; - time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; - - char *msg_id = create_uuid(); - - snprintfz( - payload, 511, - "{ \"type\": \"disconnect\"," - " \"msg-id\": \"%s\"," - " \"timestamp\": %ld," - " \"timestamp-offset-usec\": %llu," - " \"version\": %d," - " \"payload\": \"unexpected\" }", - msg_id, time_created, time_created_offset_usec, ACLK_VERSION); - - freez(msg_id); + usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC); + buffer_strcat(b, ", \"payload\": \"unexpected\" }"); + rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); + buffer_free(b); - rc = mosquitto_will_set(mosq, topic, strlen(payload), (const void *) payload, qos, 0); return rc; } |