diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-06-06 12:05:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-06 12:05:32 +0200 |
commit | 123e923f76ad2cc458dd98b01ba8b29f2306bee2 (patch) | |
tree | 3f57835986e0381c6ab973b142ad8c42398f82f5 /aclk | |
parent | e6f1aeb54c5b17f7ad5315fd97c6e1dcdd03207e (diff) |
Allow usage of the new MQTT 5 implementation (#12838)
* adds support for new MQTT5 implementation in agent, currently by default disabled as Tech Preview
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/README.md | 2 | ||||
-rw-r--r-- | aclk/aclk.c | 98 | ||||
-rw-r--r-- | aclk/aclk.h | 3 | ||||
-rw-r--r-- | aclk/aclk_alarm_api.c | 3 | ||||
-rw-r--r-- | aclk/aclk_api.c | 3 | ||||
-rw-r--r-- | aclk/aclk_api.h | 3 | ||||
-rw-r--r-- | aclk/aclk_query.c | 20 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 12 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 2 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 33 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 66 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.h | 3 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_creation.h | 6 |
13 files changed, 130 insertions, 124 deletions
diff --git a/aclk/README.md b/aclk/README.md index 09c0d2920f..f595726e3d 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -50,10 +50,12 @@ You can configure following keys in the `netdata.conf` section `[cloud]`: [cloud] statistics = yes query thread count = 2 + mqtt5 = no ``` - `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. - `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). +- `mqtt5` enables the new MQTT5 protocol implementation in the Agent. Currently a technical preview. ## Disable the ACLK diff --git a/aclk/aclk.c b/aclk/aclk.c index 4aff9ef9f8..6426c5b5e7 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -12,6 +12,7 @@ #include "aclk_rx_msgs.h" #include "aclk_collector_list.h" #include "https_client.h" +#include "schema-wrappers/schema_wrappers.h" #include "aclk_proxy.h" @@ -172,7 +173,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) case MQTT_WSS_LOG_ERROR: case MQTT_WSS_LOG_FATAL: case MQTT_WSS_LOG_WARN: - error("%s", str); + error_report("%s", str); return; case MQTT_WSS_LOG_INFO: info("%s", str); @@ -391,7 +392,7 @@ static inline void queue_connect_payloads(void) static inline void mqtt_connected_actions(mqtt_wss_client client) { - const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); + char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) error("Unable to fetch topic for COMMAND (to subscribe)"); @@ -400,7 +401,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) #ifdef ENABLE_NEW_CLOUD_PROTOCOL if (aclk_use_new_cloud_arch) { - topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); if (!topic) error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); else @@ -800,10 +801,12 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; + use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); + #ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { #else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { #endif error("Couldn't initialize MQTT_WSS network library"); goto exit; @@ -1041,6 +1044,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu aclk_queue_query(query); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; @@ -1060,28 +1064,40 @@ void aclk_host_state_update(RRDHOST *host, int cmd) aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); rrdhost_aclk_state_lock(localhost); - create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_instance_creation_t node_instance_creation = { + .claim_id = localhost->aclk_state.claimed_id, + .hops = host->system_info->hops, + .hostname = host->hostname, + .machine_guid = host->machine_guid + }; + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - create_query->data.node_creation.hops = (uint32_t) host->system_info->hops; - create_query->data.node_creation.hostname = strdupz(host->hostname); - create_query->data.node_creation.machine_guid = strdupz(host->machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); aclk_queue_query(create_query); return; } aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); - query->data.node_update.hops = (uint32_t) host->system_info->hops; + node_instance_connection_t node_state_update = { + .hops = host->system_info->hops, + .live = cmd, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)node_state_update.node_id); rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - query->data.node_update.live = cmd; - query->data.node_update.node_id = mallocz(UUID_STR_LEN); - uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id); - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd, + + info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; aclk_queue_query(query); } @@ -1096,39 +1112,52 @@ void aclk_send_node_instances() while (!uuid_is_null(list->host_id)) { if (!uuid_is_null(list->node_id)) { aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + node_instance_connection_t node_state_update = { + .live = list->live, + .hops = list->hops, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id); rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - query->data.node_update.live = list->live; - query->data.node_update.hops = list->hops; - query->data.node_update.node_id = mallocz(UUID_STR_LEN); - uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - freez(list->hostname); - info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id, + info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; aclk_queue_query(query); } else { aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); + node_instance_creation_t node_instance_creation = { + .hops = list->hops, + .hostname = list->hostname, + }; + node_instance_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; rrdhost_aclk_state_lock(localhost); - create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + node_instance_creation.claim_id = localhost->aclk_state.claimed_id, + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - create_query->data.node_creation.hops = list->hops; - create_query->data.node_creation.hostname = list->hostname; - create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN); - uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid); - info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid, + info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); + freez(node_instance_creation.machine_guid); aclk_queue_query(create_query); } + freez(list->hostname); list++; } freez(list_head); } +#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { @@ -1208,7 +1237,7 @@ char *ng_aclk_state(void) "Protocols Supported: Legacy\n" #endif ); - buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); char *agent_id = is_agent_claimed(); if (agent_id == NULL) @@ -1408,6 +1437,9 @@ char *ng_aclk_state_json(void) tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); json_object_object_add(msg, "used-cloud-protocol", tmp); + tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); + json_object_object_add(msg, "mqtt-version", tmp); + tmp = json_object_new_int(aclk_rcvd_cloud_msgs); json_object_object_add(msg, "received-app-layer-msgs", tmp); diff --git a/aclk/aclk.h b/aclk/aclk.h index 4d85463141..41c4e05e40 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -43,9 +43,10 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create); void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd); - void aclk_send_node_instances(void); +#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index 0db599be20..a181eb291f 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -24,7 +24,8 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); - freez(payload); + if (!use_mqtt_5) + freez(payload); } void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index 766f78053a..a2e738ab1a 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -16,6 +16,7 @@ int aclk_disable_runtime = 0; int aclk_disable_single_updates = 0; int aclk_stats_enabled; +int use_mqtt_5 = 0; #define ACLK_IMPL_KEY_NAME "aclk implementation" @@ -68,6 +69,8 @@ struct label *add_aclk_host_labels(struct label *label) { break; } + int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); + label = add_label_to_list(label, "_mqtt_version", mqtt5 ? "5" : "3", LABEL_SOURCE_AUTO); label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO); label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); #ifdef ENABLE_NEW_CLOUD_PROTOCOL diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index 9958b0e116..557b70d70a 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -21,6 +21,7 @@ extern int aclk_stats_enabled; extern int aclk_alert_reloaded; extern int aclk_ng; +extern int use_mqtt_5; #ifdef ENABLE_ACLK void *aclk_starter(void *ptr); @@ -36,7 +37,9 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int connect); +#endif #define NETDATA_ACLK_HOOK \ { .name = "ACLK_Main", \ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 5371673c1a..de970fc3d6 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -292,22 +292,6 @@ static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_qu } #ifdef ENABLE_NEW_CLOUD_PROTOCOL -static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) { - // TODO create a pending registrations list - // with some timeouts to detect registration requests that - // go unanswered from the cloud - aclk_generate_node_registration(query_thr->client, &query->data.node_creation); - return 0; -} - -static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) { - // TODO create a pending registrations list - // with some timeouts to detect registration requests that - // go unanswered from the cloud - aclk_generate_node_state_update(query_thr->client, &query->data.node_update); - return 0; -} - static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { // this will be simplified when legacy support is removed @@ -324,8 +308,8 @@ aclk_query_handler aclk_query_handlers[] = { { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, #ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node }, - { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update }, + { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg }, + { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg }, { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 74a8992263..2422b01e12 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -121,16 +121,7 @@ void aclk_query_free(aclk_query_t query) break; case NODE_STATE_UPDATE: - freez((void*)query->data.node_update.claim_id); - freez((void*)query->data.node_update.node_id); - break; - case REGISTER_NODE: - freez((void*)query->data.node_creation.claim_id); - freez((void*)query->data.node_creation.hostname); - freez((void*)query->data.node_creation.machine_guid); - break; - case CHART_DIMS_UPDATE: case CHART_CONFIG_UPDATED: case CHART_RESET: @@ -139,7 +130,8 @@ void aclk_query_free(aclk_query_t query) case ALARM_LOG_HEALTH: case ALARM_PROVIDE_CFG: case ALARM_SNAPSHOT: - freez(query->data.bin_payload.payload); + if (!use_mqtt_5) + freez(query->data.bin_payload.payload); break; default: diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index fbc39aee93..0b5ef8faaa 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -77,8 +77,6 @@ struct aclk_query { struct aclk_query_metadata metadata_alarms; struct aclk_query_http_api_v2 http_api_v2; struct aclk_query_chart_add_del chart_add_del; - node_instance_creation_t node_creation; - node_instance_connection_t node_update; struct aclk_bin_payload bin_payload; json_object *alarm_update; } data; diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 00310749bd..27f1bf2dc7 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -277,32 +277,39 @@ int create_node_instance_result(const char *msg, size_t msg_len) update_node_id(&host_id, &node_id); aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); - query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded - rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); - rrdhost_aclk_state_unlock(localhost); + node_instance_connection_t node_state_update = { + .hops = 1, + .live = 0, + .queryable = 1, + .session_id = aclk_session_newarch, + .node_id = res.node_id + }; RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); - query->data.node_update.live = 0; - if (host) { // not all host must have RRDHOST struct created for them // if they never connected during runtime of agent if (host == localhost) { - query->data.node_update.live = 1; - query->data.node_update.hops = 0; + node_state_update.live = 1; + node_state_update.hops = 0; } else { netdata_mutex_lock(&host->receiver_lock); - query->data.node_update.live = (host->receiver != NULL); + node_state_update.live = (host->receiver != NULL); netdata_mutex_unlock(&host->receiver_lock); - query->data.node_update.hops = host->system_info->hops; + node_state_update.hops = host->system_info->hops; } } - query->data.node_update.node_id = res.node_id; // aclk_query_free will free it - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; + rrdhost_aclk_state_lock(localhost); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + rrdhost_aclk_state_unlock(localhost); + + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); + freez(res.node_id); freez(res.machine_guid); return 0; } diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index d7254fe596..3530dccff8 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -49,7 +49,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return 0; } - mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + if (use_mqtt_5) + mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + else + mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -125,7 +129,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); - return 500; + return HTTP_RESP_INTERNAL_SERVER_ERROR; } str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); @@ -149,17 +153,22 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); #endif */ - rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { - error("Timeout sending binpacked message"); - freez(full_msg); - return 503; - } - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { - error("Message is bigger than allowed maximum"); - freez(full_msg); - return 403; + if (use_mqtt_5) + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id); + else { + rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); + if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { + error("Timeout sending binpacked message"); + freez(full_msg); + return HTTP_RESP_BACKEND_FETCH_FAILED; + } + if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { + error("Message is bigger than allowed maximum"); + freez(full_msg); + return HTTP_RESP_FORBIDDEN; + } } + #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif @@ -363,13 +372,13 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg json_object_put(msg); switch (rc) { - case 403: + case HTTP_RESP_FORBIDDEN: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len); break; - case 500: + case HTTP_RESP_INTERNAL_SERVER_ERROR: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len); break; - case 503: + case HTTP_RESP_BACKEND_FETCH_FAILED: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len); break; } @@ -490,7 +499,8 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable } pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); - freez(msg); + if (!use_mqtt_5) + freez(msg); if (localhost->aclk_state.prev_claimed_id) { freez(localhost->aclk_state.prev_claimed_id); localhost->aclk_state.prev_claimed_id = NULL; @@ -522,30 +532,6 @@ char *aclk_generate_lwt(size_t *size) { return msg; } - -void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) { - size_t len; - char *msg = generate_node_instance_creation(&len, node_creation); - if (!msg) { - error("Error generating nodeinstance::create::v1::CreateNodeInstance"); - return; - } - - aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); - freez(msg); -} - -void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) { - size_t len; - char *msg = generate_node_instance_connection(&len, node_connection); - if (!msg) { - error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection"); - return; - } - - aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); - freez(msg); -} #endif /* ENABLE_NEW_CLOUD_PROTOCOL */ #ifndef __GNUC__ diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index 402f13fb65..44281eb688 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -28,9 +28,6 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message); // new protobuf msgs uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable); char *aclk_generate_lwt(size_t *size); - -void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation); -void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection); #endif #endif diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h index 71e45ef55e..190ccb4d6e 100644 --- a/aclk/schema-wrappers/node_creation.h +++ b/aclk/schema-wrappers/node_creation.h @@ -8,9 +8,9 @@ extern "C" { #endif typedef struct { - const char* claim_id; - const char* machine_guid; - const char* hostname; + char* claim_id; + char* machine_guid; + char* hostname; int32_t hops; } node_instance_creation_t; |