summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-06-06 12:05:32 +0200
committerGitHub <noreply@github.com>2022-06-06 12:05:32 +0200
commit123e923f76ad2cc458dd98b01ba8b29f2306bee2 (patch)
tree3f57835986e0381c6ab973b142ad8c42398f82f5
parente6f1aeb54c5b17f7ad5315fd97c6e1dcdd03207e (diff)
Allow usage of the new MQTT 5 implementation (#12838)
* adds support for new MQTT5 implementation in agent, currently by default disabled as Tech Preview
-rw-r--r--CMakeLists.txt5
-rw-r--r--Makefile.am5
-rw-r--r--aclk/README.md2
-rw-r--r--aclk/aclk.c98
-rw-r--r--aclk/aclk.h3
-rw-r--r--aclk/aclk_alarm_api.c3
-rw-r--r--aclk/aclk_api.c3
-rw-r--r--aclk/aclk_api.h3
-rw-r--r--aclk/aclk_query.c20
-rw-r--r--aclk/aclk_query_queue.c12
-rw-r--r--aclk/aclk_query_queue.h2
-rw-r--r--aclk/aclk_rx_msgs.c33
-rw-r--r--aclk/aclk_tx_msgs.c66
-rw-r--r--aclk/aclk_tx_msgs.h3
-rw-r--r--aclk/schema-wrappers/node_creation.h6
m---------mqtt_websockets38
-rw-r--r--streaming/receiver.c8
17 files changed, 164 insertions, 146 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index cd5d68a8c5..378e8a6832 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -817,6 +817,11 @@ set(ACLK_FILES
mqtt_websockets/src/include/mqtt_wss_log.h
mqtt_websockets/src/ws_client.c
mqtt_websockets/src/include/ws_client.h
+ mqtt_websockets/src/mqtt_ng.c
+ mqtt_websockets/src/include/mqtt_ng.h
+ mqtt_websockets/src/common_public.c
+ mqtt_websockets/src/include/common_public.h
+ mqtt_websockets/src/include/common_internal.h
mqtt_websockets/c-rbuf/src/ringbuffer.c
mqtt_websockets/c-rbuf/include/ringbuffer.h
mqtt_websockets/c-rbuf/src/ringbuffer_internal.h
diff --git a/Makefile.am b/Makefile.am
index 65b5f22e8e..6241fc1028 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -628,6 +628,11 @@ ACLK_FILES = \
mqtt_websockets/src/include/mqtt_wss_log.h \
mqtt_websockets/src/ws_client.c \
mqtt_websockets/src/include/ws_client.h \
+ mqtt_websockets/src/mqtt_ng.c \
+ mqtt_websockets/src/include/mqtt_ng.h \
+ mqtt_websockets/src/common_public.c \
+ mqtt_websockets/src/include/common_public.h \
+ mqtt_websockets/src/include/common_internal.h \
mqtt_websockets/c-rbuf/src/ringbuffer.c \
mqtt_websockets/c-rbuf/include/ringbuffer.h \
mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \
diff --git a/aclk/README.md b/aclk/README.md
index 09c0d2920f..f595726e3d 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -50,10 +50,12 @@ You can configure following keys in the `netdata.conf` section `[cloud]`:
[cloud]
statistics = yes
query thread count = 2
+ mqtt5 = no
```
- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
+- `mqtt5` enables the new MQTT5 protocol implementation in the Agent. Currently a technical preview.
## Disable the ACLK
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 4aff9ef9f8..6426c5b5e7 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -12,6 +12,7 @@
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
#include "https_client.h"
+#include "schema-wrappers/schema_wrappers.h"
#include "aclk_proxy.h"
@@ -172,7 +173,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
case MQTT_WSS_LOG_ERROR:
case MQTT_WSS_LOG_FATAL:
case MQTT_WSS_LOG_WARN:
- error("%s", str);
+ error_report("%s", str);
return;
case MQTT_WSS_LOG_INFO:
info("%s", str);
@@ -391,7 +392,7 @@ static inline void queue_connect_payloads(void)
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
- const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+ char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
error("Unable to fetch topic for COMMAND (to subscribe)");
@@ -400,7 +401,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch) {
- topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
if (!topic)
error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
else
@@ -800,10 +801,12 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
+ use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
+
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
#else
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) {
#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
@@ -1041,6 +1044,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
aclk_queue_query(query);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
@@ -1060,28 +1064,40 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
rrdhost_aclk_state_lock(localhost);
- create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = host->hostname,
+ .machine_guid = host->machine_guid
+ };
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = (uint32_t) host->system_info->hops;
- create_query->data.node_creation.hostname = strdupz(host->hostname);
- create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
aclk_queue_query(create_query);
return;
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = (uint32_t) host->system_info->hops;
+ node_instance_connection_t node_state_update = {
+ .hops = host->system_info->hops,
+ .live = cmd,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- query->data.node_update.live = cmd;
- query->data.node_update.node_id = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd,
+
+ info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
host->system_info->hops);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
aclk_queue_query(query);
}
@@ -1096,39 +1112,52 @@ void aclk_send_node_instances()
while (!uuid_is_null(list->host_id)) {
if (!uuid_is_null(list->node_id)) {
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .live = list->live,
+ .hops = list->hops,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- query->data.node_update.live = list->live;
- query->data.node_update.hops = list->hops;
- query->data.node_update.node_id = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- freez(list->hostname);
- info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
+ info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
aclk_queue_query(query);
} else {
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
+ node_instance_creation_t node_instance_creation = {
+ .hops = list->hops,
+ .hostname = list->hostname,
+ };
+ node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
rrdhost_aclk_state_lock(localhost);
- create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = list->hops;
- create_query->data.node_creation.hostname = list->hostname;
- create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
- info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid,
+ info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
+ freez(node_instance_creation.machine_guid);
aclk_queue_query(create_query);
}
+ freez(list->hostname);
list++;
}
freez(list_head);
}
+#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
@@ -1208,7 +1237,7 @@ char *ng_aclk_state(void)
"Protocols Supported: Legacy\n"
#endif
);
- buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
+ buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3);
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
@@ -1408,6 +1437,9 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
json_object_object_add(msg, "used-cloud-protocol", tmp);
+ tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
+ json_object_object_add(msg, "mqtt-version", tmp);
+
tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
json_object_object_add(msg, "received-app-layer-msgs", tmp);
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 4d85463141..41c4e05e40 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -43,9 +43,10 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd);
-
void aclk_send_node_instances(void);
+#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c
index 0db599be20..a181eb291f 100644
--- a/aclk/aclk_alarm_api.c
+++ b/aclk/aclk_alarm_api.c
@@ -24,7 +24,8 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
- freez(payload);
+ if (!use_mqtt_5)
+ freez(payload);
}
void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index 766f78053a..a2e738ab1a 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -16,6 +16,7 @@ int aclk_disable_runtime = 0;
int aclk_disable_single_updates = 0;
int aclk_stats_enabled;
+int use_mqtt_5 = 0;
#define ACLK_IMPL_KEY_NAME "aclk implementation"
@@ -68,6 +69,8 @@ struct label *add_aclk_host_labels(struct label *label) {
break;
}
+ int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
+ label = add_label_to_list(label, "_mqtt_version", mqtt5 ? "5" : "3", LABEL_SOURCE_AUTO);
label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
index 9958b0e116..557b70d70a 100644
--- a/aclk/aclk_api.h
+++ b/aclk/aclk_api.h
@@ -21,6 +21,7 @@ extern int aclk_stats_enabled;
extern int aclk_alert_reloaded;
extern int aclk_ng;
+extern int use_mqtt_5;
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr);
@@ -36,7 +37,9 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int connect);
+#endif
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 5371673c1a..de970fc3d6 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -292,22 +292,6 @@ static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_qu
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) {
- // TODO create a pending registrations list
- // with some timeouts to detect registration requests that
- // go unanswered from the cloud
- aclk_generate_node_registration(query_thr->client, &query->data.node_creation);
- return 0;
-}
-
-static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) {
- // TODO create a pending registrations list
- // with some timeouts to detect registration requests that
- // go unanswered from the cloud
- aclk_generate_node_state_update(query_thr->client, &query->data.node_update);
- return 0;
-}
-
static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
// this will be simplified when legacy support is removed
@@ -324,8 +308,8 @@ aclk_query_handler aclk_query_handlers[] = {
{ .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node },
- { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update },
+ { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg },
+ { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg },
{ .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
{ .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
{ .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 74a8992263..2422b01e12 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -121,16 +121,7 @@ void aclk_query_free(aclk_query_t query)
break;
case NODE_STATE_UPDATE:
- freez((void*)query->data.node_update.claim_id);
- freez((void*)query->data.node_update.node_id);
- break;
-
case REGISTER_NODE:
- freez((void*)query->data.node_creation.claim_id);
- freez((void*)query->data.node_creation.hostname);
- freez((void*)query->data.node_creation.machine_guid);
- break;
-
case CHART_DIMS_UPDATE:
case CHART_CONFIG_UPDATED:
case CHART_RESET:
@@ -139,7 +130,8 @@ void aclk_query_free(aclk_query_t query)
case ALARM_LOG_HEALTH:
case ALARM_PROVIDE_CFG:
case ALARM_SNAPSHOT:
- freez(query->data.bin_payload.payload);
+ if (!use_mqtt_5)
+ freez(query->data.bin_payload.payload);
break;
default:
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index fbc39aee93..0b5ef8faaa 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -77,8 +77,6 @@ struct aclk_query {
struct aclk_query_metadata metadata_alarms;
struct aclk_query_http_api_v2 http_api_v2;
struct aclk_query_chart_add_del chart_add_del;
- node_instance_creation_t node_creation;
- node_instance_connection_t node_update;
struct aclk_bin_payload bin_payload;
json_object *alarm_update;
} data;
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 00310749bd..27f1bf2dc7 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -277,32 +277,39 @@ int create_node_instance_result(const char *msg, size_t msg_len)
update_node_id(&host_id, &node_id);
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
- rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
- rrdhost_aclk_state_unlock(localhost);
+ node_instance_connection_t node_state_update = {
+ .hops = 1,
+ .live = 0,
+ .queryable = 1,
+ .session_id = aclk_session_newarch,
+ .node_id = res.node_id
+ };
RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
- query->data.node_update.live = 0;
-
if (host) {
// not all host must have RRDHOST struct created for them
// if they never connected during runtime of agent
if (host == localhost) {
- query->data.node_update.live = 1;
- query->data.node_update.hops = 0;
+ node_state_update.live = 1;
+ node_state_update.hops = 0;
} else {
netdata_mutex_lock(&host->receiver_lock);
- query->data.node_update.live = (host->receiver != NULL);
+ node_state_update.live = (host->receiver != NULL);
netdata_mutex_unlock(&host->receiver_lock);
- query->data.node_update.hops = host->system_info->hops;
+ node_state_update.hops = host->system_info->hops;
}
}
- query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
+ rrdhost_aclk_state_lock(localhost);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ rrdhost_aclk_state_unlock(localhost);
+
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+
aclk_queue_query(query);
+ freez(res.node_id);
freez(res.machine_guid);
return 0;
}
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index d7254fe596..3530dccff8 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -49,7 +49,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return 0;
}
- mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ if (use_mqtt_5)
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ else
+ mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -125,7 +129,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
- return 500;
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
@@ -149,17 +153,22 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
- rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
- error("Timeout sending binpacked message");
- freez(full_msg);
- return 503;
- }
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
- error("Message is bigger than allowed maximum");
- freez(full_msg);
- return 403;
+ if (use_mqtt_5)
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
+ else {
+ rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
+ error("Timeout sending binpacked message");
+ freez(full_msg);
+ return HTTP_RESP_BACKEND_FETCH_FAILED;
+ }
+ if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
+ error("Message is bigger than allowed maximum");
+ freez(full_msg);
+ return HTTP_RESP_FORBIDDEN;
+ }
}
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -363,13 +372,13 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
json_object_put(msg);
switch (rc) {
- case 403:
+ case HTTP_RESP_FORBIDDEN:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
break;
- case 500:
+ case HTTP_RESP_INTERNAL_SERVER_ERROR:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
break;
- case 503:
+ case HTTP_RESP_BACKEND_FETCH_FAILED:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
break;
}
@@ -490,7 +499,8 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
}
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
- freez(msg);
+ if (!use_mqtt_5)
+ freez(msg);
if (localhost->aclk_state.prev_claimed_id) {
freez(localhost->aclk_state.prev_claimed_id);
localhost->aclk_state.prev_claimed_id = NULL;
@@ -522,30 +532,6 @@ char *aclk_generate_lwt(size_t *size) {
return msg;
}
-
-void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) {
- size_t len;
- char *msg = generate_node_instance_creation(&len, node_creation);
- if (!msg) {
- error("Error generating nodeinstance::create::v1::CreateNodeInstance");
- return;
- }
-
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
- freez(msg);
-}
-
-void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) {
- size_t len;
- char *msg = generate_node_instance_connection(&len, node_connection);
- if (!msg) {
- error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection");
- return;
- }
-
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
- freez(msg);
-}
#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
#ifndef __GNUC__
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index 402f13fb65..44281eb688 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -28,9 +28,6 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message);
// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
char *aclk_generate_lwt(size_t *size);
-
-void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation);
-void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection);
#endif
#endif
diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h
index 71e45ef55e..190ccb4d6e 100644
--- a/aclk/schema-wrappers/node_creation.h
+++ b/aclk/schema-wrappers/node_creation.h
@@ -8,9 +8,9 @@ extern "C" {
#endif
typedef struct {
- const char* claim_id;
- const char* machine_guid;
- const char* hostname;
+ char* claim_id;
+ char* machine_guid;
+ char* hostname;
int32_t hops;
} node_instance_creation_t;
diff --git a/mqtt_websockets b/mqtt_websockets
-Subproject 2c7c2eb583abea0137e169ad3646e843d44297e
+Subproject 288d92b36ffab3bd078334a7a87ba2b680b7852
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 4685f6bcd2..d20658e658 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -682,9 +682,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
-#if defined(ENABLE_ACLK)
+#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
// in case we have cloud connection we inform cloud
- // new slave connected
+ // new child connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, 1);
#endif
@@ -696,9 +696,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
-#if defined(ENABLE_ACLK)
+#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
// in case we have cloud connection we inform cloud
- // new slave connected
+ // new child connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, 0);
#endif