diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-05-12 19:27:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-12 19:27:00 +0200 |
commit | f20269d836f3eecfd67d566cd781618e8bc2e8ee (patch) | |
tree | e6aafc99a3612dd667729f1ee811c54fa481ca05 /aclk | |
parent | 26a113a8357e1db70e1f92a1b42fff581b8e9108 (diff) |
Fix the latency issue on the ACLK and suppress the diagnostics (#8992)
The on-connect payloads were large enough to trigger a massive increase in latency on the link and prevent chart updates due to head-of-line blocking. The default window detection in libwebsockets was under-reporting the size of the available window in the network. Overwritten with some sensible values.
The large volume of ACLK per-message info-logging is not produced unless the agent is compiled with NETDATA_INTERNAL_CHECKS. The logging now includes latency measurements on the link.
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_lws_wss_client.c | 2 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 2 | ||||
-rw-r--r-- | aclk/mqtt.c | 28 |
3 files changed, 26 insertions, 6 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c index 97aa337390..9b29188a76 100644 --- a/aclk/aclk_lws_wss_client.c +++ b/aclk/aclk_lws_wss_client.c @@ -132,7 +132,7 @@ static inline void aclk_lws_wss_clear_io_buffers() } static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback, - sizeof(struct aclk_lws_wss_perconnect_data), 0, 0, 0, 0 }, + sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 }, { NULL, NULL, 0, 0, 0, 0, 0 } }; static void aclk_lws_wss_log_divert(int level, const char *line) diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 4750967987..6f99bf6f15 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -1412,8 +1412,6 @@ void *aclk_main(void *ptr) aclk_lws_wss_destroy_context(); aclk_force_reconnect = 0; } - //info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu", - // first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q); if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) { if (unlikely(!first_init)) { aclk_try_to_connect(aclk_hostname, aclk_port, port_num); diff --git a/aclk/mqtt.c b/aclk/mqtt.c index 8beb4b6766..f1ee629742 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -13,6 +13,10 @@ inline const char *_link_strerror(int rc) return mosquitto_strerror(rc); } +#ifdef NETDATA_INTERNAL_CHECKS +static struct timeval sendTimes[1024]; +#endif + static struct mosquitto *mosq = NULL; @@ -29,8 +33,14 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc) UNUSED(mosq); UNUSED(obj); UNUSED(rc); - info("Publish_callback: mid=%d", rc); - // TODO: link this with a msg_id so it can be traced +#ifdef NETDATA_INTERNAL_CHECKS + struct timeval now, *orig; + now_realtime_timeval(&now); + orig = &sendTimes[ rc & 0x3ff ]; + int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec); + + info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff / 1000); +#endif return; } @@ -320,6 +330,7 @@ int _link_subscribe(char *topic, int qos) int _link_send_message(char *topic, unsigned char *message, int *mid) { int rc; + size_t write_q, write_q_bytes, read_q; rc = mosquitto_pub_topic_check(topic); @@ -327,9 +338,20 @@ int _link_send_message(char *topic, unsigned char *message, int *mid) return rc; int msg_len = strlen((char*)message); - info("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0); +#ifdef NETDATA_INTERNAL_CHECKS + char msg_head[64]; + memset(msg_head, 0, sizeof(msg_head)); + strncpy(msg_head, (char*)message, 60); + for (size_t i = 0; i < sizeof(msg_head); i++) + if(msg_head[i] == '\n') msg_head[i] = ' '; + info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", msg_len, + *mid, write_q, write_q_bytes, read_q, msg_head); + now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]); +#endif + // TODO: Add better handling -- error will flood the logfile here if (unlikely(rc != MOSQ_ERR_SUCCESS)) { errno = 0; |