summaryrefslogtreecommitdiffstats
path: root/aclk/agent_cloud_link.c
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-07-10 15:31:30 +0200
committerGitHub <noreply@github.com>2020-07-10 15:31:30 +0200
commit42aa54eaf6a64348f23cc0c9ed0a071b79bc9cad (patch)
treed8e8cd9b2a695560e08c6db771afe7c6a4c8587e /aclk/agent_cloud_link.c
parent0be2c726035419bb3c06e7ee8852e5b58ff1b020 (diff)
adds support for multiple ACLK query processing threads (#9355)
Diffstat (limited to 'aclk/agent_cloud_link.c')
-rw-r--r--aclk/agent_cloud_link.c739
1 files changed, 119 insertions, 620 deletions
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 5ff03319e5..97ced1c19d 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -3,34 +3,26 @@
#include "libnetdata/libnetdata.h"
#include "agent_cloud_link.h"
#include "aclk_lws_https_client.h"
+#include "aclk_query.h"
#include "aclk_common.h"
#include "aclk_stats.h"
int aclk_shutting_down = 0;
-// State-machine for the on-connect metadata transmission.
-// TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
-// agent startup phase.
-static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
-static AGENT_STATE agent_state = AGENT_INITIALIZING;
// Other global state
static int aclk_subscribed = 0;
static int aclk_disable_single_updates = 0;
-static time_t last_init_sequence = 0;
-static int waiting_init = 1;
static char *aclk_username = NULL;
static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
-int aclk_connected = 0; // Exposed in the web-api
int aclk_force_reconnect = 0; // Indication from lower layers
int aclk_kill_link = 0; // Tell the agent to tear down the link
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
-static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
@@ -39,16 +31,6 @@ static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
-#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
-#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
-
-pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
-pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
-
-#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
-#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
-#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
-
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void aclk_lws_wss_destroy_context();
/*
@@ -71,24 +53,6 @@ struct _collector {
struct _collector *collector_list = NULL;
-struct aclk_query {
- time_t created;
- time_t run_after; // Delay run until after this time
- ACLK_CMD cmd; // What command is this
- char *topic; // Topic to respond to
- char *data; // Internal data (NULL if request from the cloud)
- char *msg_id; // msg_id generated by the cloud (NULL if internal)
- char *query; // The actual query
- u_char deleted; // Mark deleted for garbage collect
- struct aclk_query *next;
-};
-
-struct aclk_query_queue {
- struct aclk_query *aclk_query_head;
- struct aclk_query *aclk_query_tail;
- uint64_t count;
-} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
-
char *create_uuid()
{
uuid_t uuid;
@@ -219,225 +183,6 @@ unsigned long int aclk_reconnect_delay(int mode)
return delay;
}
-/*
- * Free a query structure when done
- */
-
-void aclk_query_free(struct aclk_query *this_query)
-{
- if (unlikely(!this_query))
- return;
-
- freez(this_query->topic);
- if (likely(this_query->query))
- freez(this_query->query);
- if (likely(this_query->data))
- freez(this_query->data);
- if (likely(this_query->msg_id))
- freez(this_query->msg_id);
- freez(this_query);
-}
-
-// Returns the entry after which we need to create a new entry to run at the specified time
-// If NULL is returned we need to add to HEAD
-// Need to have a QUERY lock before calling this
-
-struct aclk_query *aclk_query_find_position(time_t time_to_run)
-{
- struct aclk_query *tmp_query, *last_query;
-
- // Quick check if we will add to the end
- if (likely(aclk_queue.aclk_query_tail)) {
- if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
- return aclk_queue.aclk_query_tail;
- }
-
- last_query = NULL;
- tmp_query = aclk_queue.aclk_query_head;
-
- while (tmp_query) {
- if (tmp_query->run_after > time_to_run)
- return last_query;
- last_query = tmp_query;
- tmp_query = tmp_query->next;
- }
- return last_query;
-}
-
-// Need to have a QUERY lock before calling this
-struct aclk_query *
-aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
-{
- struct aclk_query *tmp_query, *prev_query;
- UNUSED(cmd);
-
- tmp_query = aclk_queue.aclk_query_head;
- prev_query = NULL;
- while (tmp_query) {
- if (likely(!tmp_query->deleted)) {
- if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
- if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
- (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
- if (likely(last_query))
- *last_query = prev_query;
- return tmp_query;
- }
- }
- }
- prev_query = tmp_query;
- tmp_query = tmp_query->next;
- }
- return NULL;
-}
-
-/*
- * Add a query to execute, the result will be send to the specified topic
- */
-
-int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
-{
- struct aclk_query *new_query, *tmp_query;
-
- // Ignore all commands while we wait for the agent to initialize
- if (unlikely(waiting_init))
- return 1;
-
- run_after = now_realtime_sec() + run_after;
-
- QUERY_LOCK;
- struct aclk_query *last_query = NULL;
-
- tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
- if (unlikely(tmp_query)) {
- if (tmp_query->run_after == run_after) {
- QUERY_UNLOCK;
- QUERY_THREAD_WAKEUP;
- return 0;
- }
-
- if (last_query)
- last_query->next = tmp_query->next;
- else
- aclk_queue.aclk_query_head = tmp_query->next;
-
- debug(D_ACLK, "Removing double entry");
- aclk_query_free(tmp_query);
- aclk_queue.count--;
- }
-
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_queued++;
- ACLK_STATS_UNLOCK;
- }
-
- new_query = callocz(1, sizeof(struct aclk_query));
- new_query->cmd = aclk_cmd;
- if (internal) {
- new_query->topic = strdupz(topic);
- if (likely(query))
- new_query->query = strdupz(query);
- } else {
- new_query->topic = topic;
- new_query->query = query;
- new_query->msg_id = msg_id;
- }
-
- if (data)
- new_query->data = strdupz(data);
-
- new_query->next = NULL;
- new_query->created = now_realtime_sec();
- new_query->run_after = run_after;
-
- debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
-
- tmp_query = aclk_query_find_position(run_after);
-
- if (tmp_query) {
- new_query->next = tmp_query->next;
- tmp_query->next = new_query;
- if (tmp_query == aclk_queue.aclk_query_tail)
- aclk_queue.aclk_query_tail = new_query;
- aclk_queue.count++;
- QUERY_UNLOCK;
- QUERY_THREAD_WAKEUP;
- return 0;
- }
-
- new_query->next = aclk_queue.aclk_query_head;
- aclk_queue.aclk_query_head = new_query;
- aclk_queue.count++;
-
- QUERY_UNLOCK;
- QUERY_THREAD_WAKEUP;
- return 0;
-}
-
-inline int aclk_submit_request(struct aclk_request *request)
-{
- return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
-}
-
-/*
- * Get the next query to process - NULL if nothing there
- * The caller needs to free memory by calling aclk_query_free()
- *
- * topic
- * query
- * The structure itself
- *
- */
-struct aclk_query *aclk_queue_pop()
-{
- struct aclk_query *this_query;
-
- QUERY_LOCK;
-
- if (likely(!aclk_queue.aclk_query_head)) {
- QUERY_UNLOCK;
- return NULL;
- }
-
- this_query = aclk_queue.aclk_query_head;
-
- // Get rid of the deleted entries
- while (this_query && this_query->deleted) {
- aclk_queue.count--;
-
- aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
-
- if (likely(!aclk_queue.aclk_query_head)) {
- aclk_queue.aclk_query_tail = NULL;
- }
-
- aclk_query_free(this_query);
-
- this_query = aclk_queue.aclk_query_head;
- }
-
- if (likely(!this_query)) {
- QUERY_UNLOCK;
- return NULL;
- }
-
- if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
- info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
- QUERY_UNLOCK;
- return NULL;
- }
-
- aclk_queue.count--;
- aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
-
- if (likely(!aclk_queue.aclk_query_head)) {
- aclk_queue.aclk_query_tail = NULL;
- }
-
- QUERY_UNLOCK;
- return this_query;
-}
-
// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g. base_topic/subtopic
// This is called during the connection, we delete any previous topic
@@ -489,6 +234,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size)
return final_topic;
}
+#ifndef __GNUC__
+#pragma region ACLK Internal Collector Tracking
+#endif
+
/*
* Free a collector structure
*/
@@ -673,6 +422,22 @@ static struct _collector *_add_collector(const char *hostname, const char *plugi
return tmp_collector;
}
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+inline static int aclk_popcorn_check_bump()
+{
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
+ ACLK_SHARED_STATE_UNLOCK;
+ return 1;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ return 0;
+}
+
/*
* Add a new collector to the list
* If it exists, update the chart count
@@ -693,14 +458,13 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
return;
}
- if (unlikely(agent_state == AGENT_INITIALIZING))
- last_init_sequence = now_realtime_sec();
- else {
- if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
- debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
- }
-
COLLECTOR_UNLOCK;
+
+ if(aclk_popcorn_check_bump())
+ return;
+
+ if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}
/*
@@ -733,286 +497,13 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha
COLLECTOR_UNLOCK;
- if (unlikely(agent_state == AGENT_INITIALIZING))
- last_init_sequence = now_realtime_sec();
- else {
- if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
- debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
- }
-
_free_collector(tmp_collector);
-}
-/*
- * Take a buffer, encode it and rewrite it
- *
- */
-
-static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
-{
- char *tmp_buffer = mallocz(content_size * 2);
- char *dst = tmp_buffer;
- while (content_size > 0) {
- switch (*src) {
- case '\n':
- if (keep_newlines)
- {
- *dst++ = '\\';
- *dst++ = 'n';
- }
- break;
- case '\t':
- break;
- case 0x01 ... 0x08:
- case 0x0b ... 0x1F:
- *dst++ = '\\';
- *dst++ = 'u';
- *dst++ = '0';
- *dst++ = '0';
- *dst++ = (*src < 0x0F) ? '0' : '1';
- *dst++ = to_hex(*src);
- break;
- case '\"':
- *dst++ = '\\';
- *dst++ = *src;
- break;
- default:
- *dst++ = *src;
- }
- src++;
- content_size--;
- }
- *dst = '\0';
-
- return tmp_buffer;
-}
-
-int aclk_execute_query(struct aclk_query *this_query)
-{
- if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
- struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
- w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
- w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
- w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
- w->acl = 0x1f;
-
- char *mysep = strchr(this_query->query, '?');
- if (mysep) {
- strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
- *mysep = '\0';
- } else
- strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
-
- mysep = strrchr(this_query->query, '/');
-
- // TODO: handle bad response perhaps in a different way. For now it does to the payload
- w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
- now_realtime_timeval(&w->tv_ready);
- w->response.data->date = w->tv_ready.tv_sec;
- web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
- BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
- buffer_strcat(local_buffer, ",\n\t\"payload\": ");
- char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
- char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
-
- buffer_sprintf(
- local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
- w->response.code, encoded_response, encoded_header);
-
- buffer_sprintf(local_buffer, "\n}");
-
- debug(D_ACLK, "Response:%s", encoded_header);
-
- aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
-
- buffer_free(w->response.data);
- buffer_free(w->response.header);
- buffer_free(w->response.header_output);
- freez(w);
- buffer_free(local_buffer);
- freez(encoded_response);
- freez(encoded_header);
- return 0;
- }
- return 1;
-}
-
-/*
- * This function will fetch the next pending command and process it
- *
- */
-int aclk_process_query()
-{
- struct aclk_query *this_query;
- static long int query_count = 0;
-
- if (!aclk_connected)
- return 0;
-
- this_query = aclk_queue_pop();
- if (likely(!this_query)) {
- return 0;
- }
-
- if (unlikely(this_query->deleted)) {
- debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
- aclk_query_free(this_query);
- return 1;
- }
- query_count++;
-
- debug(
- D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
- this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
-
- switch (this_query->cmd) {
- case ACLK_CMD_ONCONNECT:
- debug(D_ACLK, "EXECUTING on connect metadata command");
- aclk_send_metadata();
- aclk_metadata_submitted = ACLK_METADATA_SENT;
- break;
-
- case ACLK_CMD_CHART:
- debug(D_ACLK, "EXECUTING a chart update command");
- aclk_send_single_chart(this_query->data, this_query->query);
- break;
-
- case ACLK_CMD_CHARTDEL:
- debug(D_ACLK, "EXECUTING a chart delete command");
- //TODO: This send the info metadata for now
- aclk_send_info_metadata();
- break;
-
- case ACLK_CMD_ALARM:
- debug(D_ACLK, "EXECUTING an alarm update command");
- aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
- break;
-
- case ACLK_CMD_CLOUD:
- debug(D_ACLK, "EXECUTING a cloud command");
- aclk_execute_query(this_query);
- break;
-
- default:
- break;
- }
- debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
-
- aclk_query_free(this_query);
-
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_dispatched++;
- ACLK_STATS_UNLOCK;
- }
-
- return 1;
-}
-
-/*
- * Process all pending queries
- * Return 0 if no queries were processed, 1 otherwise
- *
- */
-
-int aclk_process_queries()
-{
- if (unlikely(netdata_exit || !aclk_connected))
- return 0;
-
- if (likely(!aclk_queue.count))
- return 0;
-
- debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count);
-
- //TODO: may consider possible throttling here
- while (aclk_process_query()) {
- // Process all commands
- };
-
- return 1;
-}
-
-static void aclk_query_thread_cleanup(void *ptr)
-{
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
-
- info("cleaning up...");
-
- _reset_collector_list();
- freez(collector_list);
-
- // Clean memory for pending queries if any
- struct aclk_query *this_query;
-
- do {
- this_query = aclk_queue_pop();
- aclk_query_free(this_query);
- } while (this_query);
-
- freez(static_thread->thread);
- freez(static_thread);
-}
-
-/**
- * Main query processing thread
- *
- * On startup wait for the agent collectors to initialize
- * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
- * of no new collectors coming in in order to mark the agent
- * as stable (set agent_state = AGENT_STABLE)
- */
-void *aclk_query_main_thread(void *ptr)
-{
- netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
-
- while (agent_state == AGENT_INITIALIZING && !netdata_exit) {
- time_t checkpoint;
-
- checkpoint = now_realtime_sec() - last_init_sequence;
- if (checkpoint > ACLK_STABLE_TIMEOUT) {
- agent_state = AGENT_STABLE;
- info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
-#ifdef ACLK_DEBUG
- _dump_collector_list();
-#endif
- break;
- }
- info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint);
- sleep_usec(USEC_PER_SEC * 1);
- }
-
- while (!netdata_exit) {
- if (unlikely(!aclk_metadata_submitted)) {
- aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
- if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
- errno = 0;
- error("ACLK failed to queue on_connect command");
- aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
- }
- }
-
- aclk_process_queries();
-
- QUERY_THREAD_LOCK;
-
- // TODO: Need to check if there are queries awaiting already
- if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
- sleep_usec(USEC_PER_SEC * 1);
-
- QUERY_THREAD_UNLOCK;
+ if (aclk_popcorn_check_bump())
+ return;
- } // forever
- info("Shutting down query processing thread");
- netdata_thread_cleanup_pop(1);
- return NULL;
+ if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}
static void aclk_graceful_disconnect()
@@ -1047,23 +538,9 @@ static void aclk_graceful_disconnect()
aclk_shutting_down = 0;
}
-
-// Thread cleanup
-static void aclk_main_cleanup(void *ptr)
-{
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
-
- info("cleaning up...");
-
- char *agent_id = is_agent_claimed();
- if (agent_id && aclk_connected) {
- freez(agent_id);
- // Wakeup thread to cleanup
- QUERY_THREAD_WAKEUP;
- aclk_graceful_disconnect();
- }
-}
+#ifndef __GNUC__
+#pragma region Incoming Msg Parsing
+#endif
struct dictionary_singleton {
char *key;
@@ -1092,6 +569,15 @@ int json_extract_singleton(JSON_ENTRY *e)
return 0;
}
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+
+#ifndef __GNUC__
+#pragma region Challenge Response
+#endif
+
// Base-64 decoder.
// Note: This is non-validating, invalid input will be decoded without an error.
// Challenges are packed into json strings so we don't skip newlines.
@@ -1225,29 +711,6 @@ int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decry
return result;
}
-char *extract_payload(BUFFER *b)
-{
-char *s = b->buffer;
-unsigned int line_len=0;
- for (size_t i=0; i<b->len; i++)
- {
- if (*s == 0 )
- return NULL;
- if (*s == '\n' ) {
- if (line_len==0)
- return s+1;
- line_len = 0;
- }
- else if (*s == '\r') {
- /* don't count */
- }
- else
- line_len ++;
- s++;
- }
- return NULL;
-}
-
void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
{
char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1340,6 +803,10 @@ CLEANUP:
return;
}
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
static void aclk_try_to_connect(char *hostname, char *port, int port_num)
{
if (!aclk_private_key) {
@@ -1359,7 +826,6 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num)
}
}
-
/**
* Main agent cloud link thread
*
@@ -1373,8 +839,10 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num)
void *aclk_main(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- struct netdata_static_thread *query_thread;
- struct netdata_static_thread *stats_thread = NULL;
+ struct aclk_query_threads query_threads;
+ struct aclk_stats_thread *stats_thread = NULL;
+
+ query_threads.thread_list = NULL;
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
// as it must notify the far end that it shutdown gracefully and avoid the LWT.
@@ -1400,18 +868,25 @@ void *aclk_main(void *ptr)
}
}
- aclk_stats_enabled = appconfig_get_boolean(&cloud_config, CONFIG_SECTION_GLOBAL, "statistics", CONFIG_BOOLEAN_YES);
+ query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", 2);
+ if(query_threads.count < 1) {
+ error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count);
+ query_threads.count = 1;
+ config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
+ }
+
+ aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); // without mutex here because threads are not yet started
+
+ aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
if (aclk_stats_enabled) {
- stats_thread = callocz(1, sizeof(struct netdata_static_thread));
+ stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
+ stats_thread->query_thread_count = query_threads.count;
netdata_thread_create(
stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
stats_thread);
}
- last_init_sequence = now_realtime_sec();
- query_thread = NULL;
-
char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
char *aclk_port = NULL;
uint32_t port_num = 0;
@@ -1508,17 +983,13 @@ void *aclk_main(void *ptr)
aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
}
- if (unlikely(!query_thread)) {
- query_thread = callocz(1, sizeof(struct netdata_static_thread));
- query_thread->thread = mallocz(sizeof(netdata_thread_t));
- netdata_thread_create(
- query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
- query_thread);
+ if (unlikely(!query_threads.thread_list)) {
+ aclk_query_threads_start(&query_threads);
}
} // forever
exited:
// Wakeup query thread to cleanup
- QUERY_THREAD_WAKEUP;
+ QUERY_THREAD_WAKEUP_ALL;
freez(aclk_username);
freez(aclk_password);
@@ -1527,10 +998,24 @@ exited:
if (aclk_private_key != NULL)
RSA_free(aclk_private_key);
- aclk_main_cleanup(ptr);
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id && aclk_connected) {
+ freez(agent_id);
+ // Wakeup thread to cleanup
+ QUERY_THREAD_WAKEUP;
+ aclk_graceful_disconnect();
+ }
+
+ aclk_query_threads_cleanup(&query_threads);
+
+ _reset_collector_list();
+ freez(collector_list);
if(aclk_stats_enabled) {
netdata_thread_join(*stats_thread->thread, NULL);
+ aclk_stats_thread_cleanup();
freez(stats_thread->thread);
freez(stats_thread);
}
@@ -1626,12 +1111,11 @@ int aclk_subscribe(char *sub_topic, int qos)
// This is called from a callback when the link goes up
void aclk_connect()
{
- info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count);
+ info("Connection detected (%u queued queries)", aclk_query_size());
aclk_stats_upd_online(1);
aclk_connected = 1;
- waiting_init = 0;
aclk_reconnect_delay(0);
QUERY_THREAD_WAKEUP;
return;
@@ -1641,13 +1125,14 @@ void aclk_connect()
void aclk_disconnect()
{
if (likely(aclk_connected))
- info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count);
+ info("Disconnect detected (%u queued queries)", aclk_query_size());
aclk_stats_upd_online(0);
aclk_subscribed = 0;
- aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
- waiting_init = 1;
+ ACLK_SHARED_STATE_LOCK;
+ aclk_shared_state.metadata_submitted = ACLK_METADATA_REQUIRED;
+ ACLK_SHARED_STATE_UNLOCK;
aclk_connected = 0;
aclk_connecting = 0;
aclk_force_reconnect = 1;
@@ -1692,7 +1177,8 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts
* active alarms
*/
void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
-void aclk_send_alarm_metadata()
+
+void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1706,7 +1192,8 @@ void aclk_send_alarm_metadata()
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
- if (aclk_metadata_submitted == ACLK_METADATA_SENT)
+
+ if (metadata_submitted == ACLK_METADATA_SENT)
aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
else
aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
@@ -1737,7 +1224,7 @@ void aclk_send_alarm_metadata()
* /api/v1/info
* charts
*/
-int aclk_send_info_metadata()
+int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1751,7 +1238,7 @@ int aclk_send_info_metadata()
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
- if (aclk_metadata_submitted == ACLK_METADATA_SENT)
+ if (metadata_submitted == ACLK_METADATA_SENT)
aclk_create_header(local_buffer, "update", msg_id, 0, 0);
else
aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
@@ -1794,11 +1281,11 @@ void aclk_send_stress_test(size_t size)
// Send info metadata message to the cloud if the link is established
// or on request
-int aclk_send_metadata()
+int aclk_send_metadata(ACLK_METADATA_STATE state)
{
- aclk_send_info_metadata();
- aclk_send_alarm_metadata();
+ aclk_send_info_metadata(state);
+ aclk_send_alarm_metadata(state);
return 0;
}
@@ -1816,8 +1303,13 @@ void aclk_single_update_enable()
// Trigged by a health reload, sends the alarm metadata
void aclk_alarm_reload()
{
- if (unlikely(agent_state == AGENT_INITIALIZING))
+
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ ACLK_SHARED_STATE_UNLOCK;
return;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (likely(aclk_connected)) {
@@ -1881,16 +1373,16 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (unlikely(aclk_disable_single_updates))
return 0;
- if (unlikely(agent_state == AGENT_INITIALIZING))
- last_init_sequence = now_realtime_sec();
- else {
- if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
- if (likely(aclk_connected)) {
- errno = 0;
- error("ACLK failed to queue chart_update command");
- }
+ if (aclk_popcorn_check_bump())
+ return 0;
+
+ if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
+ if (likely(aclk_connected)) {
+ errno = 0;
+ error("ACLK failed to queue chart_update command");
}
}
+
return 0;
#endif
}
@@ -1905,8 +1397,12 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
if (host != localhost)
return 0;
- if (unlikely(agent_state == AGENT_INITIALIZING))
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ ACLK_SHARED_STATE_UNLOCK;
return 0;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
/*
* Check if individual updates have been disabled
@@ -1959,10 +1455,13 @@ int aclk_handle_cloud_request(char *payload)
ACLK_STATS_UNLOCK;
}
- if (unlikely(agent_state == AGENT_INITIALIZING)) {
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
+ ACLK_SHARED_STATE_UNLOCK;
return 0;
}
+ ACLK_SHARED_STATE_UNLOCK;
if (unlikely(!payload)) {
debug(D_ACLK, "ACLK incoming message is empty");
@@ -2010,7 +1509,7 @@ int aclk_handle_cloud_request(char *payload)
cloud_to_agent.type_id = NULL;
}
- if (unlikely(aclk_submit_request(&cloud_to_agent)))
+ if (unlikely(aclk_queue_query(cloud_to_agent.callback_topic, NULL, cloud_to_agent.msg_id, cloud_to_agent.payload, 0, 0, ACLK_CMD_CLOUD)))
debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
// Note: the payload comes from the callback and it will be automatically freed