summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt12
-rw-r--r--Makefile.am25
-rw-r--r--aclk/Makefile.am18
-rw-r--r--aclk/README.md1
-rw-r--r--aclk/agent_cloud_link.c995
-rw-r--r--aclk/agent_cloud_link.h103
-rw-r--r--aclk/mqtt.c318
-rw-r--r--aclk/mqtt.h21
-rw-r--r--claim/claim.c46
-rw-r--r--claim/claim.h2
-rwxr-xr-xclaim/netdata-claim.sh.in4
-rw-r--r--configure.ac16
-rw-r--r--daemon/common.h3
-rw-r--r--daemon/main.c5
-rw-r--r--database/rrddim.c22
-rw-r--r--database/rrdset.c4
-rw-r--r--health/health_log.c1
-rw-r--r--web/api/web_api_v1.c21
-rw-r--r--web/api/web_api_v1.h2
19 files changed, 1587 insertions, 32 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f715aeecb4..d43cb8a1f9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -339,8 +339,7 @@ set(LIBNETDATA_FILES
add_library(libnetdata OBJECT ${LIBNETDATA_FILES})
set(APPS_PLUGIN_FILES
- collectors/apps.plugin/apps_plugin.c
- )
+ collectors/apps.plugin/apps_plugin.c)
set(CHECKS_PLUGIN_FILES
collectors/checks.plugin/plugin_checks.c
@@ -606,6 +605,14 @@ set(CLAIM_PLUGIN_FILES
claim/claim.h
)
+set(ACLK_PLUGIN_FILES
+ aclk/agent_cloud_link.c
+ aclk/agent_cloud_link.h
+ aclk/agent_cloud_link.c
+ aclk/mqtt.c
+ aclk/mqtt.h
+ )
+
set(EXPORTING_ENGINE_FILES
exporting/exporting_engine.c
exporting/exporting_engine.h
@@ -673,6 +680,7 @@ set(NETDATA_FILES
${STREAMING_PLUGIN_FILES}
${WEB_PLUGIN_FILES}
${CLAIM_PLUGIN_FILES}
+ ${ACLK_PLUGIN_FILES}
)
set(NETDATACLI_FILES
diff --git a/Makefile.am b/Makefile.am
index 30c2eb67b5..483a9c22db 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -100,6 +100,7 @@ SUBDIRS += \
streaming \
web \
claim \
+ aclk \
$(NULL)
@@ -108,6 +109,7 @@ AM_CFLAGS = \
$(OPTIONAL_NFACCT_CFLAGS) \
$(OPTIONAL_ZLIB_CFLAGS) \
$(OPTIONAL_UUID_CFLAGS) \
+ $(OPTIONAL_MQTT_CFLAGS) \
$(OPTIONAL_LIBCAP_LIBS) \
$(OPTIONAL_IPMIMONITORING_CFLAGS) \
$(OPTIONAL_CUPS_CFLAGS) \
@@ -456,6 +458,13 @@ CLAIM_PLUGIN_FILES = \
claim/claim.h \
$(NULL)
+ACLK_PLUGIN_FILES = \
+ aclk/agent_cloud_link.c \
+ aclk/agent_cloud_link.h \
+ aclk/mqtt.c \
+ aclk/mqtt.h \
+ $(NULL)
+
EXPORTING_ENGINE_FILES = \
exporting/exporting_engine.c \
exporting/exporting_engine.h \
@@ -526,6 +535,12 @@ NETDATA_FILES = \
$(CLAIM_PLUGIN_FILES) \
$(NULL)
+if ENABLE_ACLK
+ NETDATA_FILES += \
+ $(ACLK_PLUGIN_FILES) \
+ $(NULL)
+endif
+
if FREEBSD
NETDATA_FILES += \
$(FREEBSD_PLUGIN_FILES) \
@@ -559,6 +574,7 @@ NETDATA_COMMON_LIBS = \
$(OPTIONAL_ZLIB_LIBS) \
$(OPTIONAL_SSL_LIBS) \
$(OPTIONAL_UUID_LIBS) \
+ $(OPTIONAL_MQTT_LIBS) \
$(OPTIONAL_UV_LIBS) \
$(OPTIONAL_LZ4_LIBS) \
$(OPTIONAL_JUDY_LIBS) \
@@ -575,9 +591,18 @@ NETDATACLI_FILES = \
sbin_PROGRAMS += netdata
netdata_SOURCES = $(NETDATA_FILES)
+
+if ENABLE_ACLK
netdata_LDADD = \
+ mosquitto/lib/libmosquitto.a \
$(NETDATA_COMMON_LIBS) \
$(NULL)
+else
+netdata_LDADD = \
+ $(NETDATA_COMMON_LIBS) \
+ $(NULL)
+endif
+
if ENABLE_CXX_LINKER
netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@
else
diff --git a/aclk/Makefile.am b/aclk/Makefile.am
new file mode 100644
index 0000000000..ebc2a18d95
--- /dev/null
+++ b/aclk/Makefile.am
@@ -0,0 +1,18 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+CLEANFILES = \
+ $(NULL)
+
+include $(top_srcdir)/build/subst.inc
+SUFFIXES = .in
+
+sbin_SCRIPTS = \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
diff --git a/aclk/README.md b/aclk/README.md
new file mode 100644
index 0000000000..287d3b2606
--- /dev/null
+++ b/aclk/README.md
@@ -0,0 +1 @@
+This is the agent cloud link (ACLK) information file \ No newline at end of file
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
new file mode 100644
index 0000000000..dc90c6d14e
--- /dev/null
+++ b/aclk/agent_cloud_link.c
@@ -0,0 +1,995 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
+#include "agent_cloud_link.h"
+
+// Read from the config file -- new section [agent_cloud_link]
+// Defaults are supplied
+int aclk_recv_maximum = 0; // default 20
+int aclk_send_maximum = 0; // default 20
+
+int aclk_port = 0; // default 1883
+char *aclk_hostname = NULL; //default localhost
+int aclk_subscribed = 0;
+
+int aclk_metadata_submitted = 0;
+int waiting_init = 1;
+int cmdpause = 0; // Used to pause query processing
+
+BUFFER *aclk_buffer = NULL;
+char *global_base_topic = NULL;
+
+int cloud_to_agent_parse(JSON_ENTRY *e)
+{
+ struct aclk_request *data = e->callback_data;
+
+ switch(e->type) {
+ case JSON_OBJECT:
+ e->callback_function = cloud_to_agent_parse;
+ break;
+ case JSON_ARRAY:
+ e->callback_function = cloud_to_agent_parse;
+ break;
+ case JSON_STRING:
+ if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
+ data->msg_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
+ data->type_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
+ data->topic = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
+ data->url = strdupz(e->data.string);
+ break;
+ }
+ break;
+ case JSON_NUMBER:
+ if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
+ data->version = atol(e->data.string);
+ break;
+ }
+ break;
+
+ case JSON_BOOLEAN:
+ break;
+
+ case JSON_NULL:
+ break;
+ }
+ return 0;
+}
+
+//char *send_http_request(char *host, char *port, char *url, BUFFER *b)
+//{
+// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
+//
+// buffer_flush(b);
+// buffer_sprintf(
+// b,
+// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
+// url, host);
+// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
+//
+// if (unlikely(sock == -1)) {
+// error("Handshake failed");
+// return NULL;
+// }
+//
+// SSL_CTX *ctx = security_initialize_openssl_client();
+// // Certificate chain: not updating the stores - do we need private CA roots?
+// // Calls to SSL_CTX_load_verify_locations would go here.
+// SSL *ssl = SSL_new(ctx);
+// SSL_set_fd(ssl, sock);
+// int err = SSL_connect(ssl);
+// SSL_write(ssl, b->buffer, b->len); // Timeout options?
+// int bytes_read = SSL_read(ssl, b->buffer, b->len);
+// SSL_shutdown(ssl);
+// close(sock);
+//}
+
+// Set when we have connection up and running from the connection callback
+int aclk_connection_initialized = 0;
+
+static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
+
+#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
+#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
+
+#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
+#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
+
+pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+
+#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
+#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+
+
+struct aclk_query {
+ time_t created;
+ time_t run_after; // Delay run until after this time
+ char *topic; // Topic to respond to
+ char *data; // Internal data (NULL if request from the cloud)
+ char *msg_id; // msg_id generated by the cloud (NULL if internal)
+ char *query; // The actual query
+ u_char deleted; // Mark deleted for garbage collect
+ struct aclk_query *next;
+};
+
+struct aclk_query_queue {
+ struct aclk_query *aclk_query_head;
+ struct aclk_query *aclk_query_tail;
+ u_int64_t count;
+} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
+
+/*
+ * Free a query structure when done
+ */
+
+void aclk_query_free(struct aclk_query *this_query)
+{
+ if (unlikely(!this_query))
+ return;
+
+ freez(this_query->topic);
+ freez(this_query->query);
+ if (this_query->data)
+ freez(this_query->data);
+ if (this_query->msg_id)
+ freez(this_query->msg_id);
+ freez(this_query);
+ return;
+}
+
+// Returns the entry after which we need to create a new entry to run at the specified time
+// If NULL is returned we need to add to HEAD
+// Called with locked entries
+
+struct aclk_query *aclk_query_find_position(time_t time_to_run)
+{
+ struct aclk_query *tmp_query, *last_query;
+
+ last_query = NULL;
+ tmp_query = aclk_queue.aclk_query_head;
+
+ while (tmp_query) {
+ if (tmp_query->run_after > time_to_run)
+ return last_query;
+ last_query = tmp_query;
+ tmp_query = tmp_query->next;
+ }
+ return last_query;
+}
+
+// Need to have a lock before calling this
+struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query)
+{
+ struct aclk_query *tmp_query;
+
+ tmp_query = aclk_queue.aclk_query_head;
+
+ while (tmp_query) {
+ if (likely(!tmp_query->deleted)) {
+ if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) {
+ if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
+ (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0)))
+ return tmp_query;
+ }
+ }
+ tmp_query = tmp_query->next;
+ }
+ return NULL;
+}
+
+/*
+ * Add a query to execute, the result will be send to the specified topic
+ */
+
+int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal)
+{
+ struct aclk_query *new_query, *tmp_query;
+
+ // Ignore all commands while we wait for the agent to initialize
+ if (unlikely(waiting_init))
+ return 0;
+
+ run_after = now_realtime_sec() + run_after;
+
+ QUERY_LOCK;
+ tmp_query = aclk_query_find(topic, data, msg_id, query);
+ if (unlikely(tmp_query)) {
+ if (tmp_query->run_after == run_after) {
+ QUERY_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+ tmp_query->deleted = 1;
+ }
+
+ new_query = callocz(1, sizeof(struct aclk_query));
+ if (internal) {
+ new_query->topic = strdupz(topic);
+ new_query->query = strdupz(query);
+ } else {
+ new_query->topic = topic;
+ new_query->query = query;
+ new_query->msg_id = msg_id;
+ }
+
+ if (data)
+ new_query->data = strdupz(data);
+
+ new_query->next = NULL;
+ new_query->created = now_realtime_sec();
+ new_query->run_after = run_after;
+
+ info("Added query (%s) (%s)", topic, query);
+
+ tmp_query = aclk_query_find_position(run_after);
+
+ if (tmp_query) {
+ new_query->next = tmp_query->next;
+ tmp_query->next = new_query;
+ if (tmp_query == aclk_queue.aclk_query_tail)
+ aclk_queue.aclk_query_tail = new_query;
+ aclk_queue.count++;
+ QUERY_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+
+ new_query->next = aclk_queue.aclk_query_head;
+ aclk_queue.aclk_query_head = new_query;
+ aclk_queue.count++;
+
+ QUERY_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+
+// if (likely(aclk_queue.aclk_query_tail)) {
+// aclk_queue.aclk_query_tail->next = new_query;
+// aclk_queue.aclk_query_tail = new_query;
+// aclk_queue.count++;
+// QUERY_UNLOCK;
+// return 0;
+// }
+//
+// if (likely(!aclk_queue.aclk_query_head)) {
+// aclk_queue.aclk_query_head = new_query;
+// aclk_queue.aclk_query_tail = new_query;
+// aclk_queue.count++;
+// QUERY_UNLOCK;
+// return 0;
+// }
+// QUERY_UNLOCK;
+// return 0;
+}
+
+inline int aclk_submit_request(struct aclk_request *request)
+{
+ return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0);
+}
+
+/*
+ * Get the next query to process - NULL if nothing there
+ * The caller needs to free memory by calling aclk_query_free()
+ *
+ * topic
+ * query
+ * The structure itself
+ *
+ */
+struct aclk_query *aclk_queue_pop()
+{
+ struct aclk_query *this_query;
+
+ QUERY_LOCK;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ QUERY_UNLOCK;
+ return NULL;
+ }
+
+ this_query = aclk_queue.aclk_query_head;
+
+ if (this_query->run_after > now_realtime_sec()) {
+ info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
+ QUERY_UNLOCK;
+ return NULL;
+ }
+
+ aclk_queue.count--;
+ aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ aclk_queue.aclk_query_tail = NULL;
+ }
+
+ QUERY_UNLOCK;
+ return this_query;
+}
+
+// This will give the base topic that the agent will publish messages.
+// subtopics will be sent under the base topic e.g. base_topic/subtopic
+// This is called by aclk_init(), to compute the base topic once and have
+// it stored internally.
+// Need to check if additional logic should be added to make sure that there
+// is enough information to determine the base topic at init time
+
+// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch
+// that on the fly
+
+char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action)
+{
+ static char *topic = NULL;
+
+ if (unlikely(!is_agent_claimed()))
+ return NULL;
+
+ ACLK_LOCK;
+
+ if (unlikely(action == PUBLICH_TOPIC_FREE)) {
+ if (likely(topic)) {
+ freez(topic);
+ topic = NULL;
+ }
+
+ ACLK_UNLOCK;
+
+ return NULL;
+ }
+
+ if (unlikely(action == PUBLICH_TOPIC_REBUILD)) {
+ ACLK_UNLOCK;
+ get_publish_base_topic(PUBLICH_TOPIC_FREE);
+ return get_publish_base_topic(PUBLICH_TOPIC_GET);
+ }
+
+ if (unlikely(!topic)) {
+ char tmp_topic[ACLK_MAX_TOPIC + 1];
+
+ sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
+ topic = strdupz(tmp_topic);
+ }
+
+ ACLK_UNLOCK;
+ return topic;
+}
+
+char *get_topic(char *sub_topic, char *final_topic, int max_size)
+{
+ if (unlikely(!global_base_topic))
+ global_base_topic = GET_PUBLISH_BASE_TOPIC;
+
+ if (unlikely(!global_base_topic))
+ return sub_topic;
+
+ snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
+
+ return final_topic;
+}
+
+// Wait for ACLK connection to be established
+int aclk_wait_for_initialization()
+{
+ if (unlikely(!aclk_connection_initialized)) {
+ time_t now = now_realtime_sec();
+
+ while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) {
+ sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT);
+ _link_event_loop(0);
+ }
+
+ if (unlikely(!aclk_connection_initialized)) {
+ error("ACLK connection cannot be established");
+ return 1;
+ }
+ }
+ return 0;
+}
+
+/*
+ * This function will fetch the next pending command and process it
+ *
+ */
+int aclk_process_query()
+{
+ struct aclk_query *this_query;
+ static u_int64_t query_count = 0;
+ //int rc;
+
+ if (unlikely(cmdpause))
+ return 0;
+
+ if (!aclk_connection_initialized)
+ return 0;
+
+ this_query = aclk_queue_pop();
+ if (likely(!this_query)) {
+ //info("No pending queries");
+ return 0;
+ }
+
+ if (unlikely(this_query->deleted)) {
+ info("Garbage collect query %s:%s", this_query->topic, this_query->query);
+ aclk_query_free(this_query);
+ return 1;
+ }
+
+ query_count++;
+ info(
+ "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query,
+ (int) (now_realtime_sec() - this_query->created));
+
+ if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) {
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = 0x1f;
+
+ char *mysep = strchr(this_query->query, '?');
+ if (mysep) {
+ strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
+ *mysep = '\0';
+ } else
+ strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
+
+ mysep = strrchr(this_query->query, '/');
+
+ // TODO: ignore return code for now
+ web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+
+ //TODO: handle bad response perhaps in a different way. For now it does to the payload
+ //if (rc == HTTP_RESP_OK || 1) {
+ buffer_flush(aclk_buffer);
+
+ aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data);
+ aclk_buffer->contenttype = CT_APPLICATION_JSON;
+ aclk_send_message(this_query->topic, aclk_buffer->buffer);
+ //} else
+ // error("Query RESP: %s", w->response.data->buffer);
+
+ buffer_free(w->response.data);
+ freez(w);
+ aclk_query_free(this_query);
+ return 1;
+ }
+
+ if (strcmp((char *)this_query->topic, "_chart") == 0) {
+ aclk_send_single_chart(this_query->data, this_query->query);
+ }
+
+ aclk_query_free(this_query);
+
+ return 1;
+}
+
+// Launch a query processing thread
+
+/*
+ * Process all pending queries
+ * Return 0 if no queries were processed, 1 otherwise
+ *
+ */
+
+int aclk_process_queries()
+{
+ if (unlikely(cmdpause))
+ return 0;
+
+ // Return if no queries pending
+ if (likely(!aclk_queue.count))
+ return 0;
+
+ info("Processing %d queries", (int ) aclk_queue.count);
+
+ while (aclk_process_query()) {
+ //rc = _link_event_loop(0);
+ };
+
+ return 1;
+}
+
+static void aclk_query_thread_cleanup(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ info("cleaning up...");
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+/**
+ * MAin query processing thread
+ *
+ */
+void *aclk_query_main_thread(void *ptr)
+{
+ netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
+
+ while (!netdata_exit) {
+
+ QUERY_THREAD_LOCK;
+
+ if (unlikely(!aclk_metadata_submitted)) {
+ aclk_send_metadata();
+ aclk_metadata_submitted = 1;
+ }
+
+ if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ sleep_usec(USEC_PER_SEC * 1);
+
+ if (likely(aclk_connection_initialized && !netdata_exit)) {
+ while (aclk_process_queries()) {
+ // Sleep for a few ms and retry maybe we have something to process
+ // before going to sleep
+ // TODO: This needs improvement to avoid missed queries
+ sleep_usec(USEC_PER_MS * 100);
+ }
+ }
+
+ QUERY_THREAD_UNLOCK;
+
+ } // forever
+ info("Shutting down query processing thread");
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+// Thread cleanup
+static void aclk_main_cleanup(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ info("cleaning up...");
+
+ QUERY_THREAD_WAKEUP;
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+/**
+ * Main agent cloud link thread
+ *
+ * This thread will simply call the main event loop that handles
+ * pending requests - both inbound and outbound
+ *
+ * @param ptr is a pointer to the netdata_static_thread structure.
+ *
+ * @return It always returns NULL
+ */
+void *aclk_main(void *ptr)
+{
+ //netdata_thread_t *query_thread;
+ struct netdata_static_thread query_thread;
+
+ memset(&query_thread, 0, sizeof(query_thread));
+
+ netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
+
+ if (unlikely(!aclk_buffer))
+ aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+
+ assert(aclk_buffer != NULL);
+
+ //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
+ //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
+ info("Waiting for netdata to be ready");
+ while (!netdata_ready) {
+ sleep_usec(USEC_PER_MS * 300);
+ }
+ info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT);
+ sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT);
+
+ // Ok mark we are ready to accept incoming requests
+ waiting_init = 0;
+
+ while (!netdata_exit) {
+ // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds
+ // TODO: Handle the unclaim command as well -- we may need to shutdown the connection
+ if (likely(!is_agent_claimed())) {
+ sleep_usec(USEC_PER_SEC * 60);
+ info("Checking agent claiming status");
+ continue;
+ }
+
+ if (unlikely(!aclk_connection_initialized)) {
+ static int initializing = 0;
+
+ if (likely(initializing)) {
+ _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+ continue;
+ }
+ initializing = 1;
+ info("Initializing connection");
+ //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer);
+ if (unlikely(aclk_init(ACLK_INIT))) {
+ // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying.
+ sleep_usec(USEC_PER_SEC * 60);
+ continue;
+ } else {
+ sleep_usec(USEC_PER_SEC * 1);
+ }
+ _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+ continue;
+ }
+
+ if (unlikely(!aclk_subscribed)) {
+ aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
+ }
+ if (unlikely(!query_thread.thread)) {
+ query_thread.thread = mallocz(sizeof(netdata_thread_t));
+ netdata_thread_create(
+ query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
+ }
+
+ //TODO: Check if there is a return code
+ _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+
+ } // forever
+ aclk_shutdown();
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+/*
+ * Send a message to the cloud, using a base topic and sib_topic
+ * The final topic will be in the form <base_topic>/<sub_topic>
+ * If base_topic is missing then the global_base_topic will be used (if available)
+ *
+ */
+int aclk_send_message(char *sub_topic, char *message)
+{
+ int rc;
+ static int skip_due_to_shutdown = 0;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ if (!aclk_connection_initialized)
+ return 0;
+
+ if (unlikely(netdata_exit)) {
+ if (unlikely(!aclk_connection_initialized))
+ return 1;
+
+ ++skip_due_to_shutdown;
+ if (unlikely(!(skip_due_to_shutdown % 100)))
+ info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown);
+ return 1;
+ }
+
+ if (unlikely(!message))
+ return 0;
+
+ if (unlikely(aclk_wait_for_initialization()))
+ return 1;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+
+ ACLK_LOCK;
+ rc = _link_send_message(final_topic, message);
+ ACLK_UNLOCK;
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc))
+ error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
+
+ return rc;
+}
+
+/*
+ * Subscribe to a topic in the cloud
+ * The final subscription will be in the form
+ * /agent/claim_id/<sub_topic>
+ */
+int aclk_subscribe(char *sub_topic, int qos)
+{
+ int rc;
+ //static char *global_base_topic = NULL;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ if (!aclk_connection_initialized)
+ return 0;
+
+ if (unlikely(netdata_exit)) {
+ return 1;
+ }
+
+ if (unlikely(aclk_wait_for_initialization()))
+ return 1;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+
+ ACLK_LOCK;
+ rc = _link_subscribe(final_topic, qos);
+ ACLK_UNLOCK;
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc))
+ error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
+
+ return rc;
+}
+
+// This is called from a callback when the link goes up
+void aclk_connect(void *ptr)
+{
+ (void) ptr;
+ info("Connection detected");
+ return;
+}
+
+// This is called from a callback when the link goes down
+void aclk_disconnect(void *ptr)
+{
+ (void) ptr;
+ info("Disconnect detected");
+ aclk_subscribed = 0;
+ aclk_metadata_submitted = 0;
+}
+
+void aclk_shutdown()
+{
+ info("Shutdown initiated");
+ aclk_connection_initialized = 0;
+ _link_shutdown();
+ info("Shutdown complete");
+}
+
+int aclk_init(ACLK_INIT_ACTION action)
+{
+ (void) action;
+
+ static int init = 0;
+ int rc;
+
+ if (likely(init))
+ return 0;
+
+ aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20);
+ aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);
+
+ aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
+ aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883);
+
+ info("Maximum parallel outgoing messages %d", aclk_send_maximum);
+ info("Maximum parallel incoming messages %d", aclk_recv_maximum);
+
+ // This will setup the base publish topic internally
+ //get_publish_base_topic(PUBLICH_TOPIC_GET);
+
+ // initialize the low level link to the cloud
+ rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect);
+ if (unlikely(rc)) {
+ error("Failed to initialize the agent cloud link library");
+ return 1;
+ }
+ global_base_topic = GET_PUBLISH_BASE_TOPIC;
+ init = 1;
+
+ return 0;
+}
+
+// Use this to disable encoding of quotes and newlines so that
+// MQTT subscriber can display more readable data on screen
+
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
+{
+ uuid_t uuid;
+ char uuid_str[36 + 1];
+
+ if (unlikely(!msg_id)) {
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+ msg_id = uuid_str;
+ }
+
+ buffer_sprintf(
+ dest,
+ "\t{\"type\": \"%s\",\n"
+ "\t\"msg-id\": \"%s\",\n"
+ "\t\"version\": %s,\n"
+ "\t\"payload\": ",
+ type, msg_id, ACLK_VERSION);
+}
+
+#define