diff options
56 files changed, 1647 insertions, 442 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c5254b6e6..ca0ab99946 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1112,6 +1112,8 @@ endif() -Wl,--wrap=recv -Wl,--wrap=send -Wl,--wrap=connect_to_one_of + -Wl,--wrap=create_main_rusage_chart + -Wl,--wrap=send_main_rusage ${PROMETHEUS_REMOTE_WRITE_LINK_OPTIONS} ${KINESIS_LINK_OPTIONS} ${MONGODB_LINK_OPTIONS} diff --git a/Makefile.am b/Makefile.am index e4e240cbda..be84fc09ac 100644 --- a/Makefile.am +++ b/Makefile.am @@ -573,6 +573,7 @@ NETDATA_FILES = \ $(LIBNETDATA_FILES) \ $(API_PLUGIN_FILES) \ $(BACKENDS_PLUGIN_FILES) \ + $(EXPORTING_ENGINE_FILES) \ $(CHECKS_PLUGIN_FILES) \ $(HEALTH_PLUGIN_FILES) \ $(IDLEJITTER_PLUGIN_FILES) \ @@ -608,12 +609,6 @@ if LINUX endif -if ENABLE_EXPORTING - NETDATA_FILES += \ - $(EXPORTING_ENGINE_FILES) \ - $(NULL) -endif - NETDATA_COMMON_LIBS = \ $(OPTIONAL_MATH_LIBS) \ $(OPTIONAL_ZLIB_LIBS) \ @@ -745,23 +740,13 @@ if ENABLE_PLUGIN_SLABINFO $(NULL) endif -if ENABLE_EXPORTING -if ENABLE_BACKEND_KINESIS - netdata_SOURCES += $(KINESIS_EXPORTING_FILES) - netdata_LDADD += $(OPTIONAL_KINESIS_LIBS) -endif -endif - if ENABLE_BACKEND_KINESIS - netdata_SOURCES += $(KINESIS_BACKEND_FILES) + netdata_SOURCES += $(KINESIS_BACKEND_FILES) $(KINESIS_EXPORTING_FILES) netdata_LDADD += $(OPTIONAL_KINESIS_LIBS) endif if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE -if ENABLE_EXPORTING - netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) -endif - netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) + netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) BUILT_SOURCES = \ exporting/prometheus/remote_write/remote_write.pb.cc \ @@ -775,15 +760,8 @@ exporting/prometheus/remote_write/remote_write.pb.h: exporting/prometheus/remote endif -if ENABLE_EXPORTING -if ENABLE_BACKEND_MONGODB - netdata_SOURCES += $(MONGODB_EXPORTING_FILES) - netdata_LDADD += $(OPTIONAL_MONGOC_LIBS) -endif -endif - if ENABLE_BACKEND_MONGODB - netdata_SOURCES += $(MONGODB_BACKEND_FILES) + netdata_SOURCES += $(MONGODB_BACKEND_FILES) $(MONGODB_EXPORTING_FILES) netdata_LDADD += $(OPTIONAL_MONGOC_LIBS) endif @@ -895,6 +873,8 @@ if ENABLE_UNITTESTS -Wl,--wrap=recv \ -Wl,--wrap=send \ -Wl,--wrap=connect_to_one_of \ + -Wl,--wrap=create_main_rusage_chart \ + -Wl,--wrap=send_main_rusage \ $(TEST_LDFLAGS) \ $(NULL) exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS) @@ -1,6 +1,7 @@ <!-- --- title: "Netdata" +date: 2020-04-06 custom_edit_url: https://github.com/netdata/netdata/edit/master/README.md --- --> @@ -33,7 +34,7 @@ granularity. Run this long-term storage autonomously, or integrate Netdata with Netdata is **fast** and **efficient**, designed to permanently run on all systems (**physical** and **virtual** servers, **containers**, **IoT** devices), without disrupting their core function. -Netdata is **free, open-source software** and it currently runs on **Linux**, **FreeBSD**, and **MacOS**, along with +Netdata is **free, open-source software** and it currently runs on **Linux**, **FreeBSD**, and **macOS**, along with other systems derived from them, such as **Kubernetes** and **Docker**. Netdata is not hosted by the CNCF but is the 3rd most starred open-source project in the [Cloud Native Computing diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 1adaf6bcce..a41d17e7bd 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -23,6 +23,8 @@ static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_connected = 0; // Exposed in the web-api +usec_t aclk_session_us = 0; // Used by the mqtt layer +time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; @@ -185,7 +187,7 @@ biofailed: * should be called with * * mode 0 to reset the delay - * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms * */ unsigned long int aclk_reconnect_delay(int mode) @@ -208,8 +210,6 @@ unsigned long int aclk_reconnect_delay(int mode) delay = (delay * 1000) + (random() % 1000); } - // sleep_usec(USEC_PER_MS * delay); - return delay; } @@ -306,7 +306,7 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run if (tmp_query->run_after == run_after) { QUERY_UNLOCK; QUERY_THREAD_WAKEUP; - return 1; + return 0; } if (last_query) @@ -750,8 +750,8 @@ int aclk_execute_query(struct aclk_query *this_query) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id); - + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); char *encoded_response = aclk_encode_response(w->response.data); buffer_sprintf( @@ -821,11 +821,6 @@ int aclk_process_query() aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); break; - case ACLK_CMD_ALARMS: - debug(D_ACLK, "EXECUTING an alarms update command"); - aclk_send_alarm_metadata(); - break; - case ACLK_CMD_CLOUD: debug(D_ACLK, "EXECUTING a cloud command"); aclk_execute_query(this_query); @@ -868,18 +863,22 @@ int aclk_process_queries() static void aclk_query_thread_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; info("cleaning up..."); - COLLECTOR_LOCK; - _reset_collector_list(); freez(collector_list); - COLLECTOR_UNLOCK; + // Clean memory for pending queries if any + struct aclk_query *this_query; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + do { + this_query = aclk_queue_pop(); + aclk_query_free(this_query); + } while (this_query); + + freez(static_thread->thread); + freez(static_thread); } /** @@ -916,7 +915,7 @@ void *aclk_query_main_thread(void *ptr) if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { errno = 0; error("ACLK failed to queue on_connect command"); - aclk_metadata_submitted = 0; + aclk_metadata_submitted = ACLK_METADATA_REQUIRED; } } @@ -939,7 +938,6 @@ void *aclk_query_main_thread(void *ptr) // Thread cleanup static void aclk_main_cleanup(void *ptr) { - char payload[512]; struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; @@ -952,24 +950,11 @@ static void aclk_main_cleanup(void *ptr) // Wakeup thread to cleanup QUERY_THREAD_WAKEUP; // Send a graceful disconnect message - char *msg_id = create_uuid(); - - usec_t time_created_offset_usec = now_realtime_usec(); - time_t time_created = time_created_offset_usec / USEC_PER_SEC; - time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; - - snprintfz( - payload, 511, - "{ \"type\": \"disconnect\"," - " \"msg-id\": \"%s\"," - " \"timestamp\": %ld," - " \"timestamp-offset-usec\": %llu," - " \"version\": %d," - " \"payload\": \"graceful\" }", - msg_id, time_created, time_created_offset_usec, ACLK_VERSION); - - aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id); - freez(msg_id); + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, 0, 0); + buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); + aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); + buffer_free(b); event_loop_timeout = now_realtime_sec() + 5; write_q = 1; @@ -990,7 +975,6 @@ static void aclk_main_cleanup(void *ptr) } } - info("Disconnected"); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } @@ -1295,7 +1279,6 @@ void *aclk_main(void *ptr) { struct netdata_static_thread *query_thread; - netdata_thread_cleanup_push(aclk_main_cleanup, ptr); if (!netdata_cloud_setting) { info("Killing ACLK thread -> cloud functionality has been disabled"); return NULL; @@ -1335,10 +1318,11 @@ void *aclk_main(void *ptr) sleep_usec(USEC_PER_SEC * 60); } create_publish_base_topic(); - create_private_key(); usec_t reconnect_expiry = 0; // In usecs + netdata_thread_disable_cancelability(); + while (!netdata_exit) { static int first_init = 0; size_t write_q, write_q_bytes, read_q; @@ -1392,7 +1376,8 @@ void *aclk_main(void *ptr) } } // forever exited: - aclk_shutdown(); + // Wakeup query thread to cleanup + QUERY_THREAD_WAKEUP; freez(aclk_username); freez(aclk_password); @@ -1401,7 +1386,7 @@ exited: if (aclk_private_key != NULL) RSA_free(aclk_private_key); - netdata_thread_cleanup_pop(1); + aclk_main_cleanup(ptr); return NULL; } @@ -1514,7 +1499,7 @@ void aclk_shutdown() info("Shutdown complete"); } -inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) +inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us) { uuid_t uuid; char uuid_str[36 + 1]; @@ -1525,9 +1510,11 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) msg_id = uuid_str; } - usec_t time_created_offset_usec = now_realtime_usec(); - time_t time_created = time_created_offset_usec / USEC_PER_SEC; - time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; + if (ts_secs == 0) { + ts_us = now_realtime_usec(); + ts_secs = ts_us / USEC_PER_SEC; + ts_us = ts_us % USEC_PER_SEC; + } buffer_sprintf( dest, @@ -1535,11 +1522,12 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) "\t\"msg-id\": \"%s\",\n" "\t\"timestamp\": %ld,\n" "\t\"timestamp-offset-usec\": %llu,\n" - "\t\"version\": %d,\n" - "\t\"payload\": ", - type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION); + "\t\"connect\": %ld,\n" + "\t\"connect-offset-usec\": %llu,\n" + "\t\"version\": %d", + type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION); - debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created); + debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs); } /* @@ -1599,7 +1587,15 @@ void aclk_send_alarm_metadata() debug(D_ACLK, "Metadata alarms start"); - aclk_create_header(local_buffer, "connect_alarms", msg_id); + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (aclk_metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0); + else + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); health_alarms2json(localhost, local_buffer, 1); @@ -1635,7 +1631,16 @@ int aclk_send_info_metadata() buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "connect", msg_id); + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (aclk_metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "connect", msg_id, 0, 0); + else + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + buffer_sprintf(local_buffer, "{\n\t \"info\" : "); web_client_api_request_v1_info_fill_buffer(localhost, local_buffer); debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); @@ -1728,7 +1733,9 @@ int aclk_send_single_chart(char *hostname, char *chart) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "chart", msg_id); + aclk_create_header(local_buffer, "chart", msg_id, 0, 0); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + rrdset2json(st, local_buffer, NULL, NULL, 1); buffer_sprintf(local_buffer, "\t\n}"); @@ -1793,7 +1800,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) char *msg_id = create_uuid(); buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); health_alarm_entry2json_nolock(local_buffer, ae, host); @@ -1863,6 +1871,12 @@ int aclk_handle_cloud_request(char *payload) return 1; } + // Checked to be "http", not needed anymore + if (likely(cloud_to_agent.type_id)) { + freez(cloud_to_agent.type_id); + cloud_to_agent.type_id = NULL; + } + if (unlikely(aclk_submit_request(&cloud_to_agent))) debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload); diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h index faf4932f84..f147669e5d 100644 --- a/aclk/agent_cloud_link.h +++ b/aclk/agent_cloud_link.h @@ -44,7 +44,6 @@ typedef enum aclk_cmd { ACLK_CMD_CHART, ACLK_CMD_CHARTDEL, ACLK_CMD_ALARM, - ACLK_CMD_ALARMS, ACLK_CMD_MAX } ACLK_CMD; @@ -74,16 +73,12 @@ void *aclk_main(void *ptr); extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); -//int aclk_init(); -//char *get_base_topic(); - extern char *is_agent_claimed(void); extern void aclk_lws_wss_mqtt_layer_disconect_notif(); char *create_uuid(); // callbacks for agent cloud link int aclk_subscribe(char *topic, int qos); -void aclk_shutdown(); int cloud_to_agent_parse(JSON_ENTRY *e); void aclk_disconnect(); void aclk_connect(); @@ -98,7 +93,7 @@ struct aclk_query * aclk_query_find(char *token, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query); int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); -void aclk_create_header(BUFFER *dest, char *type, char *msg_id); +void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us); int aclk_handle_cloud_request(char *payload); int aclk_submit_request(struct aclk_request *); void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name); diff --git a/aclk/mqtt.c b/aclk/mqtt.c index dad32b578b..b070f7fb09 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -5,6 +5,9 @@ #include "mqtt.h" #include "aclk_lws_wss_client.h" +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; + inline const char *_link_strerror(int rc) { return mosquitto_strerror(rc); @@ -49,7 +52,12 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) UNUSED(obj); UNUSED(rc); - info("Connection to cloud failed"); + if (netdata_exit) + info("Connection to cloud terminated due to agent shutdown"); + else { + errno = 0; + error("Connection to cloud failed"); + } aclk_disconnect(); aclk_lws_wss_mqtt_layer_disconect_notif(); @@ -131,6 +139,11 @@ static int _mqtt_create_connection(char *username, char *password) return MOSQ_ERR_UNKNOWN; } + // Record the session start time to allow a nominal LWT timestamp + usec_t now = now_realtime_usec(); + aclk_session_sec = now / USEC_PER_SEC; + aclk_session_us = now % USEC_PER_SEC; + _link_set_lwt("outbound/meta", 2); mosquitto_connect_callback_set(mosq, connect_callback); @@ -259,7 +272,6 @@ int _link_set_lwt(char *sub_topic, int qos) { int rc; char topic[ACLK_MAX_TOPIC + 1]; - char payload[512]; char *final_topic; final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); @@ -269,25 +281,13 @@ int _link_set_lwt(char *sub_topic, int qos) return 1; } - usec_t time_created_offset_usec = now_realtime_usec(); - time_t time_created = time_created_offset_usec / USEC_PER_SEC; - time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; - - char *msg_id = create_uuid(); - - snprintfz( - payload, 511, - "{ \"type\": \"disconnect\"," - " \"msg-id\": \"%s\"," - " \"timestamp\": %ld," - " \"timestamp-offset-usec\": %llu," - " \"version\": %d," - " \"payload\": \"unexpected\" }", - msg_id, time_created, time_created_offset_usec, ACLK_VERSION); - - freez(msg_id); + usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC); + buffer_strcat(b, ", \"payload\": \"unexpected\" }"); + rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); + buffer_free(b); - rc = mosquitto_will_set(mosq, topic, strlen(payload), (const void *) payload, qos, 0); return rc; } diff --git a/backends/backends.h b/backends/backends.h index 212823a078..efa88a7f22 100644 --- a/backends/backends.h +++ b/backends/backends.h @@ -27,10 +27,6 @@ typedef enum backend_types { BACKEND_TYPE_NUM // Number of backend types } BACKEND_TYPE; -#ifdef ENABLE_EXPORTING -#include "exporting/exporting_engine.h" -#endif - typedef int (**backend_response_checker_t)(BUFFER *); typedef int (**backend_request_formatter_t)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS); diff --git a/backends/prometheus/backend_prometheus.c b/backends/prometheus/backend_prometheus.c index b3f955e15f..0a7b3a3391 100644 --- a/backends/prometheus/backend_prometheus.c +++ b/backends/prometheus/backend_prometheus.c @@ -44,7 +44,7 @@ static inline time_t prometheus_server_last_access(const char *server, RRDHOST * return 0; } -static inline size_t prometheus_name_copy(char *d, const char *s, size_t usable) { +static inline size_t backends_prometheus_name_copy(char *d, const char *s, size_t usable) { size_t n; for(n = 0; *s && n < usable ; d++, s++, n++) { @@ -58,7 +58,7 @@ static inline size_t prometheus_name_copy(char *d, const char *s, size_t usable) return n; } |