diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-10-28 14:36:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-28 14:36:12 +0200 |
commit | 50be9eb132845dddd5533ef52006ffb3bffc6ead (patch) | |
tree | d7acd345a4caa58b73fe1c4ab0417b9eed4e6843 /aclk | |
parent | 7b078bc3aa4eaa0781dbc55589d83ce241f1410d (diff) |
Remove option to use MQTT 3 (#13824)
* remove mqtt3 support
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/README.md | 2 | ||||
-rw-r--r-- | aclk/aclk.c | 35 | ||||
-rw-r--r-- | aclk/aclk.h | 1 | ||||
-rw-r--r-- | aclk/aclk_alarm_api.c | 3 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 16 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 25 |
6 files changed, 18 insertions, 64 deletions
diff --git a/aclk/README.md b/aclk/README.md index 0b63640f24..af0f5fddee 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -60,12 +60,10 @@ You can configure following keys in the `netdata.conf` section `[cloud]`: [cloud] statistics = yes query thread count = 2 - mqtt5 = yes ``` - `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. - `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). -- `mqtt5` allows disabling the new MQTT5 implementation which is used now by default in case of issues. This option will be removed in future stable release. ## Disable the ACLK diff --git a/aclk/aclk.c b/aclk/aclk.c index 3af54f8e73..9a0ffc0700 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -32,7 +32,6 @@ int aclk_connection_counter = 0; int disconnect_req = 0; int aclk_connected = 0; -int use_mqtt_5 = 0; int aclk_ctx_based = 0; int aclk_disable_runtime = 0; int aclk_stats_enabled; @@ -459,9 +458,9 @@ static int aclk_block_till_recon_allowed() { */ static int aclk_get_transport_idx(aclk_env_t *env) { for (size_t i = 0; i < env->transport_count; i++) { - // currently we support only MQTT 3 + // currently we support only MQTT 5 // therefore select first transport that matches - if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) { + if (env->transports[i]->type == ACLK_TRP_MQTT_5) { return i; } } @@ -495,7 +494,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { - error("Do not move the cloud base url out of post_conf_load!!"); + error_report("Do not move the cloud base url out of post_conf_load!!"); return -1; } @@ -505,7 +504,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) info("Attempting connection now"); memset(&base_url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &base_url)) { - error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); + error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); url_t_destroy(&base_url); continue; @@ -535,7 +534,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_env(aclk_env, base_url.host, base_url.port); url_t_destroy(&base_url); if (ret) { - error("Failed to Get ACLK environment"); + error_report("Failed to Get ACLK environment"); // delay handled by aclk_block_till_recon_allowed continue; } @@ -549,14 +548,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } if (!aclk_env_has_capa("proto")) { - error ("Can't use encoding=proto without at least \"proto\" capability."); + error_report("Can't use encoding=proto without at least \"proto\" capability."); continue; } info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { - error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); + error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); url_t_destroy(&auth_url); continue; } @@ -564,7 +563,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); url_t_destroy(&auth_url); if (ret) { - error("Error passing Challenge/Response to get OTP"); + error_report("Error passing Challenge/Response to get OTP"); continue; } @@ -573,20 +572,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { - error("Couldn't get LWT topic. Will not send LWT."); + error_report("Couldn't get LWT topic. Will not send LWT."); continue; } // Do the MQTT connection ret = aclk_get_transport_idx(aclk_env); if (ret < 0) { - error("Cloud /env endpoint didn't return any transport usable by this Agent."); + error_report("Cloud /env endpoint didn't return any transport usable by this Agent."); continue; } memset(&mqtt_url, 0, sizeof(url_t)); if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ - error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); + error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); url_t_destroy(&mqtt_url); continue; } @@ -672,9 +671,7 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); - - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) { error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -919,7 +916,7 @@ char *aclk_state(void) "ACLK Version: 2\n" "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5); char *agent_id = get_agent_claimid(); if (agent_id == NULL) @@ -1072,7 +1069,7 @@ char *aclk_state_json(void) tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); - tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); + tmp = json_object_new_int(5); json_object_object_add(msg, "mqtt-version", tmp); tmp = json_object_new_int(aclk_rcvd_cloud_msgs); @@ -1171,9 +1168,7 @@ void add_aclk_host_labels(void) { break; } - int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); - - rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); #else diff --git a/aclk/aclk.h b/aclk/aclk.h index 9bf7cc3611..6aed548b74 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -14,7 +14,6 @@ #endif /* ENABLE_ACLK */ extern int aclk_connected; -extern int use_mqtt_5; extern int aclk_ctx_based; extern int aclk_disable_runtime; extern int aclk_stats_enabled; diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index a181eb291f..7df51a7b54 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -23,9 +23,6 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) char *payload = generate_alarm_log_entry(&payload_size, log_entry); aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); - - if (!use_mqtt_5) - freez(payload); } void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 01b20d23f3..9a450571e1 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -111,22 +111,6 @@ void aclk_query_free(aclk_query_t query) freez(query->data.http_api_v2.query); break; - case NODE_STATE_UPDATE: - case REGISTER_NODE: - case CHART_DIMS_UPDATE: - case CHART_CONFIG_UPDATED: - case CHART_RESET: - case RETENTION_UPDATED: - case UPDATE_NODE_INFO: - case ALARM_LOG_HEALTH: - case ALARM_PROVIDE_CFG: - case ALARM_SNAPSHOT: - case UPDATE_NODE_COLLECTORS: - case PROTO_BIN_MESSAGE: - if (!use_mqtt_5) - freez(query->data.bin_payload.payload); - break; - default: break; } diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index e58f5c3661..f7c4c08593 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -35,10 +35,7 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return 0; } - if (use_mqtt_5) - mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); - else - mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); @@ -64,7 +61,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec uint16_t packet_id; const char *str; char *full_msg = NULL; - int len, rc; + int len; if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); @@ -87,21 +84,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec len += payload_len; } - if (use_mqtt_5) - mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id); - else { - rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - freez(full_msg); - json_object_put(msg); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { - error("Timeout sending binpacked message"); - return HTTP_RESP_BACKEND_FETCH_FAILED; - } - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { - error("Message is bigger than allowed maximum"); - return HTTP_RESP_FORBIDDEN; - } - } + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); @@ -263,8 +246,6 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable } pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); - if (!use_mqtt_5) - freez(msg); if (localhost->aclk_state.prev_claimed_id) { freez(localhost->aclk_state.prev_claimed_id); localhost->aclk_state.prev_claimed_id = NULL; |