summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2023-05-18 13:56:20 +0200
committerGitHub <noreply@github.com>2023-05-18 13:56:20 +0200
commit5827732aa706db68bf50cdeb1b53fecc3b14f2f9 (patch)
treedf486ece7af388482ce47a3e05bba4a3ae9ca118 /aclk
parentb20abcbc1e78acdf50537af281c3dfe7621c13fd (diff)
Honor maximum message size limit of MQTT server (#15009)
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_query.c2
-rw-r--r--aclk/aclk_tx_msgs.c12
-rw-r--r--aclk/aclk_tx_msgs.h2
3 files changed, 10 insertions, 6 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 4c2e918a3a..0698c2d606 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -216,7 +216,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// send msg.
- aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
+ w->response.code = aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
struct timeval tv;
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 86ee818edc..d11e96cfba 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -83,7 +83,10 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
memcpy(&full_msg[len], payload, payload_len);
}
- mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ int rc = mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+
+ if (rc == MQTT_WSS_ERR_TOO_BIG_FOR_SERVER)
+ return HTTP_RESP_FORBIDDEN;
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
@@ -169,11 +172,11 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char
json_object_object_add(msg, "error-description", tmp);
if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
- error("Failed to send cancelation message for http reply");
+ error("Failed to send cancellation message for http reply %zu %s", payload_len, payload);
}
}
-void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
+int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -192,7 +195,7 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
switch (rc) {
case HTTP_RESP_FORBIDDEN:
- aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, NULL, 0);
break;
case HTTP_RESP_INTERNAL_SERVER_ERROR:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
@@ -201,6 +204,7 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
break;
}
+ return rc ? rc : http_code;
}
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index 31e5924100..9e7d890772 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -12,7 +12,7 @@
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len);
-void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
+int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
char *aclk_generate_lwt(size_t *size);