summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-03-14 07:35:49 +0100
committerGitHub <noreply@github.com>2020-03-14 07:35:49 +0100
commit87a0559ec3f885ff64764ab1d48bb1cb52ce0ebe (patch)
tree4aaac1f709b5ab0d2d4b5c999811311d9efe3dfb /aclk
parentc999f89754072128d7efcf5581ad145ad8af5026 (diff)
Improving the ACLK performance - initial changes (#8399)
Add an inspection point for VerneMQ in the local dev env. Remove the bottleneck in sending websocket messages, at the expense of increased CPU-load. Fixed the message encoding. Added support for stress testing - it is still enabled in the main loop so will fire stress-testing payloads when the ACLK is established. Next patch will integrate the socket polling properly to reduce the CPU overhead and remove the stress testing payloads.
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_lws_wss_client.c82
-rw-r--r--aclk/aclk_lws_wss_client.h7
-rw-r--r--aclk/agent_cloud_link.c52
-rw-r--r--aclk/mqtt.c4
-rw-r--r--aclk/mqtt.h2
-rwxr-xr-xaclk/tests/launch-paho.sh4
-rw-r--r--aclk/tests/paho-inspection.py57
-rw-r--r--aclk/tests/paho.Dockerfile14
8 files changed, 191 insertions, 31 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c
index 8734e18573..f9770e8fb2 100644
--- a/aclk/aclk_lws_wss_client.c
+++ b/aclk/aclk_lws_wss_client.c
@@ -10,14 +10,57 @@ struct aclk_lws_wss_perconnect_data {
int todo;
};
-struct lws_wss_packet_buffer {
- unsigned char *data;
- size_t data_size;
- struct lws_wss_packet_buffer *next;
-};
-
static struct aclk_lws_wss_engine_instance *engine_instance = NULL;
+void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len)
+{
+ if (write_len != NULL && write_len_bytes != NULL)
+ {
+ *write_len = 0;
+ *write_len_bytes = 0;
+ if (engine_instance != NULL)
+ {
+ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
+
+ struct lws_wss_packet_buffer *write_b;
+ size_t w,wb;
+ for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
+ {
+ w++;
+ wb += write_b->data_size;
+ }
+ *write_len = w;
+ *write_len_bytes = wb;
+ aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
+ }
+ }
+ else if (write_len != NULL)
+ {
+ *write_len = 0;
+ if (engine_instance != NULL)
+ {
+ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
+
+ struct lws_wss_packet_buffer *write_b;
+ size_t w;
+ for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
+ w++;
+ *write_len = w;
+ aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
+ }
+ }
+ if (read_len != NULL)
+ {
+ *read_len = 0;
+ if (engine_instance != NULL)
+ {
+ aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
+ *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
+ aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
+ }
+ }
+}
+
static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size)
{
struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
@@ -25,6 +68,7 @@ static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data
new->data = mallocz(LWS_PRE + size);
memcpy(new->data + LWS_PRE, data, size);
new->data_size = size;
+ new->written = 0;
}
return new;
}
@@ -355,7 +399,7 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
{
UNUSED(user);
struct lws_wss_packet_buffer *data;
- int retval = 0;
+ int retval = 0, rc;
// Callback servicing is forced when we are closed from above.
if (engine_instance->upstream_reconnect_request) {
@@ -372,12 +416,24 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
switch (reason) {
case LWS_CALLBACK_CLIENT_WRITEABLE:
aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
- data = lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
+ data = engine_instance->write_buffer_head;
if (likely(data)) {
- lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY);
- lws_wss_packet_buffer_free(data);
+ size_t bytes_left = data->data_size - data->written;
+ if ( bytes_left > 65536 )
+ bytes_left = 65536;
+ rc = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY);
+ error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc);
+ data->written += bytes_left;
+ if (data->written == data->data_size)
+ {
+ lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
+ lws_wss_packet_buffer_free(data);
+ }
if (engine_instance->write_buffer_head)
+ {
+ error("Req write");
lws_callback_on_writable(engine_instance->lws_wsi);
+ }
}
aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
return retval;
@@ -487,7 +543,13 @@ abort:
void aclk_lws_wss_service_loop()
{
if (engine_instance)
+ {
+ if (engine_instance->lws_wsi) {
+ lws_cancel_service(engine_instance->lws_context);
+ lws_callback_on_writable(engine_instance->lws_wsi);
+ }
lws_service(engine_instance->lws_context, 0);
+ }
}
// in case the MQTT connection disconnect while lws transport is still operational
diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h
index f4a0f86957..3ab5abbe78 100644
--- a/aclk/aclk_lws_wss_client.h
+++ b/aclk/aclk_lws_wss_client.h
@@ -31,7 +31,11 @@ struct aclk_lws_wss_engine_callbacks {
void (*connection_closed)();
};
-struct lws_wss_packet_buffer;
+struct lws_wss_packet_buffer {
+ unsigned char *data;
+ size_t data_size, written;
+ struct lws_wss_packet_buffer *next;
+};
struct aclk_lws_wss_engine_instance {
//target host/port for connection
@@ -73,6 +77,7 @@ void aclk_lws_wss_mqtt_layer_disconect_notif();
void aclk_lws_connection_established();
void aclk_lws_connection_data_received();
void aclk_lws_connection_closed();
+void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
#endif
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index d8e3d20e4e..6f2ecd6246 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -1281,7 +1281,7 @@ void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
return;
}
if (challenge.result == NULL ) {
- error("Could not retrieve challenge from auth response");
+ error("Could not retrieve challenge from auth response: %s", payload);
return;
}
@@ -1362,6 +1362,7 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num)
*
* @return It always returns NULL
*/
+void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void *aclk_main(void *ptr)
{
struct netdata_static_thread *query_thread;
@@ -1410,9 +1411,10 @@ void *aclk_main(void *ptr)
while (!netdata_exit) {
static int first_init = 0;
-
- info("loop state first_init_%d connected=%d connecting=%d", first_init, aclk_connected, aclk_connecting);
- sleep_usec(USEC_PER_MS * 500);
+ size_t write_q, write_q_bytes, read_q;
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
+ first_init, aclk_connected, aclk_connecting, write_q, read_q);
if (unlikely(!aclk_connected)) {
if (unlikely(!first_init)) {
aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
@@ -1439,7 +1441,10 @@ void *aclk_main(void *ptr)
}
_link_event_loop();
- sleep_usec(USEC_PER_MS * 100);
+ //sleep_usec(USEC_PER_MS * 50);
+ static int stress_counter = 0;
+ if (stress_counter++ % 100 == 0 && write_q==0)
+ aclk_send_stress_test(2000000);
// TODO: Move to on-connect
if (unlikely(!aclk_subscribed)) {
@@ -1498,7 +1503,7 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id)
}
ACLK_LOCK;
- rc = _link_send_message(final_topic, message, &mid);
+ rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
// TODO: link the msg_id with the mid so we can trace it
ACLK_UNLOCK;
@@ -1603,8 +1608,6 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
}
-//#define EYE_FRIENDLY
-
/*
* Take a buffer, encode it and rewrite it
*
@@ -1612,10 +1615,6 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
BUFFER *aclk_encode_response(BUFFER *contents)
{
-#ifdef EYE_FRIENDLY
-
- return contents;
-#else
char *tmp_buffer = mallocz(contents->len * 2);
char *src, *dst;
@@ -1652,7 +1651,6 @@ BUFFER *aclk_encode_response(BUFFER *contents)
freez(tmp_buffer);
return contents;
-#endif
}
/*
@@ -1684,7 +1682,7 @@ void aclk_send_alarm_metadata()
debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, "\n}\n}");
- aclk_send_message(ACLK_ALARMS_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
+ aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len);
freez(msg_id);
@@ -1711,7 +1709,7 @@ int aclk_send_info_metadata()
buffer_sprintf(local_buffer, "\n}\n}");
debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
- aclk_send_message(ACLK_METADATA_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
+ aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len);
freez(msg_id);
@@ -1719,10 +1717,30 @@ int aclk_send_info_metadata()
return 0;
}
+void aclk_send_stress_test(size_t size)
+{
+ char *buffer = mallocz(size);
+ if (buffer != NULL)
+ {
+ for(size_t i=0; i<size; i++)
+ buffer[i] = 'x';
+ buffer[size-1] = 0;
+ time_t time_created = now_realtime_sec();
+ sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
+ buffer[strlen(buffer)] = '"';
+ buffer[size-2] = '}';
+ buffer[size-3] = '"';
+ aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
+ error("Sending stress of size %zu at time %ld", size, time_created);
+ }
+ free(buffer);
+}
+
// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata()
{
+
aclk_send_info_metadata();
aclk_send_alarm_metadata();
@@ -1774,7 +1792,7 @@ int aclk_send_single_chart(char *hostname, char *chart)
rrdset2json(st, local_buffer, NULL, NULL, 1);
buffer_sprintf(local_buffer, "\t\n}");
- aclk_send_message(ACLK_CHART_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
+ aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
freez(msg_id);
buffer_free(local_buffer);
@@ -1833,7 +1851,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
buffer_sprintf(local_buffer, "\n}");
- aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, aclk_encode_response(local_buffer)->buffer, 0, 1, ACLK_CMD_ALARM);
+ aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM);
freez(msg_id);
buffer_free(local_buffer);
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index 4f4316a416..59ed46e362 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -275,7 +275,7 @@ int _link_subscribe(char *topic, int qos)
*
*/
-int _link_send_message(char *topic, char *message, int *mid)
+int _link_send_message(char *topic, unsigned char *message, int *mid)
{
int rc;
@@ -285,7 +285,7 @@ int _link_send_message(char *topic, char *message, int *mid)
return rc;
int msg_len = strlen(message);
-
+ error("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]);
rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0);
// TODO: Add better handling -- error will flood the logfile here
diff --git a/aclk/mqtt.h b/aclk/mqtt.h
index a794c93b7d..c43346427c 100644
--- a/aclk/mqtt.h
+++ b/aclk/mqtt.h
@@ -14,7 +14,7 @@ int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username,
//int _link_lib_init();
int _mqtt_lib_init();
int _link_subscribe(char *topic, int qos);
-int _link_send_message(char *topic, char *message, int *mid);
+int _link_send_message(char *topic, unsigned char *message, int *mid);
const char *_link_strerror(int rc);
int aclk_handle_cloud_request(char *);
diff --git a/aclk/tests/launch-paho.sh b/aclk/tests/launch-paho.sh
new file mode 100755
index 0000000000..1c2cb5f2c3
--- /dev/null
+++ b/aclk/tests/launch-paho.sh
@@ -0,0 +1,4 @@
+#!/usr/bin/env bash
+
+docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client
+docker run -it paho-client
diff --git a/aclk/tests/paho-inspection.py b/aclk/tests/paho-inspection.py
new file mode 100644
index 0000000000..b12b62da8c
--- /dev/null
+++ b/aclk/tests/paho-inspection.py
@@ -0,0 +1,57 @@
+import ssl
+import paho.mqtt.client as mqtt
+import json
+import time
+import sys
+
+def on_connect(mqttc, obj, flags, rc):
+ if rc==0:
+ print("Successful connection", flush=True)
+ else :
+ print(f"Connection error rc={rc}", flush=True)
+ mqttc.subscribe("/agent/#",0)
+
+def on_disconnect(mqttc, obj, flags, rc):
+ print("disconnected rc: "+str(rc), flush=True)
+
+def on_message(mqttc, obj, msg):
+ print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True)
+ try:
+ print(f"Trying decode of {msg.payload[:60]}",flush=True)
+ api_msg = json.loads(msg.payload)
+ except Exception as e:
+ print(e,flush=True)
+ return
+ ts = api_msg["timestamp"]
+ mtype = api_msg["type"]
+ print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True)
+ now = time.time()
+ print(f"Current {now} -> Delay {now-ts}", flush=True)
+
+def on_publish(mqttc, obj, mid):
+ print("mid: "+str(mid), flush=True)
+
+def on_subscribe(mqttc, obj, mid, granted_qos):
+ print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True)
+
+def on_log(mqttc, obj, level, string):
+ print(string)
+
+print(f"Starting paho-inspection on {sys.argv[1]}", flush=True)
+mqttc = mqtt.Client(transport='websockets',client_id="paho")
+#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
+#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
+mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
+mqttc.tls_insecure_set(True)
+mqttc.on_message = on_message
+mqttc.on_connect = on_connect
+mqttc.on_disconnect = on_disconnect
+mqttc.on_publish = on_publish
+mqttc.on_subscribe = on_subscribe
+mqttc.username_pw_set("paho","paho")
+mqttc.connect(sys.argv[1], 8443, 60)
+
+#mqttc.publish("/agent/mine","Test1")
+#mqttc.subscribe("$SYS/#", 0)
+print("Connected succesfully, monitoring /agent/#", flush=True)
+mqttc.loop_forever()
diff --git a/aclk/tests/paho.Dockerfile b/aclk/tests/paho.Dockerfile
new file mode 100644
index 0000000000..d67cc4cb0c
--- /dev/null
+++ b/aclk/tests/paho.Dockerfile
@@ -0,0 +1,14 @@
+FROM archlinux/base:latest
+
+RUN pacman -Syyu --noconfirm
+RUN pacman --noconfirm --needed -S python-pip
+
+RUN pip install paho-mqtt
+
+RUN mkdir -p /opt/paho
+COPY paho-inspection.py /opt/paho/
+
+WORKDIR /opt/paho
+ARG HOST_HOSTNAME
+RUN echo $HOST_HOSTNAME >host
+CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"]