summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-09-10 13:48:06 +0200
committerGitHub <noreply@github.com>2020-09-10 13:48:06 +0200
commit54916258aa5b687766b8bcf7bb5a058fada3f855 (patch)
treeaaf2f79845ff22b48486d6b6a37f962324b02239 /aclk
parentc1698e87f058c770f087888aba224fbdd018026a (diff)
Implements ACLK v2 http message with compression (#9895)
Allows cloud to use v2 queries which support compression.
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_common.c1
-rw-r--r--aclk/aclk_common.h16
-rw-r--r--aclk/aclk_query.c193
-rw-r--r--aclk/aclk_query.h3
-rw-r--r--aclk/aclk_rx_msgs.c314
-rw-r--r--aclk/aclk_rx_msgs.h13
-rw-r--r--aclk/agent_cloud_link.c193
-rw-r--r--aclk/agent_cloud_link.h3
-rw-r--r--aclk/mqtt.c8
-rw-r--r--aclk/mqtt.h2
10 files changed, 525 insertions, 221 deletions
diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c
index d172f27c38..188f41a09c 100644
--- a/aclk/aclk_common.c
+++ b/aclk/aclk_common.c
@@ -5,6 +5,7 @@
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_disable_runtime = 0;
+int aclk_kill_link = 0;
struct aclk_shared_state aclk_shared_state = {
.metadata_submitted = ACLK_METADATA_REQUIRED,
diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h
index 0d85a6db9d..819a51e979 100644
--- a/aclk/aclk_common.h
+++ b/aclk/aclk_common.h
@@ -9,8 +9,8 @@ extern netdata_mutex_t aclk_shared_state_mutex;
// minimum and maximum supported version of ACLK
// in this version of agent
-#define ACLK_VERSION_MIN 1
-#define ACLK_VERSION_MAX 1
+#define ACLK_VERSION_MIN 2
+#define ACLK_VERSION_MAX 2
// Version negotiation messages have they own versioning
// this is also used for LWT message as we set that up
@@ -25,6 +25,9 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
#endif
+// Define ACLK Feature Version Boundaries Here
+#define ACLK_V_COMPRESSION 2
+
typedef enum aclk_cmd {
ACLK_CMD_CLOUD,
ACLK_CMD_ONCONNECT,
@@ -32,7 +35,7 @@ typedef enum aclk_cmd {
ACLK_CMD_CHART,
ACLK_CMD_CHARTDEL,
ACLK_CMD_ALARM,
- ACLK_CMD_MAX
+ ACLK_CMD_CLOUD_QUERY_2
} ACLK_CMD;
typedef enum aclk_metadata_state {
@@ -64,18 +67,21 @@ typedef enum aclk_proxy_type {
PROXY_NOT_SET,
} ACLK_PROXY_TYPE;
+extern int aclk_kill_link; // Tells the agent to tear down the link
+extern int aclk_disable_runtime;
+
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
#define ACLK_PROXY_ENV "env"
#define ACLK_PROXY_CONFIG_VAR "proxy"
+#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/"
+
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
int aclk_decode_base_url(char *url, char **aclk_hostname, char **aclk_port);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
-extern int aclk_disable_runtime;
-
#endif //ACLK_COMMON_H
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index cf9a63ff6a..cd1cfd1e3e 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -1,6 +1,9 @@
#include "aclk_common.h"
#include "aclk_query.h"
#include "aclk_stats.h"
+#include "aclk_rx_msgs.h"
+
+#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
@@ -18,7 +21,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex)
struct aclk_query {
- time_t created;
+ usec_t created;
time_t run_after; // Delay run until after this time
ACLK_CMD cmd; // What command is this
char *topic; // Topic to respond to
@@ -56,7 +59,7 @@ static void aclk_query_free(struct aclk_query *this_query)
freez(this_query->topic);
if (likely(this_query->query))
freez(this_query->query);
- if (likely(this_query->data))
+ if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2)
freez(this_query->data);
if (likely(this_query->msg_id))
freez(this_query->msg_id);
@@ -150,7 +153,7 @@ static struct aclk_query *aclk_query_find_position(time_t time_to_run)
// Need to have a QUERY lock before calling this
static struct aclk_query *
-aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
+aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
{
struct aclk_query *tmp_query, *prev_query;
UNUSED(cmd);
@@ -160,7 +163,7 @@ aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd
while (tmp_query) {
if (likely(!tmp_query->deleted)) {
if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
- if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
+ if ((!data || data == tmp_query->data) &&
(!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
if (likely(last_query))
*last_query = prev_query;
@@ -178,7 +181,7 @@ aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd
* Add a query to execute, the result will be send to the specified topic
*/
-int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
+int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
struct aclk_query *new_query, *tmp_query;
@@ -227,11 +230,9 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
new_query->msg_id = msg_id;
}
- if (data)
- new_query->data = strdupz(data);
-
+ new_query->data = data;
new_query->next = NULL;
- new_query->created = now_realtime_sec();
+ new_query->created = now_realtime_usec();
new_query->run_after = run_after;
debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
@@ -318,6 +319,26 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
#pragma region ACLK_QUERY
#endif
+static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url)
+{
+ usec_t t;
+
+ t = now_monotonic_high_precision_usec();
+ w->response.code = web_client_api_request_v1(host, w, url);
+ t = now_monotonic_high_precision_usec() - t;
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_q_process_total += t;
+ aclk_metrics_per_sample.cloud_q_process_count++;
+ if (aclk_metrics_per_sample.cloud_q_process_max < t)
+ aclk_metrics_per_sample.cloud_q_process_max = t;
+ ACLK_STATS_UNLOCK;
+ }
+
+ return t;
+}
+
static int aclk_execute_query(struct aclk_query *this_query)
{
if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
@@ -340,7 +361,7 @@ static int aclk_execute_query(struct aclk_query *this_query)
mysep = strrchr(this_query->query, '/');
// TODO: handle bad response perhaps in a different way. For now it does to the payload
- w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+ aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
now_realtime_timeval(&w->tv_ready);
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
@@ -375,16 +396,137 @@ static int aclk_execute_query(struct aclk_query *this_query)
return 1;
}
+static int aclk_execute_query_v2(struct aclk_query *this_query)
+{
+ int retval = 0;
+ usec_t t;
+ BUFFER *local_buffer = NULL;
+
+#ifdef NETDATA_WITH_ZLIB
+ int z_ret;
+ BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ char *start, *end;
+#endif
+
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = 0x1f;
+
+ char *mysep = strchr(this_query->query, '?');
+ if (mysep) {
+ url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+ *mysep = '\0';
+ } else
+ url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+
+ mysep = strrchr(this_query->query, '/');
+
+ // execute the query
+ t = aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+
+#ifdef NETDATA_WITH_ZLIB
+ // check if gzip encoding can and should be used
+ if ((start = strstr((char *)this_query->data, WEB_HDR_ACCEPT_ENC))) {
+ start += strlen(WEB_HDR_ACCEPT_ENC);
+ end = strstr(start, "\x0D\x0A");
+ start = strstr(start, "gzip");
+
+ if (start && start < end) {
+ w->response.zstream.zalloc = Z_NULL;
+ w->response.zstream.zfree = Z_NULL;
+ w->response.zstream.opaque = Z_NULL;
+ if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
+ w->response.zinitialized = 1;
+ w->response.zoutput = 1;
+ } else
+ error("Failed to initialize zlib. Proceeding without compression.");
+ }
+ }
+
+ if (w->response.data->len && w->response.zinitialized) {
+ w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
+ w->response.zstream.avail_in = w->response.data->len;
+ do {
+ w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
+ w->response.zstream.next_out = w->response.zbuffer;
+ z_ret = deflate(&w->response.zstream, Z_FINISH);
+ if(z_ret < 0) {
+ if(w->response.zstream.msg)
+ error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg);
+ else
+ error("Unknown error during zlib compression.");
+ retval = 1;
+ goto cleanup;
+ }
+ int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
+ buffer_need_bytes(z_buffer, bytes_to_cpy);
+ memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
+ z_buffer->len += bytes_to_cpy;
+ } while(z_ret != Z_STREAM_END);
+ // so that web_client_build_http_header
+ // puts correct content lenght into header
+ buffer_free(w->response.data);
+ w->response.data = z_buffer;
+ z_buffer = NULL;
+ }
+#endif
+
+ now_realtime_timeval(&w->tv_ready);
+ w->response.data->date = w->tv_ready.tv_sec;
+ web_client_build_http_header(w);
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
+ buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code);
+ buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A");
+ buffer_strcat(local_buffer, w->response.header_output->buffer);
+
+ if (w->response.data->len) {
+#ifdef NETDATA_WITH_ZLIB
+ if (w->response.zinitialized) {
+ buffer_need_bytes(local_buffer, w->response.data->len);
+ memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
+ local_buffer->len += w->response.data->len;
+ } else {
+#endif
+ buffer_strcat(local_buffer, w->response.data->buffer);
+#ifdef NETDATA_WITH_ZLIB
+ }
+#endif
+ }
+
+ aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
+
+cleanup:
+#ifdef NETDATA_WITH_ZLIB
+ if(w->response.zinitialized)
+ deflateEnd(&w->response.zstream);
+ buffer_free(z_buffer);
+#endif
+ buffer_free(w->response.data);
+ buffer_free(w->response.header);
+ buffer_free(w->response.header_output);
+ freez(w);
+ buffer_free(local_buffer);
+ return retval;
+}
+
/*
* This function will fetch the next pending command and process it
*
*/
-static int aclk_process_query(int t_idx)
+static int aclk_process_query(struct aclk_query_thread *t_info)
{
struct aclk_query *this_query;
static long int query_count = 0;
ACLK_METADATA_STATE meta_state;
- usec_t t = 0;
+ RRDHOST *host;
if (!aclk_connected)
return 0;
@@ -401,9 +543,11 @@ static int aclk_process_query(int t_idx)
}
query_count++;
+ host = (RRDHOST*)this_query->data;
+
debug(
- D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
- this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
+ D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
+ this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS);
switch (this_query->cmd) {
case ACLK_CMD_ONCONNECT:
@@ -417,7 +561,9 @@ static int aclk_process_query(int t_idx)
case ACLK_CMD_CHART:
debug(D_ACLK, "EXECUTING a chart update command");
- aclk_send_single_chart(this_query->data, this_query->query);
+ if (!host)
+ fatal("Pointer to host compulsory");
+ aclk_send_single_chart(host->hostname, this_query->query);
break;
case ACLK_CMD_CHARTDEL:
@@ -432,10 +578,12 @@ static int aclk_process_query(int t_idx)
break;
case ACLK_CMD_CLOUD:
- t = now_monotonic_high_precision_usec();
debug(D_ACLK, "EXECUTING a cloud command");
aclk_execute_query(this_query);
- t = now_monotonic_high_precision_usec() - t;
+ break;
+ case ACLK_CMD_CLOUD_QUERY_2:
+ debug(D_ACLK, "EXECUTING Cloud Query v2");
+ aclk_execute_query_v2(this_query);
break;
default:
@@ -446,13 +594,7 @@ static int aclk_process_query(int t_idx)
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
- aclk_queries_per_thread[t_idx]++;
- if(this_query->cmd == ACLK_CMD_CLOUD) {
- aclk_metrics_per_sample.cloud_q_process_total += t;
- aclk_metrics_per_sample.cloud_q_process_count++;
- if(aclk_metrics_per_sample.cloud_q_process_max < t)
- aclk_metrics_per_sample.cloud_q_process_max = t;
- }
+ aclk_queries_per_thread[t_info->idx]++;
ACLK_STATS_UNLOCK;
}
@@ -553,6 +695,7 @@ void *aclk_query_main_thread(void *ptr)
error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
" Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
aclk_shared_state.version_neg = ACLK_VERSION_MIN;
+ aclk_set_rx_handlers(aclk_shared_state.version_neg);
}
if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) {
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
@@ -566,7 +709,7 @@ void *aclk_query_main_thread(void *ptr)
}
ACLK_SHARED_STATE_UNLOCK;
- while (aclk_process_query(info->idx)) {
+ while (aclk_process_query(info)) {
// Process all commands
};
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
index 382b97d262..8c7d7cbd07 100644
--- a/aclk/aclk_query.h
+++ b/aclk/aclk_query.h
@@ -4,6 +4,7 @@
#define NETDATA_ACLK_QUERY_H
#include "libnetdata/libnetdata.h"
+#include "web/server/web_client.h"
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
@@ -25,7 +26,7 @@ struct aclk_query_threads {
};
void *aclk_query_main_thread(void *ptr);
-int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
+int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
void aclk_query_threads_start(struct aclk_query_threads *query_threads);
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
new file mode 100644
index 0000000000..b166d13449
--- /dev/null
+++ b/aclk/aclk_rx_msgs.c
@@ -0,0 +1,314 @@
+
+#include "aclk_rx_msgs.h"
+
+#include "aclk_common.h"
+#include "aclk_stats.h"
+#include "aclk_query.h"
+
+static inline int aclk_extract_v2_data(char *payload, char **data)
+{
+ char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR);
+ if(!ptr)
+ return 1;
+ ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR);
+ *data = strdupz(ptr);
+ return 0;
+}
+
+static inline int aclk_v2_payload_get_query(const char *payload, struct aclk_request *req)
+{
+ const char *start, *end;
+
+ if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) {
+ errno = 0;
+ error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
+ return 1;
+ }
+ start = payload + 4;
+
+ if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) {
+ errno = 0;
+ error("Doesn't look like HTTP GET request.");
+ return 1;
+ }
+
+ req->payload = mallocz((end - start) + 1);
+ strncpyz(req->payload, start, end - start);
+
+ return 0;
+}
+
+#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\
+ debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
+ ACLK_SHARED_STATE_UNLOCK;\
+ return 1;\
+ }\
+ ACLK_SHARED_STATE_UNLOCK;
+
+/*
+ * Parse the incoming payload and queue a command if valid
+ */
+static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ UNUSED(raw_payload);
+ HTTP_CHECK_AGENT_INITIALIZED();
+
+ errno = 0;
+ if (unlikely(cloud_to_agent->version != 1)) {
+ error(
+ "Received \"http\" message from Cloud with version %d, but ACLK version %d is used",
+ cloud_to_agent->version,
+ aclk_shared_state.version_neg);
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->payload)) {
+ error("payload missing");
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->callback_topic)) {
+ error("callback_topic missing");
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->msg_id)) {
+ error("msg_id missing");
+ return 1;
+ }
+
+ if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
+ debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
+
+ return 0;
+}
+
+static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ HTTP_CHECK_AGENT_INITIALIZED();
+
+ char *data;
+
+ errno = 0;
+ if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
+ error(
+ "This handler cannot reply to request with version older than %d, received %d.",
+ ACLK_V_COMPRESSION,
+ cloud_to_agent->version);
+ return 1;
+ }
+
+ if (unlikely(aclk_extract_v2_data(raw_payload, &data))) {
+ error("Error extracting payload expected after the JSON dictionary.");
+ return 1;
+ }
+
+ if (unlikely(aclk_v2_payload_get_query(data, cloud_to_agent)))
+ return 1;
+
+ if (unlikely(!cloud_to_agent->callback_topic)) {
+ error("Missing callback_topic");
+ freez(data);
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->msg_id)) {
+ error("Missing msg_id");
+ freez(data);
+ return 1;
+ }
+
+ // aclk_queue_query takes ownership of data pointer
+ if (unlikely(aclk_queue_query(
+ cloud_to_agent->callback_topic, data, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
+ ACLK_CMD_CLOUD_QUERY_2)))
+ debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
+
+ UNUSED(cloud_to_agent);
+ return 0;
+}
+
+// This handles `version` message from cloud used to negotiate
+// protocol version we will use
+static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ UNUSED(raw_payload);
+ int version = -1;
+ errno = 0;
+
+ if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
+ error(
+ "Unsuported version of \"version\" message from cloud. Expected %d, Got %d",
+ ACLK_VERSION_NEG_VERSION,
+ cloud_to_agent->version);
+ return 1;
+ }
+ if (unlikely(!cloud_to_agent->min_version)) {
+ error("Min version missing or 0");
+ return 1;
+ }
+ if (unlikely(!cloud_to_agent->max_version)) {
+ error("Max version missing or 0");
+ return 1;
+ }
+ if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
+ error(
+ "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version,
+ cloud_to_agent->min_version);
+ return 1;
+ }
+
+ if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
+ error(
+ "Agent too old for this cloud. Minimum version required by cloud %d."
+ " Maximum version supported by this agent %d.",
+ cloud_to_agent->min_version, ACLK_VERSION_MAX);
+ aclk_kill_link = 1;
+ aclk_disable_runtime = 1;
+ return 1;
+ }
+ if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
+ error(
+ "Cloud version is too old for this agent. Maximum version supported by cloud %d."
+ " Minimum (oldest) version supported by this agent %d.",
+ cloud_to_agent->max_version, ACLK_VERSION_MIN);
+ aclk_kill_link = 1;
+ return 1;
+ }
+
+ version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
+
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
+ errno = 0;
+ error("The \"version\" message came too late ignoring.");
+ goto err_cleanup;
+ }
+ if (unlikely(aclk_shared_state.version_neg)) {
+ errno = 0;
+ error("Version has already been set to %d", aclk_shared_state.version_neg);
+ goto err_cleanup;
+ }
+ aclk_shared_state.version_neg = version;
+ ACLK_SHARED_STATE_UNLOCK;
+
+ info("Choosing version %d of ACLK", version);
+
+ aclk_set_rx_handlers(version);
+
+ return 0;
+
+err_cleanup:
+ ACLK_SHARED_STATE_UNLOCK;
+ return 1;
+}
+
+typedef struct aclk_incoming_msg_type{
+ char *name;
+ int(*fnc)(struct aclk_request *, char *);
+}aclk_incoming_msg_type;
+
+aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = {
+ { .name = "http", .fnc = aclk_handle_cloud_request_v1 },
+ { .name = "version", .fnc = aclk_handle_version_response },
+ { .name = NULL, .fnc = NULL }
+};
+
+aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
+ { .name = "http", .fnc = aclk_handle_cloud_request_v2 },
+ { .name = "version", .fnc = aclk_handle_version_response },
+ { .name = NULL, .fnc = NULL }
+};
+
+struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+
+void aclk_set_rx_handlers(int version)
+{
+ if(version >= ACLK_V_COMPRESSION) {
+ aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
+ return;
+ }
+
+ aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+}
+
+int aclk_handle_cloud_message(char *payload)
+{
+ struct aclk_request cloud_to_agent;
+ memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ if (unlikely(!payload)) {
+ errno = 0;
+ error("ACLK incoming message is empty");
+ goto err_cleanup_nojson;
+ }
+
+ debug(D_ACLK, "ACLK incoming message (%s)", payload);
+
+ int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+
+ if (unlikely(rc != JSON_OK)) {
+ errno = 0;
+ error("Malformed json request (%s)", payload);
+ goto err_cleanup;
+ }
+
+ if (!cloud_to_agent.type_id) {
+ errno = 0;
+ error("Cloud message is missing compulsory key \"type\"");
+ goto err_cleanup;
+ }
+
+ if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
+ error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
+ goto err_cleanup;
+ }
+
+ for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
+ if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
+ if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
+ // in case of success handler is supposed to clean up after itself
+ // or as in the case of aclk_handle_cloud_request take
+ // ownership of the pointers (done to avoid copying)
+ // see what `aclk_queue_query` parameter `internal` does
+
+ // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
+ // msg handlers (namely aclk_handle_version_responce)
+ // can freely change what aclk_incoming_msg_types points to
+ // so either exit or restart this for loop
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
+ goto err_cleanup;
+ }
+ }
+
+ errno = 0;
+ error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
+
+err_cleanup:
+ if (cloud_to_agent.payload)
+ freez(cloud_to_agent.payload);
+ if (cloud_to_agent.type_id)
+ freez(cloud_to_agent.type_id);
+ if (cloud_to_agent.msg_id)
+ freez(cloud_to_agent.msg_id);
+ if (cloud_to_agent.callback_topic)
+ freez(cloud_to_agent.callback_topic);
+
+err_cleanup_nojson:
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ return 1;
+}
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
new file mode 100644
index 0000000000..66a30576c2
--- /dev/null
+++ b/aclk/aclk_rx_msgs.h
@@ -0,0 +1,13 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_RX_MSGS_H
+#define NETDATA_ACLK_RX_MSGS_H
+
+#include "../daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+int aclk_handle_cloud_message(char *payload);
+void aclk_set_rx_handlers(int version);
+
+
+#endif /* NETDATA_ACLK_RX_MSGS_H */
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index ee64b1faf9..27d95c0002 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -18,7 +18,6 @@ static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_force_reconnect = 0; // Indication from lower layers
-int aclk_kill_link = 0; // Tell the agent to tear down the link
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
@@ -1074,7 +1073,7 @@ exited:
* If base_topic is missing then the global_base_topic will be used (if available)
*
*/
-int aclk_send_message(char *sub_topic, char *message, char *msg_id)
+int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id)
{
int rc;
int mid;
@@ -1098,7 +1097,7 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id)
}
ACLK_LOCK;
- rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
+ rc = _link_send_message(final_topic, message, len, &mid);
// TODO: link the msg_id with the mid so we can trace it
ACLK_UNLOCK;
@@ -1110,6 +1109,11 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id)
return rc;
}
+int aclk_send_message(char *sub_topic, char *message, char *msg_id)
+{
+ return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id);
+}
+
/*
* Subscribe to a topic in the cloud
* The final subscription will be in the form
@@ -1415,7 +1419,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (aclk_popcorn_check_bump())
return 0;
- if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
+ if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue chart_update command");
@@ -1478,184 +1482,3 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
}
-
-/*
- * Parse the incoming payload and queue a command if valid
- */
-static int aclk_handle_cloud_request(struct aclk_request *cloud_to_agent)
-{
- errno = 0;
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
- debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");
- ACLK_SHARED_STATE_UNLOCK;
- return 1;
- }
- ACLK_SHARED_STATE_UNLOCK;
-
- if (unlikely(cloud_to_agent->version != aclk_shared_state.version_neg)) {
- error("Received \"http\" message from Cloud with version %d, but ACLK version %d is used", cloud_to_agent->version, aclk_shared_state.version_neg);
- return 1;
- }
-
- if (unlikely(!cloud_to_agent->payload)) {
- error("payload missing");
- return 1;
- }
-
- if (unlikely(!cloud_to_agent->callback_topic)) {
- error("callback_topic missing");
- return 1;
- }
-
- if (unlikely(!cloud_to_agent->msg_id)) {
- error("msg_id missing");
- return 1;
- }
-
- if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
- debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
-
- // Note: the payload comes from the callback and it will be automatically freed
- return 0;
-}
-
-// This handles `version` message from cloud used to negotiate
-// protocol version we will use
-static int aclk_handle_version_response(struct aclk_request *cloud_to_agent)
-{
- int version = -1;
- errno = 0;
-
- if(unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
- error("Unsuported version of \"version\" message from cloud. Expected %d, Got %d", ACLK_VERSION_NEG_VERSION, cloud_to_agent->version);
- return 1;
- }
- if(unlikely(!cloud_to_agent->min_version)) {
- error("Min version missing or 0");
- return 1;
- }
- if(unlikely(!cloud_to_agent->max_version)) {
- error("Max version missing or 0");
- return 1;
- }
- if(unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
- error("Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, cloud_to_agent->min_version);
- return 1;
- }
-
- if(unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
- error("Agent too old for this cloud. Minimum version required by cloud %d. Maximum version supported by this agent %d.", cloud_to_agent->min_version, ACLK_VERSION_MAX);
- aclk_kill_link = 1;
- aclk_disable_runtime = 1;
- return 1;
- }
- if(unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
- error("Cloud version is too old for this agent. Maximum version supported by