diff options
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | aclk/aclk_lws_wss_client.c | 337 | ||||
-rw-r--r-- | aclk/aclk_lws_wss_client.h | 81 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 10 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 2 | ||||
-rw-r--r-- | aclk/mqtt.c | 142 | ||||
-rw-r--r-- | aclk/mqtt.h | 4 | ||||
-rw-r--r-- | libnetdata/libnetdata.h | 2 | ||||
-rw-r--r-- | libnetdata/log/log.h | 1 |
9 files changed, 569 insertions, 12 deletions
diff --git a/Makefile.am b/Makefile.am index e3384b6f2a..985c85bd6b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -463,6 +463,8 @@ ACLK_PLUGIN_FILES = \ aclk/agent_cloud_link.h \ aclk/mqtt.c \ aclk/mqtt.h \ + aclk/aclk_lws_wss_client.c \ + aclk/aclk_lws_wss_client.h \ $(NULL) EXPORTING_ENGINE_FILES = \ diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c new file mode 100644 index 0000000000..a4a21f45bf --- /dev/null +++ b/aclk/aclk_lws_wss_client.c @@ -0,0 +1,337 @@ +#include "aclk_lws_wss_client.h" + +#include "libnetdata/libnetdata.h" + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + +struct aclk_lws_wss_perconnect_data { + int todo; +}; + +struct lws_wss_packet_buffer { + unsigned char* data; + size_t data_size; + struct lws_wss_packet_buffer *next; +}; + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void* data, size_t size) +{ + struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); + if(data) { + new->data = mallocz(LWS_PRE+size); + memcpy(new->data+LWS_PRE, data, size); + new->data_size = size; + } + return new; +} + +static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item) +{ + struct lws_wss_packet_buffer *tail = *list; + if(!*list) { + *list = item; + return; + } + while(tail->next) { + tail = tail->next; + } + tail->next = item; +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *ret = *list; + if(ret != NULL) + *list = ret->next; + + return ret; +} + +static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item) +{ + freez(item->data); + freez(item); +} + +static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer) +{ + size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL); + if(elems > 0) + lws_ring_consume(ringbuffer, NULL, NULL, elems); +} + +static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *i; + while((i = lws_wss_packet_buffer_pop(list)) != NULL) { + lws_wss_packet_buffer_free(i); + } + *list = NULL; +} + +static inline void aclk_lws_wss_clear_io_buffers(struct aclk_lws_wss_engine_instance *inst) +{ + aclk_lws_mutex_lock(&inst->read_buf_mutex); + _aclk_lws_wss_read_buffer_clear(inst->read_ringbuffer); + aclk_lws_mutex_unlock(&inst->read_buf_mutex); + aclk_lws_mutex_lock(&inst->write_buf_mutex); + _aclk_lws_wss_write_buffer_clear(&inst->write_buffer_head); + aclk_lws_mutex_unlock(&inst->write_buf_mutex); +} + +static const struct lws_protocols protocols[] = { + { + "aclk-wss", + aclk_lws_wss_callback, + sizeof(struct aclk_lws_wss_perconnect_data), + 0, 0, 0, 0 + }, + { NULL, NULL, 0, 0, 0, 0, 0 } +}; + +static void aclk_lws_wss_log_divert(int level, const char *line) { + switch(level){ + case LLL_ERR: + error("Libwebsockets Error: %s", line); + break; + case LLL_WARN: + debug(D_ACLK, "Libwebsockets Warn: %s", line); + break; + default: + error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line); + } +} + +struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port) { + static int lws_logging_initialized = 0; + struct lws_context_creation_info info; + struct aclk_lws_wss_engine_instance *inst; + + if(unlikely(!lws_logging_initialized)) { + lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); + lws_logging_initialized = 1; + } + + if(!callbacks || !target_hostname) + return NULL; + + inst = callocz(1, sizeof(struct aclk_lws_wss_engine_instance)); + + inst->host = target_hostname; + inst->port = target_port; + + memset(&info, 0, sizeof(struct lws_context_creation_info)); + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + info.user = inst; + + inst->lws_context = lws_create_context(&info); + if(!inst->lws_context) + goto failure_cleanup_2; + + inst->callbacks = *callbacks; + + aclk_lws_mutex_init(&inst->write_buf_mutex); + aclk_lws_mutex_init(&inst->read_buf_mutex); + + inst->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL); + if(!inst->read_ringbuffer) + goto failure_cleanup; + + return inst; + +failure_cleanup: + lws_context_destroy(inst->lws_context); +failure_cleanup_2: + freez(inst); + return NULL; +} + +void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst) { + lws_context_destroy(inst->lws_context); + inst->lws_context = NULL; + inst->lws_wsi = NULL; + + aclk_lws_wss_clear_io_buffers(inst); + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + pthread_mutex_destroy(&inst->write_buf_mutex); + pthread_mutex_destroy(&inst->read_buf_mutex); +#endif +} + +void _aclk_wss_connect(struct aclk_lws_wss_engine_instance *inst){ + struct lws_client_connect_info i; + + memset(&i, 0, sizeof(i)); + i.context = inst->lws_context; + i.port = inst->port; + i.address = inst->host; + i.path = "/mqtt"; + i.host = inst->host; + i.protocol = "mqtt"; +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; +#else + i.ssl_connection = LCCSCF_USE_SSL; +#endif + lws_client_connect_via_info(&i); +} + +static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data, size_t len) { + if( lws_ring_insert(buffer, data, len) != len ) { + error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding."); + return 0; + } + return 1; +} + +static int +aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + UNUSED(user); + struct aclk_lws_wss_engine_instance *inst = lws_context_user(lws_get_context(wsi)); + struct lws_wss_packet_buffer *data; + int retval = 0; + + if( !inst ) { + error("Callback received without any aclk_lws_wss_engine_instance!"); + return -1; + } + + if( inst->upstream_reconnect_request ) { + error("Closing lws connectino due to libmosquitto error."); + char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; + lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char*)upstream_connection_error, strlen(upstream_connection_error)); + retval = -1; + inst->upstream_reconnect_request = 0; + } + + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + aclk_lws_mutex_lock(&inst->write_buf_mutex); + data = lws_wss_packet_buffer_pop(&inst->write_buffer_head); + if(likely(data)) { + lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY); + lws_wss_packet_buffer_free(data); + if(inst->write_buffer_head) + lws_callback_on_writable(inst->lws_wsi); + } + aclk_lws_mutex_unlock(&inst->write_buf_mutex); + break; + case LWS_CALLBACK_CLIENT_RECEIVE: + aclk_lws_mutex_lock(&inst->read_buf_mutex); + if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len)) + retval = 1; + aclk_lws_mutex_unlock(&inst->read_buf_mutex); + + if(likely(inst->callbacks.data_rcvd_callback)) + // to future myself -> do not call this while read lock is active as it will eventually + // want to acquire same lock later in aclk_lws_wss_client_read() function + inst->callbacks.data_rcvd_callback(); + else + inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read + break; + case LWS_CALLBACK_PROTOCOL_INIT: + //initial connection here + //later we will reconnect with delay od ACLK_LWS_WSS_RECONNECT_TIMEOUT + //in case this connection fails or drops + _aclk_wss_connect(inst); + break; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + //TODO if already active make some error noise + //currently we expect only one connection per netdata + inst->lws_wsi = wsi; + break; +#ifdef AUTO_RECONNECT_ON_LWS_LAYER + case LWS_CALLBACK_USER: + inst->reconnect_timeout_running = 0; + _aclk_wss_connect(inst); + break; +#endif + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + error("Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", inst->host, inst->port, (in ? (char*)in : "not given")); + /* FALLTHRU */ + case LWS_CALLBACK_CLIENT_CLOSED: + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: +#ifdef AUTO_RECONNECT_ON_LWS_LAYER + if(!inst->reconnect_timeout_running) { + lws_timed_callback_vh_protocol(lws_get_vhost(wsi), + lws_get_protocol(wsi), + LWS_CALLBACK_USER, ACLK_LWS_WSS_RECONNECT_TIMEOUT); + inst->reconnect_timeout_running = 1; + } + /* FALLTHRU */ +#endif + //no break here on purpose we want to continue with LWS_CALLBACK_WSI_DESTROY + case LWS_CALLBACK_WSI_DESTROY: + aclk_lws_wss_clear_io_buffers(inst); + inst->lws_wsi = NULL; + inst->websocket_connection_up = 0; + break; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + inst->websocket_connection_up = 1; + if(inst->callbacks.connection_established_callback) + inst->callbacks.connection_established_callback(); + break; + default: + break; + } + return retval; //0-OK, other connection should be closed! +} + +int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count) +{ + if(inst && inst->lws_wsi && inst->websocket_connection_up) + { + aclk_lws_mutex_lock(&inst->write_buf_mutex); + lws_wss_packet_buffer_append(&inst->write_buffer_head, lws_wss_packet_buffer_new(buf, count)); + aclk_lws_mutex_unlock(&inst->write_buf_mutex); + + lws_callback_on_writable(inst->lws_wsi); + return count; + } + return 0; +} + +int aclk_lws_wss_client_read(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count) +{ + size_t data_to_be_read = count; + + aclk_lws_mutex_lock(&inst->read_buf_mutex); + size_t readable_byte_count = lws_ring_get_count_waiting_elements(inst->read_ringbuffer, NULL); + if(unlikely(readable_byte_count == 0)) { + errno = EAGAIN; + data_to_be_read = -1; + goto abort; + } + + if( readable_byte_count < data_to_be_read ) + data_to_be_read = readable_byte_count; + + data_to_be_read = lws_ring_consume(inst->read_ringbuffer, NULL, buf, data_to_be_read); + if(data_to_be_read == readable_byte_count) + inst->data_to_read = 0; + +abort: + aclk_lws_mutex_unlock(&inst->read_buf_mutex); + return data_to_be_read; +} + +int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst) +{ + return lws_service(inst->lws_context, 0); +} + +// in case the MQTT connection disconnect while lws transport is still operational +// we should drop connection and reconnect +// this function should be called when that happens to notify lws of that situation +void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst) +{ + if(inst->lws_wsi && inst->websocket_connection_up) { + inst->upstream_reconnect_request = 1; + lws_callback_on_writable(inst->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written. + } +}
\ No newline at end of file diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h new file mode 100644 index 0000000000..00d51c3985 --- /dev/null +++ b/aclk/aclk_lws_wss_client.h @@ -0,0 +1,81 @@ +#ifndef ACLK_LWS_WSS_CLIENT_H +#define ACLK_LWS_WSS_CLIENT_H + +#include <libwebsockets.h> + +#include "libnetdata/libnetdata.h" + +#define ACLK_LWS_WSS_RECONNECT_TIMEOUT 5 + +// This is as define because ideally the ACLK at high level +// can do mosqitto writes and reads only from one thread +// which is cleaner implementation IMHO +// in such case this mutexes are not necessarry and life +// is simpler +#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1 + +#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES 128*1024 + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + #define aclk_lws_mutex_init(x) netdata_mutex_init(x) + #define aclk_lws_mutex_lock(x) netdata_mutex_lock(x) + #define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x) +#else + #define aclk_lws_mutex_init(x) + #define aclk_lws_mutex_lock(x) + #define aclk_lws_mutex_unlock(x) +#endif + +struct aclk_lws_wss_engine_callbacks { + void (*connection_established_callback)(); + void (*data_rcvd_callback)(); + void (*data_writable_callback)(); +}; + +struct lws_wss_packet_buffer; + +struct aclk_lws_wss_engine_instance { + //target host/port for connection + const char *host; + int port; + + //internal data + struct lws_context *lws_context; + struct lws *lws_wsi; + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + netdata_mutex_t write_buf_mutex; + netdata_mutex_t read_buf_mutex; +#endif + + struct lws_wss_packet_buffer *write_buffer_head; + struct lws_ring *read_ringbuffer; + + struct aclk_lws_wss_engine_callbacks callbacks; + + //flags to be readed by engine user + int websocket_connection_up; + +// currently this is by default disabled +// as decision has been made that reconnection +// will have to be done from top layer +// (after getting the new MQTT auth data) +// for now i keep it here as it is usefull for +// some of my internall testing +#ifdef AUTO_RECONNECT_ON_LWS_LAYER + int reconnect_timeout_running; +#endif + int data_to_read; + int upstream_reconnect_request; +}; + +struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port); +void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst); + +int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count); +int aclk_lws_wss_client_read (struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count); +int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst); + +void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst); + +#endif
\ No newline at end of file diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index dc90c6d14e..2f75a65009 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -94,6 +94,10 @@ int cloud_to_agent_parse(JSON_ENTRY *e) // Set when we have connection up and running from the connection callback int aclk_connection_initialized = 0; +// TODO modify previous comment if this stays this way +// con_initialized means library is initialized and ready to be used +// acklk_connected means there is actually an established connection +int aclk_mqtt_connected = 0; static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER; @@ -625,10 +629,10 @@ void *aclk_main(void *ptr) continue; } - if (unlikely(!aclk_subscribed)) { + if (unlikely(!aclk_subscribed) && aclk_mqtt_connected) { aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2); } - if (unlikely(!query_thread.thread)) { + if (unlikely(!query_thread.thread && aclk_mqtt_connected)) { query_thread.thread = mallocz(sizeof(netdata_thread_t)); netdata_thread_create( query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread); @@ -763,7 +767,7 @@ int aclk_init(ACLK_INIT_ACTION action) aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20); aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost"); - aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883); + aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 9002); info("Maximum parallel outgoing messages %d", aclk_send_maximum); info("Maximum parallel incoming messages %d", aclk_recv_maximum); diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h index 626cdecfaf..1e4eb10872 100644 --- a/aclk/agent_cloud_link.h +++ b/aclk/agent_cloud_link.h @@ -63,7 +63,7 @@ void *aclk_main(void *ptr); #define NETDATA_ACLK_HOOK \ { \ - .name = "AgentCloudLink", \ + .name = "ACLK_Main", \ .config_section = NULL, \ .config_name = NULL, \ .enabled = 1, \ diff --git a/aclk/mqtt.c b/aclk/mqtt.c index e781ee9f91..dc034455bd 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -3,6 +3,7 @@ #include <libnetdata/json/json.h> #include "../daemon/common.h" #include "mqtt.h" +#include "aclk_lws_wss_client.h" void (*_on_connect)(void *ptr) = NULL; void (*_on_disconnect)(void *ptr) = NULL; @@ -104,6 +105,14 @@ void mqtt_message_callback( } +int lws_wss_client_initialized = 0; + +// This is not define because in future we might want to try plain +// MQTT as fallback ? +// e.g. try 1st MQTT-WSS, 2nd MQTT plain, 3rd https fallback... +int mqtt_over_websockets = 1; +struct aclk_lws_wss_engine_instance *lws_engine_instance = NULL; + void connect_callback(struct mosquitto *mosq, void *obj, int rc) { (void) obj; @@ -112,6 +121,7 @@ void connect_callback(struct mosquitto *mosq, void *obj, int rc) info("Connection to cloud estabilished"); aclk_connection_initialized = 1; + aclk_mqtt_connected = 1; _on_connect((void *) mosq); return; @@ -127,7 +137,12 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) // TODO: Keep the connection "alive" for now. The library will reconnect. //mqtt_connection_initialized = 0; + aclk_mqtt_connected = 0; _on_disconnect((void *) mosq); + + if(mqtt_over_websockets && lws_engine_instance) + aclk_lws_wss_mqtt_layer_disconect_notif(lws_engine_instance); + //sleep_usec(USEC_PER_SEC * 5); return; } @@ -141,7 +156,17 @@ void _show_mqtt_info() info("Detected libmosquitto library version %d, %d.%d.%d",libmosq_version, libmosq_major, libmosq_minor, libmosq_revision); } -int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) +size_t _mqtt_external_write_hook(void *buf, size_t count) +{ + return aclk_lws_wss_client_write(lws_engine_instance, buf, count); +} + +size_t _mqtt_external_read_hook(void *buf, size_t count) +{ + return aclk_lws_wss_client_read(lws_engine_instance, buf, count); +} + +int _mqtt_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) { int rc; int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; @@ -194,7 +219,7 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void * mosquitto_connect_callback_set(mosq, connect_callback); mosquitto_disconnect_callback_set(mosq, disconnect_callback); - mosquitto_username_pw_set(mosq, "anon", "anon"); + mosquitto_username_pw_set(mosq, NULL, NULL); rc = mosquitto_threaded_set(mosq, 1); if (unlikely(rc != MOSQ_ERR_SUCCESS)) @@ -209,12 +234,21 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void * info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); #endif - rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1); + if(!mqtt_over_websockets) { + rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc)); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc)); + + mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL); + } + + return rc; +} - mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL); +int _link_mqtt_connect(char *aclk_hostname, int aclk_port) +{ + int rc; rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL); @@ -226,7 +260,83 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void * return rc; } -int _link_event_loop(int timeout) +static inline void _link_mosquitto_write() +{ + int rc; + + if(!mqtt_over_websockets) + return; + + rc = mosquitto_loop_misc(mosq); + if(unlikely( rc != MOSQ_ERR_SUCCESS )) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc)); + + if(likely( mosquitto_want_write(mosq) )) { + rc = mosquitto_loop_write(mosq, 1); + if( rc != MOSQ_ERR_SUCCESS ) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc)); + } +} + +void aclk_lws_connect_notif_callback(){ + //the connection is done by LWS so this parameters dont matter + //ig MQTT over LWS is used + _link_mqtt_connect("doesntmatter", 12345); + _link_mosquitto_write(); +} + +void aclk_lws_data_received_callback(){ + int rc = mosquitto_loop_read(mosq, 1); + if(rc != MOSQ_ERR_SUCCESS) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc)); +} + +static const struct aclk_lws_wss_engine_callbacks aclk_lws_engine_callbacks = { + .connection_established_callback = aclk_lws_connect_notif_callback, + .data_rcvd_callback = aclk_lws_data_received_callback, + .data_writable_callback = NULL +}; + +int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) +{ + int rc; + + if(mqtt_over_websockets) { + // we will connect when WebSocket connection is up + // based on callback + if(!lws_wss_client_initialized) { + lws_engine_instance = aclk_lws_wss_client_init(&aclk_lws_engine_callbacks, aclk_hostname, aclk_port); + aclk_lws_wss_service_loop(lws_engine_instance); + lws_wss_client_initialized = 1; + } + } + + rc = _mqtt_lib_init(aclk_hostname, aclk_port, on_connect, on_disconnect); + if(rc != MOSQ_ERR_SUCCESS) + return rc; + + if(mqtt_over_websockets) { + mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook); + return MOSQ_ERR_SUCCESS; + } else { + // if direct mqtt connection is used + // connect immediatelly + return _link_mqtt_connect(aclk_hostname, aclk_port); + } +} + +static inline int _link_event_loop_wss() +{ + if(lws_engine_instance && lws_engine_instance->websocket_connection_up) + _link_mosquitto_write(); + + aclk_lws_wss_service_loop(lws_engine_instance); + // this is because if use LWS we don't want + // mqtt to reconnect by itself + return MOSQ_ERR_SUCCESS; +} + +static inline int _link_event_loop_plain_mqtt(int timeout) { int rc; @@ -245,6 +355,14 @@ int _link_event_loop(int timeout) return rc; } +int _link_event_loop(int timeout) +{ + if(mqtt_over_websockets) + return _link_event_loop_wss(); + + return _link_event_loop_plain_mqtt(timeout); +} + void _link_shutdown() { int rc; @@ -261,6 +379,12 @@ void _link_shutdown() mosquitto_destroy(mosq); mosq = NULL; + + if(lws_engine_instance) { + aclk_lws_wss_client_destroy(lws_engine_instance); + lws_engine_instance = NULL; + } + return; } @@ -281,6 +405,8 @@ int _link_subscribe(char *topic, int qos) return 1; } + _link_mosquitto_write(); + return 0; } @@ -313,6 +439,8 @@ int _link_send_message(char *topic, char *message) error("MQTT message failed : %s", mosquitto_strerror(rc)); } + _link_mosquitto_write(); + return rc; } #endif
\ No newline at end of file diff --git a/aclk/mqtt.h b/aclk/mqtt.h index 4ba3a30d90..fd51c2af70 100644 --- a/aclk/mqtt.h +++ b/aclk/mqtt.h @@ -15,7 +15,9 @@ int _link_subscribe(char *topic, int qos); int _link_send_message(char *topic, char *message); const char *_link_strerror(int rc); -extern int aclk_connection_initialized; int aclk_handle_cloud_request(char *); +extern int aclk_connection_initialized; +extern int aclk_mqtt_connected; + #endif //NETDATA_MQTT_H diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h index 27f67eafa1..7b04ca7cef 100644 --- a/libnetdata/libnetdata.h +++ b/libnetdata/libnetdata.h @@ -288,6 +288,8 @@ extern void recursive_config_double_dir_load( #define BITS_IN_A_KILOBIT 1000 +/* misc. */ +#define UNUSED(x) (void)(x) extern void netdata_cleanup_and_exit(int ret) NORETURN; extern void send_statistics(const char *action, const char *action_result, const char *action_data); diff --git a/libnetdata/log/log.h b/libnetdata/log/log.h index 1522ef9aa8..582ebafe03 100644 --- a/libnetdata/log/log.h +++ b/libnetdata/log/log.h @@ -37,6 +37,7 @@ #define D_POLLFD 0x0000000020000000 #define D_STREAM 0x0000000040000000 #define D_RRDENGINE 0x0000000100000000 +#define D_ACLK 0x0000000200000000 #define D_SYSTEM 0x8000000000000000 //#define DEBUG (D_WEB_CLIENT_ACCESS|D_LISTENER|D_RRD_STATS) |