summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-01-11 14:33:27 +0100
committerGitHub <noreply@github.com>2022-01-11 14:33:27 +0100
commite102adc6239ef10405bcda18c02adb95da902f64 (patch)
tree73f6e65acc95e7df884c1eb2df24c6a46c065048
parent4919103c4b715a83ebcc677431dab6ecbacabc9c (diff)
ACLK-NG remove 'cmd' switch by message type (#11866)
* remove legacy protocl rx msg switch
-rw-r--r--aclk/aclk.c2
-rw-r--r--aclk/aclk_rx_msgs.c93
-rw-r--r--aclk/aclk_rx_msgs.h2
3 files changed, 39 insertions, 58 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 8a2710aaea..fea83a1f8f 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -222,7 +222,7 @@ static void msg_callback_old_protocol(const char *topic, const void *msg, size_t
return;
}
- aclk_handle_cloud_message(cmsg);
+ aclk_handle_cloud_cmd_message(cmsg);
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index ee9085565f..ecb2b4179d 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -119,7 +119,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
}\
ACLK_SHARED_STATE_UNLOCK;
-static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
+static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
if (!aclk_use_new_cloud_arch) {
HTTP_CHECK_AGENT_INITIALIZED();
@@ -172,73 +172,43 @@ error:
return 1;
}
-typedef struct aclk_incoming_msg_type{
- char *name;
- int(*fnc)(struct aclk_request *, char *);
-}aclk_incoming_msg_type;
-
-aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
- { .name = "http", .fnc = aclk_handle_cloud_request_v2 },
- { .name = NULL, .fnc = NULL }
-};
-
-struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
-
-int aclk_handle_cloud_message(char *payload)
+int aclk_handle_cloud_cmd_message(char *payload)
{
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
-
if (unlikely(!payload)) {
- errno = 0;
- error("ACLK incoming message is empty");
- goto err_cleanup_nojson;
+ error_report("ACLK incoming 'cmd' message is empty");
+ return 1;
}
- debug(D_ACLK, "ACLK incoming message (%s)", payload);
+ debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload);
int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
if (unlikely(rc != JSON_OK)) {
- errno = 0;
- error("Malformed json request (%s)", payload);
+ error_report("Malformed json request (%s)", payload);
goto err_cleanup;
}
if (!cloud_to_agent.type_id) {
- errno = 0;
- error("Cloud message is missing compulsory key \"type\"");
+ error_report("Cloud message is missing compulsory key \"type\"");
goto err_cleanup;
}
-
- for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
- if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
- if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
- // in case of success handler is supposed to clean up after itself
- // or as in the case of aclk_handle_cloud_request take
- // ownership of the pointers (done to avoid copying)
- // see what `aclk_queue_query` parameter `internal` does
-
- // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
- // msg handlers (namely aclk_handle_version_response)
- // can freely change what aclk_incoming_msg_types points to
- // so either exit or restart this for loop
- freez(cloud_to_agent.type_id);
- return 0;
- }
- goto err_cleanup;
- }
+ // Originally we were expecting to have multiple types of 'cmd' message,
+ // but after the new protocol was designed we will ever only have 'http'
+ if (strcmp(cloud_to_agent.type_id, "http")) {
+ error_report("Only 'http' cmd message is supported");
+ goto err_cleanup;
}
- errno = 0;
- error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
+ if (likely(!aclk_handle_cloud_http_request_v2(&cloud_to_agent, payload))) {
+ // aclk_handle_cloud_request takes ownership of the pointers
+ // (to avoid copying) in case of success
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
err_cleanup:
if (cloud_to_agent.payload)
@@ -250,13 +220,6 @@ err_cleanup:
if (cloud_to_agent.callback_topic)
freez(cloud_to_agent.callback_topic);
-err_cleanup_nojson:
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
- }
-
return 1;
}
@@ -272,7 +235,10 @@ int handle_old_proto_cmd(const char *msg, size_t msg_len)
char *str = mallocz(msg_len+1);
memcpy(str, msg, msg_len);
str[msg_len] = 0;
- aclk_handle_cloud_message(str);
+ if (aclk_handle_cloud_cmd_message(str)) {
+ freez(str);
+ return 1;
+ }
freez(str);
return 0;
}
@@ -507,14 +473,29 @@ void aclk_init_rx_msg_handlers(void)
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
{
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
+ }
new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type));
debug(D_ACLK, "Got message named '%s' from cloud", message_type);
if (unlikely(!msg_descriptor)) {
error("Do not know how to handle message of type '%s'. Ignoring", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
return;
}
if (msg_descriptor->fnc(msg, msg_len)) {
error("Error processing message of type '%s'", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
return;
}
}
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index 9e7377e9e1..38243a4c93 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -8,7 +8,7 @@
#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
-int aclk_handle_cloud_message(char *payload);
+int aclk_handle_cloud_cmd_message(char *payload);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_init_rx_msg_handlers(void);