diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-07-10 15:31:30 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-10 15:31:30 +0200 |
commit | 42aa54eaf6a64348f23cc0c9ed0a071b79bc9cad (patch) | |
tree | d8e8cd9b2a695560e08c6db771afe7c6a4c8587e /aclk/agent_cloud_link.c | |
parent | 0be2c726035419bb3c06e7ee8852e5b58ff1b020 (diff) |
adds support for multiple ACLK query processing threads (#9355)
Diffstat (limited to 'aclk/agent_cloud_link.c')
-rw-r--r-- | aclk/agent_cloud_link.c | 739 |
1 files changed, 119 insertions, 620 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 5ff03319e5..97ced1c19d 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -3,34 +3,26 @@ #include "libnetdata/libnetdata.h" #include "agent_cloud_link.h" #include "aclk_lws_https_client.h" +#include "aclk_query.h" #include "aclk_common.h" #include "aclk_stats.h" int aclk_shutting_down = 0; -// State-machine for the on-connect metadata transmission. -// TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial -// agent startup phase. -static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED; -static AGENT_STATE agent_state = AGENT_INITIALIZING; // Other global state static int aclk_subscribed = 0; static int aclk_disable_single_updates = 0; -static time_t last_init_sequence = 0; -static int waiting_init = 1; static char *aclk_username = NULL; static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; -int aclk_connected = 0; // Exposed in the web-api int aclk_force_reconnect = 0; // Indication from lower layers int aclk_kill_link = 0; // Tell the agent to tear down the link usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; -static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) @@ -39,16 +31,6 @@ static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) -#define QUERY_LOCK netdata_mutex_lock(&query_mutex) -#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex) - -pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; -pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; - -#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait); -#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) -#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) - void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); void aclk_lws_wss_destroy_context(); /* @@ -71,24 +53,6 @@ struct _collector { struct _collector *collector_list = NULL; -struct aclk_query { - time_t created; - time_t run_after; // Delay run until after this time - ACLK_CMD cmd; // What command is this - char *topic; // Topic to respond to - char *data; // Internal data (NULL if request from the cloud) - char *msg_id; // msg_id generated by the cloud (NULL if internal) - char *query; // The actual query - u_char deleted; // Mark deleted for garbage collect - struct aclk_query *next; -}; - -struct aclk_query_queue { - struct aclk_query *aclk_query_head; - struct aclk_query *aclk_query_tail; - uint64_t count; -} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; - char *create_uuid() { uuid_t uuid; @@ -219,225 +183,6 @@ unsigned long int aclk_reconnect_delay(int mode) return delay; } -/* - * Free a query structure when done - */ - -void aclk_query_free(struct aclk_query *this_query) -{ - if (unlikely(!this_query)) - return; - - freez(this_query->topic); - if (likely(this_query->query)) - freez(this_query->query); - if (likely(this_query->data)) - freez(this_query->data); - if (likely(this_query->msg_id)) - freez(this_query->msg_id); - freez(this_query); -} - -// Returns the entry after which we need to create a new entry to run at the specified time -// If NULL is returned we need to add to HEAD -// Need to have a QUERY lock before calling this - -struct aclk_query *aclk_query_find_position(time_t time_to_run) -{ - struct aclk_query *tmp_query, *last_query; - - // Quick check if we will add to the end - if (likely(aclk_queue.aclk_query_tail)) { - if (aclk_queue.aclk_query_tail->run_after <= time_to_run) - return aclk_queue.aclk_query_tail; - } - - last_query = NULL; - tmp_query = aclk_queue.aclk_query_head; - - while (tmp_query) { - if (tmp_query->run_after > time_to_run) - return last_query; - last_query = tmp_query; - tmp_query = tmp_query->next; - } - return last_query; -} - -// Need to have a QUERY lock before calling this -struct aclk_query * -aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) -{ - struct aclk_query *tmp_query, *prev_query; - UNUSED(cmd); - - tmp_query = aclk_queue.aclk_query_head; - prev_query = NULL; - while (tmp_query) { - if (likely(!tmp_query->deleted)) { - if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { - if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && - (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { - if (likely(last_query)) - *last_query = prev_query; - return tmp_query; - } - } - } - prev_query = tmp_query; - tmp_query = tmp_query->next; - } - return NULL; -} - -/* - * Add a query to execute, the result will be send to the specified topic - */ - -int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) -{ - struct aclk_query *new_query, *tmp_query; - - // Ignore all commands while we wait for the agent to initialize - if (unlikely(waiting_init)) - return 1; - - run_after = now_realtime_sec() + run_after; - - QUERY_LOCK; - struct aclk_query *last_query = NULL; - - tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); - if (unlikely(tmp_query)) { - if (tmp_query->run_after == run_after) { - QUERY_UNLOCK; - QUERY_THREAD_WAKEUP; - return 0; - } - - if (last_query) - last_query->next = tmp_query->next; - else - aclk_queue.aclk_query_head = tmp_query->next; - - debug(D_ACLK, "Removing double entry"); - aclk_query_free(tmp_query); - aclk_queue.count--; - } - - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_queued++; - ACLK_STATS_UNLOCK; - } - - new_query = callocz(1, sizeof(struct aclk_query)); - new_query->cmd = aclk_cmd; - if (internal) { - new_query->topic = strdupz(topic); - if (likely(query)) - new_query->query = strdupz(query); - } else { - new_query->topic = topic; - new_query->query = query; - new_query->msg_id = msg_id; - } - - if (data) - new_query->data = strdupz(data); - - new_query->next = NULL; - new_query->created = now_realtime_sec(); - new_query->run_after = run_after; - - debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); - - tmp_query = aclk_query_find_position(run_after); - - if (tmp_query) { - new_query->next = tmp_query->next; - tmp_query->next = new_query; - if (tmp_query == aclk_queue.aclk_query_tail) - aclk_queue.aclk_query_tail = new_query; - aclk_queue.count++; - QUERY_UNLOCK; - QUERY_THREAD_WAKEUP; - return 0; - } - - new_query->next = aclk_queue.aclk_query_head; - aclk_queue.aclk_query_head = new_query; - aclk_queue.count++; - - QUERY_UNLOCK; - QUERY_THREAD_WAKEUP; - return 0; -} - -inline int aclk_submit_request(struct aclk_request *request) -{ - return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD); -} - -/* - * Get the next query to process - NULL if nothing there - * The caller needs to free memory by calling aclk_query_free() - * - * topic - * query - * The structure itself - * - */ -struct aclk_query *aclk_queue_pop() -{ - struct aclk_query *this_query; - - QUERY_LOCK; - - if (likely(!aclk_queue.aclk_query_head)) { - QUERY_UNLOCK; - return NULL; - } - - this_query = aclk_queue.aclk_query_head; - - // Get rid of the deleted entries - while (this_query && this_query->deleted) { - aclk_queue.count--; - - aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; - - if (likely(!aclk_queue.aclk_query_head)) { - aclk_queue.aclk_query_tail = NULL; - } - - aclk_query_free(this_query); - - this_query = aclk_queue.aclk_query_head; - } - - if (likely(!this_query)) { - QUERY_UNLOCK; - return NULL; - } - - if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { - info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); - QUERY_UNLOCK; - return NULL; - } - - aclk_queue.count--; - aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; - - if (likely(!aclk_queue.aclk_query_head)) { - aclk_queue.aclk_query_tail = NULL; - } - - QUERY_UNLOCK; - return this_query; -} - // This will give the base topic that the agent will publish messages. // subtopics will be sent under the base topic e.g. base_topic/subtopic // This is called during the connection, we delete any previous topic @@ -489,6 +234,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size) return final_topic; } +#ifndef __GNUC__ +#pragma region ACLK Internal Collector Tracking +#endif + /* * Free a collector structure */ @@ -673,6 +422,22 @@ static struct _collector *_add_collector(const char *hostname, const char *plugi return tmp_collector; } +#ifndef __GNUC__ +#pragma endregion +#endif + +inline static int aclk_popcorn_check_bump() +{ + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); + ACLK_SHARED_STATE_UNLOCK; + return 1; + } + ACLK_SHARED_STATE_UNLOCK; + return 0; +} + /* * Add a new collector to the list * If it exists, update the chart count @@ -693,14 +458,13 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha return; } - if (unlikely(agent_state == AGENT_INITIALIZING)) - last_init_sequence = now_realtime_sec(); - else { - if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) - debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); - } - COLLECTOR_UNLOCK; + + if(aclk_popcorn_check_bump()) + return; + + if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); } /* @@ -733,286 +497,13 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha COLLECTOR_UNLOCK; - if (unlikely(agent_state == AGENT_INITIALIZING)) - last_init_sequence = now_realtime_sec(); - else { - if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) - debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); - } - _free_collector(tmp_collector); -} -/* - * Take a buffer, encode it and rewrite it - * - */ - -static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines) -{ - char *tmp_buffer = mallocz(content_size * 2); - char *dst = tmp_buffer; - while (content_size > 0) { - switch (*src) { - case '\n': - if (keep_newlines) - { - *dst++ = '\\'; - *dst++ = 'n'; - } - break; - case '\t': - break; - case 0x01 ... 0x08: - case 0x0b ... 0x1F: - *dst++ = '\\'; - *dst++ = 'u'; - *dst++ = '0'; - *dst++ = '0'; - *dst++ = (*src < 0x0F) ? '0' : '1'; - *dst++ = to_hex(*src); - break; - case '\"': - *dst++ = '\\'; - *dst++ = *src; - break; - default: - *dst++ = *src; - } - src++; - content_size--; - } - *dst = '\0'; - - return tmp_buffer; -} - -int aclk_execute_query(struct aclk_query *this_query) -{ - if (strncmp(this_query->query, "/api/v1/", 8) == 0) { - struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() - w->cookie1[0] = 0; // Simulate web_client_create_on_fd() - w->cookie2[0] = 0; // Simulate web_client_create_on_fd() - w->acl = 0x1f; - - char *mysep = strchr(this_query->query, '?'); - if (mysep) { - strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); - *mysep = '\0'; - } else - strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); - - mysep = strrchr(this_query->query, '/'); - - // TODO: handle bad response perhaps in a different way. For now it does to the payload - w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); - now_realtime_timeval(&w->tv_ready); - w->response.data->date = w->tv_ready.tv_sec; - web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready - BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); - char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); - - buffer_sprintf( - local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}", - w->response.code, encoded_response, encoded_header); - - buffer_sprintf(local_buffer, "\n}"); - - debug(D_ACLK, "Response:%s", encoded_header); - - aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); - - buffer_free(w->response.data); - buffer_free(w->response.header); - buffer_free(w->response.header_output); - freez(w); - buffer_free(local_buffer); - freez(encoded_response); - freez(encoded_header); - return 0; - } - return 1; -} - -/* - * This function will fetch the next pending command and process it - * - */ -int aclk_process_query() -{ - struct aclk_query *this_query; - static long int query_count = 0; - - if (!aclk_connected) - return 0; - - this_query = aclk_queue_pop(); - if (likely(!this_query)) { - return 0; - } - - if (unlikely(this_query->deleted)) { - debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); - aclk_query_free(this_query); - return 1; - } - query_count++; - - debug( - D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic, - this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created)); - - switch (this_query->cmd) { - case ACLK_CMD_ONCONNECT: - debug(D_ACLK, "EXECUTING on connect metadata command"); - aclk_send_metadata(); - aclk_metadata_submitted = ACLK_METADATA_SENT; - break; - - case ACLK_CMD_CHART: - debug(D_ACLK, "EXECUTING a chart update command"); - aclk_send_single_chart(this_query->data, this_query->query); - break; - - case ACLK_CMD_CHARTDEL: - debug(D_ACLK, "EXECUTING a chart delete command"); - //TODO: This send the info metadata for now - aclk_send_info_metadata(); - break; - - case ACLK_CMD_ALARM: - debug(D_ACLK, "EXECUTING an alarm update command"); - aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); - break; - - case ACLK_CMD_CLOUD: - debug(D_ACLK, "EXECUTING a cloud command"); - aclk_execute_query(this_query); - break; - - default: - break; - } - debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); - - aclk_query_free(this_query); - - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_dispatched++; - ACLK_STATS_UNLOCK; - } - - return 1; -} - -/* - * Process all pending queries - * Return 0 if no queries were processed, 1 otherwise - * - */ - -int aclk_process_queries() -{ - if (unlikely(netdata_exit || !aclk_connected)) - return 0; - - if (likely(!aclk_queue.count)) - return 0; - - debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count); - - //TODO: may consider possible throttling here - while (aclk_process_query()) { - // Process all commands - }; - - return 1; -} - -static void aclk_query_thread_cleanup(void *ptr) -{ - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - - info("cleaning up..."); - - _reset_collector_list(); - freez(collector_list); - - // Clean memory for pending queries if any - struct aclk_query *this_query; - - do { - this_query = aclk_queue_pop(); - aclk_query_free(this_query); - } while (this_query); - - freez(static_thread->thread); - freez(static_thread); -} - -/** - * Main query processing thread - * - * On startup wait for the agent collectors to initialize - * Expect at least a time of ACLK_STABLE_TIMEOUT seconds - * of no new collectors coming in in order to mark the agent - * as stable (set agent_state = AGENT_STABLE) - */ -void *aclk_query_main_thread(void *ptr) -{ - netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr); - - while (agent_state == AGENT_INITIALIZING && !netdata_exit) { - time_t checkpoint; - - checkpoint = now_realtime_sec() - last_init_sequence; - if (checkpoint > ACLK_STABLE_TIMEOUT) { - agent_state = AGENT_STABLE; - info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint); -#ifdef ACLK_DEBUG - _dump_collector_list(); -#endif - break; - } - info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint); - sleep_usec(USEC_PER_SEC * 1); - } - - while (!netdata_exit) { - if (unlikely(!aclk_metadata_submitted)) { - aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED; - if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { - errno = 0; - error("ACLK failed to queue on_connect command"); - aclk_metadata_submitted = ACLK_METADATA_REQUIRED; - } - } - - aclk_process_queries(); - - QUERY_THREAD_LOCK; - - // TODO: Need to check if there are queries awaiting already - if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) - sleep_usec(USEC_PER_SEC * 1); - - QUERY_THREAD_UNLOCK; + if (aclk_popcorn_check_bump()) + return; - } // forever - info("Shutting down query processing thread"); - netdata_thread_cleanup_pop(1); - return NULL; + if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); } static void aclk_graceful_disconnect() @@ -1047,23 +538,9 @@ static void aclk_graceful_disconnect() aclk_shutting_down = 0; } - -// Thread cleanup -static void aclk_main_cleanup(void *ptr) -{ - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - - info("cleaning up..."); - - char *agent_id = is_agent_claimed(); - if (agent_id && aclk_connected) { - freez(agent_id); - // Wakeup thread to cleanup - QUERY_THREAD_WAKEUP; - aclk_graceful_disconnect(); - } -} +#ifndef __GNUC__ +#pragma region Incoming Msg Parsing +#endif struct dictionary_singleton { char *key; @@ -1092,6 +569,15 @@ int json_extract_singleton(JSON_ENTRY *e) return 0; } +#ifndef __GNUC__ +#pragma endregion +#endif + + +#ifndef __GNUC__ +#pragma region Challenge Response +#endif + // Base-64 decoder. // Note: This is non-validating, invalid input will be decoded without an error. // Challenges are packed into json strings so we don't skip newlines. @@ -1225,29 +711,6 @@ int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decry return result; } -char *extract_payload(BUFFER *b) -{ -char *s = b->buffer; -unsigned int line_len=0; - for (size_t i=0; i<b->len; i++) - { - if (*s == 0 ) - return NULL; - if (*s == '\n' ) { - if (line_len==0) - return s+1; - line_len = 0; - } - else if (*s == '\r') { - /* don't count */ - } - else - line_len ++; - s++; - } - return NULL; -} - void aclk_get_challenge(char *aclk_hostname, char *aclk_port) { char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1340,6 +803,10 @@ CLEANUP: return; } +#ifndef __GNUC__ +#pragma endregion +#endif + static void aclk_try_to_connect(char *hostname, char *port, int port_num) { if (!aclk_private_key) { @@ -1359,7 +826,6 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num) } } - /** * Main agent cloud link thread * @@ -1373,8 +839,10 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num) void *aclk_main(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - struct netdata_static_thread *query_thread; - struct netdata_static_thread *stats_thread = NULL; + struct aclk_query_threads query_threads; + struct aclk_stats_thread *stats_thread = NULL; + + query_threads.thread_list = NULL; // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. @@ -1400,18 +868,25 @@ void *aclk_main(void *ptr) } } - aclk_stats_enabled = appconfig_get_boolean(&cloud_config, CONFIG_SECTION_GLOBAL, "statistics", CONFIG_BOOLEAN_YES); + query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", 2); + if(query_threads.count < 1) { + error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count); + query_threads.count = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); + } + + aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); // without mutex here because threads are not yet started + + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); if (aclk_stats_enabled) { - stats_thread = callocz(1, sizeof(struct netdata_static_thread)); + stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); + stats_thread->query_thread_count = query_threads.count; netdata_thread_create( stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread); } - last_init_sequence = now_realtime_sec(); - query_thread = NULL; - char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering. char *aclk_port = NULL; uint32_t port_num = 0; @@ -1508,17 +983,13 @@ void *aclk_main(void *ptr) aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1); } - if (unlikely(!query_thread)) { - query_thread = callocz(1, sizeof(struct netdata_static_thread)); - query_thread->thread = mallocz(sizeof(netdata_thread_t)); - netdata_thread_create( - query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, - query_thread); + if (unlikely(!query_threads.thread_list)) { + aclk_query_threads_start(&query_threads); } } // forever exited: // Wakeup query thread to cleanup - QUERY_THREAD_WAKEUP; + QUERY_THREAD_WAKEUP_ALL; freez(aclk_username); freez(aclk_password); @@ -1527,10 +998,24 @@ exited: if (aclk_private_key != NULL) RSA_free(aclk_private_key); - aclk_main_cleanup(ptr); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + char *agent_id = is_agent_claimed(); + if (agent_id && aclk_connected) { + freez(agent_id); + // Wakeup thread to cleanup + QUERY_THREAD_WAKEUP; + aclk_graceful_disconnect(); + } + + aclk_query_threads_cleanup(&query_threads); + + _reset_collector_list(); + freez(collector_list); if(aclk_stats_enabled) { netdata_thread_join(*stats_thread->thread, NULL); + aclk_stats_thread_cleanup(); freez(stats_thread->thread); freez(stats_thread); } @@ -1626,12 +1111,11 @@ int aclk_subscribe(char *sub_topic, int qos) // This is called from a callback when the link goes up void aclk_connect() { - info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count); + info("Connection detected (%u queued queries)", aclk_query_size()); aclk_stats_upd_online(1); aclk_connected = 1; - waiting_init = 0; aclk_reconnect_delay(0); QUERY_THREAD_WAKEUP; return; @@ -1641,13 +1125,14 @@ void aclk_connect() void aclk_disconnect() { if (likely(aclk_connected)) - info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count); + info("Disconnect detected (%u queued queries)", aclk_query_size()); aclk_stats_upd_online(0); aclk_subscribed = 0; - aclk_metadata_submitted = ACLK_METADATA_REQUIRED; - waiting_init = 1; + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.metadata_submitted = ACLK_METADATA_REQUIRED; + ACLK_SHARED_STATE_UNLOCK; aclk_connected = 0; aclk_connecting = 0; aclk_force_reconnect = 1; @@ -1692,7 +1177,8 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts * active alarms */ void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); -void aclk_send_alarm_metadata() + +void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1706,7 +1192,8 @@ void aclk_send_alarm_metadata() // use the session time as the fake timestamp to indicate that it starts the session. If it is // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. - if (aclk_metadata_submitted == ACLK_METADATA_SENT) + + if (metadata_submitted == ACLK_METADATA_SENT) aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0); else aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us); @@ -1737,7 +1224,7 @@ void aclk_send_alarm_metadata() * /api/v1/info * charts */ -int aclk_send_info_metadata() +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1751,7 +1238,7 @@ int aclk_send_info_metadata() // use the session time as the fake timestamp to indicate that it starts the session. If it is // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. - if (aclk_metadata_submitted == ACLK_METADATA_SENT) + if (metadata_submitted == ACLK_METADATA_SENT) aclk_create_header(local_buffer, "update", msg_id, 0, 0); else aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us); @@ -1794,11 +1281,11 @@ void aclk_send_stress_test(size_t size) // Send info metadata message to the cloud if the link is established // or on request -int aclk_send_metadata() +int aclk_send_metadata(ACLK_METADATA_STATE state) { - aclk_send_info_metadata(); - aclk_send_alarm_metadata(); + aclk_send_info_metadata(state); + aclk_send_alarm_metadata(state); return 0; } @@ -1816,8 +1303,13 @@ void aclk_single_update_enable() // Trigged by a health reload, sends the alarm metadata void aclk_alarm_reload() { - if (unlikely(agent_state == AGENT_INITIALIZING)) + + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; return; + } + ACLK_SHARED_STATE_UNLOCK; if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { if (likely(aclk_connected)) { @@ -1881,16 +1373,16 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (unlikely(aclk_disable_single_updates)) return 0; - if (unlikely(agent_state == AGENT_INITIALIZING)) - last_init_sequence = now_realtime_sec(); - else { - if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) { - if (likely(aclk_connected)) { - errno = 0; - error("ACLK failed to queue chart_update command"); - } + if (aclk_popcorn_check_bump()) + return 0; + + if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue chart_update command"); } } + return 0; #endif } @@ -1905,8 +1397,12 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) if (host != localhost) return 0; - if (unlikely(agent_state == AGENT_INITIALIZING)) + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; return 0; + } + ACLK_SHARED_STATE_UNLOCK; /* * Check if individual updates have been disabled @@ -1959,10 +1455,13 @@ int aclk_handle_cloud_request(char *payload) ACLK_STATS_UNLOCK; } - if (unlikely(agent_state == AGENT_INITIALIZING)) { + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { debug(D_ACLK, "Ignoring cloud request; agent not in stable state"); + ACLK_SHARED_STATE_UNLOCK; return 0; } + ACLK_SHARED_STATE_UNLOCK; if (unlikely(!payload)) { debug(D_ACLK, "ACLK incoming message is empty"); @@ -2010,7 +1509,7 @@ int aclk_handle_cloud_request(char *payload) cloud_to_agent.type_id = NULL; } - if (unlikely(aclk_submit_request(&cloud_to_agent))) + if (unlikely(aclk_queue_query(cloud_to_agent.callback_topic, NULL, cloud_to_agent.msg_id, cloud_to_agent.payload, 0, 0, ACLK_CMD_CLOUD))) debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload); // Note: the payload comes from the callback and it will be automatically freed |