From b2b3c182548fe81e6d1c9a599b2571dabfdabcaa Mon Sep 17 00:00:00 2001 From: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Date: Thu, 6 Feb 2020 17:58:51 +0200 Subject: ACLK agent 1 (#7894) * - Add initial mqtt support * [WIP] Agent cloud link - Setup main mqtt thread to connect to a broker using V5 of the MQTT protocol (TBD) - Send alarms to "netdata/alarm" - Add error checks to handle connection failures - Add params for Broker, port Maximum concurrent sent / recev messages - Dummy function to check claiming status - Generic mqtt_send command to publish message to a base topic , sub topic It will end up in the form base_topic/sub_topic - Add host/port in the connection failure error message * Test libmosquitto libs * connect to broker locally (assume localhost:1883) * subscribe to channel netdata/command * Test try a reload command to trigger health reload * publish alerts to netdata/alarm * - Fix compile issues * - Use sleep_usec instead of usleep * - Delay reconnection on failure due to misconfiguration (high cpu usage) * - Remove the TLS connection config * - Fix NETDATA_MQTT_INITIALIZATION_SLEEP_WAIT to use seconds * - Gather ACLK related code under aclk folder - Add aclk_ functions for abstract layer - Moved low level libs intergration in mqtt.c * - Add README.md file with initial comment * - Clean MQTT v5 * - Code cleanup * - Remove alarm log for now - Remove the heart beat * - Remove message properties for V5 * - Remove message properties for V5 (header) * Fixed the netdata target to use a local static version of libmosquitto. The installer does not yet have steps to pull and build the local library. cd project_root git clone ssh://git@github.com/netdata/mosquitto mosquitto/ (cd mosquitto/lib && make) # Ignore the cpp error This will leave mosquitto/lib/libmosquitto.a for the build process to use. * - Fix compile issues with older < 1.6 libmosquitto lib * - Enable alarm events to check it works - Re arrange includes - Rework topic to be agent/guid/. Actual id will be returned by the is_agent_claimed * - Add initial metadata info - Added helper function in web_api - Added a debug command (info) * Update the claiming state to retrieve the claimed id. * - Use define for constants like command and metadata topics - Function to wait for initialization of the ACLK link - New aclk_subscribe command with QOS parameter for the mqtt subscription - Use the is_agent_claimed function to get the real claim id and use it to build the topics that will be used for the cloud communication - Change in netdata-claim.sh.in to write the claim id without a trailing \n * - Use define for constants like command and metadata topics - Function to wait for initialization of the ACLK link - New aclk_subscribe command with QOS parameter for the mqtt subscription - Use the is_agent_claimed function to get the real claim id and use it to build the topics that will be used for the cloud communication - Change in netdata-claim.sh.in to write the claim id without a trailing \n * - Remove the alarm log for now - Add code (but disabled) to send charts * - Use dummy anon, anon as username and password for testing purposes * - Use client id anon as well * Testing without TLS * Switching TLS back on to fix docker environment. * - Added query processing An incoming URL now calls web_client_api_request_v1_data to handle a request and push the results back to the "data" topic - Move the above processing from the message callback to the query handle loop - Added helper "pause" , "resume" commands to stop and resume query processing to stress test loading the queue with queries before executing them - Changed the endpoint topics to "meta", and "cmd" (previously metadata and command) * make info message follow protocol * move metadata msg generation into new func * move metadata msg generation into new func * - Add metadata to the responses - Add hook to queue chart changes on creation and dimensions - Changed the queue mechanism to include delay for X seconds - Add delayed submittion of charts to the cloud so that all DIMs are defined to avoid resubmission * - Add additional data info for aclk_queue command * - Use web_clinet_api_request_v1 to handle the incoming request This will handle all requests coming from the cloud * - Cleanup and aclk_query structure - Add msg_id parameter - Enable the incoming JSON request - Enable the outgoing JSON response * - Added new thread to handle query processing - Add lock and cond wait to wakeup thread when queries are submitted - Cleanup on the main init function * - Add wait time on agent init, to allow for chart, alarms and other definitions to be completed. - During the wait time, no queries will be queued * - Send metadata on query thread init - New generic create header function for the JSON response - Pack info and charts into one message - Modified chart to remove entries (test) - Modified charts mod to remove entries e.g alarms and volatile info - Change input to aclk_update_chart (RRDHOST / instead of hostname) * - When a request fails, add to the payload - We may need to handle in a different key - Error check in json parsing * - Add dummy aclk_update_alarm command * - Move incoming request JSON parsing code away from mqtt.c - Added #ifdef ACLK_ENABLE so that we can have code merged but disabled by default - Added version in incoming and outgoing JSON dict * - Disable code if ACLK_ENABLE is not defined - Remove references to the mqtt (mosquitto) lib - Add dummy stubs in mqtt.c for completeness if ACLK_ENABLE is not defined * - Disable challenge sample code for now * - Remove libmosquitto from makefile * - Fix spaces in Makefile.am - Remove ifdef to avoid warning from LGTM * - Remove for now the code that builds an along log test message to send to the cloud * - Add check for ACLK_ENABLE definition and avoid calling the chart update functions * - Remove commented code * - Move source files to the correct place (ACLK_PLUGIN_FILES) * - Remove include file thats not needed * - Remove include file thats not needed - Add improved checks for load_claiming_state() * - Fix error message. Used error() that also logs errno and message * - Fix some codacy issues * - Fix more codacy issues, code cleanup * - Revert code to address codacy warnings * - Revert spaces added in a previous commit by mistake * clean up if/else nest * print error if fopen fails * minor - error already logs errno * - Fix version formatting * - Cleanup all ACLK related compiler warnings - Re-arrange include files - Removed unused defines * - More compilation warnings fixed - Bug with thread creation fixed * - Add condition to skip compilation of the ACLK code entirely. Add env variable ACLK="yes" to enable * - Add condition to skip the libmosquitto * - Change feature flag from ACLK_ENABLE to ENABLE_ACLK in accordance with the rest of ENABLE_xx flags - Typo in info message fix Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com> Co-authored-by: Timo <6674623+underhood@users.noreply.github.com> --- CMakeLists.txt | 12 +- Makefile.am | 25 ++ aclk/Makefile.am | 18 + aclk/README.md | 1 + aclk/agent_cloud_link.c | 995 ++++++++++++++++++++++++++++++++++++++++++++++ aclk/agent_cloud_link.h | 103 +++++ aclk/mqtt.c | 318 +++++++++++++++ aclk/mqtt.h | 21 + claim/claim.c | 46 ++- claim/claim.h | 2 +- claim/netdata-claim.sh.in | 4 +- configure.ac | 16 +- daemon/common.h | 3 + daemon/main.c | 5 + database/rrddim.c | 22 +- database/rrdset.c | 4 +- health/health_log.c | 1 - web/api/web_api_v1.c | 21 +- web/api/web_api_v1.h | 2 +- 19 files changed, 1587 insertions(+), 32 deletions(-) create mode 100644 aclk/Makefile.am create mode 100644 aclk/README.md create mode 100644 aclk/agent_cloud_link.c create mode 100644 aclk/agent_cloud_link.h create mode 100644 aclk/mqtt.c create mode 100644 aclk/mqtt.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f715aeecb4..d43cb8a1f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -339,8 +339,7 @@ set(LIBNETDATA_FILES add_library(libnetdata OBJECT ${LIBNETDATA_FILES}) set(APPS_PLUGIN_FILES - collectors/apps.plugin/apps_plugin.c - ) + collectors/apps.plugin/apps_plugin.c) set(CHECKS_PLUGIN_FILES collectors/checks.plugin/plugin_checks.c @@ -606,6 +605,14 @@ set(CLAIM_PLUGIN_FILES claim/claim.h ) +set(ACLK_PLUGIN_FILES + aclk/agent_cloud_link.c + aclk/agent_cloud_link.h + aclk/agent_cloud_link.c + aclk/mqtt.c + aclk/mqtt.h + ) + set(EXPORTING_ENGINE_FILES exporting/exporting_engine.c exporting/exporting_engine.h @@ -673,6 +680,7 @@ set(NETDATA_FILES ${STREAMING_PLUGIN_FILES} ${WEB_PLUGIN_FILES} ${CLAIM_PLUGIN_FILES} + ${ACLK_PLUGIN_FILES} ) set(NETDATACLI_FILES diff --git a/Makefile.am b/Makefile.am index 30c2eb67b5..483a9c22db 100644 --- a/Makefile.am +++ b/Makefile.am @@ -100,6 +100,7 @@ SUBDIRS += \ streaming \ web \ claim \ + aclk \ $(NULL) @@ -108,6 +109,7 @@ AM_CFLAGS = \ $(OPTIONAL_NFACCT_CFLAGS) \ $(OPTIONAL_ZLIB_CFLAGS) \ $(OPTIONAL_UUID_CFLAGS) \ + $(OPTIONAL_MQTT_CFLAGS) \ $(OPTIONAL_LIBCAP_LIBS) \ $(OPTIONAL_IPMIMONITORING_CFLAGS) \ $(OPTIONAL_CUPS_CFLAGS) \ @@ -456,6 +458,13 @@ CLAIM_PLUGIN_FILES = \ claim/claim.h \ $(NULL) +ACLK_PLUGIN_FILES = \ + aclk/agent_cloud_link.c \ + aclk/agent_cloud_link.h \ + aclk/mqtt.c \ + aclk/mqtt.h \ + $(NULL) + EXPORTING_ENGINE_FILES = \ exporting/exporting_engine.c \ exporting/exporting_engine.h \ @@ -526,6 +535,12 @@ NETDATA_FILES = \ $(CLAIM_PLUGIN_FILES) \ $(NULL) +if ENABLE_ACLK + NETDATA_FILES += \ + $(ACLK_PLUGIN_FILES) \ + $(NULL) +endif + if FREEBSD NETDATA_FILES += \ $(FREEBSD_PLUGIN_FILES) \ @@ -559,6 +574,7 @@ NETDATA_COMMON_LIBS = \ $(OPTIONAL_ZLIB_LIBS) \ $(OPTIONAL_SSL_LIBS) \ $(OPTIONAL_UUID_LIBS) \ + $(OPTIONAL_MQTT_LIBS) \ $(OPTIONAL_UV_LIBS) \ $(OPTIONAL_LZ4_LIBS) \ $(OPTIONAL_JUDY_LIBS) \ @@ -575,9 +591,18 @@ NETDATACLI_FILES = \ sbin_PROGRAMS += netdata netdata_SOURCES = $(NETDATA_FILES) + +if ENABLE_ACLK netdata_LDADD = \ + mosquitto/lib/libmosquitto.a \ $(NETDATA_COMMON_LIBS) \ $(NULL) +else +netdata_LDADD = \ + $(NETDATA_COMMON_LIBS) \ + $(NULL) +endif + if ENABLE_CXX_LINKER netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@ else diff --git a/aclk/Makefile.am b/aclk/Makefile.am new file mode 100644 index 0000000000..ebc2a18d95 --- /dev/null +++ b/aclk/Makefile.am @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +CLEANFILES = \ + $(NULL) + +include $(top_srcdir)/build/subst.inc +SUFFIXES = .in + +sbin_SCRIPTS = \ + $(NULL) + +dist_noinst_DATA = \ + README.md \ + $(NULL) + diff --git a/aclk/README.md b/aclk/README.md new file mode 100644 index 0000000000..287d3b2606 --- /dev/null +++ b/aclk/README.md @@ -0,0 +1 @@ +This is the agent cloud link (ACLK) information file \ No newline at end of file diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c new file mode 100644 index 0000000000..dc90c6d14e --- /dev/null +++ b/aclk/agent_cloud_link.c @@ -0,0 +1,995 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" +#include "agent_cloud_link.h" + +// Read from the config file -- new section [agent_cloud_link] +// Defaults are supplied +int aclk_recv_maximum = 0; // default 20 +int aclk_send_maximum = 0; // default 20 + +int aclk_port = 0; // default 1883 +char *aclk_hostname = NULL; //default localhost +int aclk_subscribed = 0; + +int aclk_metadata_submitted = 0; +int waiting_init = 1; +int cmdpause = 0; // Used to pause query processing + +BUFFER *aclk_buffer = NULL; +char *global_base_topic = NULL; + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch(e->type) { + case JSON_OBJECT: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_ARRAY: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_STRING: + if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) { + data->topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_URL)) { + data->url = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) { + data->version = atol(e->data.string); + break; + } + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + +//char *send_http_request(char *host, char *port, char *url, BUFFER *b) +//{ +// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; +// +// buffer_flush(b); +// buffer_sprintf( +// b, +// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n", +// url, host); +// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout); +// +// if (unlikely(sock == -1)) { +// error("Handshake failed"); +// return NULL; +// } +// +// SSL_CTX *ctx = security_initialize_openssl_client(); +// // Certificate chain: not updating the stores - do we need private CA roots? +// // Calls to SSL_CTX_load_verify_locations would go here. +// SSL *ssl = SSL_new(ctx); +// SSL_set_fd(ssl, sock); +// int err = SSL_connect(ssl); +// SSL_write(ssl, b->buffer, b->len); // Timeout options? +// int bytes_read = SSL_read(ssl, b->buffer, b->len); +// SSL_shutdown(ssl); +// close(sock); +//} + +// Set when we have connection up and running from the connection callback +int aclk_connection_initialized = 0; + +static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; + +#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) +#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) + +#define QUERY_LOCK netdata_mutex_lock(&query_mutex) +#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; + +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait); +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) + + +struct aclk_query { + time_t created; + time_t run_after; // Delay run until after this time + char *topic; // Topic to respond to + char *data; // Internal data (NULL if request from the cloud) + char *msg_id; // msg_id generated by the cloud (NULL if internal) + char *query; // The actual query + u_char deleted; // Mark deleted for garbage collect + struct aclk_query *next; +}; + +struct aclk_query_queue { + struct aclk_query *aclk_query_head; + struct aclk_query *aclk_query_tail; + u_int64_t count; +} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; + +/* + * Free a query structure when done + */ + +void aclk_query_free(struct aclk_query *this_query) +{ + if (unlikely(!this_query)) + return; + + freez(this_query->topic); + freez(this_query->query); + if (this_query->data) + freez(this_query->data); + if (this_query->msg_id) + freez(this_query->msg_id); + freez(this_query); + return; +} + +// Returns the entry after which we need to create a new entry to run at the specified time +// If NULL is returned we need to add to HEAD +// Called with locked entries + +struct aclk_query *aclk_query_find_position(time_t time_to_run) +{ + struct aclk_query *tmp_query, *last_query; + + last_query = NULL; + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (tmp_query->run_after > time_to_run) + return last_query; + last_query = tmp_query; + tmp_query = tmp_query->next; + } + return last_query; +} + +// Need to have a lock before calling this +struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query) +{ + struct aclk_query *tmp_query; + + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (likely(!tmp_query->deleted)) { + if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) { + if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) + return tmp_query; + } + } + tmp_query = tmp_query->next; + } + return NULL; +} + +/* + * Add a query to execute, the result will be send to the specified topic + */ + +int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal) +{ + struct aclk_query *new_query, *tmp_query; + + // Ignore all commands while we wait for the agent to initialize + if (unlikely(waiting_init)) + return 0; + + run_after = now_realtime_sec() + run_after; + + QUERY_LOCK; + tmp_query = aclk_query_find(topic, data, msg_id, query); + if (unlikely(tmp_query)) { + if (tmp_query->run_after == run_after) { + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + tmp_query->deleted = 1; + } + + new_query = callocz(1, sizeof(struct aclk_query)); + if (internal) { + new_query->topic = strdupz(topic); + new_query->query = strdupz(query); + } else { + new_query->topic = topic; + new_query->query = query; + new_query->msg_id = msg_id; + } + + if (data) + new_query->data = strdupz(data); + + new_query->next = NULL; + new_query->created = now_realtime_sec(); + new_query->run_after = run_after; + + info("Added query (%s) (%s)", topic, query); + + tmp_query = aclk_query_find_position(run_after); + + if (tmp_query) { + new_query->next = tmp_query->next; + tmp_query->next = new_query; + if (tmp_query == aclk_queue.aclk_query_tail) + aclk_queue.aclk_query_tail = new_query; + aclk_queue.count++; + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + new_query->next = aclk_queue.aclk_query_head; + aclk_queue.aclk_query_head = new_query; + aclk_queue.count++; + + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + +// if (likely(aclk_queue.aclk_query_tail)) { +// aclk_queue.aclk_query_tail->next = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// +// if (likely(!aclk_queue.aclk_query_head)) { +// aclk_queue.aclk_query_head = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// QUERY_UNLOCK; +// return 0; +} + +inline int aclk_submit_request(struct aclk_request *request) +{ + return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0); +} + +/* + * Get the next query to process - NULL if nothing there + * The caller needs to free memory by calling aclk_query_free() + * + * topic + * query + * The structure itself + * + */ +struct aclk_query *aclk_queue_pop() +{ + struct aclk_query *this_query; + + QUERY_LOCK; + + if (likely(!aclk_queue.aclk_query_head)) { + QUERY_UNLOCK; + return NULL; + } + + this_query = aclk_queue.aclk_query_head; + + if (this_query->run_after > now_realtime_sec()) { + info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); + QUERY_UNLOCK; + return NULL; + } + + aclk_queue.count--; + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + QUERY_UNLOCK; + return this_query; +} + +// This will give the base topic that the agent will publish messages. +// subtopics will be sent under the base topic e.g. base_topic/subtopic +// This is called by aclk_init(), to compute the base topic once and have +// it stored internally. +// Need to check if additional logic should be added to make sure that there +// is enough information to determine the base topic at init time + +// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch +// that on the fly + +char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action) +{ + static char *topic = NULL; + + if (unlikely(!is_agent_claimed())) + return NULL; + + ACLK_LOCK; + + if (unlikely(action == PUBLICH_TOPIC_FREE)) { + if (likely(topic)) { + freez(topic); + topic = NULL; + } + + ACLK_UNLOCK; + + return NULL; + } + + if (unlikely(action == PUBLICH_TOPIC_REBUILD)) { + ACLK_UNLOCK; + get_publish_base_topic(PUBLICH_TOPIC_FREE); + return get_publish_base_topic(PUBLICH_TOPIC_GET); + } + + if (unlikely(!topic)) { + char tmp_topic[ACLK_MAX_TOPIC + 1]; + + sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); + topic = strdupz(tmp_topic); + } + + ACLK_UNLOCK; + return topic; +} + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + if (unlikely(!global_base_topic)) + global_base_topic = GET_PUBLISH_BASE_TOPIC; + + if (unlikely(!global_base_topic)) + return sub_topic; + + snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + + return final_topic; +} + +// Wait for ACLK connection to be established +int aclk_wait_for_initialization() +{ + if (unlikely(!aclk_connection_initialized)) { + time_t now = now_realtime_sec(); + + while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) { + sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT); + _link_event_loop(0); + } + + if (unlikely(!aclk_connection_initialized)) { + error("ACLK connection cannot be established"); + return 1; + } + } + return 0; +} + +/* + * This function will fetch the next pending command and process it + * + */ +int aclk_process_query() +{ + struct aclk_query *this_query; + static u_int64_t query_count = 0; + //int rc; + + if (unlikely(cmdpause)) + return 0; + + if (!aclk_connection_initialized) + return 0; + + this_query = aclk_queue_pop(); + if (likely(!this_query)) { + //info("No pending queries"); + return 0; + } + + if (unlikely(this_query->deleted)) { + info("Garbage collect query %s:%s", this_query->topic, this_query->query); + aclk_query_free(this_query); + return 1; + } + + query_count++; + info( + "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query, + (int) (now_realtime_sec() - this_query->created)); + + if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) { + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); + *mysep = '\0'; + } else + strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); + + mysep = strrchr(this_query->query, '/'); + + // TODO: ignore return code for now + web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + + //TODO: handle bad response perhaps in a different way. For now it does to the payload + //if (rc == HTTP_RESP_OK || 1) { + buffer_flush(aclk_buffer); + + aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data); + aclk_buffer->contenttype = CT_APPLICATION_JSON; + aclk_send_message(this_query->topic, aclk_buffer->buffer); + //} else + // error("Query RESP: %s", w->response.data->buffer); + + buffer_free(w->response.data); + freez(w); + aclk_query_free(this_query); + return 1; + } + + if (strcmp((char *)this_query->topic, "_chart") == 0) { + aclk_send_single_chart(this_query->data, this_query->query); + } + + aclk_query_free(this_query); + + return 1; +} + +// Launch a query processing thread + +/* + * Process all pending queries + * Return 0 if no queries were processed, 1 otherwise + * + */ + +int aclk_process_queries() +{ + if (unlikely(cmdpause)) + return 0; + + // Return if no queries pending + if (likely(!aclk_queue.count)) + return 0; + + info("Processing %d queries", (int ) aclk_queue.count); + + while (aclk_process_query()) { + //rc = _link_event_loop(0); + }; + + return 1; +} + +static void aclk_query_thread_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * MAin query processing thread + * + */ +void *aclk_query_main_thread(void *ptr) +{ + netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + + while (!netdata_exit) { + + QUERY_THREAD_LOCK; + + if (unlikely(!aclk_metadata_submitted)) { + aclk_send_metadata(); + aclk_metadata_submitted = 1; + } + + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + if (likely(aclk_connection_initialized && !netdata_exit)) { + while (aclk_process_queries()) { + // Sleep for a few ms and retry maybe we have something to process + // before going to sleep + // TODO: This needs improvement to avoid missed queries + sleep_usec(USEC_PER_MS * 100); + } + } + + QUERY_THREAD_UNLOCK; + + } // forever + info("Shutting down query processing thread"); + netdata_thread_cleanup_pop(1); + return NULL; +} + +// Thread cleanup +static void aclk_main_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + QUERY_THREAD_WAKEUP; + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + //netdata_thread_t *query_thread; + struct netdata_static_thread query_thread; + + memset(&query_thread, 0, sizeof(query_thread)); + + netdata_thread_cleanup_push(aclk_main_cleanup, ptr); + + if (unlikely(!aclk_buffer)) + aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + assert(aclk_buffer != NULL); + + //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + info("Waiting for netdata to be ready"); + while (!netdata_ready) { + sleep_usec(USEC_PER_MS * 300); + } + info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT); + sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT); + + // Ok mark we are ready to accept incoming requests + waiting_init = 0; + + while (!netdata_exit) { + // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds + // TODO: Handle the unclaim command as well -- we may need to shutdown the connection + if (likely(!is_agent_claimed())) { + sleep_usec(USEC_PER_SEC * 60); + info("Checking agent claiming status"); + continue; + } + + if (unlikely(!aclk_connection_initialized)) { + static int initializing = 0; + + if (likely(initializing)) { + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + initializing = 1; + info("Initializing connection"); + //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer); + if (unlikely(aclk_init(ACLK_INIT))) { + // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying. + sleep_usec(USEC_PER_SEC * 60); + continue; + } else { + sleep_usec(USEC_PER_SEC * 1); + } + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + + if (unlikely(!aclk_subscribed)) { + aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2); + } + if (unlikely(!query_thread.thread)) { + query_thread.thread = mallocz(sizeof(netdata_thread_t)); + netdata_thread_create( + query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + } + + //TODO: Check if there is a return code + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + + } // forever + aclk_shutdown(); + + netdata_thread_cleanup_pop(1); + return NULL; +} + +/* + * Send a message to the cloud, using a base topic and sib_topic + * The final topic will be in the form / + * If base_topic is missing then the global_base_topic will be used (if available) + * + */ +int aclk_send_message(char *sub_topic, char *message) +{ + int rc; + static int skip_due_to_shutdown = 0; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + if (!aclk_connection_initialized) + return 0; + + if (unlikely(netdata_exit)) { + if (unlikely(!aclk_connection_initialized)) + return 1; + + ++skip_due_to_shutdown; + if (unlikely(!(skip_due_to_shutdown % 100))) + info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown); + return 1; + } + + if (unlikely(!message)) + return 0; + + if (unlikely(aclk_wait_for_initialization())) + return 1; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + ACLK_LOCK; + rc = _link_send_message(final_topic, message); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + + return rc; +} + +/* + * Subscribe to a topic in the cloud + * The final subscription will be in the form + * /agent/claim_id/ + */ +int aclk_subscribe(char *sub_topic, int qos) +{ + int rc; + //static char *global_base_topic = NULL; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + if (!aclk_connection_initialized) + return 0; + + if (unlikely(netdata_exit)) { + return 1; + } + + if (unlikely(aclk_wait_for_initialization())) + return 1; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + ACLK_LOCK; + rc = _link_subscribe(final_topic, qos); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + + return rc; +} + +// This is called from a callback when the link goes up +void aclk_connect(void *ptr) +{ + (void) ptr; + info("Connection detected"); + return; +} + +// This is called from a callback when the link goes down +void aclk_disconnect(void *ptr) +{ + (void) ptr; + info("Disconnect detected"); + aclk_subscribed = 0; + aclk_metadata_submitted = 0; +} + +void aclk_shutdown() +{ + info("Shutdown initiated"); + aclk_connection_initialized = 0; + _link_shutdown(); + info("Shutdown complete"); +} + +int aclk_init(ACLK_INIT_ACTION action) +{ + (void) action; + + static int init = 0; + int rc; + + if (likely(init)) + return 0; + + aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20); + aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20); + + aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost"); + aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883); + + info("Maximum parallel outgoing messages %d", aclk_send_maximum); + info("Maximum parallel incoming messages %d", aclk_recv_maximum); + + // This will setup the base publish topic internally + //get_publish_base_topic(PUBLICH_TOPIC_GET); + + // initialize the low level link to the cloud + rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect); + if (unlikely(rc)) { + error("Failed to initialize the agent cloud link library"); + return 1; + } + global_base_topic = GET_PUBLISH_BASE_TOPIC; + init = 1; + + return 0; +} + +// Use this to disable encoding of quotes and newlines so that +// MQTT subscriber can display more readable data on screen + +void aclk_create_header(BUFFER *dest, char *type, char *msg_id) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + buffer_sprintf( + dest, + "\t{\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"version\": %s,\n" + "\t\"payload\": ", + type, msg_id, ACLK_VERSION); +} + +#define EYE_FRIENDLY 1 + +// encapsulate contents into metadata message as per ACLK documentation +void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents) +{ +#ifndef EYE_FRIENDLY + char *tmp_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *src, *dst; +#endif + + buffer_sprintf( + dest, + "\t{\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"payload\": %s\n\t}", + type, msg_id ? msg_id : "", contents->buffer); + +#ifndef EYE_FRIENDLY + //TODO: this is the initial escaping, It will expanded + src = dest->buffer; + dst = tmp_buffer; + while (*src) { + switch (*src) { + case '0x0a': + case '\n': + *dst++ = '\\'; + *dst++ = 'n'; + break; + case '\"': + *dst++ = '\\'; + *dst++ = '\"'; + break; + case '\'': + *dst++ = '\\'; + *dst++ = '\"'; + break; + default: + *dst++ = *src; + } + src++; + } + *dst = '\0'; + + buffer_flush(dest); + buffer_sprintf(dest, "%s", tmp_buffer); + + freez(tmp_buffer); +#endif + return; +} + +//TODO: this has been changed in the latest specs. We need to pack the data in one MQTT +//message with a payload and has a list of json objects +int aclk_send_alarm_metadata() +{ + //TODO: improve locking on the buffer -- same lock is used for the message send + //improve error handling + ACLK_LOCK; + buffer_flush(aclk_buffer); + // Alarms configuration + aclk_create_header(aclk_buffer, "alarms", NULL); + health_alarms2json(localhost, aclk_buffer, 1); + buffer_sprintf(aclk_buffer,"\n}"); + ACLK_UNLOCK; + aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer); + + // Alarms log + ACLK_LOCK; + buffer_flush(aclk_buffer); + aclk_create_header(aclk_buffer, "alarms_log", NULL); + health_alarm_log2json(localhost, aclk_buffer, 0); + buffer_sprintf(aclk_buffer,"\n}"); + ACLK_UNLOCK; + aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer); + + return 0; +} + + +// Send info metadata message to the cloud if the link is established +// or on request +int aclk_send_metadata() +{ + ACLK_LOCK; + + buffer_flush(aclk_buffer); + + aclk_create_header(aclk_buffer, "connect", NULL); + buffer_sprintf(aclk_buffer,"{\n\t \"info\" : "); + web_client_api_request_v1_info_fill_buffer(localhost, aclk_buffer); + buffer_sprintf(aclk_buffer,", \n\t \"charts\" : "); + charts2json(localhost, aclk_buffer); + buffer_sprintf(aclk_buffer,"\n}\n}"); + aclk_buffer->contenttype = CT_APPLICATION_JSON; + + ACLK_UNLOCK; + + aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer); + + aclk_send_alarm_metadata(); + + return 0; +} + +//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf) + +int aclk_send_single_chart(char *hostname, char *chart) +{ + RRDHOST *target_host; + ACLK_LOCK; + + buffer_flush(aclk_buffer); + + target_host = rrdhost_find_by_hostname(hostname, 0); + if (!target_host) + return 1; + + RRDSET *st = rrdset_find(target_host, chart); + + if (!st) + st = rrdset_find_byname(target_host, chart); + + if (!st) { + info("FAILED to find chart %s", chart); + return 1; + } + + aclk_buffer->contenttype = CT_APPLICATION_JSON; + + buffer_flush(aclk_buffer); + + aclk_create_header(aclk_buffer, "chart", NULL); + + rrdset2json(st, aclk_buffer, NULL, NULL); + buffer_sprintf(aclk_buffer,"\n}\n}"); + + + ACLK_UNLOCK; + aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer); + return 0; +} + +int aclk_update_chart(RRDHOST *host, char *chart_name) +{ + (void) host; + (void) chart_name; +#ifndef ENABLE_ACLK + return 0; +#else + if (host != localhost) + return 0; + + aclk_queue_query("_chart", host->hostname, NULL, chart_name, 2, 1); + return 0; +#endif +} + +int aclk_update_alarm(RRDHOST *host, char *alarm_name) +{ + if (host != localhost) + return 0; + + aclk_queue_query("_alarm", host->hostname, NULL, alarm_name, 2, 1); + return 0; +} + + +//TODO: add and check the incoming type e.g http +int aclk_handle_cloud_request(char *payload) +{ + struct aclk_request cloud_to_agent = { .msg_id = NULL, .topic = NULL, .url = NULL, .version = 1}; + + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(JSON_OK != rc)) { + error("Malformed json request (%s)", payload); + return 1; + } + + if (unlikely(!cloud_to_agent.url || !cloud_to_agent.topic)) { + return 1; + } + + aclk_submit_request(&cloud_to_agent); + + return 0; +} diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h new file mode 100644 index 0000000000..626cdecfaf --- /dev/null +++ b/aclk/agent_cloud_link.h @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_AGENT_CLOUD_LINK_H +#define NETDATA_AGENT_CLOUD_LINK_H + +#include "../daemon/common.h" +#include "mqtt.h" + +#define ACLK_JSON_IN_MSGID "msg-id" +#define ACLK_JSON_IN_TYPE "type" +#define ACLK_JSON_IN_VERSION "version" +#define ACLK_JSON_IN_TOPIC "callback-topic" +#define ACLK_JSON_IN_URL "payload" + + +#define ACLK_MSG_TYPE_CHART "chart" +#define ACLK_ALARMS_TOPIC "alarms" +#define ACLK_METADATA_TOPIC "meta" +#define ACLK_COMMAND_TOPIC "cmd" +#define ACLK_TOPIC_STRUCTURE "/agent/%s" + +#define ACLK_STARTUP_WAIT 30 // Seconds to wait before establishing initialization process +#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg) +#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds +#define ACLK_QOS 1 +#define ACLK_PING_INTERVAL 60 +#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop + +#define ACLK_MAX_TOPIC 255 + +#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now +#define ACLK_MAX_RECONNECT_DELAY 120 +#define ACLK_VERSION "1" + +#define CONFIG_SECTION_ACLK "agent_cloud_link" + +struct aclk_request { + char *type_id; + char *msg_id; + char *topic; + char *url; + int version; +}; + + +typedef enum publish_topic_action { + PUBLICH_TOPIC_GET, + PUBLICH_TOPIC_FREE, + PUBLICH_TOPIC_REBUILD +} PUBLISH_TOPIC_ACTION; + +typedef enum aclk_init_action { + ACLK_INIT, + ACLK_REINIT +} ACLK_INIT_ACTION; + + +#define GET_PUBLISH_BASE_TOPIC get_publish_base_topic(0) +#define FREE_PUBLISH_BASE_TOPIC get_publish_base_topic(1) +#define REBUILD_PUBLISH_BASE_TOPIC get_publish_base_topic(2) + +void *aclk_main(void *ptr); + +#define NETDATA_ACLK_HOOK \ + { \ + .name = "AgentCloudLink", \ + .config_section = NULL, \ + .config_name = NULL, \ + .enabled = 1, \ + .thread = NULL, \ + .init_routine = NULL, \ + .start_routine = aclk_main \ + }, + +extern int aclk_send_message(char *sub_topic, char *message); + +int aclk_init(); +char *get_base_topic(); + +extern char *is_agent_claimed(void); + +// callbacks for agent cloud link +int aclk_subscribe(char *topic, int qos); +void aclk_shutdown(); +//void aclk_message_callback(struct mosquitto *moqs, void *obj, const struct mosquitto_message *msg); + +int cloud_to_agent_parse(JSON_ENTRY *e); +void aclk_disconnect(void *conn); +void aclk_connect(void *conn); +void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents); +int aclk_send_metadata(); +int aclk_wait_for_initialization(); +//int aclk_send_charts(RRDHOST *host, BUFFER *wb); +int aclk_send_single_chart(char *host, char *chart); +int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal); +struct aclk_query *aclk_query_find(char *token, char *data, char *msg_id, char *query); +//void aclk_rrdset2json(RRDSET *st, BUFFER *wb, char *hostname, int is_slave); +int aclk_update_chart(RRDHOST *host, char *chart_name); +int aclk_update_alarm(RRDHOST *host, char *alarm_name); +void aclk_create_header(BUFFER *dest, char *type, char *msg_id); +int aclk_handle_cloud_request(char *payload); +int aclk_submit_request(struct aclk_request *); +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/mqtt.c b/aclk/mqtt.c new file mode 100644 index 0000000000..e781ee9f91 --- /dev/null +++ b/aclk/mqtt.c @@ -0,0 +1,318 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "../daemon/common.h" +#include "mqtt.h" + +void (*_on_connect)(void *ptr) = NULL; +void (*_on_disconnect)(void *ptr) = NULL; +extern int cmdpause; + + +#ifndef ENABLE_ACLK + +inline const char *_link_strerror(int rc) +{ + (void) rc; + return "no error"; +} + +int _link_event_loop(int timeout) +{ + (void) timeout; + return 0; +} + +int _link_send_message(char *topic, char *message) +{ + (void) topic; + (void) message; + return 0; +} + +int _link_subscribe(char *topic, int qos) +{ + (void) topic; + (void) qos; + return 0; +} + +void _link_shutdown() +{ + return; +} + +int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) +{ + (void) aclk_hostname; + (void) aclk_port; + (void) on_connect; + (void) on_disconnect; + return 0; +} + +#else +/* + * Just report the library info in the logfile for reference when issues arise + * + */ + +struct mosquitto *mosq = NULL; + +// Get a string description of the error + +inline const char *_link_strerror(int rc) +{ + return mosquitto_strerror(rc); +} + + +void mqtt_message_callback( + struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) +{ + (void) mosq; + (void) obj; + + // TODO: handle commands in a more efficient way, if we have many + + if (strcmp((char *)msg->payload, "pause") == 0) { + cmdpause = 1; + return; + } + + if (strcmp((char *)msg->payload, "resume") == 0) { + cmdpause = 0; + return; + } + + if (strcmp((char *)msg->payload, "reload") == 0) { + error_log_limit_unlimited(); + info("Reloading health configuration"); + health_reload(); + error_log_limit_reset(); + return; + } + + if (strcmp((char *)msg->payload, "info") == 0) { + aclk_send_metadata(); + return; + } + + aclk_handle_cloud_request(msg->payload); + + //info("Received type=[%s], msg-id=[%s], topic=[%s], url=[%s]",cloud_to_agent.type_id, cloud_to_agent.msg_id, cloud_to_agent.topic, cloud_to_agent.url); + +} + +void connect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + (void) obj; + (void) rc; + + info("Connection to cloud estabilished"); + + aclk_connection_initialized = 1; + _on_connect((void *) mosq); + + return; +} + + +void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + (void) obj; + (void) rc; + + info("Connection to cloud failed"); + // TODO: Keep the connection "alive" for now. The library will reconnect. + + //mqtt_connection_initialized = 0; + _on_disconnect((void *) mosq); + //sleep_usec(USEC_PER_SEC * 5); + return; +} + + +void _show_mqtt_info() +{ + int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + + info("Detected libmosquitto library version %d, %d.%d.%d",libmosq_version, libmosq_major, libmosq_minor, libmosq_revision); +} + +int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) +{ + int rc; + int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + char *ca_crt; + char *server_crt; + char *server_key; + + // show library info so can have in in the logfile + libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + ca_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link cert", "*"); + server_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link server cert", "*"); + server_key = config_get(CONFIG_SECTION_ACLK, "agent cloud link server key", "*"); + + + if (ca_crt[0] == '*') { + freez(ca_crt); + ca_crt = NULL; + } + + if (server_crt[0] == '*') { + freez(server_crt); + server_crt = NULL; + } + + if (server_key[0] == '*') { + freez(server_key); + server_key = NULL; + } + + info( + "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, + libmosq_revision); + + rc = mosquitto_lib_init(); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("Failed to initialize MQTT (libmosquitto library)"); + return 1; + } + + mosq = mosquitto_new("anon", true, NULL); + if (unlikely(!mosq)) { + mosquitto_lib_cleanup(); + error("MQTT new structure -- %s", mosquitto_strerror(errno)); + return 1; + } + + _on_connect = on_connect; + _on_disconnect = on_disconnect; + + mosquitto_connect_callback_set(mosq, connect_callback); + mosquitto_disconnect_callback_set(mosq, disconnect_callback); + + mosquitto_username_pw_set(mosq, "anon", "anon"); + + rc = mosquitto_threaded_set(mosq, 1); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc)); + +#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000 + rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc)); + + rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1); + info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); +#endif + + rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc)); + + mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL); + + rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Connect %s MQTT status = %d (%s)", aclk_hostname, rc, mosquitto_strerror(rc)); + else + info("Establishing MQTT link to %s", aclk_hostname); + + return rc; +} + +int _link_event_loop(int timeout) +{ + int rc; + + rc = mosquitto_loop(mosq, timeout, 1); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + errno = 0; + error("Loop error code %d (%s)", rc, mosquitto_strerror(rc)); + rc = mosquitto_reconnect(mosq); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("Reconnect loop error code %d (%s)", rc, mosquitto_strerror(rc)); + } + // TBD: Using delay + sleep_usec(USEC_PER_SEC * 10); + } + return rc; +} + +void _link_shutdown() +{ + int rc; + + rc = mosquitto_disconnect(mosq); + switch (rc) { + case MOSQ_ERR_SUCCESS: + info("MQTT disconnected from broker"); + break; + default: + info("MQTT invalid structure"); + break; + }; + + mosquitto_destroy(mosq); + mosq = NULL; + return; +} + + +int _link_subscribe(char *topic, int qos) +{ + int rc; + + if (unlikely(!mosq)) + return 1; + + mosquitto_message_callback_set(mosq, mqtt_message_callback); + + rc = mosquitto_subscribe(mosq, NULL, topic, qos); + if (unlikely(rc)) { + errno = 0; + error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc)); + return 1; + } + + return 0; +} + + +/* + * Send a message to the cloud to specific topic + * + */ + +int _link_send_message(char *topic, char *message) +{ + int rc; + + rc = mosquitto_pub_topic_check(topic); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + return rc; + + int msg_len = strlen(message); + + // TODO: handle encoding validation -- the message should be UFT8 encoded by the sender + //rc = mosquitto_validate_utf8(message, msg_len); + //if (unlikely(rc != MOSQ_ERR_SUCCESS)) + // return rc; + + rc = mosquitto_publish(mosq, NULL, topic, msg_len, message, ACLK_QOS, 0); + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("MQTT message failed : %s", mosquitto_strerror(rc)); + } + + return rc; +} +#endif \ No newline at end of file diff --git a/aclk/mqtt.h b/aclk/mqtt.h new file mode 100644 index 0000000000..2c7e120361 --- /dev/null +++ b/aclk/mqtt.h @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_MQTT_H +#define NETDATA_MQTT_H + +#ifdef ENABLE_ACLK +#include "mosquitto/lib/mosquitto.h" +#endif + +void _show_mqtt_info(); +int _link_event_loop(int timeout); +void _link_shutdown(); +int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)); +int _link_subscribe(char *topic, int qos); +int _link_send_message(char *topic, char *message); +const char *_link_strerror(int rc); + +extern int aclk_connection_initialized; +int aclk_handle_cloud_request(char *); + +#endif //NETDATA_MQTT_H diff --git a/claim/claim.c b/claim/claim.c index 75f0a437d6..72b6b42914 100644 --- a/claim/claim.c +++ b/claim/claim.c @@ -21,13 +21,12 @@ static char *claiming_errors[] = { "internal server error" // 12 }; -#define AGENT_UNCLAIMED 0 -#define AGENT_CLAIMED 1 -static uint8_t claiming_status = AGENT_UNCLAIMED; -uint8_t is_agent_claimed(void) +static char *claimed_id = NULL; + +char *is_agent_claimed(void) { - return (AGENT_CLAIMED == claiming_status); + return claimed_id; } #define CLAIMING_COMMAND_LENGTH 16384 @@ -64,8 +63,7 @@ void claim_agent(char *claiming_arguments) exit_code = mypclose(fp, command_pid); info("Agent claiming command returned with code %d", exit_code); if (0 == exit_code) { - claiming_status = AGENT_CLAIMED; - info("Agent successfully claimed."); + load_claiming_state(); return; } if (exit_code < 0) { @@ -85,19 +83,37 @@ void claim_agent(char *claiming_arguments) void load_claiming_state(void) { - info("The claiming feature is under development and still subject to change before the next release"); - return; + if (claimed_id != NULL) { + freez(claimed_id); + claimed_id = NULL; + } char filename[FILENAME_MAX + 1]; struct stat statbuf; - snprintfz(filename, FILENAME_MAX, "%s/claim.d/is_claimed", netdata_configured_user_config_dir); + snprintfz(filename, FILENAME_MAX, "%s/claim.d/claimed_id", netdata_configured_user_config_dir); + // check if the file exists if (lstat(filename, &statbuf) != 0) { - info("File '%s' was not found. Setting state to AGENT_UNCLAIMED.", filename); - claiming_status = AGENT_UNCLAIMED; - } else { - info("File '%s' was found. Setting state to AGENT_CLAIMED.", filename); - claiming_status = AGENT_CLAIMED; + info("lstat on File '%s' failed reason=\"%s\". Setting state to AGENT_UNCLAIMED.", filename, strerror(errno)); + return; } + if (unlikely(statbuf.st_size == 0)) { + info("File '%s' has no contents. Setting state to AGENT_UNCLAIMED.", filename); + return; + } + + FILE *f = fopen(filename, "rt"); + if (unlikely(f == NULL)) { + error("File '%s' cannot be opened. Setting state to AGENT_UNCLAIMED.", filename); + return; + } + + claimed_id = callocz(1, statbuf.st_size + 1); + size_t bytes_read = fread(claimed_id, 1, statbuf.st_size, f); + claimed_id[bytes_read] = 0; + info("File '%s' was found. Setting state to AGENT_CLAIMED.", filename); + fclose(f); + + snprintfz(filename, FILENAME_MAX, "%s/claim.d/private.pem", netdata_configured_user_config_dir); } diff --git a/claim/claim.h b/claim/claim.h index 3b2b867434..8eda3560b6 100644 --- a/claim/claim.h +++ b/claim/claim.h @@ -8,7 +8,7 @@ extern char *claiming_pending_arguments; void claim_agent(char *claiming_arguments); -uint8_t is_agent_claimed(void); +char *is_agent_claimed(void); void load_claiming_state(void); #endif //NETDATA_CLAIM_H diff --git a/claim/netdata-claim.sh.in b/claim/netdata-claim.sh.in index e565e3de28..42eb99d718 100755 --- a/claim/netdata-claim.sh.in +++ b/claim/netdata-claim.sh.in @@ -211,6 +211,6 @@ if [ "${HTTP_STATUS_CODE}" -ne 200 ] ; then fi rm -f "${CLAIMING_DIR}/tmpout.txt" -touch "${CLAIMING_DIR}/is_claimed" +echo -n "${ID}" >"${CLAIMING_DIR}/claimed_id" rm -f "${CLAIMING_DIR}/token" -echo >&2 "Agent was successfully claimed." \ No newline at end of file +echo >&2 "Agent was successfully claimed." diff --git a/configure.ac b/configure.ac index f407d777fc..174d456787 100644 --- a/configure.ac +++ b/configure.ac @@ -342,7 +342,6 @@ AC_DEFINE([NETDATA_WITH_UUID], [1], [uuid usability]) OPTIONAL_UUID_CFLAGS="${UUID_CFLAGS}" OPTIONAL_UUID_LIBS="${UUID_LIBS}" - # ----------------------------------------------------------------------------- # OpenSSL Cryptography and SSL/TLS Toolkit @@ -408,6 +407,19 @@ fi AC_MSG_RESULT([${enable_https}]) AM_CONDITIONAL([ENABLE_HTTPS], [test "${enable_https}" = "yes"]) +# ----------------------------------------------------------------------------- +# ACLK +AC_MSG_CHECKING([if netdata ACLK should be enabled]) +if test "${ACLK}" = "yes"; then + enable_aclk="yes" + AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK]) + CFLAGS="${CFLAGS} -DENABLE_ACLK" +else + enable_aclk="no" +fi +AC_MSG_RESULT([${enable_aclk}]) +AM_CONDITIONAL([ENABLE_ACLK], [test "${enable_aclk}" = "yes"]) + # ----------------------------------------------------------------------------- # Exporting engine AC_MSG_CHECKING([if netdata exporting engine should be used]) @@ -1146,6 +1158,7 @@ AC_SUBST([OPTIONAL_ZLIB_CFLAGS]) AC_SUBST([OPTIONAL_ZLIB_LIBS]) AC_SUBST([OPTIONAL_UUID_CFLAGS]) AC_SUBST([OPTIONAL_UUID_LIBS]) +AC_SUBST([OPTIONAL_MQTT_LIBS]) AC_SUBST([OPTIONAL_LIBCAP_CFLAGS]) AC_SUBST([OPTIONAL_LIBCAP_LIBS]) AC_SUBST([OPTIONAL_IPMIMONITORING_CFLAGS]) @@ -1284,6 +1297,7 @@ AC_CONFIG_FILES([ web/server/Makefile web/server/static/Makefile claim/Makefile + aclk/Makefile ]) AC_OUTPUT diff --git a/daemon/common.h b/daemon/common.h index 59dbc41a8d..ffe4145b51 100644 --- a/daemon/common.h +++ b/daemon/common.h @@ -63,6 +63,9 @@ // netdata agent claiming #include "claim/claim.h" +// netdata agent cloud link +#include "aclk/agent_cloud_link.h" + // the netdata deamon #include "daemon.h" #include "main.h" diff --git a/daemon/main.c b/daemon/main.c index 7d7ede083a..f019cc6832 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -73,6 +73,10 @@ struct netdata_static_thread static_threads[] = { NETDATA_PLUGIN_HOOK_IDLEJITTER NETDATA_PLUGIN_HOOK_STATSD +#ifdef ENABLE_ACLK + NETDATA_ACLK_HOOK +#endif + // common plugins for all systems {"BACKENDS", NULL, NULL, 1, NULL, NULL, backends_main}, #ifdef ENABLE_EXPORTING @@ -785,6 +789,7 @@ int get_system_info(struct rrdhost_system_info *system_info) { void send_statistics( const char *action, const char *action_result, const char *action_data) { static char *as_script; + if (netdata_anonymous_statistics_enabled == -1) { char *optout_file = mallocz(sizeof(char) * (strlen(netdata_configured_user_config_dir) +strlen(".opt-out-from-anonymous-statistics") + 2)); sprintf(optout_file, "%s/%s", netdata_configured_user_config_dir, ".opt-out-from-anonymous-statistics"); diff --git a/database/rrddim.c b/database/rrddim.c index 0032940ce7..a8b37ea3a2 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -183,6 +183,9 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) { } } } +#ifdef ENABLE_ACLK + aclk_update_chart(st->rrdhost, st->id); +#endif } RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier, collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode) { @@ -424,7 +427,9 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte } } rrdset_unlock(st); - +#ifdef ENABLE_ACLK + aclk_update_chart(host, st->id); +#endif return(rd); } @@ -478,6 +483,9 @@ void rrddim_free(RRDSET *st, RRDDIM *rd) freez(rd); break; } +#ifdef ENABLE_ACLK + aclk_update_chart(st->rrdhost, st->id); +#endif } @@ -496,6 +504,9 @@ int rrddim_hide(RRDSET *st, const char *id) { } rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN); +#ifdef ENABLE_ACLK + aclk_update_chart(st->rrdhost, st->id); +#endif return 0; } @@ -510,6 +521,9 @@ int rrddim_unhide(RRDSET *st, const char *id) { } rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN); +#ifdef ENABLE_ACLK + aclk_update_chart(st->rrdhost, st->id); +#endif return 0; } @@ -518,12 +532,18 @@ inline void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd) { rrddim_flag_set(rd, RRDDIM_FLAG_OBSOLETE); rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS); +#ifdef ENABLE_ACLK + aclk_update_chart(st->rrdhost, st->id); +#endif } inline void rrddim_isnot_obsolete(RRDSET *st __maybe_unused, RRDDIM *rd) { debug(D_RRD_CALLS, "rrddim_isnot_obsolete() for chart %s, dimension %s", st->name, rd->name); rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE); +#ifdef ENABLE_ACLK + aclk_update_chart(st->rrdhost, st->id); +#endif } // ---------------------------------------------------------------------------- diff --git a/database/rrdset.c b/database/rrdset.c index e9e1c1b8d5..32f923a103 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -761,7 +761,9 @@ RRDSET *rrdset_create_custom( rrdhost_cleanup_obsolete_charts(host); rrdhost_unlock(host); - +#ifdef ENABLE_ACLK + aclk_update_chart(host, st->id); +#endif return(st); } diff --git a/health/health_log.c b/health/health_log.c index 24ee538498..dcfb6a8d22 100644 --- a/health/health_log.c +++ b/health/health_log.c @@ -101,7 +101,6 @@ inline void health_label_log_save(RRDHOST *host) { inline void health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) { health_log_rotate(host); - if(likely(host->health_log_fp)) { if(unlikely(fprintf(host->health_log_fp , "%c\t%s" diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c index ccb1339927..fd111dc40d 100644 --- a/web/api/web_api_v1.c +++ b/web/api/web_api_v1.c @@ -793,13 +793,8 @@ inline void host_labels2json(RRDHOST *host, BUFFER *wb, size_t indentation) { rrdhost_unlock(host); } -inline int web_client_api_request_v1_info(RRDHOST *host, struct web_client *w, char *url) { - (void)url; - if (!netdata_ready) return HTTP_RESP_BACKEND_FETCH_FAILED; - BUFFER *wb = w->response.data; - buffer_flush(wb); - wb->contenttype = CT_APPLICATION_JSON; - +inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb) +{ buffer_strcat(wb, "{\n"); buffer_sprintf(wb, "\t\"version\": \"%s\",\n", host->program_version); buffer_sprintf(wb, "\t\"uid\": \"%s\",\n", host->machine_guid); @@ -849,6 +844,18 @@ inline int web_client_api_request_v1_info(RRDHOST *host, struct web_client *w, c buffer_strcat(wb, "\n\t]\n"); buffer_strcat(wb, "}"); + return 0; +} + +inline int web_client_api_request_v1_info(RRDHOST *host, struct web_client *w, char *url) { + (void)url; + if (!netdata_ready) return HTTP_RESP_BACKEND_FETCH_FAILED; + BUFFER *wb = w->response.data; + buffer_flush(wb); + wb->contenttype = CT_APPLICATION_JSON; + + web_client_api_request_v1_info_fill_buffer(host, wb); + buffer_no_cacheable(wb); return HTTP_RESP_OK; } diff --git a/web/api/web_api_v1.h b/web/api/web_api_v1.h index edeea98bed..ca853ceacd 100644 --- a/web/api/web_api_v1.h +++ b/web/api/web_api_v1.h @@ -23,7 +23,7 @@ extern int web_client_api_request_v1_data(RRDHOST *host, struct web_client *w, c extern int web_client_api_request_v1_registry(RRDHOST *host, struct web_client *w, char *url); extern int web_client_api_request_v1_info(RRDHOST *host, struct web_client *w, char *url); extern int web_client_api_request_v1(RRDHOST *host, struct web_client *w, char *url); - +extern int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb); extern void host_labels2json(RRDHOST *host, BUFFER *wb, size_t indentation); extern void web_client_api_v1_init(void); -- cgit v1.2.3