diff options
author | Timotej Šiškovič <6674623+underhood@users.noreply.github.com> | 2020-02-14 10:54:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-14 10:54:26 +0100 |
commit | f4e1012f5ffe1231c25e22f7350d2928b443c69f (patch) | |
tree | 1df86a02ed77533854e08d3dc78b389e93a225c5 /aclk/aclk_lws_wss_client.c | |
parent | ffbfdc44043e5920ee40fc9f2d9b17f8df518cbc (diff) |
initial MQTT over Secure Websockets support for ACLK (#7988)
* add aclk_lws_wss_client
* shorten the thread name in case more threads are necessary
* Draft libmosquitto<->libwebsockets integration
* use ringbuffer for recvd data
* Some code cleanup
* if mqtt connection fails close lws connection and reconect
* clear buffers on connection closed
* work on better loop integration
* move mosquitto read out of loop
* remove useless code when using websockets
* LWS - make host and port configurable
* make default port 9002 as we use MQTT over WSS now
* wait for link up before subscribing
start query thread after connection has been made
* cleanup - remove useless var
* if there is anything to write send it immediatelly
* cleanup: move buffers into engine instace
* allow MQTT IO from multiple threads (although preffered is MQTT IO to be done by single thread)
* add warning to future self
* add some comments for whoever reviews
* add destroy fnc - start work on cleanup
* minor - add mosquitto to .gitignore
* fix codacy errors
* do not reconnect automatically by default
* minor - remove outdated comment
* tab -> spaces
Co-Authored-By: Konstantinos Natsakis <5933427+knatsakis@users.noreply.github.com>
* address thiagoftsm valid comments
* add usefull logs in case of trouble
* fix -Wall -Wextra -Wformat-signedness warnings
* log error when connection fails
* update .gitignore to match new installer
* Fwd LWS logs to Netdata logs
* minor - tabulation fixes
* fix comments from thiago
* force SSL
* move UNUSED to libnetdata.h
@thiago correctly pointed out it might be usefull for others
* minor - rename function for clarity
* minor - remove commented out code
Co-authored-by: Konstantinos Natsakis <5933427+knatsakis@users.noreply.github.com>
Diffstat (limited to 'aclk/aclk_lws_wss_client.c')
-rw-r--r-- | aclk/aclk_lws_wss_client.c | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c new file mode 100644 index 0000000000..a4a21f45bf --- /dev/null +++ b/aclk/aclk_lws_wss_client.c @@ -0,0 +1,337 @@ +#include "aclk_lws_wss_client.h" + +#include "libnetdata/libnetdata.h" + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + +struct aclk_lws_wss_perconnect_data { + int todo; +}; + +struct lws_wss_packet_buffer { + unsigned char* data; + size_t data_size; + struct lws_wss_packet_buffer *next; +}; + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void* data, size_t size) +{ + struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); + if(data) { + new->data = mallocz(LWS_PRE+size); + memcpy(new->data+LWS_PRE, data, size); + new->data_size = size; + } + return new; +} + +static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item) +{ + struct lws_wss_packet_buffer *tail = *list; + if(!*list) { + *list = item; + return; + } + while(tail->next) { + tail = tail->next; + } + tail->next = item; +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *ret = *list; + if(ret != NULL) + *list = ret->next; + + return ret; +} + +static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item) +{ + freez(item->data); + freez(item); +} + +static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer) +{ + size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL); + if(elems > 0) + lws_ring_consume(ringbuffer, NULL, NULL, elems); +} + +static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *i; + while((i = lws_wss_packet_buffer_pop(list)) != NULL) { + lws_wss_packet_buffer_free(i); + } + *list = NULL; +} + +static inline void aclk_lws_wss_clear_io_buffers(struct aclk_lws_wss_engine_instance *inst) +{ + aclk_lws_mutex_lock(&inst->read_buf_mutex); + _aclk_lws_wss_read_buffer_clear(inst->read_ringbuffer); + aclk_lws_mutex_unlock(&inst->read_buf_mutex); + aclk_lws_mutex_lock(&inst->write_buf_mutex); + _aclk_lws_wss_write_buffer_clear(&inst->write_buffer_head); + aclk_lws_mutex_unlock(&inst->write_buf_mutex); +} + +static const struct lws_protocols protocols[] = { + { + "aclk-wss", + aclk_lws_wss_callback, + sizeof(struct aclk_lws_wss_perconnect_data), + 0, 0, 0, 0 + }, + { NULL, NULL, 0, 0, 0, 0, 0 } +}; + +static void aclk_lws_wss_log_divert(int level, const char *line) { + switch(level){ + case LLL_ERR: + error("Libwebsockets Error: %s", line); + break; + case LLL_WARN: + debug(D_ACLK, "Libwebsockets Warn: %s", line); + break; + default: + error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line); + } +} + +struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port) { + static int lws_logging_initialized = 0; + struct lws_context_creation_info info; + struct aclk_lws_wss_engine_instance *inst; + + if(unlikely(!lws_logging_initialized)) { + lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); + lws_logging_initialized = 1; + } + + if(!callbacks || !target_hostname) + return NULL; + + inst = callocz(1, sizeof(struct aclk_lws_wss_engine_instance)); + + inst->host = target_hostname; + inst->port = target_port; + + memset(&info, 0, sizeof(struct lws_context_creation_info)); + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + info.user = inst; + + inst->lws_context = lws_create_context(&info); + if(!inst->lws_context) + goto failure_cleanup_2; + + inst->callbacks = *callbacks; + + aclk_lws_mutex_init(&inst->write_buf_mutex); + aclk_lws_mutex_init(&inst->read_buf_mutex); + + inst->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL); + if(!inst->read_ringbuffer) + goto failure_cleanup; + + return inst; + +failure_cleanup: + lws_context_destroy(inst->lws_context); +failure_cleanup_2: + freez(inst); + return NULL; +} + +void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst) { + lws_context_destroy(inst->lws_context); + inst->lws_context = NULL; + inst->lws_wsi = NULL; + + aclk_lws_wss_clear_io_buffers(inst); + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + pthread_mutex_destroy(&inst->write_buf_mutex); + pthread_mutex_destroy(&inst->read_buf_mutex); +#endif +} + +void _aclk_wss_connect(struct aclk_lws_wss_engine_instance *inst){ + struct lws_client_connect_info i; + + memset(&i, 0, sizeof(i)); + i.context = inst->lws_context; + i.port = inst->port; + i.address = inst->host; + i.path = "/mqtt"; + i.host = inst->host; + i.protocol = "mqtt"; +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; +#else + i.ssl_connection = LCCSCF_USE_SSL; +#endif + lws_client_connect_via_info(&i); +} + +static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data, size_t len) { + if( lws_ring_insert(buffer, data, len) != len ) { + error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding."); + return 0; + } + return 1; +} + +static int +aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + UNUSED(user); + struct aclk_lws_wss_engine_instance *inst = lws_context_user(lws_get_context(wsi)); + struct lws_wss_packet_buffer *data; + int retval = 0; + + if( !inst ) { + error("Callback received without any aclk_lws_wss_engine_instance!"); + return -1; + } + + if( inst->upstream_reconnect_request ) { + error("Closing lws connectino due to libmosquitto error."); + char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; + lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char*)upstream_connection_error, strlen(upstream_connection_error)); + retval = -1; + inst->upstream_reconnect_request = 0; + } + + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + aclk_lws_mutex_lock(&inst->write_buf_mutex); + data = lws_wss_packet_buffer_pop(&inst->write_buffer_head); + if(likely(data)) { + lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY); + lws_wss_packet_buffer_free(data); + if(inst->write_buffer_head) + lws_callback_on_writable(inst->lws_wsi); + } + aclk_lws_mutex_unlock(&inst->write_buf_mutex); + break; + case LWS_CALLBACK_CLIENT_RECEIVE: + aclk_lws_mutex_lock(&inst->read_buf_mutex); + if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len)) + retval = 1; + aclk_lws_mutex_unlock(&inst->read_buf_mutex); + + if(likely(inst->callbacks.data_rcvd_callback)) + // to future myself -> do not call this while read lock is active as it will eventually + // want to acquire same lock later in aclk_lws_wss_client_read() function + inst->callbacks.data_rcvd_callback(); + else + inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read + break; + case LWS_CALLBACK_PROTOCOL_INIT: + //initial connection here + //later we will reconnect with delay od ACLK_LWS_WSS_RECONNECT_TIMEOUT + //in case this connection fails or drops + _aclk_wss_connect(inst); + break; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + //TODO if already active make some error noise + //currently we expect only one connection per netdata + inst->lws_wsi = wsi; + break; +#ifdef AUTO_RECONNECT_ON_LWS_LAYER + case LWS_CALLBACK_USER: + inst->reconnect_timeout_running = 0; + _aclk_wss_connect(inst); + break; +#endif + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + error("Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", inst->host, inst->port, (in ? (char*)in : "not given")); + /* FALLTHRU */ + case LWS_CALLBACK_CLIENT_CLOSED: + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: +#ifdef AUTO_RECONNECT_ON_LWS_LAYER + if(!inst->reconnect_timeout_running) { + lws_timed_callback_vh_protocol(lws_get_vhost(wsi), + lws_get_protocol(wsi), + LWS_CALLBACK_USER, ACLK_LWS_WSS_RECONNECT_TIMEOUT); + inst->reconnect_timeout_running = 1; + } + /* FALLTHRU */ +#endif + //no break here on purpose we want to continue with LWS_CALLBACK_WSI_DESTROY + case LWS_CALLBACK_WSI_DESTROY: + aclk_lws_wss_clear_io_buffers(inst); + inst->lws_wsi = NULL; + inst->websocket_connection_up = 0; + break; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + inst->websocket_connection_up = 1; + if(inst->callbacks.connection_established_callback) + inst->callbacks.connection_established_callback(); + break; + default: + break; + } + return retval; //0-OK, other connection should be closed! +} + +int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count) +{ + if(inst && inst->lws_wsi && inst->websocket_connection_up) + { + aclk_lws_mutex_lock(&inst->write_buf_mutex); + lws_wss_packet_buffer_append(&inst->write_buffer_head, lws_wss_packet_buffer_new(buf, count)); + aclk_lws_mutex_unlock(&inst->write_buf_mutex); + + lws_callback_on_writable(inst->lws_wsi); + return count; + } + return 0; +} + +int aclk_lws_wss_client_read(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count) +{ + size_t data_to_be_read = count; + + aclk_lws_mutex_lock(&inst->read_buf_mutex); + size_t readable_byte_count = lws_ring_get_count_waiting_elements(inst->read_ringbuffer, NULL); + if(unlikely(readable_byte_count == 0)) { + errno = EAGAIN; + data_to_be_read = -1; + goto abort; + } + + if( readable_byte_count < data_to_be_read ) + data_to_be_read = readable_byte_count; + + data_to_be_read = lws_ring_consume(inst->read_ringbuffer, NULL, buf, data_to_be_read); + if(data_to_be_read == readable_byte_count) + inst->data_to_read = 0; + +abort: + aclk_lws_mutex_unlock(&inst->read_buf_mutex); + return data_to_be_read; +} + +int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst) +{ + return lws_service(inst->lws_context, 0); +} + +// in case the MQTT connection disconnect while lws transport is still operational +// we should drop connection and reconnect +// this function should be called when that happens to notify lws of that situation +void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst) +{ + if(inst->lws_wsi && inst->websocket_connection_up) { + inst->upstream_reconnect_request = 1; + lws_callback_on_writable(inst->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written. + } +}
\ No newline at end of file |