summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-05-12 19:27:00 +0200
committerGitHub <noreply@github.com>2020-05-12 19:27:00 +0200
commitf20269d836f3eecfd67d566cd781618e8bc2e8ee (patch)
treee6aafc99a3612dd667729f1ee811c54fa481ca05 /aclk
parent26a113a8357e1db70e1f92a1b42fff581b8e9108 (diff)
Fix the latency issue on the ACLK and suppress the diagnostics (#8992)
The on-connect payloads were large enough to trigger a massive increase in latency on the link and prevent chart updates due to head-of-line blocking. The default window detection in libwebsockets was under-reporting the size of the available window in the network. Overwritten with some sensible values. The large volume of ACLK per-message info-logging is not produced unless the agent is compiled with NETDATA_INTERNAL_CHECKS. The logging now includes latency measurements on the link. Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_lws_wss_client.c2
-rw-r--r--aclk/agent_cloud_link.c2
-rw-r--r--aclk/mqtt.c28
3 files changed, 26 insertions, 6 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c
index 97aa337390..9b29188a76 100644
--- a/aclk/aclk_lws_wss_client.c
+++ b/aclk/aclk_lws_wss_client.c
@@ -132,7 +132,7 @@ static inline void aclk_lws_wss_clear_io_buffers()
}
static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback,
- sizeof(struct aclk_lws_wss_perconnect_data), 0, 0, 0, 0 },
+ sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 },
{ NULL, NULL, 0, 0, 0, 0, 0 } };
static void aclk_lws_wss_log_divert(int level, const char *line)
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 4750967987..6f99bf6f15 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -1412,8 +1412,6 @@ void *aclk_main(void *ptr)
aclk_lws_wss_destroy_context();
aclk_force_reconnect = 0;
}
- //info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
- // first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
if (unlikely(!first_init)) {
aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index 8beb4b6766..f1ee629742 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -13,6 +13,10 @@ inline const char *_link_strerror(int rc)
return mosquitto_strerror(rc);
}
+#ifdef NETDATA_INTERNAL_CHECKS
+static struct timeval sendTimes[1024];
+#endif
+
static struct mosquitto *mosq = NULL;
@@ -29,8 +33,14 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc)
UNUSED(mosq);
UNUSED(obj);
UNUSED(rc);
- info("Publish_callback: mid=%d", rc);
- // TODO: link this with a msg_id so it can be traced
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct timeval now, *orig;
+ now_realtime_timeval(&now);
+ orig = &sendTimes[ rc & 0x3ff ];
+ int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec);
+
+ info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff / 1000);
+#endif
return;
}
@@ -320,6 +330,7 @@ int _link_subscribe(char *topic, int qos)
int _link_send_message(char *topic, unsigned char *message, int *mid)
{
int rc;
+ size_t write_q, write_q_bytes, read_q;
rc = mosquitto_pub_topic_check(topic);
@@ -327,9 +338,20 @@ int _link_send_message(char *topic, unsigned char *message, int *mid)
return rc;
int msg_len = strlen((char*)message);
- info("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]);
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0);
+#ifdef NETDATA_INTERNAL_CHECKS
+ char msg_head[64];
+ memset(msg_head, 0, sizeof(msg_head));
+ strncpy(msg_head, (char*)message, 60);
+ for (size_t i = 0; i < sizeof(msg_head); i++)
+ if(msg_head[i] == '\n') msg_head[i] = ' ';
+ info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", msg_len,
+ *mid, write_q, write_q_bytes, read_q, msg_head);
+ now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]);
+#endif
+
// TODO: Add better handling -- error will flood the logfile here
if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
errno = 0;