summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2020-02-06 17:58:51 +0200
committerGitHub <noreply@github.com>2020-02-06 17:58:51 +0200
commitb2b3c182548fe81e6d1c9a599b2571dabfdabcaa (patch)
treebe40bfd7605ebf2b453114e2d904a0252abdfc21 /aclk
parent9f6c2556ecbbcfeab5049b3299094796457c97b1 (diff)
ACLK agent 1 (#7894)
* - Add initial mqtt support * [WIP] Agent cloud link - Setup main mqtt thread to connect to a broker using V5 of the MQTT protocol (TBD) - Send alarms to "netdata/alarm" - Add error checks to handle connection failures - Add params for Broker, port Maximum concurrent sent / recev messages - Dummy function to check claiming status - Generic mqtt_send command to publish message to a base topic , sub topic It will end up in the form base_topic/sub_topic - Add host/port in the connection failure error message * Test libmosquitto libs * connect to broker locally (assume localhost:1883) * subscribe to channel netdata/command * Test try a reload command to trigger health reload * publish alerts to netdata/alarm * - Fix compile issues * - Use sleep_usec instead of usleep * - Delay reconnection on failure due to misconfiguration (high cpu usage) * - Remove the TLS connection config * - Fix NETDATA_MQTT_INITIALIZATION_SLEEP_WAIT to use seconds * - Gather ACLK related code under aclk folder - Add aclk_ functions for abstract layer - Moved low level libs intergration in mqtt.c * - Add README.md file with initial comment * - Clean MQTT v5 * - Code cleanup * - Remove alarm log for now - Remove the heart beat * - Remove message properties for V5 * - Remove message properties for V5 (header) * Fixed the netdata target to use a local static version of libmosquitto. The installer does not yet have steps to pull and build the local library. cd project_root git clone ssh://git@github.com/netdata/mosquitto mosquitto/ (cd mosquitto/lib && make) # Ignore the cpp error This will leave mosquitto/lib/libmosquitto.a for the build process to use. * - Fix compile issues with older < 1.6 libmosquitto lib * - Enable alarm events to check it works - Re arrange includes - Rework topic to be agent/guid/. Actual id will be returned by the is_agent_claimed * - Add initial metadata info - Added helper function in web_api - Added a debug command (info) * Update the claiming state to retrieve the claimed id. * - Use define for constants like command and metadata topics - Function to wait for initialization of the ACLK link - New aclk_subscribe command with QOS parameter for the mqtt subscription - Use the is_agent_claimed function to get the real claim id and use it to build the topics that will be used for the cloud communication - Change in netdata-claim.sh.in to write the claim id without a trailing \n * - Use define for constants like command and metadata topics - Function to wait for initialization of the ACLK link - New aclk_subscribe command with QOS parameter for the mqtt subscription - Use the is_agent_claimed function to get the real claim id and use it to build the topics that will be used for the cloud communication - Change in netdata-claim.sh.in to write the claim id without a trailing \n * - Remove the alarm log for now - Add code (but disabled) to send charts * - Use dummy anon, anon as username and password for testing purposes * - Use client id anon as well * Testing without TLS * Switching TLS back on to fix docker environment. * - Added query processing An incoming URL now calls web_client_api_request_v1_data to handle a request and push the results back to the "data" topic - Move the above processing from the message callback to the query handle loop - Added helper "pause" , "resume" commands to stop and resume query processing to stress test loading the queue with queries before executing them - Changed the endpoint topics to "meta", and "cmd" (previously metadata and command) * make info message follow protocol * move metadata msg generation into new func * move metadata msg generation into new func * - Add metadata to the responses - Add hook to queue chart changes on creation and dimensions - Changed the queue mechanism to include delay for X seconds - Add delayed submittion of charts to the cloud so that all DIMs are defined to avoid resubmission * - Add additional data info for aclk_queue command * - Use web_clinet_api_request_v1 to handle the incoming request This will handle all requests coming from the cloud * - Cleanup and aclk_query structure - Add msg_id parameter - Enable the incoming JSON request - Enable the outgoing JSON response * - Added new thread to handle query processing - Add lock and cond wait to wakeup thread when queries are submitted - Cleanup on the main init function * - Add wait time on agent init, to allow for chart, alarms and other definitions to be completed. - During the wait time, no queries will be queued * - Send metadata on query thread init - New generic create header function for the JSON response - Pack info and charts into one message - Modified chart to remove entries (test) - Modified charts mod to remove entries e.g alarms and volatile info - Change input to aclk_update_chart (RRDHOST / instead of hostname) * - When a request fails, add to the payload - We may need to handle in a different key - Error check in json parsing * - Add dummy aclk_update_alarm command * - Move incoming request JSON parsing code away from mqtt.c - Added #ifdef ACLK_ENABLE so that we can have code merged but disabled by default - Added version in incoming and outgoing JSON dict * - Disable code if ACLK_ENABLE is not defined - Remove references to the mqtt (mosquitto) lib - Add dummy stubs in mqtt.c for completeness if ACLK_ENABLE is not defined * - Disable challenge sample code for now * - Remove libmosquitto from makefile * - Fix spaces in Makefile.am - Remove ifdef to avoid warning from LGTM * - Remove for now the code that builds an along log test message to send to the cloud * - Add check for ACLK_ENABLE definition and avoid calling the chart update functions * - Remove commented code * - Move source files to the correct place (ACLK_PLUGIN_FILES) * - Remove include file thats not needed * - Remove include file thats not needed - Add improved checks for load_claiming_state() * - Fix error message. Used error() that also logs errno and message * - Fix some codacy issues * - Fix more codacy issues, code cleanup * - Revert code to address codacy warnings * - Revert spaces added in a previous commit by mistake * clean up if/else nest * print error if fopen fails * minor - error already logs errno * - Fix version formatting * - Cleanup all ACLK related compiler warnings - Re-arrange include files - Removed unused defines * - More compilation warnings fixed - Bug with thread creation fixed * - Add condition to skip compilation of the ACLK code entirely. Add env variable ACLK="yes" to enable * - Add condition to skip the libmosquitto * - Change feature flag from ACLK_ENABLE to ENABLE_ACLK in accordance with the rest of ENABLE_xx flags - Typo in info message fix Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com> Co-authored-by: Timo <6674623+underhood@users.noreply.github.com>
Diffstat (limited to 'aclk')
-rw-r--r--aclk/Makefile.am18
-rw-r--r--aclk/README.md1
-rw-r--r--aclk/agent_cloud_link.c995
-rw-r--r--aclk/agent_cloud_link.h103
-rw-r--r--aclk/mqtt.c318
-rw-r--r--aclk/mqtt.h21
6 files changed, 1456 insertions, 0 deletions
diff --git a/aclk/Makefile.am b/aclk/Makefile.am
new file mode 100644
index 0000000000..ebc2a18d95
--- /dev/null
+++ b/aclk/Makefile.am
@@ -0,0 +1,18 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+CLEANFILES = \
+ $(NULL)
+
+include $(top_srcdir)/build/subst.inc
+SUFFIXES = .in
+
+sbin_SCRIPTS = \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
diff --git a/aclk/README.md b/aclk/README.md
new file mode 100644
index 0000000000..287d3b2606
--- /dev/null
+++ b/aclk/README.md
@@ -0,0 +1 @@
+This is the agent cloud link (ACLK) information file \ No newline at end of file
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
new file mode 100644
index 0000000000..dc90c6d14e
--- /dev/null
+++ b/aclk/agent_cloud_link.c
@@ -0,0 +1,995 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
+#include "agent_cloud_link.h"
+
+// Read from the config file -- new section [agent_cloud_link]
+// Defaults are supplied
+int aclk_recv_maximum = 0; // default 20
+int aclk_send_maximum = 0; // default 20
+
+int aclk_port = 0; // default 1883
+char *aclk_hostname = NULL; //default localhost
+int aclk_subscribed = 0;
+
+int aclk_metadata_submitted = 0;
+int waiting_init = 1;
+int cmdpause = 0; // Used to pause query processing
+
+BUFFER *aclk_buffer = NULL;
+char *global_base_topic = NULL;
+
+int cloud_to_agent_parse(JSON_ENTRY *e)
+{
+ struct aclk_request *data = e->callback_data;
+
+ switch(e->type) {
+ case JSON_OBJECT:
+ e->callback_function = cloud_to_agent_parse;
+ break;
+ case JSON_ARRAY:
+ e->callback_function = cloud_to_agent_parse;
+ break;
+ case JSON_STRING:
+ if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
+ data->msg_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
+ data->type_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
+ data->topic = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
+ data->url = strdupz(e->data.string);
+ break;
+ }
+ break;
+ case JSON_NUMBER:
+ if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
+ data->version = atol(e->data.string);
+ break;
+ }
+ break;
+
+ case JSON_BOOLEAN:
+ break;
+
+ case JSON_NULL:
+ break;
+ }
+ return 0;
+}
+
+//char *send_http_request(char *host, char *port, char *url, BUFFER *b)
+//{
+// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
+//
+// buffer_flush(b);
+// buffer_sprintf(
+// b,
+// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
+// url, host);
+// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
+//
+// if (unlikely(sock == -1)) {
+// error("Handshake failed");
+// return NULL;
+// }
+//
+// SSL_CTX *ctx = security_initialize_openssl_client();
+// // Certificate chain: not updating the stores - do we need private CA roots?
+// // Calls to SSL_CTX_load_verify_locations would go here.
+// SSL *ssl = SSL_new(ctx);
+// SSL_set_fd(ssl, sock);
+// int err = SSL_connect(ssl);
+// SSL_write(ssl, b->buffer, b->len); // Timeout options?
+// int bytes_read = SSL_read(ssl, b->buffer, b->len);
+// SSL_shutdown(ssl);
+// close(sock);
+//}
+
+// Set when we have connection up and running from the connection callback
+int aclk_connection_initialized = 0;
+
+static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
+
+#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
+#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
+
+#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
+#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
+
+pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+
+#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
+#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+
+
+struct aclk_query {
+ time_t created;
+ time_t run_after; // Delay run until after this time
+ char *topic; // Topic to respond to
+ char *data; // Internal data (NULL if request from the cloud)
+ char *msg_id; // msg_id generated by the cloud (NULL if internal)
+ char *query; // The actual query
+ u_char deleted; // Mark deleted for garbage collect
+ struct aclk_query *next;
+};
+
+struct aclk_query_queue {
+ struct aclk_query *aclk_query_head;
+ struct aclk_query *aclk_query_tail;
+ u_int64_t count;
+} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
+
+/*
+ * Free a query structure when done
+ */
+
+void aclk_query_free(struct aclk_query *this_query)
+{
+ if (unlikely(!this_query))
+ return;
+
+ freez(this_query->topic);
+ freez(this_query->query);
+ if (this_query->data)
+ freez(this_query->data);
+ if (this_query->msg_id)
+ freez(this_query->msg_id);
+ freez(this_query);
+ return;
+}
+
+// Returns the entry after which we need to create a new entry to run at the specified time
+// If NULL is returned we need to add to HEAD
+// Called with locked entries
+
+struct aclk_query *aclk_query_find_position(time_t time_to_run)
+{
+ struct aclk_query *tmp_query, *last_query;
+
+ last_query = NULL;
+ tmp_query = aclk_queue.aclk_query_head;
+
+ while (tmp_query) {
+ if (tmp_query->run_after > time_to_run)
+ return last_query;
+ last_query = tmp_query;
+ tmp_query = tmp_query->next;
+ }
+ return last_query;
+}
+
+// Need to have a lock before calling this
+struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query)
+{
+ struct aclk_query *tmp_query;
+
+ tmp_query = aclk_queue.aclk_query_head;
+
+ while (tmp_query) {
+ if (likely(!tmp_query->deleted)) {
+ if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) {
+ if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
+ (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0)))
+ return tmp_query;
+ }
+ }
+ tmp_query = tmp_query->next;
+ }
+ return NULL;
+}
+
+/*
+ * Add a query to execute, the result will be send to the specified topic
+ */
+
+int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal)
+{
+ struct aclk_query *new_query, *tmp_query;
+
+ // Ignore all commands while we wait for the agent to initialize
+ if (unlikely(waiting_init))
+ return 0;
+
+ run_after = now_realtime_sec() + run_after;
+
+ QUERY_LOCK;
+ tmp_query = aclk_query_find(topic, data, msg_id, query);
+ if (unlikely(tmp_query)) {
+ if (tmp_query->run_after == run_after) {
+ QUERY_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+ tmp_query->deleted = 1;
+ }
+
+ new_query = callocz(1, sizeof(struct aclk_query));
+ if (internal) {
+ new_query->topic = strdupz(topic);
+ new_query->query = strdupz(query);
+ } else {
+ new_query->topic = topic;
+ new_query->query = query;
+ new_query->msg_id = msg_id;
+ }
+
+ if (data)
+ new_query->data = strdupz(data);
+
+ new_query->next = NULL;
+ new_query->created = now_realtime_sec();
+ new_query->run_after = run_after;
+
+ info("Added query (%s) (%s)", topic, query);
+
+ tmp_query = aclk_query_find_position(run_after);
+
+ if (tmp_query) {
+ new_query->next = tmp_query->next;
+ tmp_query->next = new_query;
+ if (tmp_query == aclk_queue.aclk_query_tail)
+ aclk_queue.aclk_query_tail = new_query;
+ aclk_queue.count++;
+ QUERY_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+
+ new_query->next = aclk_queue.aclk_query_head;
+ aclk_queue.aclk_query_head = new_query;
+ aclk_queue.count++;
+
+ QUERY_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+
+// if (likely(aclk_queue.aclk_query_tail)) {
+// aclk_queue.aclk_query_tail->next = new_query;
+// aclk_queue.aclk_query_tail = new_query;
+// aclk_queue.count++;
+// QUERY_UNLOCK;
+// return 0;
+// }
+//
+// if (likely(!aclk_queue.aclk_query_head)) {
+// aclk_queue.aclk_query_head = new_query;
+// aclk_queue.aclk_query_tail = new_query;
+// aclk_queue.count++;
+// QUERY_UNLOCK;
+// return 0;
+// }
+// QUERY_UNLOCK;
+// return 0;
+}
+
+inline int aclk_submit_request(struct aclk_request *request)
+{
+ return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0);
+}
+
+/*
+ * Get the next query to process - NULL if nothing there
+ * The caller needs to free memory by calling aclk_query_free()
+ *
+ * topic
+ * query
+ * The structure itself
+ *
+ */
+struct aclk_query *aclk_queue_pop()
+{
+ struct aclk_query *this_query;
+
+ QUERY_LOCK;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ QUERY_UNLOCK;
+ return NULL;
+ }
+
+ this_query = aclk_queue.aclk_query_head;
+
+ if (this_query->run_after > now_realtime_sec()) {
+ info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
+ QUERY_UNLOCK;
+ return NULL;
+ }
+
+ aclk_queue.count--;
+ aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ aclk_queue.aclk_query_tail = NULL;
+ }
+
+ QUERY_UNLOCK;
+ return this_query;
+}
+
+// This will give the base topic that the agent will publish messages.
+// subtopics will be sent under the base topic e.g. base_topic/subtopic
+// This is called by aclk_init(), to compute the base topic once and have
+// it stored internally.
+// Need to check if additional logic should be added to make sure that there
+// is enough information to determine the base topic at init time
+
+// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch
+// that on the fly
+
+char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action)
+{
+ static char *topic = NULL;
+
+ if (unlikely(!is_agent_claimed()))
+ return NULL;
+
+ ACLK_LOCK;
+
+ if (unlikely(action == PUBLICH_TOPIC_FREE)) {
+ if (likely(topic)) {
+ freez(topic);
+ topic = NULL;
+ }
+
+ ACLK_UNLOCK;
+
+ return NULL;
+ }
+
+ if (unlikely(action == PUBLICH_TOPIC_REBUILD)) {
+ ACLK_UNLOCK;
+ get_publish_base_topic(PUBLICH_TOPIC_FREE);
+ return get_publish_base_topic(PUBLICH_TOPIC_GET);
+ }
+
+ if (unlikely(!topic)) {
+ char tmp_topic[ACLK_MAX_TOPIC + 1];
+
+ sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
+ topic = strdupz(tmp_topic);
+ }
+
+ ACLK_UNLOCK;
+ return topic;
+}
+
+char *get_topic(char *sub_topic, char *final_topic, int max_size)
+{
+ if (unlikely(!global_base_topic))
+ global_base_topic = GET_PUBLISH_BASE_TOPIC;
+
+ if (unlikely(!global_base_topic))
+ return sub_topic;
+
+ snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
+
+ return final_topic;
+}
+
+// Wait for ACLK connection to be established
+int aclk_wait_for_initialization()
+{
+ if (unlikely(!aclk_connection_initialized)) {
+ time_t now = now_realtime_sec();
+
+ while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) {
+ sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT);
+ _link_event_loop(0);
+ }
+
+ if (unlikely(!aclk_connection_initialized)) {
+ error("ACLK connection cannot be established");
+ return 1;
+ }
+ }
+ return 0;
+}
+
+/*
+ * This function will fetch the next pending command and process it
+ *
+ */
+int aclk_process_query()
+{
+ struct aclk_query *this_query;
+ static u_int64_t query_count = 0;
+ //int rc;
+
+ if (unlikely(cmdpause))
+ return 0;
+
+ if (!aclk_connection_initialized)
+ return 0;
+
+ this_query = aclk_queue_pop();
+ if (likely(!this_query)) {
+ //info("No pending queries");
+ return 0;
+ }
+
+ if (unlikely(this_query->deleted)) {
+ info("Garbage collect query %s:%s", this_query->topic, this_query->query);
+ aclk_query_free(this_query);
+ return 1;
+ }
+
+ query_count++;
+ info(
+ "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query,
+ (int) (now_realtime_sec() - this_query->created));
+
+ if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) {
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = 0x1f;
+
+ char *mysep = strchr(this_query->query, '?');
+ if (mysep) {
+ strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
+ *mysep = '\0';
+ } else
+ strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
+
+ mysep = strrchr(this_query->query, '/');
+
+ // TODO: ignore return code for now
+ web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+
+ //TODO: handle bad response perhaps in a different way. For now it does to the payload
+ //if (rc == HTTP_RESP_OK || 1) {
+ buffer_flush(aclk_buffer);
+
+ aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data);
+ aclk_buffer->contenttype = CT_APPLICATION_JSON;
+ aclk_send_message(this_query->topic, aclk_buffer->buffer);
+ //} else
+ // error("Query RESP: %s", w->response.data->buffer);
+
+ buffer_free(w->response.data);
+ freez(w);
+ aclk_query_free(this_query);
+ return 1;
+ }
+
+ if (strcmp((char *)this_query->topic, "_chart") == 0) {
+ aclk_send_single_chart(this_query->data, this_query->query);
+ }
+
+ aclk_query_free(this_query);
+
+ return 1;
+}
+
+// Launch a query processing thread
+
+/*
+ * Process all pending queries
+ * Return 0 if no queries were processed, 1 otherwise
+ *
+ */
+
+int aclk_process_queries()
+{
+ if (unlikely(cmdpause))
+ return 0;
+
+ // Return if no queries pending
+ if (likely(!aclk_queue.count))
+ return 0;
+
+ info("Processing %d queries", (int ) aclk_queue.count);
+
+ while (aclk_process_query()) {
+ //rc = _link_event_loop(0);
+ };
+
+ return 1;
+}
+
+static void aclk_query_thread_cleanup(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ info("cleaning up...");
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+/**
+ * MAin query processing thread
+ *
+ */
+void *aclk_query_main_thread(void *ptr)
+{
+ netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
+
+ while (!netdata_exit) {
+
+ QUERY_THREAD_LOCK;
+
+ if (unlikely(!aclk_metadata_submitted)) {
+ aclk_send_metadata();
+ aclk_metadata_submitted = 1;
+ }
+
+ if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ sleep_usec(USEC_PER_SEC * 1);
+
+ if (likely(aclk_connection_initialized && !netdata_exit)) {
+ while (aclk_process_queries()) {
+ // Sleep for a few ms and retry maybe we have something to process
+ // before going to sleep
+ // TODO: This needs improvement to avoid missed queries
+ sleep_usec(USEC_PER_MS * 100);
+ }
+ }
+
+ QUERY_THREAD_UNLOCK;
+
+ } // forever
+ info("Shutting down query processing thread");
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+// Thread cleanup
+static void aclk_main_cleanup(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ info("cleaning up...");
+
+ QUERY_THREAD_WAKEUP;
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+/**
+ * Main agent cloud link thread
+ *
+ * This thread will simply call the main event loop that handles
+ * pending requests - both inbound and outbound
+ *
+ * @param ptr is a pointer to the netdata_static_thread structure.
+ *
+ * @return It always returns NULL
+ */
+void *aclk_main(void *ptr)
+{
+ //netdata_thread_t *query_thread;
+ struct netdata_static_thread query_thread;
+
+ memset(&query_thread, 0, sizeof(query_thread));
+
+ netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
+
+ if (unlikely(!aclk_buffer))
+ aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+
+ assert(aclk_buffer != NULL);
+
+ //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
+ //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
+ info("Waiting for netdata to be ready");
+ while (!netdata_ready) {
+ sleep_usec(USEC_PER_MS * 300);
+ }
+ info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT);
+ sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT);
+
+ // Ok mark we are ready to accept incoming requests
+ waiting_init = 0;
+
+ while (!netdata_exit) {
+ // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds
+ // TODO: Handle the unclaim command as well -- we may need to shutdown the connection
+ if (likely(!is_agent_claimed())) {
+ sleep_usec(USEC_PER_SEC * 60);
+ info("Checking agent claiming status");
+ continue;
+ }
+
+ if (unlikely(!aclk_connection_initialized)) {
+ static int initializing = 0;
+
+ if (likely(initializing)) {
+ _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+ continue;
+ }
+ initializing = 1;
+ info("Initializing connection");
+ //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer);
+ if (unlikely(aclk_init(ACLK_INIT))) {
+ // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying.
+ sleep_usec(USEC_PER_SEC * 60);
+ continue;
+ } else {
+ sleep_usec(USEC_PER_SEC * 1);
+ }
+ _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+ continue;
+ }
+
+ if (unlikely(!aclk_subscribed)) {
+ aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
+ }
+ if (unlikely(!query_thread.thread)) {
+ query_thread.thread = mallocz(sizeof(netdata_thread_t));
+ netdata_thread_create(
+ query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
+ }
+
+ //TODO: Check if there is a return code
+ _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+
+ } // forever
+ aclk_shutdown();
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+/*
+ * Send a message to the cloud, using a base topic and sib_topic
+ * The final topic will be in the form <base_topic>/<sub_topic>
+ * If base_topic is missing then the global_base_topic will be used (if available)
+ *
+ */
+int aclk_send_message(char *sub_topic, char *message)
+{
+ int rc;
+ static int skip_due_to_shutdown = 0;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ if (!aclk_connection_initialized)
+ return 0;
+
+ if (unlikely(netdata_exit)) {
+ if (unlikely(!aclk_connection_initialized))
+ return 1;
+
+ ++skip_due_to_shutdown;
+ if (unlikely(!(skip_due_to_shutdown % 100)))
+ info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown);
+ return 1;
+ }
+
+ if (unlikely(!message))
+ return 0;
+
+ if (unlikely(aclk_wait_for_initialization()))
+ return 1;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+
+ ACLK_LOCK;
+ rc = _link_send_message(final_topic, message);
+ ACLK_UNLOCK;
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc))
+ error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
+
+ return rc;
+}
+
+/*
+ * Subscribe to a topic in the cloud
+ * The final subscription will be in the form
+ * /agent/claim_id/<sub_topic>
+ */
+int aclk_subscribe(char *sub_topic, int qos)
+{
+ int rc;
+ //static char *global_base_topic = NULL;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ if (!aclk_connection_initialized)
+ return 0;
+
+ if (unlikely(netdata_exit)) {
+ return 1;
+ }
+
+ if (unlikely(aclk_wait_for_initialization()))
+ return 1;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+
+ ACLK_LOCK;
+ rc = _link_subscribe(final_topic, qos);
+ ACLK_UNLOCK;
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc))
+ error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
+
+ return rc;
+}
+
+// This is called from a callback when the link goes up
+void aclk_connect(void *ptr)
+{
+ (void) ptr;
+ info("Connection detected");
+ return;
+}
+
+// This is called from a callback when the link goes down
+void aclk_disconnect(void *ptr)
+{
+ (void) ptr;
+ info("Disconnect detected");
+ aclk_subscribed = 0;
+ aclk_metadata_submitted = 0;
+}
+
+void aclk_shutdown()
+{
+ info("Shutdown initiated");
+ aclk_connection_initialized = 0;
+ _link_shutdown();
+ info("Shutdown complete");
+}
+
+int aclk_init(ACLK_INIT_ACTION action)
+{
+ (void) action;
+
+ static int init = 0;
+ int rc;
+
+ if (likely(init))
+ return 0;
+
+ aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20);
+ aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);
+
+ aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
+ aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883);
+
+ info("Maximum parallel outgoing messages %d", aclk_send_maximum);
+ info("Maximum parallel incoming messages %d", aclk_recv_maximum);
+
+ // This will setup the base publish topic internally
+ //get_publish_base_topic(PUBLICH_TOPIC_GET);
+
+ // initialize the low level link to the cloud
+ rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect);
+ if (unlikely(rc)) {
+ error("Failed to initialize the agent cloud link library");
+ return 1;
+ }
+ global_base_topic = GET_PUBLISH_BASE_TOPIC;
+ init = 1;
+
+ return 0;
+}
+
+// Use this to disable encoding of quotes and newlines so that
+// MQTT subscriber can display more readable data on screen
+
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
+{
+ uuid_t uuid;
+ char uuid_str[36 + 1];
+
+ if (unlikely(!msg_id)) {
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+ msg_id = uuid_str;
+ }
+
+ buffer_sprintf(
+ dest,
+ "\t{\"type\": \"%s\",\n"
+ "\t\"msg-id\": \"%s\",\n"
+ "\t\"version\": %s,\n"
+ "\t\"payload\": ",
+ type, msg_id, ACLK_VERSION);
+}
+
+#define EYE_FRIENDLY 1
+
+// encapsulate contents into metadata message as per ACLK documentation
+void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents)
+{
+#ifndef EYE_FRIENDLY
+ char *tmp_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ char *src, *dst;
+#endif
+
+ buffer_sprintf(
+ dest,
+ "\t{\"type\": \"%s\",\n"
+ "\t\"msg-id\": \"%s\",\n"
+ "\t\"payload\": %s\n\t}",
+ type, msg_id ? msg_id : "", contents->buffer);
+
+#ifndef EYE_FRIENDLY
+ //TODO: this is the initial escaping, It will expanded
+ src = dest->buffer;
+ dst = tmp_buffer;
+ while (*src) {
+ switch (*src) {
+ case '0x0a':
+ case '\n':
+ *dst++ = '\\';
+ *dst++ = 'n';
+ break;
+ case '\"':
+ *dst++ = '\\';
+ *dst++ = '\"';
+ break;
+ case '\'':
+ *dst++ = '\\';
+ *dst++ = '\"';
+ break;
+ default:
+ *dst++ = *src;
+ }
+ src++;
+ }
+ *dst = '\0';
+
+ buffer_flush(dest);
+ buffer_sprintf(dest, "%s", tmp_buffer);
+
+ freez(tmp_buffer);
+#endif
+ return;
+}
+
+//TODO: this has been changed in the latest specs. We need to pack the data in one MQTT
+//message with a payload and has a list of json objects
+int aclk_send_alarm_metadata()
+{
+ //TODO: improve locking on the buffer -- same lock is used for the message send
+ //improve error handling
+ ACLK_LOCK;
+ buffer_flush(aclk_buffer);
+ // Alarms configuration
+ aclk_create_header(aclk_buffer, "alarms", NULL);
+ health_alarms2json(localhost, aclk_buffer, 1);
+ buffer_sprintf(aclk_buffer,"\n}");
+ ACLK_UNLOCK;
+ aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);
+
+ // Alarms log
+