summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-08-15 11:48:21 +0200
committerGitHub <noreply@github.com>2022-08-15 11:48:21 +0200
commit0cb8e0cee1dba7cfe54a62e2b6ad619707eedd84 (patch)
tree84bf3a4ec7f62ef817dad59d64c1a3d15ef9bcdf /aclk
parentca72254a13c81bddb2bea038d12fbec6069c3cad (diff)
reduce memcpy and memory usage on mqtt5 (#13450)
* reduce memcpy on mqtt5
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_tx_msgs.c45
1 files changed, 11 insertions, 34 deletions
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 822a90fa25..b483e5221d 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -43,28 +43,12 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return packet_id;
}
-/* UNUSED now but can be used soon MVP1?
-static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
+// json_object_put returns int unfortunately :D
+// we need void(*fnc)(void *);
+static void json_object_put_wrapper(void *jsonobj)
{
- if (unlikely(!topic || topic[0] != '/')) {
- error ("Full topic required!");
- return;
- }
-
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
-
- mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published();
-#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif
+ json_object_put(jsonobj);
}
-*/
#define TOPIC_MAX_LEN 512
#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
@@ -77,6 +61,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
+ json_object_put(msg);
return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
@@ -87,32 +72,26 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
memcpy(full_msg, str, len);
+ json_object_put(msg);
+ msg = NULL;
memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
memcpy(&full_msg[len], payload, payload_len);
len += payload_len;
}
-/* TODO
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif */
-
if (use_mqtt_5)
- mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
else {
rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ freez(full_msg);
+ json_object_put(msg);
if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
error("Timeout sending binpacked message");
- freez(full_msg);
return HTTP_RESP_BACKEND_FETCH_FAILED;
}
if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
error("Message is bigger than allowed maximum");
- freez(full_msg);
return HTTP_RESP_FORBIDDEN;
}
}
@@ -120,7 +99,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
- freez(full_msg);
+
return 0;
}
@@ -203,7 +182,6 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char
if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
error("Failed to send cancelation message for http reply");
}
- json_object_put(msg);
}
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
@@ -222,7 +200,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
json_object_object_add(msg, "http-code", tmp);
int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
- json_object_put(msg);
switch (rc) {
case HTTP_RESP_FORBIDDEN: