summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile.am2
-rw-r--r--aclk/aclk_lws_wss_client.c337
-rw-r--r--aclk/aclk_lws_wss_client.h81
-rw-r--r--aclk/agent_cloud_link.c10
-rw-r--r--aclk/agent_cloud_link.h2
-rw-r--r--aclk/mqtt.c142
-rw-r--r--aclk/mqtt.h4
-rw-r--r--libnetdata/libnetdata.h2
-rw-r--r--libnetdata/log/log.h1
9 files changed, 569 insertions, 12 deletions
diff --git a/Makefile.am b/Makefile.am
index e3384b6f2a..985c85bd6b 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -463,6 +463,8 @@ ACLK_PLUGIN_FILES = \
aclk/agent_cloud_link.h \
aclk/mqtt.c \
aclk/mqtt.h \
+ aclk/aclk_lws_wss_client.c \
+ aclk/aclk_lws_wss_client.h \
$(NULL)
EXPORTING_ENGINE_FILES = \
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c
new file mode 100644
index 0000000000..a4a21f45bf
--- /dev/null
+++ b/aclk/aclk_lws_wss_client.c
@@ -0,0 +1,337 @@
+#include "aclk_lws_wss_client.h"
+
+#include "libnetdata/libnetdata.h"
+
+static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
+
+struct aclk_lws_wss_perconnect_data {
+ int todo;
+};
+
+struct lws_wss_packet_buffer {
+ unsigned char* data;
+ size_t data_size;
+ struct lws_wss_packet_buffer *next;
+};
+
+static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void* data, size_t size)
+{
+ struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
+ if(data) {
+ new->data = mallocz(LWS_PRE+size);
+ memcpy(new->data+LWS_PRE, data, size);
+ new->data_size = size;
+ }
+ return new;
+}
+
+static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item)
+{
+ struct lws_wss_packet_buffer *tail = *list;
+ if(!*list) {
+ *list = item;
+ return;
+ }
+ while(tail->next) {
+ tail = tail->next;
+ }
+ tail->next = item;
+}
+
+static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list)
+{
+ struct lws_wss_packet_buffer *ret = *list;
+ if(ret != NULL)
+ *list = ret->next;
+
+ return ret;
+}
+
+static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item)
+{
+ freez(item->data);
+ freez(item);
+}
+
+static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer)
+{
+ size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL);
+ if(elems > 0)
+ lws_ring_consume(ringbuffer, NULL, NULL, elems);
+}
+
+static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list)
+{
+ struct lws_wss_packet_buffer *i;
+ while((i = lws_wss_packet_buffer_pop(list)) != NULL) {
+ lws_wss_packet_buffer_free(i);
+ }
+ *list = NULL;
+}
+
+static inline void aclk_lws_wss_clear_io_buffers(struct aclk_lws_wss_engine_instance *inst)
+{
+ aclk_lws_mutex_lock(&inst->read_buf_mutex);
+ _aclk_lws_wss_read_buffer_clear(inst->read_ringbuffer);
+ aclk_lws_mutex_unlock(&inst->read_buf_mutex);
+ aclk_lws_mutex_lock(&inst->write_buf_mutex);
+ _aclk_lws_wss_write_buffer_clear(&inst->write_buffer_head);
+ aclk_lws_mutex_unlock(&inst->write_buf_mutex);
+}
+
+static const struct lws_protocols protocols[] = {
+ {
+ "aclk-wss",
+ aclk_lws_wss_callback,
+ sizeof(struct aclk_lws_wss_perconnect_data),
+ 0, 0, 0, 0
+ },
+ { NULL, NULL, 0, 0, 0, 0, 0 }
+};
+
+static void aclk_lws_wss_log_divert(int level, const char *line) {
+ switch(level){
+ case LLL_ERR:
+ error("Libwebsockets Error: %s", line);
+ break;
+ case LLL_WARN:
+ debug(D_ACLK, "Libwebsockets Warn: %s", line);
+ break;
+ default:
+ error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line);
+ }
+}
+
+struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port) {
+ static int lws_logging_initialized = 0;
+ struct lws_context_creation_info info;
+ struct aclk_lws_wss_engine_instance *inst;
+
+ if(unlikely(!lws_logging_initialized)) {
+ lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert);
+ lws_logging_initialized = 1;
+ }
+
+ if(!callbacks || !target_hostname)
+ return NULL;
+
+ inst = callocz(1, sizeof(struct aclk_lws_wss_engine_instance));
+
+ inst->host = target_hostname;
+ inst->port = target_port;
+
+ memset(&info, 0, sizeof(struct lws_context_creation_info));
+ info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ info.port = CONTEXT_PORT_NO_LISTEN;
+ info.protocols = protocols;
+ info.user = inst;
+
+ inst->lws_context = lws_create_context(&info);
+ if(!inst->lws_context)
+ goto failure_cleanup_2;
+
+ inst->callbacks = *callbacks;
+
+ aclk_lws_mutex_init(&inst->write_buf_mutex);
+ aclk_lws_mutex_init(&inst->read_buf_mutex);
+
+ inst->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL);
+ if(!inst->read_ringbuffer)
+ goto failure_cleanup;
+
+ return inst;
+
+failure_cleanup:
+ lws_context_destroy(inst->lws_context);
+failure_cleanup_2:
+ freez(inst);
+ return NULL;
+}
+
+void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst) {
+ lws_context_destroy(inst->lws_context);
+ inst->lws_context = NULL;
+ inst->lws_wsi = NULL;
+
+ aclk_lws_wss_clear_io_buffers(inst);
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+ pthread_mutex_destroy(&inst->write_buf_mutex);
+ pthread_mutex_destroy(&inst->read_buf_mutex);
+#endif
+}
+
+void _aclk_wss_connect(struct aclk_lws_wss_engine_instance *inst){
+ struct lws_client_connect_info i;
+
+ memset(&i, 0, sizeof(i));
+ i.context = inst->lws_context;
+ i.port = inst->port;
+ i.address = inst->host;
+ i.path = "/mqtt";
+ i.host = inst->host;
+ i.protocol = "mqtt";
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+ i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
+#else
+ i.ssl_connection = LCCSCF_USE_SSL;
+#endif
+ lws_client_connect_via_info(&i);
+}
+
+static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data, size_t len) {
+ if( lws_ring_insert(buffer, data, len) != len ) {
+ error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding.");
+ return 0;
+ }
+ return 1;
+}
+
+static int
+aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len)
+{
+ UNUSED(user);
+ struct aclk_lws_wss_engine_instance *inst = lws_context_user(lws_get_context(wsi));
+ struct lws_wss_packet_buffer *data;
+ int retval = 0;
+
+ if( !inst ) {
+ error("Callback received without any aclk_lws_wss_engine_instance!");
+ return -1;
+ }
+
+ if( inst->upstream_reconnect_request ) {
+ error("Closing lws connectino due to libmosquitto error.");
+ char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
+ lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char*)upstream_connection_error, strlen(upstream_connection_error));
+ retval = -1;
+ inst->upstream_reconnect_request = 0;
+ }
+
+ switch (reason) {
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ aclk_lws_mutex_lock(&inst->write_buf_mutex);
+ data = lws_wss_packet_buffer_pop(&inst->write_buffer_head);
+ if(likely(data)) {
+ lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY);
+ lws_wss_packet_buffer_free(data);
+ if(inst->write_buffer_head)
+ lws_callback_on_writable(inst->lws_wsi);
+ }
+ aclk_lws_mutex_unlock(&inst->write_buf_mutex);
+ break;
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ aclk_lws_mutex_lock(&inst->read_buf_mutex);
+ if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len))
+ retval = 1;
+ aclk_lws_mutex_unlock(&inst->read_buf_mutex);
+
+ if(likely(inst->callbacks.data_rcvd_callback))
+ // to future myself -> do not call this while read lock is active as it will eventually
+ // want to acquire same lock later in aclk_lws_wss_client_read() function
+ inst->callbacks.data_rcvd_callback();
+ else
+ inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read
+ break;
+ case LWS_CALLBACK_PROTOCOL_INIT:
+ //initial connection here
+ //later we will reconnect with delay od ACLK_LWS_WSS_RECONNECT_TIMEOUT
+ //in case this connection fails or drops
+ _aclk_wss_connect(inst);
+ break;
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ //TODO if already active make some error noise
+ //currently we expect only one connection per netdata
+ inst->lws_wsi = wsi;
+ break;
+#ifdef AUTO_RECONNECT_ON_LWS_LAYER
+ case LWS_CALLBACK_USER:
+ inst->reconnect_timeout_running = 0;
+ _aclk_wss_connect(inst);
+ break;
+#endif
+ case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
+ error("Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", inst->host, inst->port, (in ? (char*)in : "not given"));
+ /* FALLTHRU */
+ case LWS_CALLBACK_CLIENT_CLOSED:
+ case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
+#ifdef AUTO_RECONNECT_ON_LWS_LAYER
+ if(!inst->reconnect_timeout_running) {
+ lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
+ lws_get_protocol(wsi),
+ LWS_CALLBACK_USER, ACLK_LWS_WSS_RECONNECT_TIMEOUT);
+ inst->reconnect_timeout_running = 1;
+ }
+ /* FALLTHRU */
+#endif
+ //no break here on purpose we want to continue with LWS_CALLBACK_WSI_DESTROY
+ case LWS_CALLBACK_WSI_DESTROY:
+ aclk_lws_wss_clear_io_buffers(inst);
+ inst->lws_wsi = NULL;
+ inst->websocket_connection_up = 0;
+ break;
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ inst->websocket_connection_up = 1;
+ if(inst->callbacks.connection_established_callback)
+ inst->callbacks.connection_established_callback();
+ break;
+ default:
+ break;
+ }
+ return retval; //0-OK, other connection should be closed!
+}
+
+int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count)
+{
+ if(inst && inst->lws_wsi && inst->websocket_connection_up)
+ {
+ aclk_lws_mutex_lock(&inst->write_buf_mutex);
+ lws_wss_packet_buffer_append(&inst->write_buffer_head, lws_wss_packet_buffer_new(buf, count));
+ aclk_lws_mutex_unlock(&inst->write_buf_mutex);
+
+ lws_callback_on_writable(inst->lws_wsi);
+ return count;
+ }
+ return 0;
+}
+
+int aclk_lws_wss_client_read(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count)
+{
+ size_t data_to_be_read = count;
+
+ aclk_lws_mutex_lock(&inst->read_buf_mutex);
+ size_t readable_byte_count = lws_ring_get_count_waiting_elements(inst->read_ringbuffer, NULL);
+ if(unlikely(readable_byte_count == 0)) {
+ errno = EAGAIN;
+ data_to_be_read = -1;
+ goto abort;
+ }
+
+ if( readable_byte_count < data_to_be_read )
+ data_to_be_read = readable_byte_count;
+
+ data_to_be_read = lws_ring_consume(inst->read_ringbuffer, NULL, buf, data_to_be_read);
+ if(data_to_be_read == readable_byte_count)
+ inst->data_to_read = 0;
+
+abort:
+ aclk_lws_mutex_unlock(&inst->read_buf_mutex);
+ return data_to_be_read;
+}
+
+int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst)
+{
+ return lws_service(inst->lws_context, 0);
+}
+
+// in case the MQTT connection disconnect while lws transport is still operational
+// we should drop connection and reconnect
+// this function should be called when that happens to notify lws of that situation
+void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst)
+{
+ if(inst->lws_wsi && inst->websocket_connection_up) {
+ inst->upstream_reconnect_request = 1;
+ lws_callback_on_writable(inst->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written.
+ }
+} \ No newline at end of file
diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h
new file mode 100644
index 0000000000..00d51c3985
--- /dev/null
+++ b/aclk/aclk_lws_wss_client.h
@@ -0,0 +1,81 @@
+#ifndef ACLK_LWS_WSS_CLIENT_H
+#define ACLK_LWS_WSS_CLIENT_H
+
+#include <libwebsockets.h>
+
+#include "libnetdata/libnetdata.h"
+
+#define ACLK_LWS_WSS_RECONNECT_TIMEOUT 5
+
+// This is as define because ideally the ACLK at high level
+// can do mosqitto writes and reads only from one thread
+// which is cleaner implementation IMHO
+// in such case this mutexes are not necessarry and life
+// is simpler
+#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1
+
+#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES 128*1024
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+ #define aclk_lws_mutex_init(x) netdata_mutex_init(x)
+ #define aclk_lws_mutex_lock(x) netdata_mutex_lock(x)
+ #define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x)
+#else
+ #define aclk_lws_mutex_init(x)
+ #define aclk_lws_mutex_lock(x)
+ #define aclk_lws_mutex_unlock(x)
+#endif
+
+struct aclk_lws_wss_engine_callbacks {
+ void (*connection_established_callback)();
+ void (*data_rcvd_callback)();
+ void (*data_writable_callback)();
+};
+
+struct lws_wss_packet_buffer;
+
+struct aclk_lws_wss_engine_instance {
+ //target host/port for connection
+ const char *host;
+ int port;
+
+ //internal data
+ struct lws_context *lws_context;
+ struct lws *lws_wsi;
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+ netdata_mutex_t write_buf_mutex;
+ netdata_mutex_t read_buf_mutex;
+#endif
+
+ struct lws_wss_packet_buffer *write_buffer_head;
+ struct lws_ring *read_ringbuffer;
+
+ struct aclk_lws_wss_engine_callbacks callbacks;
+
+ //flags to be readed by engine user
+ int websocket_connection_up;
+
+// currently this is by default disabled
+// as decision has been made that reconnection
+// will have to be done from top layer
+// (after getting the new MQTT auth data)
+// for now i keep it here as it is usefull for
+// some of my internall testing
+#ifdef AUTO_RECONNECT_ON_LWS_LAYER
+ int reconnect_timeout_running;
+#endif
+ int data_to_read;
+ int upstream_reconnect_request;
+};
+
+struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port);
+void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst);
+
+int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count);
+int aclk_lws_wss_client_read (struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count);
+int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst);
+
+void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst);
+
+#endif \ No newline at end of file
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index dc90c6d14e..2f75a65009 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -94,6 +94,10 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
// Set when we have connection up and running from the connection callback
int aclk_connection_initialized = 0;
+// TODO modify previous comment if this stays this way
+// con_initialized means library is initialized and ready to be used
+// acklk_connected means there is actually an established connection
+int aclk_mqtt_connected = 0;
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -625,10 +629,10 @@ void *aclk_main(void *ptr)
continue;
}
- if (unlikely(!aclk_subscribed)) {
+ if (unlikely(!aclk_subscribed) && aclk_mqtt_connected) {
aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
}
- if (unlikely(!query_thread.thread)) {
+ if (unlikely(!query_thread.thread && aclk_mqtt_connected)) {
query_thread.thread = mallocz(sizeof(netdata_thread_t));
netdata_thread_create(
query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
@@ -763,7 +767,7 @@ int aclk_init(ACLK_INIT_ACTION action)
aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);
aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
- aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883);
+ aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 9002);
info("Maximum parallel outgoing messages %d", aclk_send_maximum);
info("Maximum parallel incoming messages %d", aclk_recv_maximum);
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index 626cdecfaf..1e4eb10872 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -63,7 +63,7 @@ void *aclk_main(void *ptr);
#define NETDATA_ACLK_HOOK \
{ \
- .name = "AgentCloudLink", \
+ .name = "ACLK_Main", \
.config_section = NULL, \
.config_name = NULL, \
.enabled = 1, \
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index e781ee9f91..dc034455bd 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -3,6 +3,7 @@
#include <libnetdata/json/json.h>
#include "../daemon/common.h"
#include "mqtt.h"
+#include "aclk_lws_wss_client.h"
void (*_on_connect)(void *ptr) = NULL;
void (*_on_disconnect)(void *ptr) = NULL;
@@ -104,6 +105,14 @@ void mqtt_message_callback(
}
+int lws_wss_client_initialized = 0;
+
+// This is not define because in future we might want to try plain
+// MQTT as fallback ?
+// e.g. try 1st MQTT-WSS, 2nd MQTT plain, 3rd https fallback...
+int mqtt_over_websockets = 1;
+struct aclk_lws_wss_engine_instance *lws_engine_instance = NULL;
+
void connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
(void) obj;
@@ -112,6 +121,7 @@ void connect_callback(struct mosquitto *mosq, void *obj, int rc)
info("Connection to cloud estabilished");
aclk_connection_initialized = 1;
+ aclk_mqtt_connected = 1;
_on_connect((void *) mosq);
return;
@@ -127,7 +137,12 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
// TODO: Keep the connection "alive" for now. The library will reconnect.
//mqtt_connection_initialized = 0;
+ aclk_mqtt_connected = 0;
_on_disconnect((void *) mosq);
+
+ if(mqtt_over_websockets && lws_engine_instance)
+ aclk_lws_wss_mqtt_layer_disconect_notif(lws_engine_instance);
+
//sleep_usec(USEC_PER_SEC * 5);
return;
}
@@ -141,7 +156,17 @@ void _show_mqtt_info()
info("Detected libmosquitto library version %d, %d.%d.%d",libmosq_version, libmosq_major, libmosq_minor, libmosq_revision);
}
-int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+size_t _mqtt_external_write_hook(void *buf, size_t count)
+{
+ return aclk_lws_wss_client_write(lws_engine_instance, buf, count);
+}
+
+size_t _mqtt_external_read_hook(void *buf, size_t count)
+{
+ return aclk_lws_wss_client_read(lws_engine_instance, buf, count);
+}
+
+int _mqtt_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
{
int rc;
int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
@@ -194,7 +219,7 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *
mosquitto_connect_callback_set(mosq, connect_callback);
mosquitto_disconnect_callback_set(mosq, disconnect_callback);
- mosquitto_username_pw_set(mosq, "anon", "anon");
+ mosquitto_username_pw_set(mosq, NULL, NULL);
rc = mosquitto_threaded_set(mosq, 1);
if (unlikely(rc != MOSQ_ERR_SUCCESS))
@@ -209,12 +234,21 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *
info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc));
#endif
- rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1);
+ if(!mqtt_over_websockets) {
+ rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1);
- if (unlikely(rc != MOSQ_ERR_SUCCESS))
- error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc));
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc));
+
+ mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL);
+ }
+
+ return rc;
+}
- mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL);
+int _link_mqtt_connect(char *aclk_hostname, int aclk_port)
+{
+ int rc;
rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
@@ -226,7 +260,83 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *
return rc;
}
-int _link_event_loop(int timeout)
+static inline void _link_mosquitto_write()
+{
+ int rc;
+
+ if(!mqtt_over_websockets)
+ return;
+
+ rc = mosquitto_loop_misc(mosq);
+ if(unlikely( rc != MOSQ_ERR_SUCCESS ))
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
+
+ if(likely( mosquitto_want_write(mosq) )) {
+ rc = mosquitto_loop_write(mosq, 1);
+ if( rc != MOSQ_ERR_SUCCESS )
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc));
+ }
+}
+
+void aclk_lws_connect_notif_callback(){
+ //the connection is done by LWS so this parameters dont matter
+ //ig MQTT over LWS is used
+ _link_mqtt_connect("doesntmatter", 12345);
+ _link_mosquitto_write();
+}
+
+void aclk_lws_data_received_callback(){
+ int rc = mosquitto_loop_read(mosq, 1);
+ if(rc != MOSQ_ERR_SUCCESS)
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc));
+}
+
+static const struct aclk_lws_wss_engine_callbacks aclk_lws_engine_callbacks = {
+ .connection_established_callback = aclk_lws_connect_notif_callback,
+ .data_rcvd_callback = aclk_lws_data_received_callback,
+ .data_writable_callback = NULL
+};
+
+int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+{
+ int rc;
+
+ if(mqtt_over_websockets) {
+ // we will connect when WebSocket connection is up
+ // based on callback
+ if(!lws_wss_client_initialized) {
+ lws_engine_instance = aclk_lws_wss_client_init(&aclk_lws_engine_callbacks, aclk_hostname, aclk_port);
+ aclk_lws_wss_service_loop(lws_engine_instance);
+ lws_wss_client_initialized = 1;
+ }
+ }
+
+ rc = _mqtt_lib_init(aclk_hostname, aclk_port, on_connect, on_disconnect);
+ if(rc != MOSQ_ERR_SUCCESS)
+ return rc;
+
+ if(mqtt_over_websockets) {
+ mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook);
+ return MOSQ_ERR_SUCCESS;
+ } else {
+ // if direct mqtt connection is used
+ // connect immediatelly
+ return _link_mqtt_connect(aclk_hostname, aclk_port);
+ }
+}
+
+static inline int _link_event_loop_wss()
+{
+ if(lws_engine_instance && lws_engine_instance->websocket_connection_up)
+ _link_mosquitto_write();
+
+ aclk_lws_wss_service_loop(lws_engine_instance);
+ // this is because if use LWS we don't want
+ // mqtt to reconnect by itself
+ return MOSQ_ERR_SUCCESS;
+}
+
+static inline int _link_event_loop_plain_mqtt(int timeout)
{
int rc;
@@ -245,6 +355,14 @@ int _link_event_loop(int timeout)
return rc;
}
+int _link_event_loop(int timeout)
+{
+ if(mqtt_over_websockets)
+ return _link_event_loop_wss();
+
+ return _link_event_loop_plain_mqtt(timeout);
+}
+
void _link_shutdown()
{
int rc;
@@ -261,6 +379,12 @@ void _link_shutdown()
mosquitto_destroy(mosq);
mosq = NULL;
+
+ if(lws_engine_instance) {
+ aclk_lws_wss_client_destroy(lws_engine_instance);
+ lws_engine_instance = NULL;
+ }
+
return;
}
@@ -281,6 +405,8 @@ int _link_subscribe(char *topic, int qos)
return 1;
}
+ _link_mosquitto_write();
+
return 0;
}
@@ -313,6 +439,8 @@ int _link_send_message(char *topic, char *message)
error("MQTT message failed : %s", mosquitto_strerror(rc));
}
+ _link_mosquitto_write();
+
return rc;
}
#endif \ No newline at end of file
diff --git a/aclk/mqtt.h b/aclk/mqtt.h
index 4ba3a30d90..fd51c2af70 100644
--- a/aclk/mqtt.h
+++ b/aclk/mqtt.h
@@ -15,7 +15,9 @@ int _link_subscribe(char *topic, int qos);
int _link_send_message(char *topic, char *message);
const char *_link_strerror(int rc);
-extern int aclk_connection_initialized;
int aclk_handle_cloud_request(char *);
+extern int aclk_connection_initialized;
+extern int aclk_mqtt_connected;
+
#endif //NETDATA_MQTT_H
diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h
index 27f67eafa1..7b04ca7cef 100644
--- a/libnetdata/libnetdata.h
+++ b/libnetdata/libnetdata.h
@@ -288,6 +288,8 @@ extern void recursive_config_double_dir_load(
#define BITS_IN_A_KILOBIT 1000
+/* misc. */
+#define UNUSED(x) (void)(x)
extern void netdata_cleanup_and_exit(int ret) NORETURN;
extern void send_statistics(const char *action, const char *action_result, const char *action_data);
diff --git a/libnetdata/log/log.h b/libnetdata/log/log.h
index 1522ef9aa8..582ebafe03 100644
--- a/libnetdata/log/log.h
+++ b/libnetdata/log/log.h
@@ -37,6 +37,7 @@
#define D_POLLFD 0x0000000020000000
#define D_STREAM 0x0000000040000000
#define D_RRDENGINE 0x0000000100000000
+#define D_ACLK 0x0000000200000000
#define D_SYSTEM 0x8000000000000000
//#define DEBUG (D_WEB_CLIENT_ACCESS|D_LISTENER|D_RRD_STATS)