// SPDX-License-Identifier: GPL-3.0-or-later #include "libnetdata/libnetdata.h" #include "agent_cloud_link.h" #include "aclk_lws_https_client.h" #include "aclk_common.h" #include "aclk_stats.h" int aclk_shutting_down = 0; // State-machine for the on-connect metadata transmission. // TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial // agent startup phase. static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED; static AGENT_STATE agent_state = AGENT_INITIALIZING; // Other global state static int aclk_subscribed = 0; static int aclk_disable_single_updates = 0; static time_t last_init_sequence = 0; static int waiting_init = 1; static char *aclk_username = NULL; static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_connected = 0; // Exposed in the web-api int aclk_force_reconnect = 0; // Indication from lower layers int aclk_kill_link = 0; // Tell the agent to tear down the link usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) #define QUERY_LOCK netdata_mutex_lock(&query_mutex) #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait); #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); void aclk_lws_wss_destroy_context(); /* * Maintain a list of collectors and chart count * If all the charts of a collector are deleted * then a new metadata dataset must be send to the cloud * */ struct _collector { time_t created; uint32_t count; //chart count uint32_t hostname_hash; uint32_t plugin_hash; uint32_t module_hash; char *hostname; char *plugin_name; char *module_name; struct _collector *next; }; struct _collector *collector_list = NULL; struct aclk_query { time_t created; time_t run_after; // Delay run until after this time ACLK_CMD cmd; // What command is this char *topic; // Topic to respond to char *data; // Internal data (NULL if request from the cloud) char *msg_id; // msg_id generated by the cloud (NULL if internal) char *query; // The actual query u_char deleted; // Mark deleted for garbage collect struct aclk_query *next; }; struct aclk_query_queue { struct aclk_query *aclk_query_head; struct aclk_query *aclk_query_tail; uint64_t count; } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; char *create_uuid() { uuid_t uuid; char *uuid_str = mallocz(36 + 1); uuid_generate(uuid); uuid_unparse(uuid, uuid_str); return uuid_str; } int cloud_to_agent_parse(JSON_ENTRY *e) { struct aclk_request *data = e->callback_data; switch (e->type) { case JSON_OBJECT: case JSON_ARRAY: break; case JSON_STRING: if (!strcmp(e->name, "msg-id")) { data->msg_id = strdupz(e->data.string); break; } if (!strcmp(e->name, "type")) { data->type_id = strdupz(e->data.string); break; } if (!strcmp(e->name, "callback-topic")) { data->callback_topic = strdupz(e->data.string); break; } if (!strcmp(e->name, "payload")) { if (likely(e->data.string)) { size_t len = strlen(e->data.string); data->payload = mallocz(len+1); if (!url_decode_r(data->payload, e->data.string, len + 1)) strcpy(data->payload, e->data.string); } break; } break; case JSON_NUMBER: if (!strcmp(e->name, "version")) { data->version = atoi(e->original_string); break; } break; case JSON_BOOLEAN: break; case JSON_NULL: break; } return 0; } static RSA *aclk_private_key = NULL; static int create_private_key() { if (aclk_private_key != NULL) RSA_free(aclk_private_key); aclk_private_key = NULL; char filename[FILENAME_MAX + 1]; snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); long bytes_read; char *private_key = read_by_filename(filename, &bytes_read); if (!private_key) { error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); return 1; } debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); BIO *key_bio = BIO_new_mem_buf(private_key, -1); if (key_bio==NULL) { error("Claimed agent cannot establish ACLK - failed to create BIO for key"); goto biofailed; } aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); BIO_free(key_bio); if (aclk_private_key!=NULL) { freez(private_key); return 0; } char err[512]; ERR_error_string_n(ERR_get_error(), err, sizeof(err)); error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); biofailed: freez(private_key); return 1; } /* * After a connection failure -- delay in milliseconds * When a connection is established, the delay function * should be called with * * mode 0 to reset the delay * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms * */ unsigned long int aclk_reconnect_delay(int mode) { static int fail = -1; unsigned long int delay; if (!mode || fail == -1) { srandom(time(NULL)); fail = mode - 1; return 0; } delay = (1 << fail); if (delay >= ACLK_MAX_BACKOFF_DELAY) { delay = ACLK_MAX_BACKOFF_DELAY * 1000; } else { fail++; delay = (delay * 1000) + (random() % 1000); } return delay; } /* * Free a query structure when done */ void aclk_query_free(struct aclk_query *this_query) { if (unlikely(!this_query)) return; freez(this_query->topic); if (likely(this_query->query)) freez(this_query->query); if (likely(this_query->data)) freez(this_query->data); if (likely(this_query->msg_id)) freez(this_query->msg_id); freez(this_query); } // Returns the entry after which we need to create a new entry to run at the specified time // If NULL is returned we need to add to HEAD // Need to have a QUERY lock before calling this struct aclk_query *aclk_query_find_position(time_t time_to_run) { struct aclk_query *tmp_query, *last_query; // Quick check if we will add to the end if (likely(aclk_queue.aclk_query_tail)) { if (aclk_queue.aclk_query_tail->run_after <= time_to_run) return aclk_queue.aclk_query_tail; } last_query = NULL; tmp_query = aclk_queue.aclk_query_head; while (tmp_query) { if (tmp_query->run_after > time_to_run) return last_query; last_query = tmp_query; tmp_query = tmp_query->next; } return last_query; } // Need to have a QUERY lock before calling this struct aclk_query * aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) { struct aclk_query *tmp_query, *prev_query; UNUSED(cmd); tmp_query = aclk_queue.aclk_query_head; prev_query = NULL; while (tmp_query) { if (likely(!tmp_query->deleted)) { if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { if (likely(last_query)) *last_query = prev_query; return tmp_query; } } } prev_query = tmp_query; tmp_query = tmp_query->next; } return NULL; } /* * Add a query to execute, the result will be send to the specified topic */ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) { struct aclk_query *new_query, *tmp_query; // Ignore all commands while we wait for the agent to initialize if (unlikely(waiting_init)) return 1; run_after = now_realtime_sec() + run_after; QUERY_LOCK; struct aclk_query *last_query = NULL; tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); if (unlikely(tmp_query)) { if (tmp_query->run_after == run_after) { QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; } if (last_query) last_query->next = tmp_query->next; else aclk_queue.aclk_query_head = tmp_query->next; debug(D_ACLK, "Removing double entry"); aclk_query_free(tmp_query); aclk_queue.count--; } if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_queued++; ACLK_STATS_UNLOCK; } new_query = callocz(1, sizeof(struct aclk_query)); new_query->cmd = aclk_cmd; if (internal) { new_query->topic = strdupz(topic); if (likely(query)) new_query->query = strdupz(query); } else { new_query->topic = topic; new_query->query = query; new_query->msg_id = msg_id; } if (data) new_query->data = strdupz(data); new_query->next = NULL; new_query->created = now_realtime_sec(); new_query->run_after = run_after; debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); tmp_query = aclk_query_find_position(run_after); if (tmp_query) { new_query->next = tmp_query->next; tmp_query->next = new_query; if (tmp_query == aclk_queue.aclk_query_tail) aclk_queue.aclk_query_tail = new_query; aclk_queue.count++; QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; } new_query->next = aclk_queue.aclk_query_head; aclk_queue.aclk_query_head = new_query; aclk_queue.count++; QUERY_UNLOCK; QUERY_THREAD_WAKEUP; return 0; } inline int aclk_submit_request(struct aclk_request *request) { return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD); } /* * Get the next query to process - NULL if nothing there * The caller needs to free memory by calling aclk_query_free() * * topic * query * The structure itself * */ struct aclk_query *aclk_queue_pop() { struct aclk_query *this_query; QUERY_LOCK; if (likely(!aclk_queue.aclk_query_head)) { QUERY_UNLOCK; return NULL; } this_query = aclk_queue.aclk_query_head; // Get rid of the deleted entries while (this_query && this_query->deleted) { aclk_queue.count--; aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; if (likely(!aclk_queue.aclk_query_head)) { aclk_queue.aclk_query_tail = NULL; } aclk_query_free(this_query); this_query = aclk_queue.aclk_query_head; } if (likely(!this_query)) { QUERY_UNLOCK; return NULL; } if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); QUERY_UNLOCK; return NULL; } aclk_queue.count--; aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; if (likely(!aclk_queue.aclk_query_head)) { aclk_queue.aclk_query_tail = NULL; } QUERY_UNLOCK; return this_query; } // This will give the base topic that the agent will publish messages. // subtopics will be sent under the base topic e.g. base_topic/subtopic // This is called during the connection, we delete any previous topic // in-case the user has changed the agent id and reclaimed. char *create_publish_base_topic() { char *agent_id = is_agent_claimed(); if (unlikely(!agent_id)) return NULL; ACLK_LOCK; if (global_base_topic) freez(global_base_topic); char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id); tmp = strchr(tmp_topic, '\n'); if (unlikely(tmp)) *tmp = '\0'; global_base_topic = strdupz(tmp_topic); ACLK_UNLOCK; freez(agent_id); return global_base_topic; } /* * Build a topic based on sub_topic and final_topic * if the sub topic starts with / assume that is an absolute topic * */ char *get_topic(char *sub_topic, char *final_topic, int max_size) { int rc; if (likely(sub_topic && sub_topic[0] == '/')) return sub_topic; if (unlikely(!global_base_topic)) return sub_topic; rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); if (unlikely(rc >= max_size)) debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic); return final_topic; } /* * Free a collector structure */ static void _free_collector(struct _collector *collector) { if (likely(collector->plugin_name)) freez(collector->plugin_name); if (likely(collector->module_name)) freez(collector->module_name); if (likely(collector->hostname)) freez(collector->hostname); freez(collector); } /* * This will report the collector list * */ #ifdef ACLK_DEBUG static void _dump_collector_list() { struct _collector *tmp_collector; COLLECTOR_LOCK; info("DUMPING ALL COLLECTORS"); if (unlikely(!collector_list || !collector_list->next)) { COLLECTOR_UNLOCK; info("DUMPING ALL COLLECTORS -- nothing found"); return; } // Note that the first entry is "dummy" tmp_collector = collector_list->next; while (tmp_collector) { info( "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, tmp_collector->plugin_name ? tmp_collector->plugin_name : "", tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); tmp_collector = tmp_collector->next; } info("DUMPING ALL COLLECTORS DONE"); COLLECTOR_UNLOCK; } #endif /* * This will cleanup the collector list * */ static void _reset_collector_list() { struct _collector *tmp_collector, *next_collector; COLLECTOR_LOCK; if (unlikely(!collector_list || !collector_list->next)) { COLLECTOR_UNLOCK; return; } // Note that the first entry is "dummy" tmp_collector = collector_list->next; collector_list->count = 0; collector_list->next = NULL; // We broke the link; we can unlock COLLECTOR_UNLOCK; while (tmp_collector) { next_collector = tmp_collector->next; _free_collector(tmp_collector); tmp_collector = next_collector; } } /* * Find a collector (if it exists) * Must lock before calling this * If last_collector is not null, it will return the previous collector in the linked * list (used in collector delete) */ static struct _collector *_find_collector( const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) { struct _collector *tmp_collector, *prev_collector; uint32_t plugin_hash; uint32_t module_hash; uint32_t hostname_hash; if (unlikely(!collector_list)) { collector_list = callocz(1, sizeof(struct _collector)); return NULL; } if (unlikely(!collector_list->next)) return NULL; plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; module_hash = module_name ? simple_hash(module_name) : 1; hostname_hash = simple_hash(hostname); // Note that the first entry is "dummy" tmp_collector = collector_list->next; prev_collector = collector_list; while (tmp_collector) { if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { if (unlikely(last_collector)) *last_collector = prev_collector; return tmp_collector; } prev_collector = tmp_collector; tmp_collector = tmp_collector->next; } return tmp_collector; } /* * Called to delete a collector * It will reduce the count (chart_count) and will remove it * from the linked list if the count reaches zero * The structure will be returned to the caller to free * the resources * */ static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector, *prev_collector = NULL; tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); if (likely(tmp_collector)) { --tmp_collector->count; if (unlikely(!tmp_collector->count)) prev_collector->next = tmp_collector->next; } return tmp_collector; } /* * Add a new collector (plugin / module) to the list * If it already exists just update the chart count * * Lock before calling */ static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); if (unlikely(!tmp_collector)) { tmp_collector = callocz(1, sizeof(struct _collector)); tmp_collector->hostname_hash = simple_hash(hostname); tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; tmp_collector->hostname = strdupz(hostname); tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; tmp_collector->next = collector_list->next; collector_list->next = tmp_collector; } tmp_collector->count++; debug( D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", module_name ? module_name : "*", tmp_collector->count); return tmp_collector; } /* * Add a new collector to the list * If it exists, update the chart count */ void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; COLLECTOR_LOCK; tmp_collector = _add_collector(hostname, plugin_name, module_name); if (unlikely(tmp_collector->count != 1)) { COLLECTOR_UNLOCK; return; } if (unlikely(agent_state == AGENT_INITIALIZING)) last_init_sequence = now_realtime_sec(); else { if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); } COLLECTOR_UNLOCK; } /* * Delete a collector from the list * If the chart count reaches zero the collector will be removed * from the list by calling del_collector. * * This function will release the memory used and schedule * a cloud update */ void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; COLLECTOR_LOCK; tmp_collector = _del_collector(hostname, plugin_name, module_name); if (unlikely(!tmp_collector || tmp_collector->count)) { COLLECTOR_UNLOCK; return; } debug( D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", tmp_collector->count); COLLECTOR_UNLOCK; if (unlikely(agent_state == AGENT_INITIALIZING)) last_init_sequence = now_realtime_sec(); else { if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); } _free_collector(tmp_collector); } /* * Take a buffer, encode it and rewrite it * */ static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines) { char *tmp_buffer = mallocz(content_size * 2); char *dst = tmp_buffer; while (content_size > 0) { switch (*src) { case '\n': if (keep_newlines) { *dst++ = '\\'; *dst++ = 'n'; } break; case '\t': break; case 0x01 ... 0x08: case 0x0b ... 0x1F: *dst++ = '\\'; *dst++ = 'u'; *dst++ = '0'; *dst++ = '0'; *dst++ = (*src < 0x0F) ? '0' : '1'; *dst++ = to_hex(*src); break; case '\"': *dst++ = '\\'; *dst++ = *src; break; default: *dst++ = *src; } src++; content_size--; } *dst = '\0'; return tmp_buffer; } int aclk_execute_query(struct aclk_query *this_query) { if (strncmp(this_query->query, "/api/v1/", 8) == 0) { struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() w->cookie1[0] = 0; // Simulate web_client_create_on_fd() w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; char *mysep = strchr(this_query->query, '?'); if (mysep) { strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); *mysep = '\0'; } else strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); mysep = strrchr(this_query->query, '/'); // TODO: handle bad response perhaps in a different way. For now it does to the payload w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); now_realtime_timeval(&w->tv_ready); w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0); buffer_strcat(local_buffer, ",\n\t\"payload\": "); char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); buffer_sprintf( local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}", w->response.code, encoded_response, encoded_header); buffer_sprintf(local_buffer, "\n}"); debug(D_ACLK, "Response:%s", encoded_header); aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); buffer_free(w->response.data); buffer_free(w->response.header); buffer_free(w->response.header_output); freez(w); buffer_free(local_buffer); freez(encoded_response); freez(encoded_header); return 0; } return 1; } /* * This function will fetch the next pending command and process it * */ int aclk_process_query() { struct aclk_query *this_query; static long int query_count = 0; if (!aclk_connected) return 0; this_query = aclk_queue_pop(); if (likely(!this_query)) { return 0; } if (unlikely(this_query->deleted)) { debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); aclk_query_free(this_query); return 1; } query_count++; debug( D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic, this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created)); switch (this_query->cmd) { case ACLK_CMD_ONCONNECT: debug(D_ACLK, "EXECUTING on connect metadata command"); aclk_send_metadata(); aclk_metadata_submitted = ACLK_METADATA_SENT; break; case ACLK_CMD_CHART: debug(D_ACLK, "EXECUTING a chart update command"); aclk_send_single_chart(this_query->data, this_query->query); break; case ACLK_CMD_CHARTDEL: debug(D_ACLK, "EXECUTING a chart delete command"); //TODO: This send the info metadata for now aclk_send_info_metadata(); break; case ACLK_CMD_ALARM: debug(D_ACLK, "EXECUTING an alarm update command"); aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); break; case ACLK_CMD_CLOUD: debug(D_ACLK, "EXECUTING a cloud command"); aclk_execute_query(this_query); break; default: break; } debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); aclk_query_free(this_query); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; ACLK_STATS_UNLOCK; } return 1; } /* * Process all pending queries * Return 0 if no queries were processed, 1 otherwise * */ int aclk_process_queries() { if (unlikely(netdata_exit || !aclk_connected)) return 0; if (likely(!aclk_queue.count)) return 0; debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count); //TODO: may consider possible throttling here while (aclk_process_query()) { // Process all commands }; return 1; } static void aclk_query_thread_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; info("cleaning up..."); _reset_collector_list(); freez(collector_list); // Clean memory for pending queries if any struct aclk_query *this_query; do { this_query = aclk_queue_pop(); aclk_query_free(this_query); } while (this_query); freez(static_thread->thread); freez(static_thread); } /** * Main query processing thread * * On startup wait for the agent collectors to initialize * Expect at least a time of ACLK_STABLE_TIMEOUT seconds * of no new collectors coming in in order to mark the agent * as stable (set agent_state = AGENT_STABLE) */ void *aclk_query_main_thread(void *ptr) { netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); while (agent_state == AGENT_INITIALIZING && !netdata_exit) { time_t checkpoint; checkpoint = now_realtime_sec() - last_init_sequence; if (checkpoint > ACLK_STABLE_TIMEOUT) { agent_state = AGENT_STABLE; info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint); #ifdef ACLK_DEBUG _dump_collector_list(); #endif break; } info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint); sleep_usec(USEC_PER_SEC * 1); } while (!netdata_exit) { if (unlikely(!aclk_metadata_submitted)) { aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED; if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { errno = 0; error("ACLK failed to queue on_connect command"); aclk_metadata_submitted = ACLK_METADATA_REQUIRED; } } aclk_process_queries(); QUERY_THREAD_LOCK; // TODO: Need to check if there are queries awaiting already if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); QUERY_THREAD_UNLOCK; } // forever info("Shutting down query processing thread"); netdata_thread_cleanup_pop(1); return NULL; } static void aclk_graceful_disconnect() { size_t write_q, write_q_bytes, read_q; time_t event_loop_timeout; // Send a graceful disconnect message BUFFER *b = buffer_create(512); aclk_create_header(b, "disconnect", NULL, 0, 0); buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); buffer_free(b); event_loop_timeout = now_realtime_sec() + 5; write_q = 1; while (write_q && event_loop_timeout > now_realtime_sec()) { _link_event_loop(); lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); } aclk_shutting_down = 1; _link_shutdown(); aclk_lws_wss_mqtt_layer_disconect_notif(); write_q = 1; event_loop_timeout = now_realtime_sec() + 5; while (write_q && event_loop_timeout > now_realtime_sec()) { _link_event_loop(); lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); } aclk_shutting_down = 0; } // Thread cleanup static void aclk_main_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; info("cleaning up..."); char *agent_id = is_agent_claimed(); if (agent_id && aclk_connected) { freez(agent_id); // Wakeup thread to cleanup QUERY_THREAD_WAKEUP; aclk_graceful_disconnect(); } static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } struct dictionary_singleton { char *key; char *result; }; int json_extract_singleton(JSON_ENTRY *e) { struct dictionary_singleton *data = e->callback_data; switch (e->type) { case JSON_OBJECT: case JSON_ARRAY: break; case JSON_STRING: if (!strcmp(e->name, data->key)) { data->result = strdupz(e->data.string); break; } break; case JSON_NUMBER: case JSON_BOOLEAN: case JSON_NULL: break; } return 0; } // Base-64 decoder. // Note: This is non-validating, invalid input will be decoded without an error. // Challenges are packed into json strings so we don't skip newlines. // Size errors (i.e. invalid input size or insufficient output space) are caught. size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size) { static char lookup[256]; static int first_time=1; if (first_time) { first_time = 0; for(int i=0; i<256; i++) lookup[i] = -1; for(int i='A'; i<='Z'; i++) lookup[i] = i-'A'; for(int i='a'; i<='z'; i++) lookup[i] = i-'a' + 26; for(int i='0'; i<='9'; i++) lookup[i] = i-'0' + 52; lookup['+'] = 62; lookup['/'] = 63; } if ((input_size & 3) != 0) { error("Can't decode base-64 input length %zu", input_size); return 0; } size_t unpadded_size = (input_size/4) * 3; if ( unpadded_size > output_size ) { error("Output buffer size %zu is too small to decode %zu into", output_size, input_size); return 0; } // Don't check padding within full quantums for (size_t i = 0 ; i < input_size-4 ; i+=4 ) { uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]]; output[0] = value >> 16; output[1] = value >> 8; output[2] = value; //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); output += 3; input += 4; } // Handle padding only in last quantum if (input[2] == '=') { uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]]; output[0] = value >> 4; //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]); return unpadded_size-2; } else if (input[3] == '=') { uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]]; output[0] = value >> 10; output[1] = value >> 2; //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]); return unpadded_size-1; } else { uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3]; output[0] = value >> 16; output[1] = value >> 8; output[2] = value; //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); return unpadded_size; } } size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size) { uint32_t value; static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789+/"; if ((input_size/3+1)*4 >= output_size) { error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size); return 0; } size_t count = 0; while (input_size>3) { value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff; output[0] = lookup[value >> 18]; output[1] = lookup[(value >> 12) & 0x3f]; output[2] = lookup[(value >> 6) & 0x3f]; output[3] = lookup[value & 0x3f]; //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); output += 4; input += 3; input_size -= 3; count += 4; } switch (input_size) { case 2: value = (input[0] << 10) + (input[1] << 2); output[0] = lookup[(value >> 12) & 0x3f]; output[1] = lookup[(value >> 6) & 0x3f]; output[2] = lookup[value & 0x3f]; output[3] = '='; //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); count += 4; break; case 1: value = input[0] << 4; output[0] = lookup[(value >> 6) & 0x3f]; output[1] = lookup[value & 0x3f]; output[2] = '='; output[3] = '='; //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); count += 4; break; case 0: break; } return count; } int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted) { int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING); if (result == -1) { char err[512]; ERR_error_string_n(ERR_get_error(), err, sizeof(err)); error("Decryption of the challenge failed: %s", err); } return result; } char *extract_payload(BUFFER *b) { char *s = b->buffer; unsigned int line_len=0; for (size_t i=0; ilen; i++) { if (*s == 0 ) return NULL; if (*s == '\n' ) { if (line_len==0) return s+1; line_len = 0; } else if (*s == '\r') { /* don't count */ } else line_len ++; s++; } return NULL; } void aclk_get_challenge(char *aclk_hostname, char *aclk_port) { char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); debug(D_ACLK, "Performing challenge-response sequence"); if (aclk_password != NULL) { freez(aclk_password); aclk_password = NULL; } // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge // TODO - target host? char *agent_id = is_agent_claimed(); if (agent_id == NULL) { error("Agent was not claimed - cannot perform challenge/response"); goto CLEANUP; } char url[1024]; sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id); info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url); if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL)) { error("Challenge failed: %s", data_buffer); goto CLEANUP; } struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; debug(D_ACLK, "Challenge response from cloud: %s", data_buffer); if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK) { freez(challenge.result); error("Could not parse the json response with the challenge: %s", data_buffer); goto CLEANUP; } if (challenge.result == NULL ) { error("Could not retrieve challenge from auth response: %s", data_buffer); goto CLEANUP; } size_t challenge_len = strlen(challenge.result); unsigned char decoded[512]; size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); unsigned char plaintext[4096]={}; int decrypted_length = private_decrypt(decoded, decoded_len, plaintext); freez(challenge.result); char encoded[512]; size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); encoded[encoded_len] = 0; debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded); char response_json[4096]={}; sprintf(response_json, "{\"response\":\"%s\"}", encoded); debug(D_ACLK, "Password phase: %s",response_json); // TODO - host sprintf(url, "/api/v1/auth/node/%s/password", agent_id); if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json)) { error("Challenge-response failed: %s", data_buffer); goto CLEANUP; } debug(D_ACLK, "Password response from cloud: %s", data_buffer); struct dictionary_singleton password = { .key = "password", .result = NULL }; if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK) { freez(password.result); error("Could not parse the json response with the password: %s", data_buffer); goto CLEANUP; } if (password.result == NULL ) { error("Could not retrieve password from auth response"); goto CLEANUP; } if (aclk_password != NULL ) freez(aclk_password); aclk_password = password.result; if (aclk_username != NULL) freez(aclk_username); aclk_username = agent_id; agent_id = NULL; CLEANUP: if (agent_id != NULL) freez(agent_id); freez(data_buffer); return; } static void aclk_try_to_connect(char *hostname, char *port, int port_num) { if (!aclk_private_key) { error("Cannot try to establish the agent cloud link - no private key available!"); return; } info("Attempting to establish the agent cloud link"); aclk_get_challenge(hostname, port); if (aclk_password == NULL) return; int rc; aclk_connecting = 1; create_publish_base_topic(); rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password); if (unlikely(rc)) { error("Failed to initialize the agent cloud link library"); } } /** * Main agent cloud link thread * * This thread will simply call the main event loop that handles * pending requests - both inbound and outbound * * @param ptr is a pointer to the netdata_static_thread structure. * * @return It always returns NULL */ void *aclk_main(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct netdata_static_thread *query_thread; struct netdata_static_thread *stats_thread = NULL; // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. netdata_thread_disable_cancelability(); #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK) info("Killing ACLK thread -> cloud functionality has been disabled"); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif info("Waiting for netdata to be ready"); while (!netdata_ready) { sleep_usec(USEC_PER_MS * 300); } info("Waiting for Cloud to be enabled"); while (!netdata_cloud_setting) { sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) { static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; } } aclk_stats_enabled = appconfig_get_boolean(&cloud_config, CONFIG_SECTION_GLOBAL, "statistics", CONFIG_BOOLEAN_YES); if (aclk_stats_enabled) { stats_thread = callocz(1, sizeof(struct netdata_static_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); netdata_thread_create( stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread); } last_init_sequence = now_realtime_sec(); query_thread = NULL; char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering. char *aclk_port = NULL; uint32_t port_num = 0; info("Waiting for netdata to be claimed"); while(1) { char *agent_id = is_agent_claimed(); while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) goto exited; agent_id = is_agent_claimed(); } freez(agent_id); // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { error("Do not move the cloud base url out of post_conf_load!!"); goto exited; } if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { error("Agent is claimed but the configuration is invalid, please fix"); } else { port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value if (!create_private_key() && !_mqtt_lib_init()) break; } for (int i=0; i<60; i++) { if (netdata_exit) goto exited; sleep_usec(USEC_PER_SEC * 1); } } usec_t reconnect_expiry = 0; // In usecs while (!netdata_exit) { static int first_init = 0; /* size_t write_q, write_q_bytes, read_q; lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ if (aclk_kill_link) { // User has reloaded the claiming state aclk_kill_link = 0; aclk_graceful_disconnect(); create_private_key(); continue; } if (aclk_force_reconnect) { aclk_lws_wss_destroy_context(); aclk_force_reconnect = 0; } if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) { if (unlikely(!first_init)) { aclk_try_to_connect(aclk_hostname, aclk_port, port_num); first_init = 1; } else { if (aclk_connecting == 0) { if (reconnect_expiry == 0) { unsigned long int delay = aclk_reconnect_delay(1); reconnect_expiry = now_realtime_usec() + delay * 1000; info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0); } if (now_realtime_usec() >= reconnect_expiry) { reconnect_expiry = 0; aclk_try_to_connect(aclk_hostname, aclk_port, port_num); } sleep_usec(USEC_PER_MS * 100); } } if (aclk_connecting) { _link_event_loop(); sleep_usec(USEC_PER_MS * 100); } continue; } _link_event_loop(); if (unlikely(!aclk_connected || aclk_force_reconnect)) continue; /*static int stress_counter = 0; if (write_q_bytes==0 && stress_counter ++ >5) { aclk_send_stress_test(8000000); stress_counter = 0; }*/ // TODO: Move to on-connect if (unlikely(!aclk_subscribed)) { aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1); } if (unlikely(!query_thread)) { query_thread = callocz(1, sizeof(struct netdata_static_thread)); query_thread->thread = mallocz(sizeof(netdata_thread_t)); netdata_thread_create( query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, query_thread); } } // forever exited: // Wakeup query thread to cleanup QUERY_THREAD_WAKEUP; freez(aclk_username); freez(aclk_password); freez(aclk_hostname); freez(aclk_port); if (aclk_private_key != NULL) RSA_free(aclk_private_key); aclk_main_cleanup(ptr); if(aclk_stats_enabled) { netdata_thread_join(*stats_thread->thread, NULL); freez(stats_thread->thread); freez(stats_thread); } return NULL; } /* * Send a message to the cloud, using a base topic and sib_topic * The final topic will be in the form / * If base_topic is missing then the global_base_topic will be used (if available) * */ int aclk_send_message(char *sub_topic, char *message, char *msg_id) { int rc; int mid; char topic[ACLK_MAX_TOPIC + 1]; char *final_topic; UNUSED(msg_id); if (!aclk_connected) return 0; if (unlikely(!message)) return 0; final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); if (unlikely(!final_topic)) { errno = 0; error("Unable to build outgoing topic; truncated?"); return 1; } ACLK_LOCK; rc = _link_send_message(final_topic, (unsigned char *)message, &mid); // TODO: link the msg_id with the mid so we can trace it ACLK_UNLOCK; if (unlikely(rc)) { errno = 0; error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); } return rc; } /* * Subscribe to a topic in the cloud * The final subscription will be in the form * /agent/claim_id/ */ int aclk_subscribe(char *sub_topic, int qos) { int rc; char topic[ACLK_MAX_TOPIC + 1]; char *final_topic; final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); if (unlikely(!final_topic)) { errno = 0; error("Unable to build outgoing topic; truncated?"); return 1; } if (!aclk_connected) { error("Cannot subscribe to %s - not connected!", topic); return 1; } ACLK_LOCK; rc = _link_subscribe(final_topic, qos); ACLK_UNLOCK; // TODO: Add better handling -- error will flood the logfile here if (unlikely(rc)) { errno = 0; error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc)); } return rc; } // This is called from a callback when the link goes up void aclk_connect() { info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count); aclk_stats_upd_online(1); aclk_connected = 1; waiting_init = 0; aclk_reconnect_delay(0); QUERY_THREAD_WAKEUP; return; } // This is called from a callback when the link goes down void aclk_disconnect() { if (likely(aclk_connected)) info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count); aclk_stats_upd_online(0); aclk_subscribed = 0; aclk_metadata_submitted = ACLK_METADATA_REQUIRED; waiting_init = 1; aclk_connected = 0; aclk_connecting = 0; aclk_force_reconnect = 1; } inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us) { uuid_t uuid; char uuid_str[36 + 1]; if (unlikely(!msg_id)) { uuid_generate(uuid); uuid_unparse(uuid, uuid_str); msg_id = uuid_str; } if (ts_secs == 0) { ts_us = now_realtime_usec(); ts_secs = ts_us / USEC_PER_SEC; ts_us = ts_us % USEC_PER_SEC; } buffer_sprintf( dest, "\t{\"type\": \"%s\",\n" "\t\"msg-id\": \"%s\",\n" "\t\"timestamp\": %ld,\n" "\t\"timestamp-offset-usec\": %llu,\n" "\t\"connect\": %ld,\n" "\t\"connect-offset-usec\": %llu,\n" "\t\"version\": %d", type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION); debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs); } /* * This will send alarm information which includes * configured alarms * alarm_log * active alarms */ void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); void aclk_send_alarm_metadata() { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); char *msg_id = create_uuid(); buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; debug(D_ACLK, "Metadata alarms start"); // on_connect messages are sent on a health reload, if the on_connect message is real then we // use the session time as the fake timestamp to indicate that it starts the session. If it is // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (aclk_metadata_submitted == ACLK_METADATA_SENT) aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0); else aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us); buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); health_alarms2json(localhost, local_buffer, 1); debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len); // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : "); // health_alarm_log2json(localhost, local_buffer, 0); // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : "); health_active_log_alarms_2json(localhost, local_buffer); //debug(D_ACLK, "Metadata message %s", local_buffer->buffer); buffer_sprintf(local_buffer, "\n}\n}"); aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id); freez(msg_id); buffer_free(local_buffer); } /* * This will send the agent metadata * /api/v1/info * charts */ int aclk_send_info_metadata() { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); debug(D_ACLK, "Metadata /info start"); char *msg_id = create_uuid(); buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; // on_connect messages are sent on a health reload, if the on_connect message is real then we // use the session time as the fake timestamp to indicate that it starts the session. If it is // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (aclk_metadata_submitted == ACLK_METADATA_SENT) aclk_create_header(local_buffer, "update", msg_id, 0, 0); else aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us); buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"info\" : "); web_client_api_request_v1_info_fill_buffer(localhost, local_buffer); debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer, ", \n\t \"charts\" : "); charts2json(localhost, local_buffer, 1); buffer_sprintf(local_buffer, "\n}\n}"); debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len); aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); freez(msg_id); buffer_free(local_buffer); return 0; } void aclk_send_stress_test(size_t size) { char *buffer = mallocz(size); if (buffer != NULL) { for(size_t i=0; icontenttype = CT_APPLICATION_JSON; aclk_create_header(local_buffer, "chart", msg_id, 0, 0); buffer_strcat(local_buffer, ",\n\t\"payload\": "); rrdset2json(st, local_buffer, NULL, NULL, 1); buffer_sprintf(local_buffer, "\t\n}"); aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id); freez(msg_id); buffer_free(local_buffer); return 0; } int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) { #ifndef ENABLE_ACLK UNUSED(host); UNUSED(chart_name); return 0; #else if (!netdata_cloud_setting) return 0; if (host != localhost) return 0; if (unlikely(aclk_disable_single_updates)) return 0; if (unlikely(agent_state == AGENT_INITIALIZING)) last_init_sequence = now_realtime_sec(); else { if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue chart_update command"); } } } return 0; #endif } int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) { BUFFER *local_buffer = NULL; if (host != localhost) return 0; if (unlikely(agent_state == AGENT_INITIALIZING)) return 0; /* * Check if individual updates have been disabled * This will be the case when we do health reload * and all the alarms will be dropped and recreated. * At the end of the health reload the complete alarm metadata * info will be sent */ if (unlikely(aclk_disable_single_updates)) return 0; local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); char *msg_id = create_uuid(); buffer_flush(local_buffer); aclk_create_header(local_buffer, "status-change", msg_id, 0, 0); buffer_strcat(local_buffer, ",\n\t\"payload\": "); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); health_alarm_entry2json_nolock(local_buffer, ae, host); netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); buffer_sprintf(local_buffer, "\n}"); if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue alarm_command on alarm_update"); } } freez(msg_id); buffer_free(local_buffer); return 0; } /* * Parse the incoming payload and queue a command if valid */ int aclk_handle_cloud_request(char *payload) { struct aclk_request cloud_to_agent = { .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0 }; if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.cloud_req_recvd++; ACLK_STATS_UNLOCK; } if (unlikely(agent_state == AGENT_INITIALIZING)) { debug(D_ACLK, "Ignoring cloud request; agent not in stable state"); return 0; } if (unlikely(!payload)) { debug(D_ACLK, "ACLK incoming message is empty"); return 0; } debug(D_ACLK, "ACLK incoming message (%s)", payload); int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); if (unlikely( JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id || !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION || strcmp(cloud_to_agent.type_id, "http"))) { if (JSON_OK != rc) error("Malformed json request (%s)", payload); if (cloud_to_agent.version > ACLK_VERSION) error("Unsupported version in JSON request %d", cloud_to_agent.version); if (cloud_to_agent.payload) freez(cloud_to_agent.payload); if (cloud_to_agent.type_id) freez(cloud_to_agent.type_id); if (cloud_to_agent.msg_id) freez(cloud_to_agent.msg_id); if (cloud_to_agent.callback_topic) freez(cloud_to_agent.callback_topic); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.cloud_req_err++; ACLK_STATS_UNLOCK; } return 1; } // Checked to be "http", not needed anymore if (likely(cloud_to_agent.type_id)) { freez(cloud_to_agent.type_id); cloud_to_agent.type_id = NULL; } if (unlikely(aclk_submit_request(&cloud_to_agent))) debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload); // Note: the payload comes from the callback and it will be automatically freed return 0; }