diff options
Diffstat (limited to 'aclk/agent_cloud_link.c')
-rw-r--r-- | aclk/agent_cloud_link.c | 995 |
1 files changed, 995 insertions, 0 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c new file mode 100644 index 0000000000..dc90c6d14e --- /dev/null +++ b/aclk/agent_cloud_link.c @@ -0,0 +1,995 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" +#include "agent_cloud_link.h" + +// Read from the config file -- new section [agent_cloud_link] +// Defaults are supplied +int aclk_recv_maximum = 0; // default 20 +int aclk_send_maximum = 0; // default 20 + +int aclk_port = 0; // default 1883 +char *aclk_hostname = NULL; //default localhost +int aclk_subscribed = 0; + +int aclk_metadata_submitted = 0; +int waiting_init = 1; +int cmdpause = 0; // Used to pause query processing + +BUFFER *aclk_buffer = NULL; +char *global_base_topic = NULL; + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch(e->type) { + case JSON_OBJECT: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_ARRAY: + e->callback_function = cloud_to_agent_parse; + break; + case JSON_STRING: + if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) { + data->topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, ACLK_JSON_IN_URL)) { + data->url = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) { + data->version = atol(e->data.string); + break; + } + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + +//char *send_http_request(char *host, char *port, char *url, BUFFER *b) +//{ +// struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; +// +// buffer_flush(b); +// buffer_sprintf( +// b, +// "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n", +// url, host); +// int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout); +// +// if (unlikely(sock == -1)) { +// error("Handshake failed"); +// return NULL; +// } +// +// SSL_CTX *ctx = security_initialize_openssl_client(); +// // Certificate chain: not updating the stores - do we need private CA roots? +// // Calls to SSL_CTX_load_verify_locations would go here. +// SSL *ssl = SSL_new(ctx); +// SSL_set_fd(ssl, sock); +// int err = SSL_connect(ssl); +// SSL_write(ssl, b->buffer, b->len); // Timeout options? +// int bytes_read = SSL_read(ssl, b->buffer, b->len); +// SSL_shutdown(ssl); +// close(sock); +//} + +// Set when we have connection up and running from the connection callback +int aclk_connection_initialized = 0; + +static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; + +#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) +#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) + +#define QUERY_LOCK netdata_mutex_lock(&query_mutex) +#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; + +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait); +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) + + +struct aclk_query { + time_t created; + time_t run_after; // Delay run until after this time + char *topic; // Topic to respond to + char *data; // Internal data (NULL if request from the cloud) + char *msg_id; // msg_id generated by the cloud (NULL if internal) + char *query; // The actual query + u_char deleted; // Mark deleted for garbage collect + struct aclk_query *next; +}; + +struct aclk_query_queue { + struct aclk_query *aclk_query_head; + struct aclk_query *aclk_query_tail; + u_int64_t count; +} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; + +/* + * Free a query structure when done + */ + +void aclk_query_free(struct aclk_query *this_query) +{ + if (unlikely(!this_query)) + return; + + freez(this_query->topic); + freez(this_query->query); + if (this_query->data) + freez(this_query->data); + if (this_query->msg_id) + freez(this_query->msg_id); + freez(this_query); + return; +} + +// Returns the entry after which we need to create a new entry to run at the specified time +// If NULL is returned we need to add to HEAD +// Called with locked entries + +struct aclk_query *aclk_query_find_position(time_t time_to_run) +{ + struct aclk_query *tmp_query, *last_query; + + last_query = NULL; + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (tmp_query->run_after > time_to_run) + return last_query; + last_query = tmp_query; + tmp_query = tmp_query->next; + } + return last_query; +} + +// Need to have a lock before calling this +struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query) +{ + struct aclk_query *tmp_query; + + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (likely(!tmp_query->deleted)) { + if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) { + if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) + return tmp_query; + } + } + tmp_query = tmp_query->next; + } + return NULL; +} + +/* + * Add a query to execute, the result will be send to the specified topic + */ + +int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal) +{ + struct aclk_query *new_query, *tmp_query; + + // Ignore all commands while we wait for the agent to initialize + if (unlikely(waiting_init)) + return 0; + + run_after = now_realtime_sec() + run_after; + + QUERY_LOCK; + tmp_query = aclk_query_find(topic, data, msg_id, query); + if (unlikely(tmp_query)) { + if (tmp_query->run_after == run_after) { + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + tmp_query->deleted = 1; + } + + new_query = callocz(1, sizeof(struct aclk_query)); + if (internal) { + new_query->topic = strdupz(topic); + new_query->query = strdupz(query); + } else { + new_query->topic = topic; + new_query->query = query; + new_query->msg_id = msg_id; + } + + if (data) + new_query->data = strdupz(data); + + new_query->next = NULL; + new_query->created = now_realtime_sec(); + new_query->run_after = run_after; + + info("Added query (%s) (%s)", topic, query); + + tmp_query = aclk_query_find_position(run_after); + + if (tmp_query) { + new_query->next = tmp_query->next; + tmp_query->next = new_query; + if (tmp_query == aclk_queue.aclk_query_tail) + aclk_queue.aclk_query_tail = new_query; + aclk_queue.count++; + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + new_query->next = aclk_queue.aclk_query_head; + aclk_queue.aclk_query_head = new_query; + aclk_queue.count++; + + QUERY_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + +// if (likely(aclk_queue.aclk_query_tail)) { +// aclk_queue.aclk_query_tail->next = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// +// if (likely(!aclk_queue.aclk_query_head)) { +// aclk_queue.aclk_query_head = new_query; +// aclk_queue.aclk_query_tail = new_query; +// aclk_queue.count++; +// QUERY_UNLOCK; +// return 0; +// } +// QUERY_UNLOCK; +// return 0; +} + +inline int aclk_submit_request(struct aclk_request *request) +{ + return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0); +} + +/* + * Get the next query to process - NULL if nothing there + * The caller needs to free memory by calling aclk_query_free() + * + * topic + * query + * The structure itself + * + */ +struct aclk_query *aclk_queue_pop() +{ + struct aclk_query *this_query; + + QUERY_LOCK; + + if (likely(!aclk_queue.aclk_query_head)) { + QUERY_UNLOCK; + return NULL; + } + + this_query = aclk_queue.aclk_query_head; + + if (this_query->run_after > now_realtime_sec()) { + info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); + QUERY_UNLOCK; + return NULL; + } + + aclk_queue.count--; + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + QUERY_UNLOCK; + return this_query; +} + +// This will give the base topic that the agent will publish messages. +// subtopics will be sent under the base topic e.g. base_topic/subtopic +// This is called by aclk_init(), to compute the base topic once and have +// it stored internally. +// Need to check if additional logic should be added to make sure that there +// is enough information to determine the base topic at init time + +// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch +// that on the fly + +char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action) +{ + static char *topic = NULL; + + if (unlikely(!is_agent_claimed())) + return NULL; + + ACLK_LOCK; + + if (unlikely(action == PUBLICH_TOPIC_FREE)) { + if (likely(topic)) { + freez(topic); + topic = NULL; + } + + ACLK_UNLOCK; + + return NULL; + } + + if (unlikely(action == PUBLICH_TOPIC_REBUILD)) { + ACLK_UNLOCK; + get_publish_base_topic(PUBLICH_TOPIC_FREE); + return get_publish_base_topic(PUBLICH_TOPIC_GET); + } + + if (unlikely(!topic)) { + char tmp_topic[ACLK_MAX_TOPIC + 1]; + + sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); + topic = strdupz(tmp_topic); + } + + ACLK_UNLOCK; + return topic; +} + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + if (unlikely(!global_base_topic)) + global_base_topic = GET_PUBLISH_BASE_TOPIC; + + if (unlikely(!global_base_topic)) + return sub_topic; + + snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + + return final_topic; +} + +// Wait for ACLK connection to be established +int aclk_wait_for_initialization() +{ + if (unlikely(!aclk_connection_initialized)) { + time_t now = now_realtime_sec(); + + while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) { + sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT); + _link_event_loop(0); + } + + if (unlikely(!aclk_connection_initialized)) { + error("ACLK connection cannot be established"); + return 1; + } + } + return 0; +} + +/* + * This function will fetch the next pending command and process it + * + */ +int aclk_process_query() +{ + struct aclk_query *this_query; + static u_int64_t query_count = 0; + //int rc; + + if (unlikely(cmdpause)) + return 0; + + if (!aclk_connection_initialized) + return 0; + + this_query = aclk_queue_pop(); + if (likely(!this_query)) { + //info("No pending queries"); + return 0; + } + + if (unlikely(this_query->deleted)) { + info("Garbage collect query %s:%s", this_query->topic, this_query->query); + aclk_query_free(this_query); + return 1; + } + + query_count++; + info( + "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query, + (int) (now_realtime_sec() - this_query->created)); + + if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) { + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); + *mysep = '\0'; + } else + strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); + + mysep = strrchr(this_query->query, '/'); + + // TODO: ignore return code for now + web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + + //TODO: handle bad response perhaps in a different way. For now it does to the payload + //if (rc == HTTP_RESP_OK || 1) { + buffer_flush(aclk_buffer); + + aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data); + aclk_buffer->contenttype = CT_APPLICATION_JSON; + aclk_send_message(this_query->topic, aclk_buffer->buffer); + //} else + // error("Query RESP: %s", w->response.data->buffer); + + buffer_free(w->response.data); + freez(w); + aclk_query_free(this_query); + return 1; + } + + if (strcmp((char *)this_query->topic, "_chart") == 0) { + aclk_send_single_chart(this_query->data, this_query->query); + } + + aclk_query_free(this_query); + + return 1; +} + +// Launch a query processing thread + +/* + * Process all pending queries + * Return 0 if no queries were processed, 1 otherwise + * + */ + +int aclk_process_queries() +{ + if (unlikely(cmdpause)) + return 0; + + // Return if no queries pending + if (likely(!aclk_queue.count)) + return 0; + + info("Processing %d queries", (int ) aclk_queue.count); + + while (aclk_process_query()) { + //rc = _link_event_loop(0); + }; + + return 1; +} + +static void aclk_query_thread_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * MAin query processing thread + * + */ +void *aclk_query_main_thread(void *ptr) +{ + netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + + while (!netdata_exit) { + + QUERY_THREAD_LOCK; + + if (unlikely(!aclk_metadata_submitted)) { + aclk_send_metadata(); + aclk_metadata_submitted = 1; + } + + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + if (likely(aclk_connection_initialized && !netdata_exit)) { + while (aclk_process_queries()) { + // Sleep for a few ms and retry maybe we have something to process + // before going to sleep + // TODO: This needs improvement to avoid missed queries + sleep_usec(USEC_PER_MS * 100); + } + } + + QUERY_THREAD_UNLOCK; + + } // forever + info("Shutting down query processing thread"); + netdata_thread_cleanup_pop(1); + return NULL; +} + +// Thread cleanup +static void aclk_main_cleanup(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + info("cleaning up..."); + + QUERY_THREAD_WAKEUP; + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + //netdata_thread_t *query_thread; + struct netdata_static_thread query_thread; + + memset(&query_thread, 0, sizeof(query_thread)); + + netdata_thread_cleanup_push(aclk_main_cleanup, ptr); + + if (unlikely(!aclk_buffer)) + aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + assert(aclk_buffer != NULL); + + //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); + //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + info("Waiting for netdata to be ready"); + while (!netdata_ready) { + sleep_usec(USEC_PER_MS * 300); + } + info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT); + sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT); + + // Ok mark we are ready to accept incoming requests + waiting_init = 0; + + while (!netdata_exit) { + // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds + // TODO: Handle the unclaim command as well -- we may need to shutdown the connection + if (likely(!is_agent_claimed())) { + sleep_usec(USEC_PER_SEC * 60); + info("Checking agent claiming status"); + continue; + } + + if (unlikely(!aclk_connection_initialized)) { + static int initializing = 0; + + if (likely(initializing)) { + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + initializing = 1; + info("Initializing connection"); + //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer); + if (unlikely(aclk_init(ACLK_INIT))) { + // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying. + sleep_usec(USEC_PER_SEC * 60); + continue; + } else { + sleep_usec(USEC_PER_SEC * 1); + } + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + continue; + } + + if (unlikely(!aclk_subscribed)) { + aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2); + } + if (unlikely(!query_thread.thread)) { + query_thread.thread = mallocz(sizeof(netdata_thread_t)); + netdata_thread_create( + query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); + } + + //TODO: Check if there is a return code + _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); + + } // forever + aclk_shutdown(); + + netdata_thread_cleanup_pop(1); + return NULL; +} + +/* + * Send a message to the cloud, using a base topic and sib_topic + * The final topic will be in the form <base_topic>/<sub_topic> + * If base_topic is missing then the global_base_topic will be used (if available) + * + */ +int aclk_send_message(char *sub_topic, char *message) +{ + int rc; + static int skip_due_to_shutdown = 0; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + if (!aclk_connection_initialized) + return 0; + + if (unlikely(netdata_exit)) { + if (unlikely(!aclk_connection_initialized)) + return 1; + + ++skip_due_to_shutdown; + if (unlikely(!(skip_due_to_shutdown % 100))) + info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown); + return 1; + } + + if (unlikely(!message)) + return 0; + + if (unlikely(aclk_wait_for_initialization())) + return 1; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + ACLK_LOCK; + rc = _link_send_message(final_topic, message); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + + return rc; +} + +/* + * Subscribe to a topic in the cloud + * The final subscription will be in the form + * /agent/claim_id/<sub_topic> + */ +int aclk_subscribe(char *sub_topic, int qos) +{ + int rc; + //static char *global_base_topic = NULL; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + if (!aclk_connection_initialized) + return 0; + + if (unlikely(netdata_exit)) { + return 1; + } + + if (unlikely(aclk_wait_for_initialization())) + return 1; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + ACLK_LOCK; + rc = _link_subscribe(final_topic, qos); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + + return rc; +} + +// This is called from a callback when the link goes up +void aclk_connect(void *ptr) +{ + (void) ptr; + info("Connection detected"); + return; +} + +// This is called from a callback when the link goes down +void aclk_disconnect(void *ptr) +{ + (void) ptr; + info("Disconnect detected"); + aclk_subscribed = 0; + aclk_metadata_submitted = 0; +} + +void aclk_shutdown() +{ + info("Shutdown initiated"); + aclk_connection_initialized = 0; + _link_shutdown(); + info("Shutdown complete"); +} + +int aclk_init(ACLK_INIT_ACTION action) +{ + (void) action; + + static int init = 0; + int rc; + + if (likely(init)) + return 0; + + aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20); + aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20); + + aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost"); + aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883); + + info("Maximum parallel outgoing messages %d", aclk_send_maximum); + info("Maximum parallel incoming messages %d", aclk_recv_maximum); + + // This will setup the base publish topic internally + //get_publish_base_topic(PUBLICH_TOPIC_GET); + + // initialize the low level link to the cloud + rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect); + if (unlikely(rc)) { + error("Failed to initialize the agent cloud link library"); + return 1; + } + global_base_topic = GET_PUBLISH_BASE_TOPIC; + init = 1; + + return 0; +} + +// Use this to disable encoding of quotes and newlines so that +// MQTT subscriber can display more readable data on screen + +void aclk_create_header(BUFFER *dest, char *type, char *msg_id) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + buffer_sprintf( + dest, + "\t{\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"version\": %s,\n" + "\t\"payload\": ", + type, msg_id, ACLK_VERSION); +} + +#define EYE_FRIENDLY 1 + +// encapsulate contents into metadata message as per ACLK documentation +void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents) +{ +#ifndef EYE_FRIENDLY + char *tmp_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *src, *dst; +#endif + + buffer_sprintf( + dest, + "\t{\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"payload\": %s\n\t}", + type, msg_id ? msg_id : "", contents->buffer); + +#ifndef EYE_FRIENDLY + //TODO: this is the initial escaping, It will expanded + src = dest->buffer; + dst = tmp_buffer; + while (*src) { + switch (*src) { + case '0x0a': + case '\n': + *dst++ = '\\'; + *dst++ = 'n'; + break; + case '\"': + *dst++ = '\\'; + *dst++ = '\"'; + break; + case '\'': + *dst++ = '\\'; + *dst++ = '\"'; + break; + default: + *dst++ = *src; + } + src++; + } + *dst = '\0'; + + buffer_flush(dest); + buffer_sprintf(dest, "%s", tmp_buffer); + + freez(tmp_buffer); +#endif + return; +} + +//TODO: this has been changed in the latest specs. We need to pack the data in one MQTT +//message with a payload and has a list of json objects +int aclk_send_alarm_metadata() +{ + //TODO: improve locking on the buffer -- same lock is used for the message send + //improve error handling + ACLK_LOCK; + buffer_flush(aclk_buffer); + // Alarms configuration + aclk_create_header(aclk_buffer, "alarms", NULL); + health_alarms2json(localhost, aclk_buffer, 1); + buffer_sprintf(aclk_buffer,"\n}"); + ACLK_UNLOCK; + aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer); + + // Alarms log + ACLK_LOCK; + buffer_flush(aclk_buffer); + aclk_create_header(aclk_buffer, "alarms_log", NULL); + health_alarm_log2json(localhost, aclk_buffer, 0); + buffer_sprintf(aclk_buffer,"\n}"); + ACLK_UNLOCK; + aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer); + + return 0; +} + + +// Send info metadata message to the cloud if the link is established +// or on request +int aclk_send_metadata() +{ + ACLK_LOCK; + + buffer_flush(aclk_buffer); + + aclk_create_header(aclk_buffer, "connect", NULL); + buffer_sprintf(aclk_buffer,"{\n\t \"info\" : "); + web_client_api_request_v1_info_fill_buffer(localhost, aclk_buffer); + buffer_sprintf(aclk_buffer,", \n\t \"charts\" : "); + charts2json(localhost, aclk_buffer); + buffer_sprintf(aclk_buffer,"\n}\n}"); + aclk_buffer->contenttype = CT_APPLICATION_JSON; + + ACLK_UNLOCK; + + aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer); + + aclk_send_alarm_metadata(); + + return 0; +} + +//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf) + +int aclk_send_single_chart(char *hostname, char *chart) +{ + RRDHOST *target_host; + ACLK_LOCK; + + buffer_flush(aclk_buffer); + + target_host = rrdhost_find_by_hostname(hostname, 0); + if (!target_host) + return 1; + + RRDSET *st = rrdset_find(target_host, chart); + + if (!st) + st = rrdset_find_byname(target_host, chart); + + if (!st) { + info("FAILED to find chart %s", chart); + return 1; + } + + aclk_buffer->contenttype = CT_APPLICATION_JSON; + + buffer_flush(aclk_buffer); + + aclk_create_header(aclk_buffer, "chart", NULL); + + rrdset2json(st, aclk_buffer, NULL, NULL); + buffer_sprintf(aclk_buffer,"\n}\n}"); + + + ACLK_UNLOCK; + aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer); + return 0; +} + +int aclk_update_chart(RRDHOST *host, char *chart_name) +{ + (void) host; + (void) chart_name; +#ifndef ENABLE_ACLK + return 0; +#else + if (host != localhost) + return 0; + + aclk_queue_query("_chart", host->hostname, NULL, chart_name, 2, 1); + return 0; +#endif +} + +int aclk_update_alarm(RRDHOST *host, char *alarm_name) +{ + if (host != localhost) + return 0; + + aclk_queue_query("_alarm", host->hostname, NULL, alarm_name, 2, 1); + return 0; +} + + +//TODO: add and check the incoming type e.g http +int aclk_handle_cloud_request(char *payload) +{ + struct aclk_request cloud_to_agent = { .msg_id = NULL, .topic = NULL, .url = NULL, .version = 1}; + + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(JSON_OK != rc)) { + error("Malformed json request (%s)", payload); + return 1; + } + + if (unlikely(!cloud_to_agent.url || !cloud_to_agent.topic)) { + return 1; + } + + aclk_submit_request(&cloud_to_agent); + + return 0; +} |