diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-08-15 11:48:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-15 11:48:21 +0200 |
commit | 0cb8e0cee1dba7cfe54a62e2b6ad619707eedd84 (patch) | |
tree | 84bf3a4ec7f62ef817dad59d64c1a3d15ef9bcdf /aclk | |
parent | ca72254a13c81bddb2bea038d12fbec6069c3cad (diff) |
reduce memcpy and memory usage on mqtt5 (#13450)
* reduce memcpy on mqtt5
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_tx_msgs.c | 45 |
1 files changed, 11 insertions, 34 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 822a90fa25..b483e5221d 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -43,28 +43,12 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return packet_id; } -/* UNUSED now but can be used soon MVP1? -static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic) +// json_object_put returns int unfortunately :D +// we need void(*fnc)(void *); +static void json_object_put_wrapper(void *jsonobj) { - if (unlikely(!topic || topic[0] != '/')) { - error ("Full topic required!"); - return; - } - - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - - mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif + json_object_put(jsonobj); } -*/ #define TOPIC_MAX_LEN 512 #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" @@ -77,6 +61,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); + json_object_put(msg); return HTTP_RESP_INTERNAL_SERVER_ERROR; } @@ -87,32 +72,26 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); memcpy(full_msg, str, len); + json_object_put(msg); + msg = NULL; memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); len += strlen(V2_BIN_PAYLOAD_SEPARATOR); memcpy(&full_msg[len], payload, payload_len); len += payload_len; } -/* TODO -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif */ - if (use_mqtt_5) - mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id); + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id); else { rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); + freez(full_msg); + json_object_put(msg); if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { error("Timeout sending binpacked message"); - freez(full_msg); return HTTP_RESP_BACKEND_FETCH_FAILED; } if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { error("Message is bigger than allowed maximum"); - freez(full_msg); return HTTP_RESP_FORBIDDEN; } } @@ -120,7 +99,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif - freez(full_msg); + return 0; } @@ -203,7 +182,6 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { error("Failed to send cancelation message for http reply"); } - json_object_put(msg); } void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) @@ -222,7 +200,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg json_object_object_add(msg, "http-code", tmp); int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); - json_object_put(msg); switch (rc) { case HTTP_RESP_FORBIDDEN: |