summaryrefslogtreecommitdiffstats
path: root/aclk/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/mqtt.c')
-rw-r--r--aclk/mqtt.c28
1 files changed, 25 insertions, 3 deletions
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index 8beb4b6766..f1ee629742 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -13,6 +13,10 @@ inline const char *_link_strerror(int rc)
return mosquitto_strerror(rc);
}
+#ifdef NETDATA_INTERNAL_CHECKS
+static struct timeval sendTimes[1024];
+#endif
+
static struct mosquitto *mosq = NULL;
@@ -29,8 +33,14 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc)
UNUSED(mosq);
UNUSED(obj);
UNUSED(rc);
- info("Publish_callback: mid=%d", rc);
- // TODO: link this with a msg_id so it can be traced
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct timeval now, *orig;
+ now_realtime_timeval(&now);
+ orig = &sendTimes[ rc & 0x3ff ];
+ int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec);
+
+ info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff / 1000);
+#endif
return;
}
@@ -320,6 +330,7 @@ int _link_subscribe(char *topic, int qos)
int _link_send_message(char *topic, unsigned char *message, int *mid)
{
int rc;
+ size_t write_q, write_q_bytes, read_q;
rc = mosquitto_pub_topic_check(topic);
@@ -327,9 +338,20 @@ int _link_send_message(char *topic, unsigned char *message, int *mid)
return rc;
int msg_len = strlen((char*)message);
- info("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]);
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0);
+#ifdef NETDATA_INTERNAL_CHECKS
+ char msg_head[64];
+ memset(msg_head, 0, sizeof(msg_head));
+ strncpy(msg_head, (char*)message, 60);
+ for (size_t i = 0; i < sizeof(msg_head); i++)
+ if(msg_head[i] == '\n') msg_head[i] = ' ';
+ info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", msg_len,
+ *mid, write_q, write_q_bytes, read_q, msg_head);
+ now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]);
+#endif
+
// TODO: Add better handling -- error will flood the logfile here
if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
errno = 0;