summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-07-10 15:31:30 +0200
committerGitHub <noreply@github.com>2020-07-10 15:31:30 +0200
commit42aa54eaf6a64348f23cc0c9ed0a071b79bc9cad (patch)
treed8e8cd9b2a695560e08c6db771afe7c6a4c8587e /aclk
parent0be2c726035419bb3c06e7ee8852e5b58ff1b020 (diff)
adds support for multiple ACLK query processing threads (#9355)
Diffstat (limited to 'aclk')
-rw-r--r--aclk/README.md10
-rw-r--r--aclk/aclk_common.c8
-rw-r--r--aclk/aclk_common.h30
-rw-r--r--aclk/aclk_query.c571
-rw-r--r--aclk/aclk_query.h34
-rw-r--r--aclk/aclk_stats.c108
-rw-r--r--aclk/aclk_stats.h19
-rw-r--r--aclk/agent_cloud_link.c739
-rw-r--r--aclk/agent_cloud_link.h36
9 files changed, 892 insertions, 663 deletions
diff --git a/aclk/README.md b/aclk/README.md
index 38825bb383..5702ec7e41 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -38,6 +38,16 @@ configuration uses two settings:
If your Agent needs to use a proxy to access the internet, you must [set up a proxy for
claiming](/claim/README.md#claim-through-a-proxy).
+You can configure following keys in the `netdata.conf` section `[cloud]`:
+```
+[cloud]
+ statistics = yes
+ query thread count = 2
+```
+
+- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
+- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
+
## Disable the ACLK
You have two options if you prefer to disable the ACLK and not use Netdata Cloud.
diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c
index 9f64567b18..4ef5b97617 100644
--- a/aclk/aclk_common.c
+++ b/aclk/aclk_common.c
@@ -2,6 +2,14 @@
#include "../daemon/common.h"
+netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+
+struct aclk_shared_state aclk_shared_state = {
+ .metadata_submitted = ACLK_METADATA_REQUIRED,
+ .agent_state = AGENT_INITIALIZING,
+ .last_popcorn_interrupt = 0
+};
+
struct {
ACLK_PROXY_TYPE type;
const char *url_str;
diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h
index 7bfdf5d7cf..62295fdf41 100644
--- a/aclk/aclk_common.h
+++ b/aclk/aclk_common.h
@@ -3,6 +3,36 @@
#include "libnetdata/libnetdata.h"
+extern netdata_mutex_t aclk_shared_state_mutex;
+#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+
+typedef enum aclk_cmd {
+ ACLK_CMD_CLOUD,
+ ACLK_CMD_ONCONNECT,
+ ACLK_CMD_INFO,
+ ACLK_CMD_CHART,
+ ACLK_CMD_CHARTDEL,
+ ACLK_CMD_ALARM,
+ ACLK_CMD_MAX
+} ACLK_CMD;
+
+typedef enum aclk_metadata_state {
+ ACLK_METADATA_REQUIRED,
+ ACLK_METADATA_CMD_QUEUED,
+ ACLK_METADATA_SENT
+} ACLK_METADATA_STATE;
+
+typedef enum aclk_agent_state {
+ AGENT_INITIALIZING,
+ AGENT_STABLE
+} ACLK_AGENT_STATE;
+extern struct aclk_shared_state {
+ ACLK_METADATA_STATE metadata_submitted;
+ ACLK_AGENT_STATE agent_state;
+ time_t last_popcorn_interrupt;
+} aclk_shared_state;
+
typedef enum aclk_proxy_type {
PROXY_TYPE_UNKNOWN = 0,
PROXY_TYPE_SOCKS5,
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
new file mode 100644
index 0000000000..64bf75504c
--- /dev/null
+++ b/aclk/aclk_query.c
@@ -0,0 +1,571 @@
+#include "aclk_common.h"
+#include "aclk_query.h"
+#include "aclk_stats.h"
+
+pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
+#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+
+volatile int aclk_connected = 0;
+
+#ifndef __GNUC__
+#pragma region ACLK_QUEUE
+#endif
+
+static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
+#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex)
+#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex)
+
+struct aclk_query {
+ time_t created;
+ time_t run_after; // Delay run until after this time
+ ACLK_CMD cmd; // What command is this
+ char *topic; // Topic to respond to
+ char *data; // Internal data (NULL if request from the cloud)
+ char *msg_id; // msg_id generated by the cloud (NULL if internal)
+ char *query; // The actual query
+ u_char deleted; // Mark deleted for garbage collect
+ struct aclk_query *next;
+};
+
+struct aclk_query_queue {
+ struct aclk_query *aclk_query_head;
+ struct aclk_query *aclk_query_tail;
+ unsigned int count;
+} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
+
+
+unsigned int aclk_query_size()
+{
+ int r;
+ ACLK_QUEUE_LOCK;
+ r = aclk_queue.count;
+ ACLK_QUEUE_UNLOCK;
+ return r;
+}
+
+/*
+ * Free a query structure when done
+ */
+static void aclk_query_free(struct aclk_query *this_query)
+{
+ if (unlikely(!this_query))
+ return;
+
+ freez(this_query->topic);
+ if (likely(this_query->query))
+ freez(this_query->query);
+ if (likely(this_query->data))
+ freez(this_query->data);
+ if (likely(this_query->msg_id))
+ freez(this_query->msg_id);
+ freez(this_query);
+}
+
+/*
+ * Get the next query to process - NULL if nothing there
+ * The caller needs to free memory by calling aclk_query_free()
+ *
+ * topic
+ * query
+ * The structure itself
+ *
+ */
+static struct aclk_query *aclk_queue_pop()
+{
+ struct aclk_query *this_query;
+
+ ACLK_QUEUE_LOCK;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ ACLK_QUEUE_UNLOCK;
+ return NULL;
+ }
+
+ this_query = aclk_queue.aclk_query_head;
+
+ // Get rid of the deleted entries
+ while (this_query && this_query->deleted) {
+ aclk_queue.count--;
+
+ aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ aclk_queue.aclk_query_tail = NULL;
+ }
+
+ aclk_query_free(this_query);
+
+ this_query = aclk_queue.aclk_query_head;
+ }
+
+ if (likely(!this_query)) {
+ ACLK_QUEUE_UNLOCK;
+ return NULL;
+ }
+
+ if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
+ info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
+ ACLK_QUEUE_UNLOCK;
+ return NULL;
+ }
+
+ aclk_queue.count--;
+ aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ aclk_queue.aclk_query_tail = NULL;
+ }
+
+ ACLK_QUEUE_UNLOCK;
+ return this_query;
+}
+
+// Returns the entry after which we need to create a new entry to run at the specified time
+// If NULL is returned we need to add to HEAD
+// Need to have a QUERY lock before calling this
+
+static struct aclk_query *aclk_query_find_position(time_t time_to_run)
+{
+ struct aclk_query *tmp_query, *last_query;
+
+ // Quick check if we will add to the end
+ if (likely(aclk_queue.aclk_query_tail)) {
+ if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
+ return aclk_queue.aclk_query_tail;
+ }
+
+ last_query = NULL;
+ tmp_query = aclk_queue.aclk_query_head;
+
+ while (tmp_query) {
+ if (tmp_query->run_after > time_to_run)
+ return last_query;
+ last_query = tmp_query;
+ tmp_query = tmp_query->next;
+ }
+ return last_query;
+}
+
+// Need to have a QUERY lock before calling this
+static struct aclk_query *
+aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
+{
+ struct aclk_query *tmp_query, *prev_query;
+ UNUSED(cmd);
+
+ tmp_query = aclk_queue.aclk_query_head;
+ prev_query = NULL;
+ while (tmp_query) {
+ if (likely(!tmp_query->deleted)) {
+ if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
+ if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
+ (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
+ if (likely(last_query))
+ *last_query = prev_query;
+ return tmp_query;
+ }
+ }
+ }
+ prev_query = tmp_query;
+ tmp_query = tmp_query->next;
+ }
+ return NULL;
+}
+
+/*
+ * Add a query to execute, the result will be send to the specified topic
+ */
+
+int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
+{
+ struct aclk_query *new_query, *tmp_query;
+
+ // Ignore all commands while we wait for the agent to initialize
+ if (unlikely(!aclk_connected))
+ return 1;
+
+ run_after = now_realtime_sec() + run_after;
+
+ ACLK_QUEUE_LOCK;
+ struct aclk_query *last_query = NULL;
+
+ tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
+ if (unlikely(tmp_query)) {
+ if (tmp_query->run_after == run_after) {
+ ACLK_QUEUE_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+
+ if (last_query)
+ last_query->next = tmp_query->next;
+ else
+ aclk_queue.aclk_query_head = tmp_query->next;
+
+ debug(D_ACLK, "Removing double entry");
+ aclk_query_free(tmp_query);
+ aclk_queue.count--;
+ }
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_queued++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ new_query = callocz(1, sizeof(struct aclk_query));
+ new_query->cmd = aclk_cmd;
+ if (internal) {
+ new_query->topic = strdupz(topic);
+ if (likely(query))
+ new_query->query = strdupz(query);
+ } else {
+ new_query->topic = topic;
+ new_query->query = query;
+ new_query->msg_id = msg_id;
+ }
+
+ if (data)
+ new_query->data = strdupz(data);
+
+ new_query->next = NULL;
+ new_query->created = now_realtime_sec();
+ new_query->run_after = run_after;
+
+ debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
+
+ tmp_query = aclk_query_find_position(run_after);
+
+ if (tmp_query) {
+ new_query->next = tmp_query->next;
+ tmp_query->next = new_query;
+ if (tmp_query == aclk_queue.aclk_query_tail)
+ aclk_queue.aclk_query_tail = new_query;
+ aclk_queue.count++;
+ ACLK_QUEUE_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+
+ new_query->next = aclk_queue.aclk_query_head;
+ aclk_queue.aclk_query_head = new_query;
+ aclk_queue.count++;
+
+ ACLK_QUEUE_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+#ifndef __GNUC__
+#pragma region Helper Functions
+#endif
+
+/*
+ * Take a buffer, encode it and rewrite it
+ *
+ */
+
+static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
+{
+ char *tmp_buffer = mallocz(content_size * 2);
+ char *dst = tmp_buffer;
+ while (content_size > 0) {
+ switch (*src) {
+ case '\n':
+ if (keep_newlines)
+ {
+ *dst++ = '\\';
+ *dst++ = 'n';
+ }
+ break;
+ case '\t':
+ break;
+ case 0x01 ... 0x08:
+ case 0x0b ... 0x1F:
+ *dst++ = '\\';
+ *dst++ = 'u';
+ *dst++ = '0';
+ *dst++ = '0';
+ *dst++ = (*src < 0x0F) ? '0' : '1';
+ *dst++ = to_hex(*src);
+ break;
+ case '\"':
+ *dst++ = '\\';
+ *dst++ = *src;
+ break;
+ default:
+ *dst++ = *src;
+ }
+ src++;
+ content_size--;
+ }
+ *dst = '\0';
+
+ return tmp_buffer;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+#ifndef __GNUC__
+#pragma region ACLK_QUERY
+#endif
+
+static int aclk_execute_query(struct aclk_query *this_query)
+{
+ if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = 0x1f;
+
+ char *mysep = strchr(this_query->query, '?');
+ if (mysep) {
+ strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
+ *mysep = '\0';
+ } else
+ strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
+
+ mysep = strrchr(this_query->query, '/');
+
+ // TODO: handle bad response perhaps in a different way. For now it does to the payload
+ w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+ now_realtime_timeval(&w->tv_ready);
+ w->response.data->date = w->tv_ready.tv_sec;
+ web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
+ BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ buffer_flush(local_buffer);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+ char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
+ char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
+
+ buffer_sprintf(
+ local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
+ w->response.code, encoded_response, encoded_header);
+
+ buffer_sprintf(local_buffer, "\n}");
+
+ debug(D_ACLK, "Response:%s", encoded_header);
+
+ aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
+
+ buffer_free(w->response.data);
+ buffer_free(w->response.header);
+ buffer_free(w->response.header_output);
+ freez(w);
+ buffer_free(local_buffer);
+ freez(encoded_response);
+ freez(encoded_header);
+ return 0;
+ }
+ return 1;
+}
+
+/*
+ * This function will fetch the next pending command and process it
+ *
+ */
+static int aclk_process_query(int t_idx)
+{
+ struct aclk_query *this_query;
+ static long int query_count = 0;
+ ACLK_METADATA_STATE meta_state;
+ usec_t t = 0;
+
+ if (!aclk_connected)
+ return 0;
+
+ this_query = aclk_queue_pop();
+ if (likely(!this_query)) {
+ return 0;
+ }
+
+ if (unlikely(this_query->deleted)) {
+ debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
+ aclk_query_free(this_query);
+ return 1;
+ }
+ query_count++;
+
+ debug(
+ D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
+ this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
+
+ switch (this_query->cmd) {
+ case ACLK_CMD_ONCONNECT:
+ debug(D_ACLK, "EXECUTING on connect metadata command");
+ ACLK_SHARED_STATE_LOCK;
+ meta_state = aclk_shared_state.metadata_submitted;
+ aclk_shared_state.metadata_submitted = ACLK_METADATA_SENT;
+ ACLK_SHARED_STATE_UNLOCK;
+ aclk_send_metadata(meta_state);
+ break;
+
+ case ACLK_CMD_CHART:
+ debug(D_ACLK, "EXECUTING a chart update command");
+ aclk_send_single_chart(this_query->data, this_query->query);
+ break;
+
+ case ACLK_CMD_CHARTDEL:
+ debug(D_ACLK, "EXECUTING a chart delete command");
+ //TODO: This send the info metadata for now
+ aclk_send_info_metadata(ACLK_METADATA_SENT);
+ break;
+
+ case ACLK_CMD_ALARM:
+ debug(D_ACLK, "EXECUTING an alarm update command");
+ aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
+ break;
+
+ case ACLK_CMD_CLOUD:
+ t = now_monotonic_high_precision_usec();
+ debug(D_ACLK, "EXECUTING a cloud command");
+ aclk_execute_query(this_query);
+ t = now_monotonic_high_precision_usec() - t;
+ break;
+
+ default:
+ break;
+ }
+ debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_dispatched++;
+ aclk_queries_per_thread[t_idx]++;
+ if(this_query->cmd == ACLK_CMD_CLOUD) {
+ aclk_metrics_per_sample.cloud_q_process_total += t;
+ aclk_metrics_per_sample.cloud_q_process_count++;
+ if(aclk_metrics_per_sample.cloud_q_process_max < t)
+ aclk_metrics_per_sample.cloud_q_process_max = t;
+ }
+ ACLK_STATS_UNLOCK;
+ }
+
+ aclk_query_free(this_query);
+
+ return 1;
+}
+
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
+{
+ if (query_threads && query_threads->thread_list) {
+ for (int i = 0; i < query_threads->count; i++) {
+ netdata_thread_join(query_threads->thread_list[i].thread, NULL);
+ }
+ freez(query_threads->thread_list);
+ }
+
+ struct aclk_query *this_query;
+
+ do {
+ this_query = aclk_queue_pop();
+ aclk_query_free(this_query);
+ } while (this_query);
+}
+
+#define TASK_LEN_MAX 16
+void aclk_query_threads_start(struct aclk_query_threads *query_threads)
+{
+ info("Starting %d query threads.", query_threads->count);
+
+ char thread_name[TASK_LEN_MAX];
+ query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
+ for (int i = 0; i < query_threads->count; i++) {
+ query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
+
+ snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i);
+ netdata_thread_create(
+ &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
+ &query_threads->thread_list[i]);
+ }
+}
+
+/**
+ * Main query processing thread
+ *
+ * On startup wait for the agent collectors to initialize
+ * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
+ * of no new collectors coming in in order to mark the agent
+ * as stable (set agent_state = AGENT_STABLE)
+ */
+void *aclk_query_main_thread(void *ptr)
+{
+ struct aclk_query_thread *info = ptr;
+ time_t previous_popcorn_interrupt = 0;
+
+ while (!netdata_exit) {
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
+ ACLK_SHARED_STATE_UNLOCK;
+ break;
+ }
+
+ time_t checkpoint = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
+
+ if (checkpoint > ACLK_STABLE_TIMEOUT) {
+ aclk_shared_state.agent_state = AGENT_STABLE;
+ ACLK_SHARED_STATE_UNLOCK;
+ info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
+#ifdef ACLK_DEBUG
+ _dump_collector_list();
+#endif
+ break;
+ }
+
+ if (previous_popcorn_interrupt != aclk_shared_state.last_popcorn_interrupt) {
+ info("Waiting %ds from this moment for agent collectors to initialize." , ACLK_STABLE_TIMEOUT);
+ previous_popcorn_interrupt = aclk_shared_state.last_popcorn_interrupt;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ sleep_usec(USEC_PER_SEC * 1);
+ }
+
+ while (!netdata_exit) {
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(!aclk_shared_state.metadata_submitted)) {
+ ACLK_SHARED_STATE_UNLOCK;
+ if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ errno = 0;
+ error("ACLK failed to queue on_connect command");
+ sleep(1);
+ continue;
+ }
+ ACLK_SHARED_STATE_LOCK;
+ aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+
+ while (aclk_process_query(info->idx)) {
+ // Process all commands
+ };
+
+ QUERY_THREAD_LOCK;
+
+ // TODO: Need to check if there are queries awaiting already
+ if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ sleep_usec(USEC_PER_SEC * 1);
+
+ QUERY_THREAD_UNLOCK;
+ }
+
+ return NULL;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
new file mode 100644
index 0000000000..382b97d262
--- /dev/null
+++ b/aclk/aclk_query.h
@@ -0,0 +1,34 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_QUERY_H
+#define NETDATA_ACLK_QUERY_H
+
+#include "libnetdata/libnetdata.h"
+
+#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+
+extern pthread_cond_t query_cond_wait;
+extern pthread_mutex_t query_lock_wait;
+#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
+
+extern volatile int aclk_connected;
+
+struct aclk_query_thread {
+ netdata_thread_t thread;
+ int idx;
+};
+
+struct aclk_query_threads {
+ struct aclk_query_thread *thread_list;
+ int count;
+};
+
+void *aclk_query_main_thread(void *ptr);
+int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
+
+void aclk_query_threads_start(struct aclk_query_threads *query_threads);
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
+unsigned int aclk_query_size();
+
+#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 2356ccdbc2..59115b55d6 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -4,6 +4,16 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_stats_enabled;
+int query_thread_count;
+
+// data ACLK stats need per query thread
+struct aclk_qt_data {
+ RRDDIM *dim;
+} *aclk_qt_data = NULL;
+
+uint32_t *aclk_queries_per_thread = NULL;
+uint32_t *aclk_queries_per_thread_sample = NULL;
+
struct aclk_metrics aclk_metrics = {
.online = 0,
};
@@ -17,7 +27,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc
if (unlikely(!st_aclkstats)) {
st_aclkstats = rrdset_create_localhost(
- "netdata", "aclk_status", NULL, "aclk_stats", NULL, "ACLK/Cloud connection status",
+ "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status",
"connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE);
rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
@@ -29,7 +39,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc
rrdset_done(st_aclkstats);
}
-static void aclk_stats_query_thread(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st_query_thread = NULL;
static RRDDIM *rd_queued = NULL;
@@ -37,16 +47,16 @@ static void aclk_stats_query_thread(struct aclk_metrics_per_sample *per_sample)
if (unlikely(!st_query_thread)) {
st_query_thread = rrdset_create_localhost(
- "netdata", "aclk_query_per_second", NULL, "aclk_stats", NULL, "ACLK Queries per second", "queries/s",
+ "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s",
"netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA);
rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st_query_thread);
rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued);
- rrddim_set_by_pointer(st_query_thread, rd_dispatched, -per_sample->queries_dispatched);
+ rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched);
rrdset_done(st_query_thread);
}
@@ -60,7 +70,7 @@ static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample)
if (unlikely(!st)) {
st = rrdset_create_localhost(
- "netdata", "aclk_latency_mqtt", NULL, "aclk_stats", NULL, "ACLK Message Publish Latency", "ms",
+ "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms",
"netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE);
rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
@@ -86,7 +96,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
if (unlikely(!st)) {
st = rrdset_create_localhost(
- "netdata", "aclk_write_q", NULL, "aclk_stats", NULL, "Write Queue Mosq->Libwebsockets", "kB/s",
+ "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "kB/s",
"netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA);
rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
@@ -108,7 +118,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
if (unlikely(!st)) {
st = rrdset_create_localhost(
- "netdata", "aclk_read_q", NULL, "aclk_stats", NULL, "Read Queue Libwebsockets->Mosq", "kB/s",
+ "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "kB/s",
"netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA);
rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
@@ -130,7 +140,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
if (unlikely(!st)) {
st = rrdset_create_localhost(
- "netdata", "aclk_cloud_req", NULL, "aclk_stats", NULL, "Requests received from cloud", "req/s",
+ "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
"netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
@@ -144,13 +154,82 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
+#define MAX_DIM_NAME 16
+static void aclk_stats_query_threads(uint32_t *queries_per_thread)
+{
+ static RRDSET *st = NULL;
+
+ char dim_name[MAX_DIM_NAME];
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
+ "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < query_thread_count; i++) {
+ snprintf(dim_name, MAX_DIM_NAME, "Query %d", i);
+ aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+ } else
+ rrdset_next(st);
+
+ for (int i = 0; i < query_thread_count; i++) {
+ rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
+ }
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_avg = NULL;
+ static RRDDIM *rd_rq_max = NULL;
+ static RRDDIM *rd_rq_total = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us",
+ "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ if(per_sample->cloud_q_process_count)
+ rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count));
+ else
+ rrddim_set_by_pointer(st, rd_rq_avg, 0);
+ rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max);
+ rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total);
+
+ rrdset_done(st);
+}
+
+void aclk_stats_thread_cleanup()
+{
+ freez(aclk_qt_data);
+ freez(aclk_queries_per_thread);
+ freez(aclk_queries_per_thread_sample);
+}
+
void *aclk_stats_main_thread(void *ptr)
{
- UNUSED(ptr);
+ struct aclk_stats_thread *args = ptr;
+
+ query_thread_count = args->query_thread_count;
+ aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
+ aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
+ aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+
heartbeat_t hb;
heartbeat_init(&hb);
usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
+
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
@@ -168,10 +247,13 @@ void *aclk_stats_main_thread(void *ptr)
memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+ mempcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
+ memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
ACLK_STATS_UNLOCK;
aclk_stats_collect(&per_sample, &permanent);
- aclk_stats_query_thread(&per_sample);
+ aclk_stats_query_queue(&per_sample);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_latency(&per_sample);
#endif
@@ -179,7 +261,11 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_read_q(&per_sample);
aclk_stats_cloud_req(&per_sample);
+ aclk_stats_query_threads(aclk_queries_per_thread_sample);
+
+ aclk_stats_query_time(&per_sample);
}
+
return 0;
}
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index f2a5b1a518..7ef3e337c6 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -5,6 +5,7 @@
#include "../daemon/common.h"
#include "libnetdata/libnetdata.h"
+#include "aclk_common.h"
#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
@@ -15,6 +16,11 @@ extern netdata_mutex_t aclk_stats_mutex;