diff options
-rw-r--r-- | CMakeLists.txt | 12 | ||||
-rw-r--r-- | Makefile.am | 25 | ||||
-rw-r--r-- | aclk/Makefile.am | 18 | ||||
-rw-r--r-- | aclk/README.md | 1 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 995 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 103 | ||||
-rw-r--r-- | aclk/mqtt.c | 318 | ||||
-rw-r--r-- | aclk/mqtt.h | 21 | ||||
-rw-r--r-- | claim/claim.c | 46 | ||||
-rw-r--r-- | claim/claim.h | 2 | ||||
-rwxr-xr-x | claim/netdata-claim.sh.in | 4 | ||||
-rw-r--r-- | configure.ac | 16 | ||||
-rw-r--r-- | daemon/common.h | 3 | ||||
-rw-r--r-- | daemon/main.c | 5 | ||||
-rw-r--r-- | database/rrddim.c | 22 | ||||
-rw-r--r-- | database/rrdset.c | 4 | ||||
-rw-r--r-- | health/health_log.c | 1 | ||||
-rw-r--r-- | web/api/web_api_v1.c | 21 | ||||
-rw-r--r-- | web/api/web_api_v1.h | 2 |
19 files changed, 1587 insertions, 32 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f715aeecb4..d43cb8a1f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -339,8 +339,7 @@ set(LIBNETDATA_FILES add_library(libnetdata OBJECT ${LIBNETDATA_FILES}) set(APPS_PLUGIN_FILES - collectors/apps.plugin/apps_plugin.c - ) + collectors/apps.plugin/apps_plugin.c) set(CHECKS_PLUGIN_FILES collectors/checks.plugin/plugin_checks.c @@ -606,6 +605,14 @@ set(CLAIM_PLUGIN_FILES claim/claim.h ) +set(ACLK_PLUGIN_FILES + aclk/agent_cloud_link.c + aclk/agent_cloud_link.h + aclk/agent_cloud_link.c + aclk/mqtt.c + aclk/mqtt.h + ) + set(EXPORTING_ENGINE_FILES exporting/exporting_engine.c exporting/exporting_engine.h @@ -673,6 +680,7 @@ set(NETDATA_FILES ${STREAMING_PLUGIN_FILES} ${WEB_PLUGIN_FILES} ${CLAIM_PLUGIN_FILES} + ${ACLK_PLUGIN_FILES} ) set(NETDATACLI_FILES diff --git a/Makefile.am b/Makefile.am index 30c2eb67b5..483a9c22db 100644 --- a/Makefile.am +++ b/Makefile.am @@ -100,6 +100,7 @@ SUBDIRS += \ streaming \ web \ claim \ + aclk \ $(NULL) @@ -108,6 +109,7 @@ AM_CFLAGS = \ $(OPTIONAL_NFACCT_CFLAGS) \ $(OPTIONAL_ZLIB_CFLAGS) \ $(OPTIONAL_UUID_CFLAGS) \ + $(OPTIONAL_MQTT_CFLAGS) \ $(OPTIONAL_LIBCAP_LIBS) \ $(OPTIONAL_IPMIMONITORING_CFLAGS) \ $(OPTIONAL_CUPS_CFLAGS) \ @@ -456,6 +458,13 @@ CLAIM_PLUGIN_FILES = \ claim/claim.h \ $(NULL) +ACLK_PLUGIN_FILES = \ + aclk/agent_cloud_link.c \ + aclk/agent_cloud_link.h \ + aclk/mqtt.c \ + aclk/mqtt.h \ + $(NULL) + EXPORTING_ENGINE_FILES = \ exporting/exporting_engine.c \ exporting/exporting_engine.h \ @@ -526,6 +535,12 @@ NETDATA_FILES = \ $(CLAIM_PLUGIN_FILES) \ $(NULL) +if ENABLE_ACLK + NETDATA_FILES += \ + $(ACLK_PLUGIN_FILES) \ + $(NULL) +endif + if FREEBSD NETDATA_FILES += \ $(FREEBSD_PLUGIN_FILES) \ @@ -559,6 +574,7 @@ NETDATA_COMMON_LIBS = \ $(OPTIONAL_ZLIB_LIBS) \ $(OPTIONAL_SSL_LIBS) \ $(OPTIONAL_UUID_LIBS) \ + $(OPTIONAL_MQTT_LIBS) \ $(OPTIONAL_UV_LIBS) \ $(OPTIONAL_LZ4_LIBS) \ $(OPTIONAL_JUDY_LIBS) \ @@ -575,9 +591,18 @@ NETDATACLI_FILES = \ sbin_PROGRAMS += netdata netdata_SOURCES = $(NETDATA_FILES) + +if ENABLE_ACLK netdata_LDADD = \ + mosquitto/lib/libmosquitto.a \ $(NETDATA_COMMON_LIBS) \ $(NULL) +else +netdata_LDADD = \ + $(NETDATA_COMMON_LIBS) \ + $(NULL) +endif + if ENABLE_CXX_LINKER netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@ else diff --git a/aclk/Makefile.am b/aclk/Makefile.am new file mode 100644 index 0000000000..ebc2a18d95 --- /dev/null +++ b/aclk/Makefile.am @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +CLEANFILES = \ + $(NULL) + +include $(top_srcdir)/build/subst.inc +SUFFIXES = .in + +sbin_SCRIPTS = \ + $(NULL) + +dist_noinst_DATA = \ + README.md \ + $(NULL) + diff --git a/aclk/README.md b/aclk/README.md new file mode 100644 index 0000000000..287d3b2606 --- /dev/null +++ b/aclk/README.md @@ -0,0 +1 @@ +This is the agent cloud link (ACLK) information file
\ No newline at end of file diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c new file mode 100644 index 0000000000..dc90c6d14e --- /dev/null +++ b/aclk/agent_cloud_link.c @@ -0,0 +1,995 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" +#include "agent_cloud_link.h" + +// Read from the config file -- new section [agent_cloud_link] +// Defaults are supplied +int aclk_recv_maximum = 0; // default 20 +int aclk_send_maximum = 0; // default 20 + +int aclk_port = 0; // default 1883 +char *aclk_hostname = NULL; //default localhost +int aclk_subscribed = 0; + +int aclk_metadata_submitted = 0; +int waiting_init = 1; +int cmdpause = 0; // Used to pause query processing + +BUFFER *aclk_buffer = NULL; +char *global_base_topic = NULL; + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch(e->type) { + case JSON_OBJECT: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_ARRAY: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_STRING: + if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) { + data->topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_URL)) { + data->url = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) { + data->version = atol(e->data.string); + break; + } + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + +//char *send_http_request(char *host, char *port, char *url, BUFFER *b) +//{ +// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; +// +// buffer_flush(b); +// buffer_sprintf( +// b, +// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n", +// url, host); +// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout); +// +// if (unlikely(sock == -1)) { +// error("Handshake failed"); +// return NULL; +// } +// +// SSL_CTX *ctx = security_initialize_openssl_client(); +// // Certificate chain: not updating the stores - do we need private CA roots? +// // Calls to SSL_CTX_load_verify_locations would go here. +// SSL *ssl = SSL_new(ctx); +// SSL_set_fd(ssl, sock); +// int err = SSL_connect(ssl); +// SSL_write(ssl, b->buffer, b->len); // Timeout options? +// int bytes_read = SSL_read(ssl, b->buffer, b->len); +// SSL_shutdown(ssl); +// close(sock); +//} + +// Set when we have connection up and running from the connection callback +int aclk_connection_initialized = 0; + +static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; + +#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) +#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) + +#define QUERY_LOCK netdata_mutex_lock(&query_mutex) +#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; + +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait); +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) + + +struct aclk_query { + time_t created; + time_t run_after; // Delay run until after this time + char *topic; // Topic to respond to + char *data; // Internal data (NULL if request from the cloud) + char *msg_id; // msg_id generated by the cloud (NULL if internal) + char *query; // The actual query + u_char deleted; // Mark deleted for garbage collect + struct aclk_query *next; +}; + +struct aclk_query_queue { + struct aclk_query *aclk_query_head; + struct aclk_query *aclk_query_tail; + u_int64_t count; +} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; + +/* + * Free a query structure when done + */ + +void aclk_query_free(struct aclk_query *this_query) +{ + if (unlikely(!this_query)) + return; + + freez(this_query->topic); + freez(this_query->query); + if (this_query->data) + freez(this_query->data); + if (this_query->msg_id) + freez(this_query->msg_id); + freez(this_query); + return; +} + +// Returns the entry after which we need to create a new entry to run at the specified time +// If NULL is returned we need to add to HEAD +// Called with locked entries + +struct aclk_query *aclk_query_find_position(time_t time_to_run) +{ + struct aclk_query *tmp_query, *last_query; + + last_query = NULL; + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (tmp_query->run_after > time_to_run) + return last_query; + last_query = tmp_query; + tmp_query = tmp_query->next; + } + return last_query; +} + +// Need to have a lock before calling this +struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query) +{ + struct aclk_query *tmp_query; + + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (likely(!tmp_query->deleted)) { + if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) { + if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) + return tmp_query; + } + } + tmp_query = tmp_query->next; + } + return NULL; +} + +/* + * Add a query to execute, the result will be send to the specified topic + */ + +int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal) +{ + struct aclk_query *new_query, *tmp_query; + + // Ignore all commands while we wait for the agent to initialize + if (unlikely(waiting_init)) + return 0; + + run_after = now_realtime_sec() + run_after; + + QUERY_LOCK; + tmp_query = aclk_query_find(topic, data, msg_id, query); + if (unlikely(tmp_query)) { + if (tmp_query->run_after == run_after) { + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + tmp_query->deleted = 1; + } + + new_query = callocz(1, sizeof(struct aclk_query)); + if (internal) { + new_query->topic = strdupz(topic); + new_query->query = strdupz(query); + } else { + new_query->topic = topic; + new_query->query = query; + new_query->msg_id = msg_id; + } + + if (data) + new_query->data = strdupz(data); + + new_query->next = NULL; + new_query->created = now_realtime_sec(); + new_query->run_after = run_after; + + info("Added query (%s) (%s)", topic, query); + + tmp_query = aclk_query_find_position(run_after); + + if (tmp_query) { + new_query->next = tmp_query->next; + tmp_query->next = new_query; + if (tmp_query == aclk_queue.aclk_query_tail) + aclk_queue.aclk_query_tail = new_query; + aclk_queue.count++; + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + new_query->next = aclk_queue.aclk_query_head; + aclk_queue.aclk_query_head = new_query; + aclk_queue.count++; + + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + +// if (likely(aclk_queue.aclk_query_tail)) { +// aclk_queue.aclk_query_tail->next = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// +// if (likely(!aclk_queue.aclk_query_head)) { +// aclk_queue.aclk_query_head = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// QUERY_UNLOCK; +// return 0; +} + +inline int aclk_submit_request(struct aclk_request *request) +{ + return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0); +} + +/* + * Get the next query to process - NULL if nothing there + * The caller needs to free memory by calling aclk_query_free() + * + * topic + * query + * The structure itself + * + */ +struct aclk_query *aclk_queue_pop() +{ + struct aclk_query *this_query; + + QUERY_LOCK; + + if (likely(!aclk_queue.aclk_query_head)) { + QUERY_UNLOCK; + return NULL; + } + + this_query = aclk_queue.aclk_query_head; + + if (this_query->run_after > now_realtime_sec()) { + info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); + QUERY_UNLOCK; + return NULL; + } + + aclk_queue.count--; + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + QUERY_UNLOCK; + return this_query; +} + +// This will give the base topic that the agent will publish messages. +// subtopics will be sent under the base topic e.g. base_topic/subtopic +// This is called by aclk_init(), to compute the base topic once and have +// it stored internally. +// Need to check if additional logic should be added to make sure that there +// is enough information to determine the base topic at init time + +// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch +// that on the fly + +char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action) +{ + static char *topic = NULL; + + if (unlikely(!is_agent_claimed())) + return NULL; + + ACLK_LOCK; + + if (unlikely(action == PUBLICH_TOPIC_FREE)) { + if (likely(topic)) { + freez(topic); + topic = NULL; + } + + ACLK_UNLOCK; + + return NULL; + } + + if (unlikely(action == PUBLICH_TOPIC_REBUILD)) { + ACLK_UNLOCK; + get_publish_base_topic(PUBLICH_TOPIC_FREE); + return get_publish_base_topic(PUBLICH_TOPIC_GET); + } + + if (unlikely(!topic)) { + char tmp_topic[ACLK_MAX_TOPIC + 1]; + + sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); + topic = strdupz(tmp_topic); + } + + ACLK_UNLOCK; + return topic; +} + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + if (unlikely(!global_base_topic)) + global_base_topic = GET_PUBLISH_BASE_TOPIC; + + if (unlikely(!global_base_topic)) + return sub_topic; + + snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + + return final_topic; +} + +// Wait for ACLK connection to be established +int aclk_wait_for_initialization() +{ + if (unlikely(!aclk_connection_initialized)) { + time_t now = now_realtime_sec(); + + while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) { + sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT); + _link_event_loop(0); + } + + if (unlikely(!aclk_connection_initialized)) { + error("ACLK connection cannot be established"); + return 1; + } + } + return 0; +} + +/* + * This function will fetch the next pending command and process it + * + */ +int aclk_process_query() +{ + struct aclk_query *this_query; + static u_int64_t query_count = 0; + //int rc; + + if (unlikely(cmdpause)) + return 0; + + if (!aclk_connection_initialized) + return 0; + + this_query = aclk_queue_pop(); + if (likely(!this_query)) { + //info("No pending queries"); + return 0; + } + + if (unlikely(this_query->deleted)) { + info("Garbage collect query %s:%s", this_query->topic, this_query->query); + aclk_query_free(this_query); + return 1; + } + + query_count++; + info( + "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query, + (int) (now_realtime_sec() - this_query->created)); + + if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) { + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); + *mysep = '\0'; + } else + strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); + + mysep = strrchr(this_query->query, '/'); + + // TODO: ignore return code for now + web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + + //TODO: handle bad response perhaps in a different way. For now it does to the payload + //if (rc == HTTP_RESP_OK || 1) { + buffer_flush(aclk_buffer); + + aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data); + aclk_buffer->contenttype = CT_APPLICATION_JSON; + aclk_send_message(this_query->topic, aclk_buffer->buffer); + //} else + // error("Query RESP: %s", w->response.data->buffer); + + buffer_free(w->response.data); + freez(w); + aclk_query_free(this_query); + return 1; + } + + if (strcmp((char *)this_query->topic, "_chart") == 0) { + aclk_send_single_chart(this_query->data, this_query->query); + } + + aclk_query_free(this_query); + + return 1; +} + +// Launch a query processing thread + +/* + * Process all pending queries + * Return 0 if no queries were processed, 1 otherwise + * + */ + +int aclk_process_queries() +{ + if (unlikely(cmdpause)) + return 0; + + // Return if no queries pending + if (likely(!aclk_queue.count)) + return 0; + + info("Processing %d queries", (int ) aclk_queue.count); + + while (aclk_process_query()) { + //rc = _link_event_loop(0); + }; + + return 1; +} + +static void aclk_query_thread_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * MAin query processing thread + * + */ +void *aclk_query_main_thread(void *ptr) +{ + netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + + while (!netdata_exit) { + + QUERY_THREAD_LOCK; + + if (unlikely(!aclk_metadata_submitted)) { + aclk_send_metadata(); + aclk_metadata_submitted = 1; + } + + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + if (likely(aclk_connection_initialized && !netdata_exit)) { + while (aclk_process_queries()) { + // Sleep for a few ms and retry maybe we have something to process + // before going to sleep + // TODO: This needs improvement to avoid missed queries + sleep_usec(USEC_PER_MS * 100); + } + } + + QUERY_THREAD_UNLOCK; + + } // forever + info("Shutting down query processing thread"); + netdata_thread_cleanup_pop(1); + return NULL; +} + +// Thread cleanup +static void aclk_main_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + QUERY_THREAD_WAKEUP; + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + //netdata_thread_t *query_thread; + struct netdata_static_thread query_thread; + + memset(&query_thread, 0, sizeof(query_thread)); + + netdata_thread_cleanup_push(aclk_main_cleanup, ptr); + + if (unlikely(!aclk_buffer)) + aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + assert(aclk_buffer != NULL); + + //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + info("Waiting for netdata to be ready"); + while (!netdata_ready) { + sleep_usec(USEC_PER_MS * 300); + } + info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT); + sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT); + + // Ok mark we are ready to accept incoming requests + waiting_init = 0; + + while (!netdata_exit) { + // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds + // TODO: Handle the unclaim command as well -- we may need to shutdown the connection + if (likely(!is_agent_claimed())) { + sleep_usec(USEC_PER_SEC * 60); + info("Checking agent claiming status"); + continue; + } + + if (unlikely(!aclk_connection_initialized)) { + static int initializing = 0; + + if (likely(initializing)) { + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + initializing = 1; + info("Initializing connection"); + //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer); + if (unlikely(aclk_init(ACLK_INIT))) { + // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying. + sleep_usec(USEC_PER_SEC * 60); + continue; + } else { + sleep_usec(USEC_PER_SEC * 1); + } + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + + if (unlikely(!aclk_subscribed)) { + aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2); + } + if (unlikely(!query_thread.thread)) { + query_thread.thread = mallocz(sizeof(netdata_thread_t)); + netdata_thread_create( + query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + } + + //TODO: Check if there is a return code + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + + } // forever + aclk_shutdown(); + + netdata_thread_cleanup_pop(1); + return NULL; +} + +/* + * Send a message to the cloud, using a base topic and sib_topic + * The final topic will be in the form <base_topic>/<sub_topic> + * If base_topic is missing then the global_base_topic will be used (if available) + * + */ +int aclk_send_message(char *sub_topic, char *message) +{ + int rc; + static int skip_due_to_shutdown = 0; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + if (!aclk_connection_initialized) + return 0; + + if (unlikely(netdata_exit)) { + if (unlikely(!aclk_connection_initialized)) + return 1; + + ++skip_due_to_shutdown; + if (unlikely(!(skip_due_to_shutdown % 100))) + info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown); + return 1; + } + + if (unlikely(!message)) + return 0; + + if (unlikely(aclk_wait_for_initialization())) + return 1; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + ACLK_LOCK; + rc = _link_send_message(final_topic, message); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + + return rc; +} + +/* + * Subscribe to a topic in the cloud + * The final subscription will be in the form + * /agent/claim_id/<sub_topic> + */ +int aclk_subscribe(char *sub_topic, int qos) +{ + int rc; + //static char *global_base_topic = NULL; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + if (!aclk_connection_initialized) + return 0; + + if (unlikely(netdata_exit)) { + return 1; + } + + if (unlikely(aclk_wait_for_initialization())) + return 1; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + ACLK_LOCK; + rc = _link_subscribe(final_topic, qos); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + + return rc; +} + +// This is called from a callback when the link goes up +void aclk_connect(void *ptr) +{ + (void) ptr; + info("Connection detected"); + return; +} + +// This is called from a callback when the link goes down +void aclk_disconnect(void *ptr) +{ + (void) ptr; + info("Disconnect detected"); + aclk_subscribed = 0; + aclk_metadata_submitted = 0; +} + +void aclk_shutdown() +{ + info("Shutdown initiated"); + aclk_connection_initialized = 0; + _link_shutdown(); + info("Shutdown complete"); +} + +int aclk_init(ACLK_INIT_ACTION action) +{ + (void) action; + + static int init = 0; + int rc; + + if (likely(init)) + return 0; + + aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20); + aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20); + + aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost"); + aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883); + + info("Maximum parallel outgoing messages %d", aclk_send_maximum); + info("Maximum parallel incoming messages %d", aclk_recv_maximum); + + // This will setup the base publish topic internally + //get_publish_base_topic(PUBLICH_TOPIC_GET); + + // initialize the low level link to the cloud + rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect); + if (unlikely(rc)) { + error("Failed to initialize the agent cloud link library"); + return 1; + } + global_base_topic = GET_PUBLISH_BASE_TOPIC; + init = 1; + + return 0; +} + +// Use this to disable encoding of quotes and newlines so that +// MQTT subscriber can display more readable data on screen + +void aclk_create_header(BUFFER *dest, char *type, char *msg_id) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + buffer_sprintf( + dest, + "\t{\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"version\": %s,\n" + "\t\"payload\": ", + type, msg_id, ACLK_VERSION); +} + +#define |