diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-09-10 13:48:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-10 13:48:06 +0200 |
commit | 54916258aa5b687766b8bcf7bb5a058fada3f855 (patch) | |
tree | aaf2f79845ff22b48486d6b6a37f962324b02239 /aclk | |
parent | c1698e87f058c770f087888aba224fbdd018026a (diff) |
Implements ACLK v2 http message with compression (#9895)
Allows cloud to use v2 queries which support compression.
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_common.c | 1 | ||||
-rw-r--r-- | aclk/aclk_common.h | 16 | ||||
-rw-r--r-- | aclk/aclk_query.c | 193 | ||||
-rw-r--r-- | aclk/aclk_query.h | 3 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 314 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.h | 13 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 193 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 3 | ||||
-rw-r--r-- | aclk/mqtt.c | 8 | ||||
-rw-r--r-- | aclk/mqtt.h | 2 |
10 files changed, 525 insertions, 221 deletions
diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c index d172f27c38..188f41a09c 100644 --- a/aclk/aclk_common.c +++ b/aclk/aclk_common.c @@ -5,6 +5,7 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; int aclk_disable_runtime = 0; +int aclk_kill_link = 0; struct aclk_shared_state aclk_shared_state = { .metadata_submitted = ACLK_METADATA_REQUIRED, diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h index 0d85a6db9d..819a51e979 100644 --- a/aclk/aclk_common.h +++ b/aclk/aclk_common.h @@ -9,8 +9,8 @@ extern netdata_mutex_t aclk_shared_state_mutex; // minimum and maximum supported version of ACLK // in this version of agent -#define ACLK_VERSION_MIN 1 -#define ACLK_VERSION_MAX 1 +#define ACLK_VERSION_MIN 2 +#define ACLK_VERSION_MAX 2 // Version negotiation messages have they own versioning // this is also used for LWT message as we set that up @@ -25,6 +25,9 @@ extern netdata_mutex_t aclk_shared_state_mutex; #error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" #endif +// Define ACLK Feature Version Boundaries Here +#define ACLK_V_COMPRESSION 2 + typedef enum aclk_cmd { ACLK_CMD_CLOUD, ACLK_CMD_ONCONNECT, @@ -32,7 +35,7 @@ typedef enum aclk_cmd { ACLK_CMD_CHART, ACLK_CMD_CHARTDEL, ACLK_CMD_ALARM, - ACLK_CMD_MAX + ACLK_CMD_CLOUD_QUERY_2 } ACLK_CMD; typedef enum aclk_metadata_state { @@ -64,18 +67,21 @@ typedef enum aclk_proxy_type { PROXY_NOT_SET, } ACLK_PROXY_TYPE; +extern int aclk_kill_link; // Tells the agent to tear down the link +extern int aclk_disable_runtime; + const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); #define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" #define ACLK_PROXY_ENV "env" #define ACLK_PROXY_CONFIG_VAR "proxy" +#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/" + ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); void safe_log_proxy_censor(char *proxy); int aclk_decode_base_url(char *url, char **aclk_hostname, char **aclk_port); const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); -extern int aclk_disable_runtime; - #endif //ACLK_COMMON_H diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index cf9a63ff6a..cd1cfd1e3e 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -1,6 +1,9 @@ #include "aclk_common.h" #include "aclk_query.h" #include "aclk_stats.h" +#include "aclk_rx_msgs.h" + +#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; @@ -18,7 +21,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex) struct aclk_query { - time_t created; + usec_t created; time_t run_after; // Delay run until after this time ACLK_CMD cmd; // What command is this char *topic; // Topic to respond to @@ -56,7 +59,7 @@ static void aclk_query_free(struct aclk_query *this_query) freez(this_query->topic); if (likely(this_query->query)) freez(this_query->query); - if (likely(this_query->data)) + if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) freez(this_query->data); if (likely(this_query->msg_id)) freez(this_query->msg_id); @@ -150,7 +153,7 @@ static struct aclk_query *aclk_query_find_position(time_t time_to_run) // Need to have a QUERY lock before calling this static struct aclk_query * -aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) +aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) { struct aclk_query *tmp_query, *prev_query; UNUSED(cmd); @@ -160,7 +163,7 @@ aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd while (tmp_query) { if (likely(!tmp_query->deleted)) { if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { - if ((!data || (data && strcmp(data, tmp_query->data) == 0)) && + if ((!data || data == tmp_query->data) && (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { if (likely(last_query)) *last_query = prev_query; @@ -178,7 +181,7 @@ aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd * Add a query to execute, the result will be send to the specified topic */ -int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) +int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) { struct aclk_query *new_query, *tmp_query; @@ -227,11 +230,9 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run new_query->msg_id = msg_id; } - if (data) - new_query->data = strdupz(data); - + new_query->data = data; new_query->next = NULL; - new_query->created = now_realtime_sec(); + new_query->created = now_realtime_usec(); new_query->run_after = run_after; debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); @@ -318,6 +319,26 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli #pragma region ACLK_QUERY #endif +static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url) +{ + usec_t t; + + t = now_monotonic_high_precision_usec(); + w->response.code = web_client_api_request_v1(host, w, url); + t = now_monotonic_high_precision_usec() - t; + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_q_process_total += t; + aclk_metrics_per_sample.cloud_q_process_count++; + if (aclk_metrics_per_sample.cloud_q_process_max < t) + aclk_metrics_per_sample.cloud_q_process_max = t; + ACLK_STATS_UNLOCK; + } + + return t; +} + static int aclk_execute_query(struct aclk_query *this_query) { if (strncmp(this_query->query, "/api/v1/", 8) == 0) { @@ -340,7 +361,7 @@ static int aclk_execute_query(struct aclk_query *this_query) mysep = strrchr(this_query->query, '/'); // TODO: handle bad response perhaps in a different way. For now it does to the payload - w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); now_realtime_timeval(&w->tv_ready); w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready @@ -375,16 +396,137 @@ static int aclk_execute_query(struct aclk_query *this_query) return 1; } +static int aclk_execute_query_v2(struct aclk_query *this_query) +{ + int retval = 0; + usec_t t; + BUFFER *local_buffer = NULL; + +#ifdef NETDATA_WITH_ZLIB + int z_ret; + BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *start, *end; +#endif + + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); + *mysep = '\0'; + } else + url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + + mysep = strrchr(this_query->query, '/'); + + // execute the query + t = aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + +#ifdef NETDATA_WITH_ZLIB + // check if gzip encoding can and should be used + if ((start = strstr((char *)this_query->data, WEB_HDR_ACCEPT_ENC))) { + start += strlen(WEB_HDR_ACCEPT_ENC); + end = strstr(start, "\x0D\x0A"); + start = strstr(start, "gzip"); + + if (start && start < end) { + w->response.zstream.zalloc = Z_NULL; + w->response.zstream.zfree = Z_NULL; + w->response.zstream.opaque = Z_NULL; + if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { + w->response.zinitialized = 1; + w->response.zoutput = 1; + } else + error("Failed to initialize zlib. Proceeding without compression."); + } + } + + if (w->response.data->len && w->response.zinitialized) { + w->response.zstream.next_in = (Bytef *)w->response.data->buffer; + w->response.zstream.avail_in = w->response.data->len; + do { + w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE; + w->response.zstream.next_out = w->response.zbuffer; + z_ret = deflate(&w->response.zstream, Z_FINISH); + if(z_ret < 0) { + if(w->response.zstream.msg) + error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); + else + error("Unknown error during zlib compression."); + retval = 1; + goto cleanup; + } + int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; + buffer_need_bytes(z_buffer, bytes_to_cpy); + memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy); + z_buffer->len += bytes_to_cpy; + } while(z_ret != Z_STREAM_END); + // so that web_client_build_http_header + // puts correct content lenght into header + buffer_free(w->response.data); + w->response.data = z_buffer; + z_buffer = NULL; + } +#endif + + now_realtime_timeval(&w->tv_ready); + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); + buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); + buffer_strcat(local_buffer, w->response.header_output->buffer); + + if (w->response.data->len) { +#ifdef NETDATA_WITH_ZLIB + if (w->response.zinitialized) { + buffer_need_bytes(local_buffer, w->response.data->len); + memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); + local_buffer->len += w->response.data->len; + } else { +#endif + buffer_strcat(local_buffer, w->response.data->buffer); +#ifdef NETDATA_WITH_ZLIB + } +#endif + } + + aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id); + +cleanup: +#ifdef NETDATA_WITH_ZLIB + if(w->response.zinitialized) + deflateEnd(&w->response.zstream); + buffer_free(z_buffer); +#endif + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + return retval; +} + /* * This function will fetch the next pending command and process it * */ -static int aclk_process_query(int t_idx) +static int aclk_process_query(struct aclk_query_thread *t_info) { struct aclk_query *this_query; static long int query_count = 0; ACLK_METADATA_STATE meta_state; - usec_t t = 0; + RRDHOST *host; if (!aclk_connected) return 0; @@ -401,9 +543,11 @@ static int aclk_process_query(int t_idx) } query_count++; + host = (RRDHOST*)this_query->data; + debug( - D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic, - this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created)); + D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic, + this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS); switch (this_query->cmd) { case ACLK_CMD_ONCONNECT: @@ -417,7 +561,9 @@ static int aclk_process_query(int t_idx) case ACLK_CMD_CHART: debug(D_ACLK, "EXECUTING a chart update command"); - aclk_send_single_chart(this_query->data, this_query->query); + if (!host) + fatal("Pointer to host compulsory"); + aclk_send_single_chart(host->hostname, this_query->query); break; case ACLK_CMD_CHARTDEL: @@ -432,10 +578,12 @@ static int aclk_process_query(int t_idx) break; case ACLK_CMD_CLOUD: - t = now_monotonic_high_precision_usec(); debug(D_ACLK, "EXECUTING a cloud command"); aclk_execute_query(this_query); - t = now_monotonic_high_precision_usec() - t; + break; + case ACLK_CMD_CLOUD_QUERY_2: + debug(D_ACLK, "EXECUTING Cloud Query v2"); + aclk_execute_query_v2(this_query); break; default: @@ -446,13 +594,7 @@ static int aclk_process_query(int t_idx) if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[t_idx]++; - if(this_query->cmd == ACLK_CMD_CLOUD) { - aclk_metrics_per_sample.cloud_q_process_total += t; - aclk_metrics_per_sample.cloud_q_process_count++; - if(aclk_metrics_per_sample.cloud_q_process_max < t) - aclk_metrics_per_sample.cloud_q_process_max = t; - } + aclk_queries_per_thread[t_info->idx]++; ACLK_STATS_UNLOCK; } @@ -553,6 +695,7 @@ void *aclk_query_main_thread(void *ptr) error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); aclk_shared_state.version_neg = ACLK_VERSION_MIN; + aclk_set_rx_handlers(aclk_shared_state.version_neg); } if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) { if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { @@ -566,7 +709,7 @@ void *aclk_query_main_thread(void *ptr) } ACLK_SHARED_STATE_UNLOCK; - while (aclk_process_query(info->idx)) { + while (aclk_process_query(info)) { // Process all commands }; diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h index 382b97d262..8c7d7cbd07 100644 --- a/aclk/aclk_query.h +++ b/aclk/aclk_query.h @@ -4,6 +4,7 @@ #define NETDATA_ACLK_QUERY_H #include "libnetdata/libnetdata.h" +#include "web/server/web_client.h" #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable @@ -25,7 +26,7 @@ struct aclk_query_threads { }; void *aclk_query_main_thread(void *ptr); -int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); +int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); void aclk_query_threads_start(struct aclk_query_threads *query_threads); void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c new file mode 100644 index 0000000000..b166d13449 --- /dev/null +++ b/aclk/aclk_rx_msgs.c @@ -0,0 +1,314 @@ + +#include "aclk_rx_msgs.h" + +#include "aclk_common.h" +#include "aclk_stats.h" +#include "aclk_query.h" + +static inline int aclk_extract_v2_data(char *payload, char **data) +{ + char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR); + if(!ptr) + return 1; + ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR); + *data = strdupz(ptr); + return 0; +} + +static inline int aclk_v2_payload_get_query(const char *payload, struct aclk_request *req) +{ + const char *start, *end; + + if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { + errno = 0; + error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); + return 1; + } + start = payload + 4; + + if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) { + errno = 0; + error("Doesn't look like HTTP GET request."); + return 1; + } + + req->payload = mallocz((end - start) + 1); + strncpyz(req->payload, start, end - start); + + return 0; +} + +#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\ + debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ + ACLK_SHARED_STATE_UNLOCK;\ + return 1;\ + }\ + ACLK_SHARED_STATE_UNLOCK; + +/* + * Parse the incoming payload and queue a command if valid + */ +static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + UNUSED(raw_payload); + HTTP_CHECK_AGENT_INITIALIZED(); + + errno = 0; + if (unlikely(cloud_to_agent->version != 1)) { + error( + "Received \"http\" message from Cloud with version %d, but ACLK version %d is used", + cloud_to_agent->version, + aclk_shared_state.version_neg); + return 1; + } + + if (unlikely(!cloud_to_agent->payload)) { + error("payload missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("callback_topic missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("msg_id missing"); + return 1; + } + + if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) + debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); + + return 0; +} + +static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + HTTP_CHECK_AGENT_INITIALIZED(); + + char *data; + + errno = 0; + if (cloud_to_agent->version < ACLK_V_COMPRESSION) { + error( + "This handler cannot reply to request with version older than %d, received %d.", + ACLK_V_COMPRESSION, + cloud_to_agent->version); + return 1; + } + + if (unlikely(aclk_extract_v2_data(raw_payload, &data))) { + error("Error extracting payload expected after the JSON dictionary."); + return 1; + } + + if (unlikely(aclk_v2_payload_get_query(data, cloud_to_agent))) + return 1; + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("Missing callback_topic"); + freez(data); + return 1; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("Missing msg_id"); + freez(data); + return 1; + } + + // aclk_queue_query takes ownership of data pointer + if (unlikely(aclk_queue_query( + cloud_to_agent->callback_topic, data, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, + ACLK_CMD_CLOUD_QUERY_2))) + debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); + + UNUSED(cloud_to_agent); + return 0; +} + +// This handles `version` message from cloud used to negotiate +// protocol version we will use +static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + UNUSED(raw_payload); + int version = -1; + errno = 0; + + if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { + error( + "Unsuported version of \"version\" message from cloud. Expected %d, Got %d", + ACLK_VERSION_NEG_VERSION, + cloud_to_agent->version); + return 1; + } + if (unlikely(!cloud_to_agent->min_version)) { + error("Min version missing or 0"); + return 1; + } + if (unlikely(!cloud_to_agent->max_version)) { + error("Max version missing or 0"); + return 1; + } + if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { + error( + "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, + cloud_to_agent->min_version); + return 1; + } + + if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { + error( + "Agent too old for this cloud. Minimum version required by cloud %d." + " Maximum version supported by this agent %d.", + cloud_to_agent->min_version, ACLK_VERSION_MAX); + aclk_kill_link = 1; + aclk_disable_runtime = 1; + return 1; + } + if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { + error( + "Cloud version is too old for this agent. Maximum version supported by cloud %d." + " Minimum (oldest) version supported by this agent %d.", + cloud_to_agent->max_version, ACLK_VERSION_MIN); + aclk_kill_link = 1; + return 1; + } + + version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); + + ACLK_SHARED_STATE_LOCK; + if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + errno = 0; + error("The \"version\" message came too late ignoring."); + goto err_cleanup; + } + if (unlikely(aclk_shared_state.version_neg)) { + errno = 0; + error("Version has already been set to %d", aclk_shared_state.version_neg); + goto err_cleanup; + } + aclk_shared_state.version_neg = version; + ACLK_SHARED_STATE_UNLOCK; + + info("Choosing version %d of ACLK", version); + + aclk_set_rx_handlers(version); + + return 0; + +err_cleanup: + ACLK_SHARED_STATE_UNLOCK; + return 1; +} + +typedef struct aclk_incoming_msg_type{ + char *name; + int(*fnc)(struct aclk_request *, char *); +}aclk_incoming_msg_type; + +aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = { + { .name = "http", .fnc = aclk_handle_cloud_request_v1 }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; + +aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { + { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; + +struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1; + +void aclk_set_rx_handlers(int version) +{ + if(version >= ACLK_V_COMPRESSION) { + aclk_incoming_msg_types = aclk_incoming_msg_types_compression; + return; + } + + aclk_incoming_msg_types = aclk_incoming_msg_types_v1; +} + +int aclk_handle_cloud_message(char *payload) +{ + struct aclk_request cloud_to_agent; + memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_recvd++; + ACLK_STATS_UNLOCK; + } + + if (unlikely(!payload)) { + errno = 0; + error("ACLK incoming message is empty"); + goto err_cleanup_nojson; + } + + debug(D_ACLK, "ACLK incoming message (%s)", payload); + + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(rc != JSON_OK)) { + errno = 0; + error("Malformed json request (%s)", payload); + goto err_cleanup; + } + + if (!cloud_to_agent.type_id) { + errno = 0; + error("Cloud message is missing compulsory key \"type\""); + goto err_cleanup; + } + + if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { + error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); + goto err_cleanup; + } + + for (int i = 0; aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { + if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { + // in case of success handler is supposed to clean up after itself + // or as in the case of aclk_handle_cloud_request take + // ownership of the pointers (done to avoid copying) + // see what `aclk_queue_query` parameter `internal` does + + // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! + // msg handlers (namely aclk_handle_version_responce) + // can freely change what aclk_incoming_msg_types points to + // so either exit or restart this for loop + freez(cloud_to_agent.type_id); + return 0; + } + goto err_cleanup; + } + } + + errno = 0; + error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id); + +err_cleanup: + if (cloud_to_agent.payload) + freez(cloud_to_agent.payload); + if (cloud_to_agent.type_id) + freez(cloud_to_agent.type_id); + if (cloud_to_agent.msg_id) + freez(cloud_to_agent.msg_id); + if (cloud_to_agent.callback_topic) + freez(cloud_to_agent.callback_topic); + +err_cleanup_nojson: + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; + } + + return 1; +} diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h new file mode 100644 index 0000000000..66a30576c2 --- /dev/null +++ b/aclk/aclk_rx_msgs.h @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_RX_MSGS_H +#define NETDATA_ACLK_RX_MSGS_H + +#include "../daemon/common.h" +#include "libnetdata/libnetdata.h" + +int aclk_handle_cloud_message(char *payload); +void aclk_set_rx_handlers(int version); + + +#endif /* NETDATA_ACLK_RX_MSGS_H */ diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index ee64b1faf9..27d95c0002 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -18,7 +18,6 @@ static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_force_reconnect = 0; // Indication from lower layers -int aclk_kill_link = 0; // Tell the agent to tear down the link usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer @@ -1074,7 +1073,7 @@ exited: * If base_topic is missing then the global_base_topic will be used (if available) * */ -int aclk_send_message(char *sub_topic, char *message, char *msg_id) +int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id) { int rc; int mid; @@ -1098,7 +1097,7 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id) } ACLK_LOCK; - rc = _link_send_message(final_topic, (unsigned char *)message, &mid); + rc = _link_send_message(final_topic, message, len, &mid); // TODO: link the msg_id with the mid so we can trace it ACLK_UNLOCK; @@ -1110,6 +1109,11 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id) return rc; } +int aclk_send_message(char *sub_topic, char *message, char *msg_id) +{ + return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id); +} + /* * Subscribe to a topic in the cloud * The final subscription will be in the form @@ -1415,7 +1419,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (aclk_popcorn_check_bump()) return 0; - if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) { + if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue chart_update command"); @@ -1478,184 +1482,3 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; } - -/* - * Parse the incoming payload and queue a command if valid - */ -static int aclk_handle_cloud_request(struct aclk_request *cloud_to_agent) -{ - errno = 0; - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { - debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state"); - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - - if (unlikely(cloud_to_agent->version != aclk_shared_state.version_neg)) { - error("Received \"http\" message from Cloud with version %d, but ACLK version %d is used", cloud_to_agent->version, aclk_shared_state.version_neg); - return 1; - } - - if (unlikely(!cloud_to_agent->payload)) { - error("payload missing"); - return 1; - } - - if (unlikely(!cloud_to_agent->callback_topic)) { - error("callback_topic missing"); - return 1; - } - - if (unlikely(!cloud_to_agent->msg_id)) { - error("msg_id missing"); - return 1; - } - - if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) - debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); - - // Note: the payload comes from the callback and it will be automatically freed - return 0; -} - -// This handles `version` message from cloud used to negotiate -// protocol version we will use -static int aclk_handle_version_response(struct aclk_request *cloud_to_agent) -{ - int version = -1; - errno = 0; - - if(unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { - error("Unsuported version of \"version\" message from cloud. Expected %d, Got %d", ACLK_VERSION_NEG_VERSION, cloud_to_agent->version); - return 1; - } - if(unlikely(!cloud_to_agent->min_version)) { - error("Min version missing or 0"); - return 1; - } - if(unlikely(!cloud_to_agent->max_version)) { - error("Max version missing or 0"); - return 1; - } - if(unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { - error("Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, cloud_to_agent->min_version); - return 1; - } - - if(unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { - error("Agent too old for this cloud. Minimum version required by cloud %d. Maximum version supported by this agent %d.", cloud_to_agent->min_version, ACLK_VERSION_MAX); - aclk_kill_link = 1; - aclk_disable_runtime = 1; - return 1; - } - if(unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { - error("Cloud version is too old for this agent. Maximum version supported by |