diff options
56 files changed, 442 insertions, 1647 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index ca0ab99946..6c5254b6e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1112,8 +1112,6 @@ endif() -Wl,--wrap=recv -Wl,--wrap=send -Wl,--wrap=connect_to_one_of - -Wl,--wrap=create_main_rusage_chart - -Wl,--wrap=send_main_rusage ${PROMETHEUS_REMOTE_WRITE_LINK_OPTIONS} ${KINESIS_LINK_OPTIONS} ${MONGODB_LINK_OPTIONS} diff --git a/Makefile.am b/Makefile.am index be84fc09ac..e4e240cbda 100644 --- a/Makefile.am +++ b/Makefile.am @@ -573,7 +573,6 @@ NETDATA_FILES = \ $(LIBNETDATA_FILES) \ $(API_PLUGIN_FILES) \ $(BACKENDS_PLUGIN_FILES) \ - $(EXPORTING_ENGINE_FILES) \ $(CHECKS_PLUGIN_FILES) \ $(HEALTH_PLUGIN_FILES) \ $(IDLEJITTER_PLUGIN_FILES) \ @@ -609,6 +608,12 @@ if LINUX endif +if ENABLE_EXPORTING + NETDATA_FILES += \ + $(EXPORTING_ENGINE_FILES) \ + $(NULL) +endif + NETDATA_COMMON_LIBS = \ $(OPTIONAL_MATH_LIBS) \ $(OPTIONAL_ZLIB_LIBS) \ @@ -740,13 +745,23 @@ if ENABLE_PLUGIN_SLABINFO $(NULL) endif +if ENABLE_EXPORTING +if ENABLE_BACKEND_KINESIS + netdata_SOURCES += $(KINESIS_EXPORTING_FILES) + netdata_LDADD += $(OPTIONAL_KINESIS_LIBS) +endif +endif + if ENABLE_BACKEND_KINESIS - netdata_SOURCES += $(KINESIS_BACKEND_FILES) $(KINESIS_EXPORTING_FILES) + netdata_SOURCES += $(KINESIS_BACKEND_FILES) netdata_LDADD += $(OPTIONAL_KINESIS_LIBS) endif if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE - netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) +if ENABLE_EXPORTING + netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) +endif + netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) BUILT_SOURCES = \ exporting/prometheus/remote_write/remote_write.pb.cc \ @@ -760,8 +775,15 @@ exporting/prometheus/remote_write/remote_write.pb.h: exporting/prometheus/remote endif +if ENABLE_EXPORTING +if ENABLE_BACKEND_MONGODB + netdata_SOURCES += $(MONGODB_EXPORTING_FILES) + netdata_LDADD += $(OPTIONAL_MONGOC_LIBS) +endif +endif + if ENABLE_BACKEND_MONGODB - netdata_SOURCES += $(MONGODB_BACKEND_FILES) $(MONGODB_EXPORTING_FILES) + netdata_SOURCES += $(MONGODB_BACKEND_FILES) netdata_LDADD += $(OPTIONAL_MONGOC_LIBS) endif @@ -873,8 +895,6 @@ if ENABLE_UNITTESTS -Wl,--wrap=recv \ -Wl,--wrap=send \ -Wl,--wrap=connect_to_one_of \ - -Wl,--wrap=create_main_rusage_chart \ - -Wl,--wrap=send_main_rusage \ $(TEST_LDFLAGS) \ $(NULL) exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS) @@ -1,7 +1,6 @@ <!-- --- title: "Netdata" -date: 2020-04-06 custom_edit_url: https://github.com/netdata/netdata/edit/master/README.md --- --> @@ -34,7 +33,7 @@ granularity. Run this long-term storage autonomously, or integrate Netdata with Netdata is **fast** and **efficient**, designed to permanently run on all systems (**physical** and **virtual** servers, **containers**, **IoT** devices), without disrupting their core function. -Netdata is **free, open-source software** and it currently runs on **Linux**, **FreeBSD**, and **macOS**, along with +Netdata is **free, open-source software** and it currently runs on **Linux**, **FreeBSD**, and **MacOS**, along with other systems derived from them, such as **Kubernetes** and **Docker**. Netdata is not hosted by the CNCF but is the 3rd most starred open-source project in the [Cloud Native Computing diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index a41d17e7bd..1adaf6bcce 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -23,8 +23,6 @@ static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_connected = 0; // Exposed in the web-api -usec_t aclk_session_us = 0; // Used by the mqtt layer -time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; @@ -187,7 +185,7 @@ biofailed: * should be called with * * mode 0 to reset the delay - * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms * */ unsigned long int aclk_reconnect_delay(int mode) @@ -210,6 +208,8 @@ unsigned long int aclk_reconnect_delay(int mode) delay = (delay * 1000) + (random() % 1000); } + // sleep_usec(USEC_PER_MS * delay); + return delay; } @@ -306,7 +306,7 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run if (tmp_query->run_after == run_after) { QUERY_UNLOCK; QUERY_THREAD_WAKEUP; - return 0; + return 1; } if (last_query) @@ -750,8 +750,8 @@ int aclk_execute_query(struct aclk_query *this_query) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); + aclk_create_header(local_buffer, "http", this_query->msg_id); + char *encoded_response = aclk_encode_response(w->response.data); buffer_sprintf( @@ -821,6 +821,11 @@ int aclk_process_query() aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); break; + case ACLK_CMD_ALARMS: + debug(D_ACLK, "EXECUTING an alarms update command"); + aclk_send_alarm_metadata(); + break; + case ACLK_CMD_CLOUD: debug(D_ACLK, "EXECUTING a cloud command"); aclk_execute_query(this_query); @@ -863,22 +868,18 @@ int aclk_process_queries() static void aclk_query_thread_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; info("cleaning up..."); + COLLECTOR_LOCK; + _reset_collector_list(); freez(collector_list); - // Clean memory for pending queries if any - struct aclk_query *this_query; - - do { - this_query = aclk_queue_pop(); - aclk_query_free(this_query); - } while (this_query); + COLLECTOR_UNLOCK; - freez(static_thread->thread); - freez(static_thread); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } /** @@ -915,7 +916,7 @@ void *aclk_query_main_thread(void *ptr) if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { errno = 0; error("ACLK failed to queue on_connect command"); - aclk_metadata_submitted = ACLK_METADATA_REQUIRED; + aclk_metadata_submitted = 0; } } @@ -938,6 +939,7 @@ void *aclk_query_main_thread(void *ptr) // Thread cleanup static void aclk_main_cleanup(void *ptr) { + char payload[512]; struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; @@ -950,11 +952,24 @@ static void aclk_main_cleanup(void *ptr) // Wakeup thread to cleanup QUERY_THREAD_WAKEUP; // Send a graceful disconnect message - BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0); - buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); - aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); - buffer_free(b); + char *msg_id = create_uuid(); + + usec_t time_created_offset_usec = now_realtime_usec(); + time_t time_created = time_created_offset_usec / USEC_PER_SEC; + time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; + + snprintfz( + payload, 511, + "{ \"type\": \"disconnect\"," + " \"msg-id\": \"%s\"," + " \"timestamp\": %ld," + " \"timestamp-offset-usec\": %llu," + " \"version\": %d," + " \"payload\": \"graceful\" }", + msg_id, time_created, time_created_offset_usec, ACLK_VERSION); + + aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id); + freez(msg_id); event_loop_timeout = now_realtime_sec() + 5; write_q = 1; @@ -975,6 +990,7 @@ static void aclk_main_cleanup(void *ptr) } } + info("Disconnected"); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } @@ -1279,6 +1295,7 @@ void *aclk_main(void *ptr) { struct netdata_static_thread *query_thread; + netdata_thread_cleanup_push(aclk_main_cleanup, ptr); if (!netdata_cloud_setting) { info("Killing ACLK thread -> cloud functionality has been disabled"); return NULL; @@ -1318,11 +1335,10 @@ void *aclk_main(void *ptr) sleep_usec(USEC_PER_SEC * 60); } create_publish_base_topic(); + create_private_key(); usec_t reconnect_expiry = 0; // In usecs - netdata_thread_disable_cancelability(); - while (!netdata_exit) { static int first_init = 0; size_t write_q, write_q_bytes, read_q; @@ -1376,8 +1392,7 @@ void *aclk_main(void *ptr) } } // forever exited: - // Wakeup query thread to cleanup - QUERY_THREAD_WAKEUP; + aclk_shutdown(); freez(aclk_username); freez(aclk_password); @@ -1386,7 +1401,7 @@ exited: if (aclk_private_key != NULL) RSA_free(aclk_private_key); - aclk_main_cleanup(ptr); + netdata_thread_cleanup_pop(1); return NULL; } @@ -1499,7 +1514,7 @@ void aclk_shutdown() info("Shutdown complete"); } -inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us) +inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) { uuid_t uuid; char uuid_str[36 + 1]; @@ -1510,11 +1525,9 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts msg_id = uuid_str; } - if (ts_secs == 0) { - ts_us = now_realtime_usec(); - ts_secs = ts_us / USEC_PER_SEC; - ts_us = ts_us % USEC_PER_SEC; - } + usec_t time_created_offset_usec = now_realtime_usec(); + time_t time_created = time_created_offset_usec / USEC_PER_SEC; + time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; buffer_sprintf( dest, @@ -1522,12 +1535,11 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts "\t\"msg-id\": \"%s\",\n" "\t\"timestamp\": %ld,\n" "\t\"timestamp-offset-usec\": %llu,\n" - "\t\"connect\": %ld,\n" - "\t\"connect-offset-usec\": %llu,\n" - "\t\"version\": %d", - type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION); + "\t\"version\": %d,\n" + "\t\"payload\": ", + type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION); - debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs); + debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created); } /* @@ -1587,15 +1599,7 @@ void aclk_send_alarm_metadata() debug(D_ACLK, "Metadata alarms start"); - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - if (aclk_metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0); - else - aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); + aclk_create_header(local_buffer, "connect_alarms", msg_id); buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); health_alarms2json(localhost, local_buffer, 1); @@ -1631,16 +1635,7 @@ int aclk_send_info_metadata() buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - if (aclk_metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect", msg_id, 0, 0); - else - aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - + aclk_create_header(local_buffer, "connect", msg_id); buffer_sprintf(local_buffer, "{\n\t \"info\" : "); web_client_api_request_v1_info_fill_buffer(localhost, local_buffer); debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); @@ -1733,9 +1728,7 @@ int aclk_send_single_chart(char *hostname, char *chart) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "chart", msg_id, 0, 0); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - + aclk_create_header(local_buffer, "chart", msg_id); rrdset2json(st, local_buffer, NULL, NULL, 1); buffer_sprintf(local_buffer, "\t\n}"); @@ -1800,8 +1793,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) char *msg_id = create_uuid(); buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id, 0, 0); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); + aclk_create_header(local_buffer, "status-change", msg_id); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); health_alarm_entry2json_nolock(local_buffer, ae, host); @@ -1871,12 +1863,6 @@ int aclk_handle_cloud_request(char *payload) return 1; } - // Checked to be "http", not needed anymore - if (likely(cloud_to_agent.type_id)) { - freez(cloud_to_agent.type_id); - cloud_to_agent.type_id = NULL; - } - if (unlikely(aclk_submit_request(&cloud_to_agent))) debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload); diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h index f147669e5d..faf4932f84 100644 --- a/aclk/agent_cloud_link.h +++ b/aclk/agent_cloud_link.h @@ -44,6 +44,7 @@ typedef enum aclk_cmd { ACLK_CMD_CHART, ACLK_CMD_CHARTDEL, ACLK_CMD_ALARM, + ACLK_CMD_ALARMS, ACLK_CMD_MAX } ACLK_CMD; @@ -73,12 +74,16 @@ void *aclk_main(void *ptr); extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); +//int aclk_init(); +//char *get_base_topic(); + extern char *is_agent_claimed(void); extern void aclk_lws_wss_mqtt_layer_disconect_notif(); char *create_uuid(); // callbacks for agent cloud link int aclk_subscribe(char *topic, int qos); +void aclk_shutdown(); int cloud_to_agent_parse(JSON_ENTRY *e); void aclk_disconnect(); void aclk_connect(); @@ -93,7 +98,7 @@ struct aclk_query * aclk_query_find(char *token, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query); int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); -void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us); +void aclk_create_header(BUFFER *dest, char *type, char *msg_id); int aclk_handle_cloud_request(char *payload); int aclk_submit_request(struct aclk_request *); void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name); diff --git a/aclk/mqtt.c b/aclk/mqtt.c index b070f7fb09..dad32b578b 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -5,9 +5,6 @@ #include "mqtt.h" #include "aclk_lws_wss_client.h" -extern usec_t aclk_session_us; -extern time_t aclk_session_sec; - inline const char *_link_strerror(int rc) { return mosquitto_strerror(rc); @@ -52,12 +49,7 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) UNUSED(obj); UNUSED(rc); - if (netdata_exit) - info("Connection to cloud terminated due to agent shutdown"); - else { - errno = 0; - error("Connection to cloud failed"); - } + info("Connection to cloud failed"); aclk_disconnect(); aclk_lws_wss_mqtt_layer_disconect_notif(); @@ -139,11 +131,6 @@ static int _mqtt_create_connection(char *username, char *password) return MOSQ_ERR_UNKNOWN; } - // Record the session start time to allow a nominal LWT timestamp - usec_t now = now_realtime_usec(); - aclk_session_sec = now / USEC_PER_SEC; - aclk_session_us = now % USEC_PER_SEC; - _link_set_lwt("outbound/meta", 2); mosquitto_connect_callback_set(mosq, connect_callback); @@ -272,6 +259,7 @@ int _link_set_lwt(char *sub_topic, int qos) { int rc; char topic[ACLK_MAX_TOPIC + 1]; + char payload[512]; char *final_topic; final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); @@ -281,13 +269,25 @@ int _link_set_lwt(char *sub_topic, int qos) return 1; } - usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; - BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC); - buffer_strcat(b, ", \"payload\": \"unexpected\" }"); - rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); - buffer_free(b); + usec_t time_created_offset_usec = now_realtime_usec(); + time_t time_created = time_created_offset_usec / USEC_PER_SEC; + time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC; + + char *msg_id = create_uuid(); + + snprintfz( + payload, 511, + "{ \"type\": \"disconnect\"," + " \"msg-id\": \"%s\"," + " \"timestamp\": %ld," + " \"timestamp-offset-usec\": %llu," + " \"version\": %d," + " \"payload\": \"unexpected\" }", + msg_id, time_created, time_created_offset_usec, ACLK_VERSION); + + freez(msg_id); + rc = mosquitto_will_set(mosq, topic, strlen(payload), (const void *) payload, qos, 0); return rc; } diff --git a/backends/backends.h b/backends/backends.h index efa88a7f22..212823a078 100644 --- a/backends/backends.h +++ b/backends/backends.h @@ -27,6 +27,10 @@ typedef enum backend_types { BACKEND_TYPE_NUM // Number of backend types } BACKEND_TYPE; +#ifdef ENABLE_EXPORTING +#include "exporting/exporting_engine.h" +#endif + typedef int (**backend_response_checker_t)(BUFFER *); typedef int (**backend_request_formatter_t)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS); diff --git a/backends/prometheus/backend_prometheus.c b/backends/prometheus/backend_prometheus.c index 0a7b3a3391..b3f955e15f 100644 --- a/backends/prometheus/backend_prometheus.c +++ b/backends/prometheus/backend_prometheus.c @@ -44,7 +44,7 @@ static inline time_t prometheus_server_last_access(const char *server, RRDHOST * return 0; } -static inline size_t backends_prometheus_name_copy(char *d, const char *s, size_t usable) { +static inline size_t prometheus_name_copy(char *d, const char *s, size_t usable) { size_t n; for(n = 0; *s && n < usable ; d++, s++, n++) { @@ -58,7 +58,7 @@ static inline size_t backends_prometheus_name_copy(char *d, const char *s, size_ return n; } |