diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-07-10 15:31:30 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-10 15:31:30 +0200 |
commit | 42aa54eaf6a64348f23cc0c9ed0a071b79bc9cad (patch) | |
tree | d8e8cd9b2a695560e08c6db771afe7c6a4c8587e | |
parent | 0be2c726035419bb3c06e7ee8852e5b58ff1b020 (diff) |
adds support for multiple ACLK query processing threads (#9355)
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | aclk/README.md | 10 | ||||
-rw-r--r-- | aclk/aclk_common.c | 8 | ||||
-rw-r--r-- | aclk/aclk_common.h | 30 | ||||
-rw-r--r-- | aclk/aclk_query.c | 571 | ||||
-rw-r--r-- | aclk/aclk_query.h | 34 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 108 | ||||
-rw-r--r-- | aclk/aclk_stats.h | 19 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 739 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 36 |
11 files changed, 896 insertions, 663 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 5de9047b14..6b895d77e9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -665,6 +665,8 @@ set(ACLK_PLUGIN_FILES aclk/aclk_common.h aclk/agent_cloud_link.c aclk/agent_cloud_link.h + aclk/aclk_query.c + aclk/aclk_query.h aclk/aclk_lws_wss_client.c aclk/aclk_lws_wss_client.h aclk/aclk_lws_https_client.c diff --git a/Makefile.am b/Makefile.am index 0d93cdb9b0..e2d2502304 100644 --- a/Makefile.am +++ b/Makefile.am @@ -518,6 +518,8 @@ if ENABLE_ACLK ACLK_FILES += \ aclk/agent_cloud_link.c \ aclk/agent_cloud_link.h \ + aclk/aclk_query.c \ + aclk/aclk_query.h \ aclk/mqtt.c \ aclk/mqtt.h \ aclk/aclk_lws_wss_client.c \ diff --git a/aclk/README.md b/aclk/README.md index 38825bb383..5702ec7e41 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -38,6 +38,16 @@ configuration uses two settings: If your Agent needs to use a proxy to access the internet, you must [set up a proxy for claiming](/claim/README.md#claim-through-a-proxy). +You can configure following keys in the `netdata.conf` section `[cloud]`: +``` +[cloud] + statistics = yes + query thread count = 2 +``` + +- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. +- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). + ## Disable the ACLK You have two options if you prefer to disable the ACLK and not use Netdata Cloud. diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c index 9f64567b18..4ef5b97617 100644 --- a/aclk/aclk_common.c +++ b/aclk/aclk_common.c @@ -2,6 +2,14 @@ #include "../daemon/common.h" +netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; + +struct aclk_shared_state aclk_shared_state = { + .metadata_submitted = ACLK_METADATA_REQUIRED, + .agent_state = AGENT_INITIALIZING, + .last_popcorn_interrupt = 0 +}; + struct { ACLK_PROXY_TYPE type; const char *url_str; diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h index 7bfdf5d7cf..62295fdf41 100644 --- a/aclk/aclk_common.h +++ b/aclk/aclk_common.h @@ -3,6 +3,36 @@ #include "libnetdata/libnetdata.h" +extern netdata_mutex_t aclk_shared_state_mutex; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +typedef enum aclk_cmd { + ACLK_CMD_CLOUD, + ACLK_CMD_ONCONNECT, + ACLK_CMD_INFO, + ACLK_CMD_CHART, + ACLK_CMD_CHARTDEL, + ACLK_CMD_ALARM, + ACLK_CMD_MAX +} ACLK_CMD; + +typedef enum aclk_metadata_state { + ACLK_METADATA_REQUIRED, + ACLK_METADATA_CMD_QUEUED, + ACLK_METADATA_SENT +} ACLK_METADATA_STATE; + +typedef enum aclk_agent_state { + AGENT_INITIALIZING, + AGENT_STABLE +} ACLK_AGENT_STATE; +extern struct aclk_shared_state { + ACLK_METADATA_STATE metadata_submitted; + ACLK_AGENT_STATE agent_state; + time_t last_popcorn_interrupt; +} aclk_shared_state; + typedef enum aclk_proxy_type { PROXY_TYPE_UNKNOWN = 0, PROXY_TYPE_SOCKS5, diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c new file mode 100644 index 0000000000..64bf75504c --- /dev/null +++ b/aclk/aclk_query.c @@ -0,0 +1,571 @@ +#include "aclk_common.h" +#include "aclk_query.h" +#include "aclk_stats.h" + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) + +volatile int aclk_connected = 0; + +#ifndef __GNUC__ +#pragma region ACLK_QUEUE +#endif + +static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex) +#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex) + +struct aclk_query { + time_t created; + time_t run_after; // Delay run until after this time + ACLK_CMD cmd; // What command is this + char *topic; // Topic to respond to + char *data; // Internal data (NULL if request from the cloud) + char *msg_id; // msg_id generated by the cloud (NULL if internal) + char *query; // The actual query + u_char deleted; // Mark deleted for garbage collect + struct aclk_query *next; +}; + +struct aclk_query_queue { + struct aclk_query *aclk_query_head; + struct aclk_query *aclk_query_tail; + unsigned int count; +} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; + + +unsigned int aclk_query_size() +{ + int r; + ACLK_QUEUE_LOCK; + r = aclk_queue.count; + ACLK_QUEUE_UNLOCK; + return r; +} + +/* + * Free a query structure when done + */ +static void aclk_query_free(struct aclk_query *this_query) +{ + if (unlikely(!this_query)) + return; + + freez(this_query->topic); + if (likely(this_query->query)) + freez(this_query->query); + if (likely(this_query->data)) + freez(this_query->data); + if (likely(this_query->msg_id)) + freez(this_query->msg_id); + freez(this_query); +} + +/* + * Get the next query to process - NULL if nothing there + * The caller needs to free memory by calling aclk_query_free() + * + * topic + * query + * The structure itself + * + */ +static struct aclk_query *aclk_queue_pop() +{ + struct aclk_query *this_query; + + ACLK_QUEUE_LOCK; + + if (likely(!aclk_queue.aclk_query_head)) { + ACLK_QUEUE_UNLOCK; + return NULL; + } + + this_query = aclk_queue.aclk_query_head; + + // Get rid of the deleted entries + while (this_query && this_query->deleted) { + aclk_queue.count--; + + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + aclk_query_free(this_query); + + this_query = aclk_queue.aclk_query_head; + } + + if (likely(!this_query)) { + ACLK_QUEUE_UNLOCK; + return NULL; + } + + if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { + info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); + ACLK_QUEUE_UNLOCK; + return NULL; + } + + aclk_queue.count--; + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + ACLK_QUEUE_UNLOCK; + return this_query; +} + +// Returns the entry after which we need to create a new entry to run at the specified time +// If NULL is returned we need to add to HEAD +// Need to have a QUERY lock before calling this + +static struct aclk_query *aclk_query_find_position(time_t time_to_run) +{ + struct aclk_query *tmp_query, *last_query; + + // Quick check if we will add to the end + if (likely(aclk_queue.aclk_query_tail)) { + if (aclk_queue.aclk_query_tail->run_after <= time_to_run) + return aclk_queue.aclk_query_tail; + } + + last_query = NULL; + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (tmp_query->run_after > time_to_run) + return last_query; + last_query = tmp_query; + tmp_query = tmp_query->next; + } + return last_query; +} + +// Need to have a QUERY lock before calling this +static struct aclk_query * +aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) +{ + struct aclk_query *tmp_query, *prev_query; + UNUSED(cmd); + + tmp_query = aclk_queue.aclk_query_head; + prev_query = NULL; + while (tmp_query) { + if (likely(!tmp_query->deleted)) { + if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { + if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { + if (likely(last_query)) + *last_query = prev_query; + return tmp_query; + } + } + } + prev_query = tmp_query; + tmp_query = tmp_query->next; + } + return NULL; +} + +/* + * Add a query to execute, the result will be send to the specified topic + */ + +int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) +{ + struct aclk_query *new_query, *tmp_query; + + // Ignore all commands while we wait for the agent to initialize + if (unlikely(!aclk_connected)) + return 1; + + run_after = now_realtime_sec() + run_after; + + ACLK_QUEUE_LOCK; + struct aclk_query *last_query = NULL; + + tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); + if (unlikely(tmp_query)) { + if (tmp_query->run_after == run_after) { + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + if (last_query) + last_query->next = tmp_query->next; + else + aclk_queue.aclk_query_head = tmp_query->next; + + debug(D_ACLK, "Removing double entry"); + aclk_query_free(tmp_query); + aclk_queue.count--; + } + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_queued++; + ACLK_STATS_UNLOCK; + } + + new_query = callocz(1, sizeof(struct aclk_query)); + new_query->cmd = aclk_cmd; + if (internal) { + new_query->topic = strdupz(topic); + if (likely(query)) + new_query->query = strdupz(query); + } else { + new_query->topic = topic; + new_query->query = query; + new_query->msg_id = msg_id; + } + + if (data) + new_query->data = strdupz(data); + + new_query->next = NULL; + new_query->created = now_realtime_sec(); + new_query->run_after = run_after; + + debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); + + tmp_query = aclk_query_find_position(run_after); + + if (tmp_query) { + new_query->next = tmp_query->next; + tmp_query->next = new_query; + if (tmp_query == aclk_queue.aclk_query_tail) + aclk_queue.aclk_query_tail = new_query; + aclk_queue.count++; + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + new_query->next = aclk_queue.aclk_query_head; + aclk_queue.aclk_query_head = new_query; + aclk_queue.count++; + + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region Helper Functions +#endif + +/* + * Take a buffer, encode it and rewrite it + * + */ + +static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines) +{ + char *tmp_buffer = mallocz(content_size * 2); + char *dst = tmp_buffer; + while (content_size > 0) { + switch (*src) { + case '\n': + if (keep_newlines) + { + *dst++ = '\\'; + *dst++ = 'n'; + } + break; + case '\t': + break; + case 0x01 ... 0x08: + case 0x0b ... 0x1F: + *dst++ = '\\'; + *dst++ = 'u'; + *dst++ = '0'; + *dst++ = '0'; + *dst++ = (*src < 0x0F) ? '0' : '1'; + *dst++ = to_hex(*src); + break; + case '\"': + *dst++ = '\\'; + *dst++ = *src; + break; + default: + *dst++ = *src; + } + src++; + content_size--; + } + *dst = '\0'; + + return tmp_buffer; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region ACLK_QUERY +#endif + +static int aclk_execute_query(struct aclk_query *this_query) +{ + if (strncmp(this_query->query, "/api/v1/", 8) == 0) { + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); + *mysep = '\0'; + } else + strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); + + mysep = strrchr(this_query->query, '/'); + + // TODO: handle bad response perhaps in a different way. For now it does to the payload + w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + now_realtime_timeval(&w->tv_ready); + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); + char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); + + buffer_sprintf( + local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}", + w->response.code, encoded_response, encoded_header); + + buffer_sprintf(local_buffer, "\n}"); + + debug(D_ACLK, "Response:%s", encoded_header); + + aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); + + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + freez(encoded_response); + freez(encoded_header); + return 0; + } + return 1; +} + +/* + * This function will fetch the next pending command and process it + * + */ +static int aclk_process_query(int t_idx) +{ + struct aclk_query *this_query; + static long int query_count = 0; + ACLK_METADATA_STATE meta_state; + usec_t t = 0; + + if (!aclk_connected) + return 0; + + this_query = aclk_queue_pop(); + if (likely(!this_query)) { + return 0; + } + + if (unlikely(this_query->deleted)) { + debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); + aclk_query_free(this_query); + return 1; + } + query_count++; + + debug( + D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic, + this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created)); + + switch (this_query->cmd) { + case ACLK_CMD_ONCONNECT: + debug(D_ACLK, "EXECUTING on connect metadata command"); + ACLK_SHARED_STATE_LOCK; + meta_state = aclk_shared_state.metadata_submitted; + aclk_shared_state.metadata_submitted = ACLK_METADATA_SENT; + ACLK_SHARED_STATE_UNLOCK; + aclk_send_metadata(meta_state); + break; + + case ACLK_CMD_CHART: + debug(D_ACLK, "EXECUTING a chart update command"); + aclk_send_single_chart(this_query->data, this_query->query); + break; + + case ACLK_CMD_CHARTDEL: + debug(D_ACLK, "EXECUTING a chart delete command"); + //TODO: This send the info metadata for now + aclk_send_info_metadata(ACLK_METADATA_SENT); + break; + + case ACLK_CMD_ALARM: + debug(D_ACLK, "EXECUTING an alarm update command"); + aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); + break; + + case ACLK_CMD_CLOUD: + t = now_monotonic_high_precision_usec(); + debug(D_ACLK, "EXECUTING a cloud command"); + aclk_execute_query(this_query); + t = now_monotonic_high_precision_usec() - t; + break; + + default: + break; + } + debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[t_idx]++; + if(this_query->cmd == ACLK_CMD_CLOUD) { + aclk_metrics_per_sample.cloud_q_process_total += t; + aclk_metrics_per_sample.cloud_q_process_count++; + if(aclk_metrics_per_sample.cloud_q_process_max < t) + aclk_metrics_per_sample.cloud_q_process_max = t; + } + ACLK_STATS_UNLOCK; + } + + aclk_query_free(this_query); + + return 1; +} + +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +{ + if (query_threads && query_threads->thread_list) { + for (int i = 0; i < query_threads->count; i++) { + netdata_thread_join(query_threads->thread_list[i].thread, NULL); + } + freez(query_threads->thread_list); + } + + struct aclk_query *this_query; + + do { + this_query = aclk_queue_pop(); + aclk_query_free(this_query); + } while (this_query); +} + +#define TASK_LEN_MAX 16 +void aclk_query_threads_start(struct aclk_query_threads *query_threads) +{ + info("Starting %d query threads.", query_threads->count); + + char thread_name[TASK_LEN_MAX]; + query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); + for (int i = 0; i < query_threads->count; i++) { + query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics + + snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i); + netdata_thread_create( + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i]); + } +} + +/** + * Main query processing thread + * + * On startup wait for the agent collectors to initialize + * Expect at least a time of ACLK_STABLE_TIMEOUT seconds + * of no new collectors coming in in order to mark the agent + * as stable (set agent_state = AGENT_STABLE) + */ +void *aclk_query_main_thread(void *ptr) +{ + struct aclk_query_thread *info = ptr; + time_t previous_popcorn_interrupt = 0; + + while (!netdata_exit) { + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { + ACLK_SHARED_STATE_UNLOCK; + break; + } + + time_t checkpoint = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; + + if (checkpoint > ACLK_STABLE_TIMEOUT) { + aclk_shared_state.agent_state = AGENT_STABLE; + ACLK_SHARED_STATE_UNLOCK; + info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint); +#ifdef ACLK_DEBUG + _dump_collector_list(); +#endif + break; + } + + if (previous_popcorn_interrupt != aclk_shared_state.last_popcorn_interrupt) { + info("Waiting %ds from this moment for agent collectors to initialize." , ACLK_STABLE_TIMEOUT); + previous_popcorn_interrupt = aclk_shared_state.last_popcorn_interrupt; + } + ACLK_SHARED_STATE_UNLOCK; + sleep_usec(USEC_PER_SEC * 1); + } + + while (!netdata_exit) { + ACLK_SHARED_STATE_LOCK; + if (unlikely(!aclk_shared_state.metadata_submitted)) { + ACLK_SHARED_STATE_UNLOCK; + if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + errno = 0; + error("ACLK failed to queue on_connect command"); + sleep(1); + continue; + } + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED; + } + ACLK_SHARED_STATE_UNLOCK; + + while (aclk_process_query(info->idx)) { + // Process all commands + }; + + QUERY_THREAD_LOCK; + + // TODO: Need to check if there are queries awaiting already + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + QUERY_THREAD_UNLOCK; + } + + return NULL; +} + +#ifndef __GNUC__ +#pragma endregion +#endif diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h new file mode 100644 index 0000000000..382b97d262 --- /dev/null +++ b/aclk/aclk_query.h @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_QUERY_H +#define NETDATA_ACLK_QUERY_H + +#include "libnetdata/libnetdata.h" + +#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable + +extern pthread_cond_t query_cond_wait; +extern pthread_mutex_t query_lock_wait; +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) +#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) + +extern volatile int aclk_connected; + +struct aclk_query_thread { + netdata_thread_t thread; + int idx; +}; + +struct aclk_query_threads { + struct aclk_query_thread *thread_list; + int count; +}; + +void *aclk_query_main_thread(void *ptr); +int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); + +void aclk_query_threads_start(struct aclk_query_threads *query_threads); +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); +unsigned int aclk_query_size(); + +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 2356ccdbc2..59115b55d6 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -4,6 +4,16 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; int aclk_stats_enabled; +int query_thread_count; + +// data ACLK stats need per query thread +struct aclk_qt_data { + RRDDIM *dim; +} *aclk_qt_data = NULL; + +uint32_t *aclk_queries_per_thread = NULL; +uint32_t *aclk_queries_per_thread_sample = NULL; + struct aclk_metrics aclk_metrics = { .online = 0, }; @@ -17,7 +27,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc if (unlikely(!st_aclkstats)) { st_aclkstats = rrdset_create_localhost( - "netdata", "aclk_status", NULL, "aclk_stats", NULL, "ACLK/Cloud connection status", + "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status", "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE); rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -29,7 +39,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc rrdset_done(st_aclkstats); } -static void aclk_stats_query_thread(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) { static RRDSET *st_query_thread = NULL; static RRDDIM *rd_queued = NULL; @@ -37,16 +47,16 @@ static void aclk_stats_query_thread(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st_query_thread)) { st_query_thread = rrdset_create_localhost( - "netdata", "aclk_query_per_second", NULL, "aclk_stats", NULL, "ACLK Queries per second", "queries/s", + "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s", "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA); rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } else rrdset_next(st_query_thread); rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued); - rrddim_set_by_pointer(st_query_thread, rd_dispatched, -per_sample->queries_dispatched); + rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched); rrdset_done(st_query_thread); } @@ -60,7 +70,7 @@ static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st)) { st = rrdset_create_localhost( - "netdata", "aclk_latency_mqtt", NULL, "aclk_stats", NULL, "ACLK Message Publish Latency", "ms", + "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms", "netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE); rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -86,7 +96,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st)) { st = rrdset_create_localhost( - "netdata", "aclk_write_q", NULL, "aclk_stats", NULL, "Write Queue Mosq->Libwebsockets", "kB/s", + "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "kB/s", "netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA); rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); @@ -108,7 +118,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st)) { st = rrdset_create_localhost( - "netdata", "aclk_read_q", NULL, "aclk_stats", NULL, "Read Queue Libwebsockets->Mosq", "kB/s", + "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "kB/s", "netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA); rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); @@ -130,7 +140,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st)) { st = rrdset_create_localhost( - "netdata", "aclk_cloud_req", NULL, "aclk_stats", NULL, "Requests received from cloud", "req/s", + "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s", "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED); rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); @@ -144,13 +154,82 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } +#define MAX_DIM_NAME 16 +static void aclk_stats_query_threads(uint32_t *queries_per_thread) +{ + static RRDSET *st = NULL; + + char dim_name[MAX_DIM_NAME]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", + "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < query_thread_count; i++) { + snprintf(dim_name, MAX_DIM_NAME, "Query %d", i); + aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } else + rrdset_next(st); + + for (int i = 0; i < query_thread_count; i++) { + rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); + } + + rrdset_done(st); +} + +static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_avg = NULL; + static RRDDIM *rd_rq_max = NULL; + static RRDDIM *rd_rq_total = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us", + "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + if(per_sample->cloud_q_process_count) + rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count)); + else + rrddim_set_by_pointer(st, rd_rq_avg, 0); + rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max); + rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total); + + rrdset_done(st); +} + +void aclk_stats_thread_cleanup() +{ + freez(aclk_qt_data); + freez(aclk_queries_per_thread); + freez(aclk_queries_per_thread_sample); +} + void *aclk_stats_main_thread(void *ptr) { - UNUSED(ptr); + struct aclk_stats_thread *args = ptr; + + query_thread_count = args->query_thread_count; + aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); + aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + heartbeat_t hb; heartbeat_init(&hb); usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + struct aclk_metrics_per_sample per_sample; struct aclk_metrics permanent; @@ -168,10 +247,13 @@ void *aclk_stats_main_thread(void *ptr) memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_p |