diff options
-rw-r--r-- | .codacy.yml | 1 | ||||
-rw-r--r-- | aclk/aclk_lws_wss_client.c | 82 | ||||
-rw-r--r-- | aclk/aclk_lws_wss_client.h | 7 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 52 | ||||
-rw-r--r-- | aclk/mqtt.c | 4 | ||||
-rw-r--r-- | aclk/mqtt.h | 2 | ||||
-rwxr-xr-x | aclk/tests/launch-paho.sh | 4 | ||||
-rw-r--r-- | aclk/tests/paho-inspection.py | 57 | ||||
-rw-r--r-- | aclk/tests/paho.Dockerfile | 14 |
9 files changed, 192 insertions, 31 deletions
diff --git a/.codacy.yml b/.codacy.yml index fb16c24122..3d88a253ca 100644 --- a/.codacy.yml +++ b/.codacy.yml @@ -13,3 +13,4 @@ exclude_paths: - web/gui/src/** - web/gui/main.js - tests/** + - aclk/tests/** diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c index 8734e18573..f9770e8fb2 100644 --- a/aclk/aclk_lws_wss_client.c +++ b/aclk/aclk_lws_wss_client.c @@ -10,14 +10,57 @@ struct aclk_lws_wss_perconnect_data { int todo; }; -struct lws_wss_packet_buffer { - unsigned char *data; - size_t data_size; - struct lws_wss_packet_buffer *next; -}; - static struct aclk_lws_wss_engine_instance *engine_instance = NULL; +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len) +{ + if (write_len != NULL && write_len_bytes != NULL) + { + *write_len = 0; + *write_len_bytes = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w,wb; + for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + { + w++; + wb += write_b->data_size; + } + *write_len = w; + *write_len_bytes = wb; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + else if (write_len != NULL) + { + *write_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w; + for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + w++; + *write_len = w; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + if (read_len != NULL) + { + *read_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + } + } +} + static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size) { struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); @@ -25,6 +68,7 @@ static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data new->data = mallocz(LWS_PRE + size); memcpy(new->data + LWS_PRE, data, size); new->data_size = size; + new->written = 0; } return new; } @@ -355,7 +399,7 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas { UNUSED(user); struct lws_wss_packet_buffer *data; - int retval = 0; + int retval = 0, rc; // Callback servicing is forced when we are closed from above. if (engine_instance->upstream_reconnect_request) { @@ -372,12 +416,24 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas switch (reason) { case LWS_CALLBACK_CLIENT_WRITEABLE: aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); - data = lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head); + data = engine_instance->write_buffer_head; if (likely(data)) { - lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY); - lws_wss_packet_buffer_free(data); + size_t bytes_left = data->data_size - data->written; + if ( bytes_left > 65536 ) + bytes_left = 65536; + rc = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY); + error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc); + data->written += bytes_left; + if (data->written == data->data_size) + { + lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head); + lws_wss_packet_buffer_free(data); + } if (engine_instance->write_buffer_head) + { + error("Req write"); lws_callback_on_writable(engine_instance->lws_wsi); + } } aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); return retval; @@ -487,7 +543,13 @@ abort: void aclk_lws_wss_service_loop() { if (engine_instance) + { + if (engine_instance->lws_wsi) { + lws_cancel_service(engine_instance->lws_context); + lws_callback_on_writable(engine_instance->lws_wsi); + } lws_service(engine_instance->lws_context, 0); + } } // in case the MQTT connection disconnect while lws transport is still operational diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h index f4a0f86957..3ab5abbe78 100644 --- a/aclk/aclk_lws_wss_client.h +++ b/aclk/aclk_lws_wss_client.h @@ -31,7 +31,11 @@ struct aclk_lws_wss_engine_callbacks { void (*connection_closed)(); }; -struct lws_wss_packet_buffer; +struct lws_wss_packet_buffer { + unsigned char *data; + size_t data_size, written; + struct lws_wss_packet_buffer *next; +}; struct aclk_lws_wss_engine_instance { //target host/port for connection @@ -73,6 +77,7 @@ void aclk_lws_wss_mqtt_layer_disconect_notif(); void aclk_lws_connection_established(); void aclk_lws_connection_data_received(); void aclk_lws_connection_closed(); +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); #endif diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index d8e3d20e4e..6f2ecd6246 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -1281,7 +1281,7 @@ void aclk_get_challenge(char *aclk_hostname, char *aclk_port) return; } if (challenge.result == NULL ) { - error("Could not retrieve challenge from auth response"); + error("Could not retrieve challenge from auth response: %s", payload); return; } @@ -1362,6 +1362,7 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num) * * @return It always returns NULL */ +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); void *aclk_main(void *ptr) { struct netdata_static_thread *query_thread; @@ -1410,9 +1411,10 @@ void *aclk_main(void *ptr) while (!netdata_exit) { static int first_init = 0; - - info("loop state first_init_%d connected=%d connecting=%d", first_init, aclk_connected, aclk_connecting); - sleep_usec(USEC_PER_MS * 500); + size_t write_q, write_q_bytes, read_q; + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu", + first_init, aclk_connected, aclk_connecting, write_q, read_q); if (unlikely(!aclk_connected)) { if (unlikely(!first_init)) { aclk_try_to_connect(aclk_hostname, aclk_port, port_num); @@ -1439,7 +1441,10 @@ void *aclk_main(void *ptr) } _link_event_loop(); - sleep_usec(USEC_PER_MS * 100); + //sleep_usec(USEC_PER_MS * 50); + static int stress_counter = 0; + if (stress_counter++ % 100 == 0 && write_q==0) + aclk_send_stress_test(2000000); // TODO: Move to on-connect if (unlikely(!aclk_subscribed)) { @@ -1498,7 +1503,7 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id) } ACLK_LOCK; - rc = _link_send_message(final_topic, message, &mid); + rc = _link_send_message(final_topic, (unsigned char *)message, &mid); // TODO: link the msg_id with the mid so we can trace it ACLK_UNLOCK; @@ -1603,8 +1608,6 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created); } -//#define EYE_FRIENDLY - /* * Take a buffer, encode it and rewrite it * @@ -1612,10 +1615,6 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id) BUFFER *aclk_encode_response(BUFFER *contents) { -#ifdef EYE_FRIENDLY - - return contents; -#else char *tmp_buffer = mallocz(contents->len * 2); char *src, *dst; @@ -1652,7 +1651,6 @@ BUFFER *aclk_encode_response(BUFFER *contents) freez(tmp_buffer); return contents; -#endif } /* @@ -1684,7 +1682,7 @@ void aclk_send_alarm_metadata() debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer, "\n}\n}"); - aclk_send_message(ACLK_ALARMS_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id); + aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id); debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len); freez(msg_id); @@ -1711,7 +1709,7 @@ int aclk_send_info_metadata() buffer_sprintf(local_buffer, "\n}\n}"); debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len); - aclk_send_message(ACLK_METADATA_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id); + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len); freez(msg_id); @@ -1719,10 +1717,30 @@ int aclk_send_info_metadata() return 0; } +void aclk_send_stress_test(size_t size) +{ + char *buffer = mallocz(size); + if (buffer != NULL) + { + for(size_t i=0; i<size; i++) + buffer[i] = 'x'; + buffer[size-1] = 0; + time_t time_created = now_realtime_sec(); + sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created); + buffer[strlen(buffer)] = '"'; + buffer[size-2] = '}'; + buffer[size-3] = '"'; + aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL); + error("Sending stress of size %zu at time %ld", size, time_created); + } + free(buffer); +} + // Send info metadata message to the cloud if the link is established // or on request int aclk_send_metadata() { + aclk_send_info_metadata(); aclk_send_alarm_metadata(); @@ -1774,7 +1792,7 @@ int aclk_send_single_chart(char *hostname, char *chart) rrdset2json(st, local_buffer, NULL, NULL, 1); buffer_sprintf(local_buffer, "\t\n}"); - aclk_send_message(ACLK_CHART_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id); + aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id); freez(msg_id); buffer_free(local_buffer); @@ -1833,7 +1851,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); buffer_sprintf(local_buffer, "\n}"); - aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, aclk_encode_response(local_buffer)->buffer, 0, 1, ACLK_CMD_ALARM); + aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM); freez(msg_id); buffer_free(local_buffer); diff --git a/aclk/mqtt.c b/aclk/mqtt.c index 4f4316a416..59ed46e362 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -275,7 +275,7 @@ int _link_subscribe(char *topic, int qos) * */ -int _link_send_message(char *topic, char *message, int *mid) +int _link_send_message(char *topic, unsigned char *message, int *mid) { int rc; @@ -285,7 +285,7 @@ int _link_send_message(char *topic, char *message, int *mid) return rc; int msg_len = strlen(message); - + error("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]); rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0); // TODO: Add better handling -- error will flood the logfile here diff --git a/aclk/mqtt.h b/aclk/mqtt.h index a794c93b7d..c43346427c 100644 --- a/aclk/mqtt.h +++ b/aclk/mqtt.h @@ -14,7 +14,7 @@ int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, //int _link_lib_init(); int _mqtt_lib_init(); int _link_subscribe(char *topic, int qos); -int _link_send_message(char *topic, char *message, int *mid); +int _link_send_message(char *topic, unsigned char *message, int *mid); const char *_link_strerror(int rc); int aclk_handle_cloud_request(char *); diff --git a/aclk/tests/launch-paho.sh b/aclk/tests/launch-paho.sh new file mode 100755 index 0000000000..1c2cb5f2c3 --- /dev/null +++ b/aclk/tests/launch-paho.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client +docker run -it paho-client diff --git a/aclk/tests/paho-inspection.py b/aclk/tests/paho-inspection.py new file mode 100644 index 0000000000..b12b62da8c --- /dev/null +++ b/aclk/tests/paho-inspection.py @@ -0,0 +1,57 @@ +import ssl +import paho.mqtt.client as mqtt +import json +import time +import sys + +def on_connect(mqttc, obj, flags, rc): + if rc==0: + print("Successful connection", flush=True) + else : + print(f"Connection error rc={rc}", flush=True) + mqttc.subscribe("/agent/#",0) + +def on_disconnect(mqttc, obj, flags, rc): + print("disconnected rc: "+str(rc), flush=True) + +def on_message(mqttc, obj, msg): + print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True) + try: + print(f"Trying decode of {msg.payload[:60]}",flush=True) + api_msg = json.loads(msg.payload) + except Exception as e: + print(e,flush=True) + return + ts = api_msg["timestamp"] + mtype = api_msg["type"] + print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True) + now = time.time() + print(f"Current {now} -> Delay {now-ts}", flush=True) + +def on_publish(mqttc, obj, mid): + print("mid: "+str(mid), flush=True) + +def on_subscribe(mqttc, obj, mid, granted_qos): + print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True) + +def on_log(mqttc, obj, level, string): + print(string) + +print(f"Starting paho-inspection on {sys.argv[1]}", flush=True) +mqttc = mqtt.Client(transport='websockets',client_id="paho") +#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +mqttc.tls_insecure_set(True) +mqttc.on_message = on_message +mqttc.on_connect = on_connect +mqttc.on_disconnect = on_disconnect +mqttc.on_publish = on_publish +mqttc.on_subscribe = on_subscribe +mqttc.username_pw_set("paho","paho") +mqttc.connect(sys.argv[1], 8443, 60) + +#mqttc.publish("/agent/mine","Test1") +#mqttc.subscribe("$SYS/#", 0) +print("Connected succesfully, monitoring /agent/#", flush=True) +mqttc.loop_forever() diff --git a/aclk/tests/paho.Dockerfile b/aclk/tests/paho.Dockerfile new file mode 100644 index 0000000000..d67cc4cb0c --- /dev/null +++ b/aclk/tests/paho.Dockerfile @@ -0,0 +1,14 @@ +FROM archlinux/base:latest + +RUN pacman -Syyu --noconfirm +RUN pacman --noconfirm --needed -S python-pip + +RUN pip install paho-mqtt + +RUN mkdir -p /opt/paho +COPY paho-inspection.py /opt/paho/ + +WORKDIR /opt/paho +ARG HOST_HOSTNAME +RUN echo $HOST_HOSTNAME >host +CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"] |