// SPDX-License-Identifier: GPL-3.0-or-later #include "libnetdata/libnetdata.h" #include "agent_cloud_link.h" // Read from the config file -- new section [agent_cloud_link] // Defaults are supplied int aclk_port = ACLK_DEFAULT_PORT; char *aclk_hostname = ACLK_DEFAULT_HOST; int aclk_subscribed = 0; int aclk_disable_single_updates = 0; int aclk_metadata_submitted = 0; int agent_state = 0; time_t last_init_sequence = 0; int waiting_init = 1; char *global_base_topic = NULL; int aclk_connecting = 0; char *create_uuid() { uuid_t uuid; char *uuid_str = mallocz(36 + 1); uuid_generate(uuid); uuid_unparse(uuid, uuid_str); return uuid_str; } int cloud_to_agent_parse(JSON_ENTRY *e) { struct aclk_request *data = e->callback_data; switch(e->type) { case JSON_OBJECT: case JSON_ARRAY: break; case JSON_STRING: if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) { data->msg_id = strdupz(e->data.string); break; } if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) { data->type_id = strdupz(e->data.string); break; } if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) { data->callback_topic = strdupz(e->data.string); break; } if (!strcmp(e->name, ACLK_JSON_IN_URL)) { data->payload = strdupz(e->data.string); break; } break; case JSON_NUMBER: if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) { data->version = atoi(e->original_string); break; } break; case JSON_BOOLEAN: break; case JSON_NULL: break; } return 0; } // Set when we have connection up and running from the connection callback int aclk_connection_initialized = 0; // TODO modify previous comment if this stays this way // con_initialized means library is initialized and ready to be used // acklk_connected means there is actually an established connection int aclk_mqtt_connected = 0; static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) #define QUERY_LOCK netdata_mutex_lock(&query_mutex) #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait); #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) /* * Maintain a list of collectors and chart count * If all the charts of a collector are deleted * then a new metadata dataset must be send to the cloud * */ struct _collector { time_t created; u_int32_t count; //chart count u_int32_t hostname_hash; u_int32_t plugin_hash; u_int32_t module_hash; char *hostname; char *plugin_name; char *module_name; struct _collector *next; }; struct _collector *collector_list = NULL; struct aclk_query { time_t created; time_t run_after; // Delay run until after this time ACLK_CMD cmd; // What command is this char *topic; // Topic to respond to char *data; // Internal data (NULL if request from the cloud) char *msg_id; // msg_id generated by the cloud (NULL if internal) char *query; // The actual query u_char deleted; // Mark deleted for garbage collect struct aclk_query *next; }; struct aclk_query_queue { struct aclk_query *aclk_query_head; struct aclk_query *aclk_query_tail; u_int64_t count; } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; /* * After a connection failure -- delay in milliseconds * When a connection is established, the delay function * should be called with * * mode 0 to reset the delay * mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms * */ unsigned long int aclk_reconnect_delay(int mode) { static int fail = -1; unsigned long int delay; if (!mode || fail == -1) { srandom(time(NULL)); fail = mode-1; return 0; } delay = (1 << fail); if (delay >= ACLK_MAX_BACKOFF_DELAY) { delay = ACLK_MAX_BACKOFF_DELAY * 1000; } else { fail++; delay = (delay * 1000) + (random() % 1000); } // sleep_usec(USEC_PER_MS * delay); return delay; } /* * Free a query structure when done */ void aclk_query_free(struct aclk_query *this_query) { if (unlikely(!this_query)) return; freez(this_query->topic); if (likely(this_query->query)) freez(this_query->query); if (likely(this_query->data)) freez(this_query->data); if (likely(this_query->msg_id)) freez(this_query->msg_id); freez(this_query); } // Returns the entry after which we need to create a new entry to run at the specified time // If NULL is returned we need to add to HEAD // Need to have a QUERY lock before calling this struct aclk_query *aclk_query_find_position(time_t time_to_run) { struct aclk_query *tmp_query, *last_query; // Quick check if we will add to the end if (likely(aclk_queue.aclk_query_tail)) { if (aclk_queue.aclk_query_tail->run_after <= time_to_run) return aclk_queue.aclk_query_tail; } last_query = NULL; tmp_query = aclk_queue.aclk_query_head; while (tmp_query) { if (tmp_query->run_after > time_to_run) return last_query; last_query = tmp_query; tmp_query = tmp_query->next; } return last_query; } // Need to have a QUERY lock before calling this struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) { struct aclk_query *tmp_query, *prev_query; UNUSED(cmd); tmp_query = aclk_queue.aclk_query_head; prev_query = NULL; while (tmp_query) { if (likely(!tmp_query->deleted)) { if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { if (likely(last_query)) *last_query = prev_query; return tmp_query; } } } prev_query = tmp_query; tmp_query = tmp_query->next; } return NULL; } /* * Add a query to execute, the result will be send to the specified topic */ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) { struct aclk_query *new_query, *tmp_query; // Ignore all commands while we wait for the agent to initialize if (unlikely(waiting_init)) return 0; // Ignore all commands if agent not stable and reset the last_init_sequence mark if (agent_state == 0) { last_init_sequence = now_realtime_sec(); return 0; } run_after = now_realtime_sec() + run_after; QUERY_LOCK; struct aclk_query *last_query = NULL; //last_query = NULL; tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); if (unlikely(tmp_query)) { if (tmp_query->run_after == run_after) { QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; } if (last_query) last_query->next = tmp_query->next; else aclk_queue.aclk_query_head = tmp_query->next; debug(D_ACLK, "Removing double entry"); aclk_query_free(tmp_query); aclk_queue.count--; } new_query = callocz(1, sizeof(struct aclk_query)); new_query->cmd = aclk_cmd; if (internal) { new_query->topic = strdupz(topic); if (likely(query)) new_query->query = strdupz(query); } else { new_query->topic = topic; new_query->query = query; new_query->msg_id = msg_id; } if (data) new_query->data = strdupz(data); new_query->next = NULL; new_query->created = now_realtime_sec(); new_query->run_after = run_after; debug(D_ACLK, "Added query (%s) (%s)", topic, query?query:""); tmp_query = aclk_query_find_position(run_after); if (tmp_query) { new_query->next = tmp_query->next; tmp_query->next = new_query; if (tmp_query == aclk_queue.aclk_query_tail) aclk_queue.aclk_query_tail = new_query; aclk_queue.count++; QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; } new_query->next = aclk_queue.aclk_query_head; aclk_queue.aclk_query_head = new_query; aclk_queue.count++; QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; } inline int aclk_submit_request(struct aclk_request *request) { return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD); } /* * Get the next query to process - NULL if nothing there * The caller needs to free memory by calling aclk_query_free() * * topic * query * The structure itself * */ struct aclk_query *aclk_queue_pop() { struct aclk_query *this_query; QUERY_LOCK; if (likely(!aclk_queue.aclk_query_head)) { QUERY_UNLOCK; return NULL; } this_query = aclk_queue.aclk_query_head; // Get rid of the deleted entries while (this_query && this_query->deleted) { aclk_queue.count--; aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; if (likely(!aclk_queue.aclk_query_head)) { aclk_queue.aclk_query_tail = NULL; } aclk_query_free(this_query); this_query = aclk_queue.aclk_query_head; } if (likely(!this_query)) { QUERY_UNLOCK; return NULL; } if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); QUERY_UNLOCK; return NULL; } aclk_queue.count--; aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; if (likely(!aclk_queue.aclk_query_head)) { aclk_queue.aclk_query_tail = NULL; } QUERY_UNLOCK; return this_query; } // This will give the base topic that the agent will publish messages. // subtopics will be sent under the base topic e.g. base_topic/subtopic // This is called by aclk_init(), to compute the base topic once and have // it stored internally. // Need to check if additional logic should be added to make sure that there // is enough information to determine the base topic at init time char *create_publish_base_topic() { if (unlikely(!is_agent_claimed())) return NULL; ACLK_LOCK; if (unlikely(!global_base_topic)) { char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed()); tmp = strchr(tmp_topic, '\n'); if (unlikely(tmp)) *tmp = '\0'; global_base_topic = strdupz(tmp_topic); } ACLK_UNLOCK; return global_base_topic; } /* * Build a topic based on sub_topic and final_topic * if the sub topic starts with / assume that is an absolute topic * */ char *get_topic(char *sub_topic, char *final_topic, int max_size) { int rc; if (likely(sub_topic && sub_topic[0] == '/')) return sub_topic; if (unlikely(!global_base_topic)) return sub_topic; rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); if (unlikely(rc >= max_size)) debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic); return final_topic; } /* * Free a collector structure */ static void _free_collector(struct _collector *collector) { if (likely(collector->plugin_name)) freez(collector->plugin_name); if (likely(collector->module_name)) freez(collector->module_name); if (likely(collector->hostname)) freez(collector->hostname); freez(collector); } /* * This will report the collector list * */ #ifdef ACLK_DEBUG static void _dump_connector_list() { struct _collector *tmp_collector; COLLECTOR_LOCK; info("DUMPING ALL COLLECTORS"); if (unlikely(!collector_list || !collector_list->next)) { COLLECTOR_UNLOCK; info("DUMPING ALL COLLECTORS -- nothing found"); return; } // Note that the first entry is "dummy" tmp_collector = collector_list->next; while (tmp_collector) { info( "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, tmp_collector->plugin_name ? tmp_collector->plugin_name : "", tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); tmp_collector = tmp_collector->next; } info("DUMPING ALL COLLECTORS DONE"); COLLECTOR_UNLOCK; } #endif /* * This will cleanup the collector list * */ static void _reset_connector_list() { struct _collector *tmp_collector, *next_collector; COLLECTOR_LOCK; if (unlikely(!collector_list || !collector_list->next)) { COLLECTOR_UNLOCK; return; } // Note that the first entry is "dummy" tmp_collector = collector_list->next; collector_list->count = 0; collector_list->next = NULL; // We broke the link; we can unlock COLLECTOR_UNLOCK; while (tmp_collector) { next_collector = tmp_collector->next; _free_collector(tmp_collector); tmp_collector = next_collector; } } /* * Find a collector (if it exists) * Must lock before calling this * If last_collector is not null, it will return the previous collector in the linked * list (used in collector delete) */ static struct _collector *_find_collector(const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) { struct _collector *tmp_collector, *prev_collector; uint32_t plugin_hash; uint32_t module_hash; uint32_t hostname_hash; if (unlikely(!collector_list)) { collector_list = callocz(1, sizeof(struct _collector)); return NULL; } if (unlikely(!collector_list->next)) return NULL; plugin_hash = plugin_name?simple_hash(plugin_name):1; module_hash = module_name?simple_hash(module_name):1; hostname_hash = simple_hash(hostname); // Note that the first entry is "dummy" tmp_collector = collector_list->next; prev_collector = collector_list; while (tmp_collector) { if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { if (unlikely(last_collector)) *last_collector = prev_collector; return tmp_collector; } prev_collector = tmp_collector; tmp_collector = tmp_collector->next; } return tmp_collector; } /* * Called to delete a collector * It will reduce the count (chart_count) and will remove it * from the linked list if the count reaches zero * The structure will be returned to the caller to free * the resources * */ static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector, *prev_collector = NULL; tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); if (likely(tmp_collector)) { --tmp_collector->count; if (unlikely(!tmp_collector->count)) prev_collector->next = tmp_collector->next; } return tmp_collector; } /* * Add a new collector (plugin / module) to the list * If it already exists just update the chart count * * Lock before calling */ static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); if (unlikely(!tmp_collector)) { tmp_collector = callocz(1, sizeof(struct _collector)); tmp_collector->hostname_hash = simple_hash(hostname); tmp_collector->plugin_hash = plugin_name?simple_hash(plugin_name):1; tmp_collector->module_hash = module_name?simple_hash(module_name):1; tmp_collector->hostname = strdupz(hostname); tmp_collector->plugin_name = plugin_name?strdupz(plugin_name):NULL; tmp_collector->module_name = module_name?strdupz(module_name):NULL; tmp_collector->next = collector_list->next; collector_list->next = tmp_collector; } tmp_collector->count++; debug(D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name?plugin_name:"*", module_name?module_name:"*", tmp_collector->count); return tmp_collector; } /* * Add a new collector to the list * If it exists, update the chart count */ void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; COLLECTOR_LOCK; tmp_collector = _add_collector(hostname, plugin_name, module_name); if (unlikely(tmp_collector->count != 1)) { COLLECTOR_UNLOCK; return; } aclk_queue_query("connector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); COLLECTOR_UNLOCK; } /* * Delete a collector from the list * If the chart count reaches zero the collector will be removed * from the list by calling del_collector. * * This function will release the memory used and schedule * a cloud update */ void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; COLLECTOR_LOCK; tmp_collector = _del_collector(hostname, plugin_name, module_name); if (unlikely(!tmp_collector || tmp_collector->count)) { COLLECTOR_UNLOCK; return; } debug(D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name?plugin_name:"*", module_name?module_name:"*", tmp_collector->count); COLLECTOR_UNLOCK; aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); _free_collector(tmp_collector); } // Wait for ACLK connection to be established int aclk_wait_for_initialization() { if (unlikely(!aclk_connection_initialized)) { time_t now = now_realtime_sec(); while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) { sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT); _link_event_loop(0); if (unlikely(!netdata_exit)) return 1; } if (unlikely(!aclk_connection_initialized)) { error("ACLK connection cannot be established"); return 1; } } return 0; } int aclk_execute_query(struct aclk_query *this_query) { if (strncmp(this_query->query, "/api/v1/", 8) == 0) { struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() w->cookie1[0] = 0; // Simulate web_client_create_on_fd() w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; char *mysep = strchr(this_query->query, '?'); if (mysep) { strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); *mysep = '\0'; } else strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); mysep = strrchr(this_query->query, '/'); // TODO: handle bad response perhaps in a different way. For now it does to the payload int rc = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; aclk_create_header(local_buffer, "http", this_query->msg_id); if (rc != HTTP_RESP_OK || strcmp(mysep?mysep+1:"noop", "badge.svg") == 0) buffer_sprintf(local_buffer, "\"%s\"", aclk_encode_response(w->response.data)->buffer); else buffer_sprintf(local_buffer, "%s", aclk_encode_response(w->response.data)->buffer); buffer_sprintf(local_buffer,"\n}"); aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); buffer_free(w->response.data); freez(w); buffer_free(local_buffer); return 0; } return 1; } /* * This function will fetch the next pending command and process it * */ int aclk_process_query() { struct aclk_query *this_query; static long int query_count = 0; if (!aclk_connection_initialized) return 0; this_query = aclk_queue_pop(); if (likely(!this_query)) { return 0; } if (unlikely(this_query->deleted)) { debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); aclk_query_free(this_query); return 1; } query_count++; debug( D_ACLK, "Query #%ld (%s) size=%ld in queue %d seconds", query_count, this_query->topic, this_query->query?strlen(this_query->query):0, (int)(now_realtime_sec() - this_query->created)); switch (this_query->cmd) { case ACLK_CMD_ONCONNECT: debug(D_ACLK, "EXECUTING on connect metadata command"); aclk_send_metadata(); aclk_metadata_submitted = 2; break; case ACLK_CMD_CHART: debug(D_ACLK, "EXECUTING a chart update command"); aclk_send_single_chart(this_query->data, this_query->query); break; case ACLK_CMD_CHARTDEL: debug(D_ACLK, "EXECUTING a chart delete command"); //TODO: This send the info metadata for now aclk_send_info_metadata(); break; case ACLK_CMD_ALARM: debug(D_ACLK, "EXECUTING an alarm update command"); aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); break; case ACLK_CMD_ALARMS: debug(D_ACLK, "EXECUTING an alarms update command"); aclk_send_alarm_metadata(); break; case ACLK_CMD_CLOUD: debug(D_ACLK, "EXECUTING a cloud command"); aclk_execute_query(this_query); break; default: break; } debug( D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); aclk_query_free(this_query); return 1; } /* * Process all pending queries * Return 0 if no queries were processed, 1 otherwise * */ int aclk_process_queries() { if (unlikely(netdata_exit || !aclk_connection_initialized)) return 0; if (likely(!aclk_queue.count)) return 0; debug(D_ACLK, "Processing %d queries", (int ) aclk_queue.count); //TODO: may consider possible throttling here while (aclk_process_query()) { // Process all commands }; return 1; } static void aclk_query_thread_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; info("cleaning up..."); COLLECTOR_LOCK; _reset_connector_list(); freez(collector_list); COLLECTOR_UNLOCK; static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } /** * Main query processing thread * * On startup wait for the agent collectors to initialize * Expect at least a time of ACLK_STABLE_TIMEOUT seconds * of no new collectors coming in in order to mark the agent * as stable (set agent_state = 1) */ void *aclk_query_main_thread(void *ptr) { netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); while (!agent_state && !netdata_exit) { time_t checkpoint; checkpoint = now_realtime_sec() - last_init_sequence; info("Waiting for agent collectors to initialize"); sleep_usec(USEC_PER_SEC * ACLK_STABLE_TIMEOUT); if (checkpoint > ACLK_STABLE_TIMEOUT) { agent_state = 1; info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint); #ifdef ACLK_DEBUG _dump_connector_list(); #endif } } while (!netdata_exit) { if (unlikely(!aclk_metadata_submitted)) { aclk_metadata_submitted = 1; aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); } aclk_process_queries(); QUERY_THREAD_LOCK; // TODO: Need to check if there are queries awaiting already if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); QUERY_THREAD_UNLOCK; } // forever info("Shutting down query processing thread"); netdata_thread_cleanup_pop(1); return NULL; } // Thread cleanup static void aclk_main_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; info("cleaning up..."); // Wakeup thread to cleanup QUERY_THREAD_WAKEUP; static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } /** * Main agent cloud link thread * * This thread will simply call the main event loop that handles * pending requests - both inbound and outbound * * @param ptr is a pointer to the netdata_static_thread structure. * * @return It always returns NULL */ void *aclk_main(void *ptr) { struct netdata_static_thread *query_thread; netdata_thread_cleanup_push(aclk_main_cleanup, ptr); info("Waiting for netdata to be ready"); while (!netdata_ready) { sleep_usec(USEC_PER_MS * 300); } last_init_sequence = now_realtime_sec(); query_thread = NULL; aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", ACLK_DEFAULT_HOST); aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", ACLK_DEFAULT_PORT); // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds // TODO: Handle the unclaim command as well -- we may need to shutdown the connection while(likely(!is_agent_claimed())) { sleep_usec(USEC_PER_SEC * 5); if(netdata_exit) goto exited; } create_publish_base_topic(); usec_t reconnect_expiry = 0; // In usecs while (!netdata_exit) { static int first_init = 0; _link_event_loop(ACLK_LOOP_TIMEOUT * 1000); debug(D_ACLK,"LINK event loop called"); if (unlikely(!aclk_connection_initialized)) { if (unlikely(first_init)) { aclk_try_to_connect(); first_init = 1; } else { if (aclk_connecting == 0) { if (reconnect_expiry == 0) { unsigned long int delay = aclk_reconnect_delay(1); reconnect_expiry = now_realtime_usec() + delay * 1000; info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0); } if (now_realtime_usec() >= reconnect_expiry) { reconnect_expiry = 0; aclk_connecting = 1; aclk_try_to_connect(); } sleep_usec(USEC_PER_MS * 100); } } continue; } if (likely(aclk_mqtt_connected)) { if (unlikely(!aclk_subscribed)) { aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2); } if (unlikely(!query_thread)) { query_thread = callocz(1, sizeof(struct netdata_static_thread)); query_thread->thread = mallocz(sizeof(netdata_thread_t)); netdata_thread_create( query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, query_thread); } } } // forever exited: aclk_shutdown(); netdata_thread_cleanup_pop(1); return NULL; } /* * Send a message to the cloud, using a base topic and sib_topic * The final topic will be in the form / * If base_topic is missing then the global_base_topic will be used (if available) * */ int aclk_send_message(char *sub_topic, char *message, char *msg_id) { int rc; int mid; char topic[ACLK_MAX_TOPIC + 1]; char *final_topic; UNUSED(msg_id); if (unlikely(aclk_wait_for_initialization())) return 1; if (unlikely(!message)) return 0; final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); if (unlikely(!final_topic)) { errno = 0; error("Unable to build outgoing topic; truncated?"); return 1; } ACLK_LOCK; rc = _link_send_message(final_topic, message, &mid); // TODO: link the msg_id with the mid so we can trace it ACLK_UNLOCK; if (unlikely(rc)) { errno = 0; error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); } return rc; } /* * Subscribe to a topic in the cloud * The final subscription will be in the form * /agent/claim_id/ */ int aclk_subscribe(char *sub_topic, int qos) { int rc; char topic[ACLK_MAX_TOPIC + 1]; char *final_topic; if (unlikely(aclk_wait_for_initialization())) return 1; final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); if (unlikely(!final_topic)) { errno = 0; error("Unable to build outgoing topic; truncated?"); return 1; } ACLK_LOCK; rc = _link_subscribe(final_topic, qos); ACLK_UNLOCK; // TODO: Add better handling -- error will flood the logfile here if (unlikely(rc)) { errno = 0; error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc)); } return rc; } // This is called from a callback when the link goes up void aclk_connect(void *ptr) { UNUSED(ptr); info("Connection detected"); aclk_connection_initialized = 1; waiting_init = 0; aclk_reconnect_delay(0); QUERY_THREAD_WAKEUP; return; } // This is called from a callback when the link goes down void aclk_disconnect(void *ptr) { UNUSED(ptr); if (likely(aclk_connection_initialized)) info("Disconnect detected"); aclk_subscribed = 0; aclk_metadata_submitted = 0; waiting_init = 1; aclk_connection_initialized = 0; aclk_connecting = 0; } void aclk_shutdown() { info("Shutdown initiated"); aclk_connection_initialized = 0; _link_shutdown(); info("Shutdown complete"); } void aclk_try_to_connect() { int rc; rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect); if (unlikely(rc)) { error("Failed to initialize the agent cloud link library"); } } inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) { uuid_t uuid; time_t time_created; char uuid_str[36 + 1]; if (unlikely(!msg_id)) { uuid_generate(uuid); uuid_unparse(uuid, uuid_str); msg_id = uuid_str; } time_created = now_realtime_sec(); buffer_sprintf( dest, "\t{\"type\": \"%s\",\n" "\t\"msg-id\": \"%s\",\n" "\t\"timestamp\": %ld,\n" "\t\"version\": %d,\n" "\t\"payload\": ", type, msg_id, time_created, ACLK_VERSION); debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created); } //#define EYE_FRIENDLY /* * Take a buffer, encode it and rewrite it * */ BUFFER *aclk_encode_response(BUFFER *contents) { #ifdef EYE_FRIENDLY return contents; #else char *tmp_buffer = mallocz(contents->len * 2); char *src, *dst; src = contents->buffer; dst = tmp_buffer; while (*src) { switch (*src) { case '\n': *dst++ = '\\'; *dst++ = 'n'; break; case 0x01 ... 0x09: case 0x0b ... 0x1F: *dst++ = '\\'; *dst++ = '0'; *dst++ = '0'; *dst++ = (*src < 0x0F) ? '0' : '1'; *dst++ = to_hex(*src); break; case '\"': case '\'': *dst++ = '\\'; *dst++ = *src; break; default: *dst++ = *src; } src++; } *dst = '\0'; buffer_flush(contents); buffer_sprintf(contents, "%s", tmp_buffer); freez(tmp_buffer); return contents; #endif } /* * This will send the alarms configuration * and */ void aclk_send_alarm_metadata() { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); char *msg_id = create_uuid(); buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; debug(D_ACLK,"Metadata alarms start"); aclk_create_header(local_buffer, "connect_alarms", msg_id); buffer_sprintf(local_buffer,"{\n\t \"configured-alarms\" : "); health_alarms2json(localhost, local_buffer, 1); debug(D_ACLK,"Metadata %s with configured alarms has %ld bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer,",\n\t \"alarm-log\" : "); health_alarm_log2json(localhost, local_buffer, 0); debug(D_ACLK,"Metadata %s with alarm_log has %ld bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer,",\n\t \"alarms-active\" : "); health_alarms_values2json(localhost, local_buffer, 0); debug(D_ACLK,"Metadata %s with alarms_active has %ld bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer,"\n}\n}"); aclk_send_message(ACLK_ALARMS_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id); debug(D_ACLK,"Metadata %s encoded has %ld bytes", msg_id, local_buffer->len); freez(msg_id); buffer_free(local_buffer); } int aclk_send_info_metadata() { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); debug(D_ACLK,"Metadata /info start"); char *msg_id = create_uuid(); buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; aclk_create_header(local_buffer, "connect", msg_id); buffer_sprintf(local_buffer,"{\n\t \"info\" : "); web_client_api_request_v1_info_fill_buffer(localhost, local_buffer); debug(D_ACLK,"Metadata %s with info has %ld bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer,", \n\t \"charts\" : "); charts2json(localhost, local_buffer, 1); buffer_sprintf(local_buffer,"\n}\n}"); debug(D_ACLK,"Metadata %s with chart has %ld bytes", msg_id, local_buffer->len); aclk_send_message(ACLK_METADATA_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id); debug(D_ACLK,"Metadata %s encoded has %ld bytes", msg_id, local_buffer->len); freez(msg_id); buffer_free(local_buffer); return 0; } // Send info metadata message to the cloud if the link is established // or on request int aclk_send_metadata() { aclk_send_info_metadata(); aclk_send_alarm_metadata(); return 0; } void aclk_single_update_disable() { aclk_disable_single_updates = 1; } void aclk_single_update_enable() { aclk_disable_single_updates = 0; } // Trigged by a health reload, sends the alarm metadata void aclk_alarm_reload() { if (unlikely(!agent_state)) return; aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); } //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf) int aclk_send_single_chart(char *hostname, char *chart) { RRDHOST *target_host; target_host = rrdhost_find_by_hostname(hostname, 0); if (!target_host) return 1; RRDSET *st = rrdset_find(target_host, chart); if (!st) st = rrdset_find_byname(target_host, chart); if (!st) { info("FAILED to find chart %s", chart); return 1; } BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); char *msg_id = create_uuid(); buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; aclk_create_header(local_buffer, "chart", msg_id); rrdset2json(st, local_buffer, NULL, NULL, 1); buffer_sprintf(local_buffer,"\t\n}"); aclk_send_message(ACLK_CHART_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id); freez(msg_id); buffer_free(local_buffer); return 0; } int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) { #ifndef ENABLE_ACLK UNUSED(host); UNUSED(chart_name); return 0; #else if (host != localhost) return 0; if (unlikely(aclk_disable_single_updates)) return 0; aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd); return 0; #endif } int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) { BUFFER *local_buffer = NULL; if (host != localhost) return 0; if (agent_state == 0) return 0; /* * Check if individual updates have been disabled * This will be the case when we do health reload * and all the alarms will be dropped and recreated. * At the end of the health reload the complete alarm metadata * info will be sent */ if (unlikely(aclk_disable_single_updates)) return 0; local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); char *msg_id = create_uuid(); buffer_flush(local_buffer); aclk_create_header(local_buffer, "alarms", msg_id); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); health_alarm_entry2json_nolock(local_buffer, ae, host); netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); buffer_sprintf(local_buffer,"\n}"); aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, aclk_encode_response(local_buffer)->buffer , 0, 1, ACLK_CMD_ALARM); freez(msg_id); buffer_free(local_buffer); return 0; } /* * Parse the incoming payload and queue a command if valid */ int aclk_handle_cloud_request(char *payload) { struct aclk_request cloud_to_agent = { .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0}; if (unlikely(!payload)) { debug(D_ACLK, "ACLK incoming message is empty"); return 0; } debug(D_ACLK, "ACLK incoming message [%s]", payload); int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); if (unlikely( JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id || !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION || strcmp(cloud_to_agent.type_id, "http"))) { if (JSON_OK != rc) error("Malformed json request (%s)", payload); if (cloud_to_agent.version > ACLK_VERSION) error("Unsupported version in JSON request %d", cloud_to_agent.version); if (cloud_to_agent.payload) freez(cloud_to_agent.payload); if (cloud_to_agent.type_id) freez(cloud_to_agent.type_id); if (cloud_to_agent.msg_id) freez(cloud_to_agent.msg_id); if (cloud_to_agent.callback_topic) freez(cloud_to_agent.callback_topic); return 1; } aclk_submit_request(&cloud_to_agent); // Note: the payload comes from the callback and it will be automatically freed return 0; }