summaryrefslogtreecommitdiffstats
path: root/aclk/mqtt.c
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2020-02-06 17:58:51 +0200
committerGitHub <noreply@github.com>2020-02-06 17:58:51 +0200
commitb2b3c182548fe81e6d1c9a599b2571dabfdabcaa (patch)
treebe40bfd7605ebf2b453114e2d904a0252abdfc21 /aclk/mqtt.c
parent9f6c2556ecbbcfeab5049b3299094796457c97b1 (diff)
ACLK agent 1 (#7894)
* - Add initial mqtt support * [WIP] Agent cloud link - Setup main mqtt thread to connect to a broker using V5 of the MQTT protocol (TBD) - Send alarms to "netdata/alarm" - Add error checks to handle connection failures - Add params for Broker, port Maximum concurrent sent / recev messages - Dummy function to check claiming status - Generic mqtt_send command to publish message to a base topic , sub topic It will end up in the form base_topic/sub_topic - Add host/port in the connection failure error message * Test libmosquitto libs * connect to broker locally (assume localhost:1883) * subscribe to channel netdata/command * Test try a reload command to trigger health reload * publish alerts to netdata/alarm * - Fix compile issues * - Use sleep_usec instead of usleep * - Delay reconnection on failure due to misconfiguration (high cpu usage) * - Remove the TLS connection config * - Fix NETDATA_MQTT_INITIALIZATION_SLEEP_WAIT to use seconds * - Gather ACLK related code under aclk folder - Add aclk_ functions for abstract layer - Moved low level libs intergration in mqtt.c * - Add README.md file with initial comment * - Clean MQTT v5 * - Code cleanup * - Remove alarm log for now - Remove the heart beat * - Remove message properties for V5 * - Remove message properties for V5 (header) * Fixed the netdata target to use a local static version of libmosquitto. The installer does not yet have steps to pull and build the local library. cd project_root git clone ssh://git@github.com/netdata/mosquitto mosquitto/ (cd mosquitto/lib && make) # Ignore the cpp error This will leave mosquitto/lib/libmosquitto.a for the build process to use. * - Fix compile issues with older < 1.6 libmosquitto lib * - Enable alarm events to check it works - Re arrange includes - Rework topic to be agent/guid/. Actual id will be returned by the is_agent_claimed * - Add initial metadata info - Added helper function in web_api - Added a debug command (info) * Update the claiming state to retrieve the claimed id. * - Use define for constants like command and metadata topics - Function to wait for initialization of the ACLK link - New aclk_subscribe command with QOS parameter for the mqtt subscription - Use the is_agent_claimed function to get the real claim id and use it to build the topics that will be used for the cloud communication - Change in netdata-claim.sh.in to write the claim id without a trailing \n * - Use define for constants like command and metadata topics - Function to wait for initialization of the ACLK link - New aclk_subscribe command with QOS parameter for the mqtt subscription - Use the is_agent_claimed function to get the real claim id and use it to build the topics that will be used for the cloud communication - Change in netdata-claim.sh.in to write the claim id without a trailing \n * - Remove the alarm log for now - Add code (but disabled) to send charts * - Use dummy anon, anon as username and password for testing purposes * - Use client id anon as well * Testing without TLS * Switching TLS back on to fix docker environment. * - Added query processing An incoming URL now calls web_client_api_request_v1_data to handle a request and push the results back to the "data" topic - Move the above processing from the message callback to the query handle loop - Added helper "pause" , "resume" commands to stop and resume query processing to stress test loading the queue with queries before executing them - Changed the endpoint topics to "meta", and "cmd" (previously metadata and command) * make info message follow protocol * move metadata msg generation into new func * move metadata msg generation into new func * - Add metadata to the responses - Add hook to queue chart changes on creation and dimensions - Changed the queue mechanism to include delay for X seconds - Add delayed submittion of charts to the cloud so that all DIMs are defined to avoid resubmission * - Add additional data info for aclk_queue command * - Use web_clinet_api_request_v1 to handle the incoming request This will handle all requests coming from the cloud * - Cleanup and aclk_query structure - Add msg_id parameter - Enable the incoming JSON request - Enable the outgoing JSON response * - Added new thread to handle query processing - Add lock and cond wait to wakeup thread when queries are submitted - Cleanup on the main init function * - Add wait time on agent init, to allow for chart, alarms and other definitions to be completed. - During the wait time, no queries will be queued * - Send metadata on query thread init - New generic create header function for the JSON response - Pack info and charts into one message - Modified chart to remove entries (test) - Modified charts mod to remove entries e.g alarms and volatile info - Change input to aclk_update_chart (RRDHOST / instead of hostname) * - When a request fails, add to the payload - We may need to handle in a different key - Error check in json parsing * - Add dummy aclk_update_alarm command * - Move incoming request JSON parsing code away from mqtt.c - Added #ifdef ACLK_ENABLE so that we can have code merged but disabled by default - Added version in incoming and outgoing JSON dict * - Disable code if ACLK_ENABLE is not defined - Remove references to the mqtt (mosquitto) lib - Add dummy stubs in mqtt.c for completeness if ACLK_ENABLE is not defined * - Disable challenge sample code for now * - Remove libmosquitto from makefile * - Fix spaces in Makefile.am - Remove ifdef to avoid warning from LGTM * - Remove for now the code that builds an along log test message to send to the cloud * - Add check for ACLK_ENABLE definition and avoid calling the chart update functions * - Remove commented code * - Move source files to the correct place (ACLK_PLUGIN_FILES) * - Remove include file thats not needed * - Remove include file thats not needed - Add improved checks for load_claiming_state() * - Fix error message. Used error() that also logs errno and message * - Fix some codacy issues * - Fix more codacy issues, code cleanup * - Revert code to address codacy warnings * - Revert spaces added in a previous commit by mistake * clean up if/else nest * print error if fopen fails * minor - error already logs errno * - Fix version formatting * - Cleanup all ACLK related compiler warnings - Re-arrange include files - Removed unused defines * - More compilation warnings fixed - Bug with thread creation fixed * - Add condition to skip compilation of the ACLK code entirely. Add env variable ACLK="yes" to enable * - Add condition to skip the libmosquitto * - Change feature flag from ACLK_ENABLE to ENABLE_ACLK in accordance with the rest of ENABLE_xx flags - Typo in info message fix Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com> Co-authored-by: Timo <6674623+underhood@users.noreply.github.com>
Diffstat (limited to 'aclk/mqtt.c')
-rw-r--r--aclk/mqtt.c318
1 files changed, 318 insertions, 0 deletions
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
new file mode 100644
index 0000000000..e781ee9f91
--- /dev/null
+++ b/aclk/mqtt.c
@@ -0,0 +1,318 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <libnetdata/json/json.h>
+#include "../daemon/common.h"
+#include "mqtt.h"
+
+void (*_on_connect)(void *ptr) = NULL;
+void (*_on_disconnect)(void *ptr) = NULL;
+extern int cmdpause;
+
+
+#ifndef ENABLE_ACLK
+
+inline const char *_link_strerror(int rc)
+{
+ (void) rc;
+ return "no error";
+}
+
+int _link_event_loop(int timeout)
+{
+ (void) timeout;
+ return 0;
+}
+
+int _link_send_message(char *topic, char *message)
+{
+ (void) topic;
+ (void) message;
+ return 0;
+}
+
+int _link_subscribe(char *topic, int qos)
+{
+ (void) topic;
+ (void) qos;
+ return 0;
+}
+
+void _link_shutdown()
+{
+ return;
+}
+
+int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+{
+ (void) aclk_hostname;
+ (void) aclk_port;
+ (void) on_connect;
+ (void) on_disconnect;
+ return 0;
+}
+
+#else
+/*
+ * Just report the library info in the logfile for reference when issues arise
+ *
+ */
+
+struct mosquitto *mosq = NULL;
+
+// Get a string description of the error
+
+inline const char *_link_strerror(int rc)
+{
+ return mosquitto_strerror(rc);
+}
+
+
+void mqtt_message_callback(
+ struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
+{
+ (void) mosq;
+ (void) obj;
+
+ // TODO: handle commands in a more efficient way, if we have many
+
+ if (strcmp((char *)msg->payload, "pause") == 0) {
+ cmdpause = 1;
+ return;
+ }
+
+ if (strcmp((char *)msg->payload, "resume") == 0) {
+ cmdpause = 0;
+ return;
+ }
+
+ if (strcmp((char *)msg->payload, "reload") == 0) {
+ error_log_limit_unlimited();
+ info("Reloading health configuration");
+ health_reload();
+ error_log_limit_reset();
+ return;
+ }
+
+ if (strcmp((char *)msg->payload, "info") == 0) {
+ aclk_send_metadata();
+ return;
+ }
+
+ aclk_handle_cloud_request(msg->payload);
+
+ //info("Received type=[%s], msg-id=[%s], topic=[%s], url=[%s]",cloud_to_agent.type_id, cloud_to_agent.msg_id, cloud_to_agent.topic, cloud_to_agent.url);
+
+}
+
+void connect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ (void) obj;
+ (void) rc;
+
+ info("Connection to cloud estabilished");
+
+ aclk_connection_initialized = 1;
+ _on_connect((void *) mosq);
+
+ return;
+}
+
+
+void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ (void) obj;
+ (void) rc;
+
+ info("Connection to cloud failed");
+ // TODO: Keep the connection "alive" for now. The library will reconnect.
+
+ //mqtt_connection_initialized = 0;
+ _on_disconnect((void *) mosq);
+ //sleep_usec(USEC_PER_SEC * 5);
+ return;
+}
+
+
+void _show_mqtt_info()
+{
+ int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+ libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+
+ info("Detected libmosquitto library version %d, %d.%d.%d",libmosq_version, libmosq_major, libmosq_minor, libmosq_revision);
+}
+
+int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+{
+ int rc;
+ int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+ char *ca_crt;
+ char *server_crt;
+ char *server_key;
+
+ // show library info so can have in in the logfile
+ libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+ ca_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link cert", "*");
+ server_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link server cert", "*");
+ server_key = config_get(CONFIG_SECTION_ACLK, "agent cloud link server key", "*");
+
+
+ if (ca_crt[0] == '*') {
+ freez(ca_crt);
+ ca_crt = NULL;
+ }
+
+ if (server_crt[0] == '*') {
+ freez(server_crt);
+ server_crt = NULL;
+ }
+
+ if (server_key[0] == '*') {
+ freez(server_key);
+ server_key = NULL;
+ }
+
+ info(
+ "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
+ libmosq_revision);
+
+ rc = mosquitto_lib_init();
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ error("Failed to initialize MQTT (libmosquitto library)");
+ return 1;
+ }
+
+ mosq = mosquitto_new("anon", true, NULL);
+ if (unlikely(!mosq)) {
+ mosquitto_lib_cleanup();
+ error("MQTT new structure -- %s", mosquitto_strerror(errno));
+ return 1;
+ }
+
+ _on_connect = on_connect;
+ _on_disconnect = on_disconnect;
+
+ mosquitto_connect_callback_set(mosq, connect_callback);
+ mosquitto_disconnect_callback_set(mosq, disconnect_callback);
+
+ mosquitto_username_pw_set(mosq, "anon", "anon");
+
+ rc = mosquitto_threaded_set(mosq, 1);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc));
+
+#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000
+ rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc));
+
+ rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1);
+ info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc));
+#endif
+
+ rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc));
+
+ mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL);
+
+ rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("Connect %s MQTT status = %d (%s)", aclk_hostname, rc, mosquitto_strerror(rc));
+ else
+ info("Establishing MQTT link to %s", aclk_hostname);
+
+ return rc;
+}
+
+int _link_event_loop(int timeout)
+{
+ int rc;
+
+ rc = mosquitto_loop(mosq, timeout, 1);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ errno = 0;
+ error("Loop error code %d (%s)", rc, mosquitto_strerror(rc));
+ rc = mosquitto_reconnect(mosq);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ error("Reconnect loop error code %d (%s)", rc, mosquitto_strerror(rc));
+ }
+ // TBD: Using delay
+ sleep_usec(USEC_PER_SEC * 10);
+ }
+ return rc;
+}
+
+void _link_shutdown()
+{
+ int rc;
+
+ rc = mosquitto_disconnect(mosq);
+ switch (rc) {
+ case MOSQ_ERR_SUCCESS:
+ info("MQTT disconnected from broker");
+ break;
+ default:
+ info("MQTT invalid structure");
+ break;
+ };
+
+ mosquitto_destroy(mosq);
+ mosq = NULL;
+ return;
+}
+
+
+int _link_subscribe(char *topic, int qos)
+{
+ int rc;
+
+ if (unlikely(!mosq))
+ return 1;
+
+ mosquitto_message_callback_set(mosq, mqtt_message_callback);
+
+ rc = mosquitto_subscribe(mosq, NULL, topic, qos);
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc));
+ return 1;
+ }
+
+ return 0;
+}
+
+
+/*
+ * Send a message to the cloud to specific topic
+ *
+ */
+
+int _link_send_message(char *topic, char *message)
+{
+ int rc;
+
+ rc = mosquitto_pub_topic_check(topic);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ return rc;
+
+ int msg_len = strlen(message);
+
+ // TODO: handle encoding validation -- the message should be UFT8 encoded by the sender
+ //rc = mosquitto_validate_utf8(message, msg_len);
+ //if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ // return rc;
+
+ rc = mosquitto_publish(mosq, NULL, topic, msg_len, message, ACLK_QOS, 0);
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ error("MQTT message failed : %s", mosquitto_strerror(rc));
+ }
+
+ return rc;
+}
+#endif \ No newline at end of file