summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-10-28 14:36:12 +0200
committerGitHub <noreply@github.com>2022-10-28 14:36:12 +0200
commit50be9eb132845dddd5533ef52006ffb3bffc6ead (patch)
treed7acd345a4caa58b73fe1c4ab0417b9eed4e6843 /aclk
parent7b078bc3aa4eaa0781dbc55589d83ce241f1410d (diff)
Remove option to use MQTT 3 (#13824)
* remove mqtt3 support
Diffstat (limited to 'aclk')
-rw-r--r--aclk/README.md2
-rw-r--r--aclk/aclk.c35
-rw-r--r--aclk/aclk.h1
-rw-r--r--aclk/aclk_alarm_api.c3
-rw-r--r--aclk/aclk_query_queue.c16
-rw-r--r--aclk/aclk_tx_msgs.c25
6 files changed, 18 insertions, 64 deletions
diff --git a/aclk/README.md b/aclk/README.md
index 0b63640f24..af0f5fddee 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -60,12 +60,10 @@ You can configure following keys in the `netdata.conf` section `[cloud]`:
[cloud]
statistics = yes
query thread count = 2
- mqtt5 = yes
```
- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
-- `mqtt5` allows disabling the new MQTT5 implementation which is used now by default in case of issues. This option will be removed in future stable release.
## Disable the ACLK
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 3af54f8e73..9a0ffc0700 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -32,7 +32,6 @@ int aclk_connection_counter = 0;
int disconnect_req = 0;
int aclk_connected = 0;
-int use_mqtt_5 = 0;
int aclk_ctx_based = 0;
int aclk_disable_runtime = 0;
int aclk_stats_enabled;
@@ -459,9 +458,9 @@ static int aclk_block_till_recon_allowed() {
*/
static int aclk_get_transport_idx(aclk_env_t *env) {
for (size_t i = 0; i < env->transport_count; i++) {
- // currently we support only MQTT 3
+ // currently we support only MQTT 5
// therefore select first transport that matches
- if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+ if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
return i;
}
}
@@ -495,7 +494,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
- error("Do not move the cloud base url out of post_conf_load!!");
+ error_report("Do not move the cloud base url out of post_conf_load!!");
return -1;
}
@@ -505,7 +504,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
info("Attempting connection now");
memset(&base_url, 0, sizeof(url_t));
if (url_parse(cloud_base_url, &base_url)) {
- error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
+ error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
url_t_destroy(&base_url);
continue;
@@ -535,7 +534,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
url_t_destroy(&base_url);
if (ret) {
- error("Failed to Get ACLK environment");
+ error_report("Failed to Get ACLK environment");
// delay handled by aclk_block_till_recon_allowed
continue;
}
@@ -549,14 +548,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
if (!aclk_env_has_capa("proto")) {
- error ("Can't use encoding=proto without at least \"proto\" capability.");
+ error_report("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
info("New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
- error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
url_t_destroy(&auth_url);
continue;
}
@@ -564,7 +563,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
url_t_destroy(&auth_url);
if (ret) {
- error("Error passing Challenge/Response to get OTP");
+ error_report("Error passing Challenge/Response to get OTP");
continue;
}
@@ -573,20 +572,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
- error("Couldn't get LWT topic. Will not send LWT.");
+ error_report("Couldn't get LWT topic. Will not send LWT.");
continue;
}
// Do the MQTT connection
ret = aclk_get_transport_idx(aclk_env);
if (ret < 0) {
- error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
continue;
}
memset(&mqtt_url, 0, sizeof(url_t));
if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
- error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
url_t_destroy(&mqtt_url);
continue;
}
@@ -672,9 +671,7 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -919,7 +916,7 @@ char *aclk_state(void)
"ACLK Version: 2\n"
"Protocols Supported: Protobuf\n"
);
- buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
+ buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
char *agent_id = get_agent_claimid();
if (agent_id == NULL)
@@ -1072,7 +1069,7 @@ char *aclk_state_json(void)
tmp = json_object_new_string("Protobuf");
json_object_object_add(msg, "used-cloud-protocol", tmp);
- tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
+ tmp = json_object_new_int(5);
json_object_object_add(msg, "mqtt-version", tmp);
tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
@@ -1171,9 +1168,7 @@ void add_aclk_host_labels(void) {
break;
}
- int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-
- rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
#else
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 9bf7cc3611..6aed548b74 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -14,7 +14,6 @@
#endif /* ENABLE_ACLK */
extern int aclk_connected;
-extern int use_mqtt_5;
extern int aclk_ctx_based;
extern int aclk_disable_runtime;
extern int aclk_stats_enabled;
diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c
index a181eb291f..7df51a7b54 100644
--- a/aclk/aclk_alarm_api.c
+++ b/aclk/aclk_alarm_api.c
@@ -23,9 +23,6 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
char *payload = generate_alarm_log_entry(&payload_size, log_entry);
aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
-
- if (!use_mqtt_5)
- freez(payload);
}
void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 01b20d23f3..9a450571e1 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -111,22 +111,6 @@ void aclk_query_free(aclk_query_t query)
freez(query->data.http_api_v2.query);
break;
- case NODE_STATE_UPDATE:
- case REGISTER_NODE:
- case CHART_DIMS_UPDATE:
- case CHART_CONFIG_UPDATED:
- case CHART_RESET:
- case RETENTION_UPDATED:
- case UPDATE_NODE_INFO:
- case ALARM_LOG_HEALTH:
- case ALARM_PROVIDE_CFG:
- case ALARM_SNAPSHOT:
- case UPDATE_NODE_COLLECTORS:
- case PROTO_BIN_MESSAGE:
- if (!use_mqtt_5)
- freez(query->data.bin_payload.payload);
- break;
-
default:
break;
}
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index e58f5c3661..f7c4c08593 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -35,10 +35,7 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return 0;
}
- if (use_mqtt_5)
- mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
- else
- mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
@@ -64,7 +61,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
uint16_t packet_id;
const char *str;
char *full_msg = NULL;
- int len, rc;
+ int len;
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
@@ -87,21 +84,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
len += payload_len;
}
- if (use_mqtt_5)
- mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
- else {
- rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- freez(full_msg);
- json_object_put(msg);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
- error("Timeout sending binpacked message");
- return HTTP_RESP_BACKEND_FETCH_FAILED;
- }
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
- error("Message is bigger than allowed maximum");
- return HTTP_RESP_FORBIDDEN;
- }
- }
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
@@ -263,8 +246,6 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
}
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
- if (!use_mqtt_5)
- freez(msg);
if (localhost->aclk_state.prev_claimed_id) {
freez(localhost->aclk_state.prev_claimed_id);
localhost->aclk_state.prev_claimed_id = NULL;