diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2020-02-06 17:58:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-06 17:58:51 +0200 |
commit | b2b3c182548fe81e6d1c9a599b2571dabfdabcaa (patch) | |
tree | be40bfd7605ebf2b453114e2d904a0252abdfc21 /aclk/mqtt.c | |
parent | 9f6c2556ecbbcfeab5049b3299094796457c97b1 (diff) |
ACLK agent 1 (#7894)
* - Add initial mqtt support
* [WIP] Agent cloud link
- Setup main mqtt thread to connect to a broker using V5 of the MQTT protocol (TBD)
- Send alarms to "netdata/alarm"
- Add error checks to handle connection failures
- Add params for
Broker, port
Maximum concurrent sent / recev messages
- Dummy function to check claiming status
- Generic mqtt_send command to publish message to a base topic , sub topic
It will end up in the form base_topic/sub_topic
- Add host/port in the connection failure error message
* Test libmosquitto libs
* connect to broker locally (assume localhost:1883)
* subscribe to channel netdata/command
* Test try a reload command to trigger health reload
* publish alerts to netdata/alarm
* - Fix compile issues
* - Use sleep_usec instead of usleep
* - Delay reconnection on failure due to misconfiguration (high cpu usage)
* - Remove the TLS connection config
* - Fix NETDATA_MQTT_INITIALIZATION_SLEEP_WAIT to use seconds
* - Gather ACLK related code under aclk folder
- Add aclk_ functions for abstract layer
- Moved low level libs intergration in mqtt.c
* - Add README.md file with initial comment
* - Clean MQTT v5
* - Code cleanup
* - Remove alarm log for now
- Remove the heart beat
* - Remove message properties for V5
* - Remove message properties for V5 (header)
* Fixed the netdata target to use a local static version of libmosquitto.
The installer does not yet have steps to pull and build the local library.
cd project_root
git clone ssh://git@github.com/netdata/mosquitto mosquitto/
(cd mosquitto/lib && make) # Ignore the cpp error
This will leave mosquitto/lib/libmosquitto.a for the build process to use.
* - Fix compile issues with older < 1.6 libmosquitto lib
* - Enable alarm events to check it works
- Re arrange includes
- Rework topic to be agent/guid/. Actual id will be
returned by the is_agent_claimed
* - Add initial metadata info
- Added helper function in web_api
- Added a debug command (info)
* Update the claiming state to retrieve the claimed id.
* - Use define for constants like command and metadata topics
- Function to wait for initialization of the ACLK link
- New aclk_subscribe command with QOS parameter for the mqtt subscription
- Use the is_agent_claimed function to get the real claim id and use it to build the topics
that will be used for the cloud communication
- Change in netdata-claim.sh.in to write the claim id without a trailing \n
* - Use define for constants like command and metadata topics
- Function to wait for initialization of the ACLK link
- New aclk_subscribe command with QOS parameter for the mqtt subscription
- Use the is_agent_claimed function to get the real claim id and use it to build the topics
that will be used for the cloud communication
- Change in netdata-claim.sh.in to write the claim id without a trailing \n
* - Remove the alarm log for now
- Add code (but disabled) to send charts
* - Use dummy anon, anon as username and password for testing purposes
* - Use client id anon as well
* Testing without TLS
* Switching TLS back on to fix docker environment.
* - Added query processing
An incoming URL now calls web_client_api_request_v1_data to handle a request and push the results
back to the "data" topic
- Move the above processing from the message callback to the query handle loop
- Added helper "pause" , "resume" commands to stop and resume query processing to stress test loading the queue
with queries before executing them
- Changed the endpoint topics to "meta", and "cmd" (previously metadata and command)
* make info message follow protocol
* move metadata msg generation into new func
* move metadata msg generation into new func
* - Add metadata to the responses
- Add hook to queue chart changes on creation and dimensions
- Changed the queue mechanism to include delay for X seconds
- Add delayed submittion of charts to the cloud so that all DIMs are defined to avoid resubmission
* - Add additional data info for aclk_queue command
* - Use web_clinet_api_request_v1 to handle the incoming request
This will handle all requests coming from the cloud
* - Cleanup and aclk_query structure
- Add msg_id parameter
- Enable the incoming JSON request
- Enable the outgoing JSON response
* - Added new thread to handle query processing
- Add lock and cond wait to wakeup thread when queries are submitted
- Cleanup on the main init function
* - Add wait time on agent init, to allow for chart, alarms and other definitions to be completed.
- During the wait time, no queries will be queued
* - Send metadata on query thread init
- New generic create header function for the JSON response
- Pack info and charts into one message
- Modified chart to remove entries (test)
- Modified charts mod to remove entries e.g alarms and volatile info
- Change input to aclk_update_chart (RRDHOST / instead of hostname)
* - When a request fails, add to the payload
- We may need to handle in a different key
- Error check in json parsing
* - Add dummy aclk_update_alarm command
* - Move incoming request JSON parsing code away from mqtt.c
- Added #ifdef ACLK_ENABLE so that we can have code merged but disabled by default
- Added version in incoming and outgoing JSON dict
* - Disable code if ACLK_ENABLE is not defined
- Remove references to the mqtt (mosquitto) lib
- Add dummy stubs in mqtt.c for completeness if ACLK_ENABLE is not defined
* - Disable challenge sample code for now
* - Remove libmosquitto from makefile
* - Fix spaces in Makefile.am
- Remove ifdef to avoid warning from LGTM
* - Remove for now the code that builds an along log test message to send to the cloud
* - Add check for ACLK_ENABLE definition and avoid calling the chart update functions
* - Remove commented code
* - Move source files to the correct place (ACLK_PLUGIN_FILES)
* - Remove include file thats not needed
* - Remove include file thats not needed
- Add improved checks for load_claiming_state()
* - Fix error message. Used error() that also logs errno and message
* - Fix some codacy issues
* - Fix more codacy issues, code cleanup
* - Revert code to address codacy warnings
* - Revert spaces added in a previous commit by mistake
* clean up if/else nest
* print error if fopen fails
* minor - error already logs errno
* - Fix version formatting
* - Cleanup all ACLK related compiler warnings
- Re-arrange include files
- Removed unused defines
* - More compilation warnings fixed
- Bug with thread creation fixed
* - Add condition to skip compilation of the ACLK code entirely. Add env variable ACLK="yes" to enable
* - Add condition to skip the libmosquitto
* - Change feature flag from ACLK_ENABLE to ENABLE_ACLK in accordance with the rest of ENABLE_xx flags
- Typo in info message fix
Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com>
Co-authored-by: Timo <6674623+underhood@users.noreply.github.com>
Diffstat (limited to 'aclk/mqtt.c')
-rw-r--r-- | aclk/mqtt.c | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/aclk/mqtt.c b/aclk/mqtt.c new file mode 100644 index 0000000000..e781ee9f91 --- /dev/null +++ b/aclk/mqtt.c @@ -0,0 +1,318 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include <libnetdata/json/json.h> +#include "../daemon/common.h" +#include "mqtt.h" + +void (*_on_connect)(void *ptr) = NULL; +void (*_on_disconnect)(void *ptr) = NULL; +extern int cmdpause; + + +#ifndef ENABLE_ACLK + +inline const char *_link_strerror(int rc) +{ + (void) rc; + return "no error"; +} + +int _link_event_loop(int timeout) +{ + (void) timeout; + return 0; +} + +int _link_send_message(char *topic, char *message) +{ + (void) topic; + (void) message; + return 0; +} + +int _link_subscribe(char *topic, int qos) +{ + (void) topic; + (void) qos; + return 0; +} + +void _link_shutdown() +{ + return; +} + +int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) +{ + (void) aclk_hostname; + (void) aclk_port; + (void) on_connect; + (void) on_disconnect; + return 0; +} + +#else +/* + * Just report the library info in the logfile for reference when issues arise + * + */ + +struct mosquitto *mosq = NULL; + +// Get a string description of the error + +inline const char *_link_strerror(int rc) +{ + return mosquitto_strerror(rc); +} + + +void mqtt_message_callback( + struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) +{ + (void) mosq; + (void) obj; + + // TODO: handle commands in a more efficient way, if we have many + + if (strcmp((char *)msg->payload, "pause") == 0) { + cmdpause = 1; + return; + } + + if (strcmp((char *)msg->payload, "resume") == 0) { + cmdpause = 0; + return; + } + + if (strcmp((char *)msg->payload, "reload") == 0) { + error_log_limit_unlimited(); + info("Reloading health configuration"); + health_reload(); + error_log_limit_reset(); + return; + } + + if (strcmp((char *)msg->payload, "info") == 0) { + aclk_send_metadata(); + return; + } + + aclk_handle_cloud_request(msg->payload); + + //info("Received type=[%s], msg-id=[%s], topic=[%s], url=[%s]",cloud_to_agent.type_id, cloud_to_agent.msg_id, cloud_to_agent.topic, cloud_to_agent.url); + +} + +void connect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + (void) obj; + (void) rc; + + info("Connection to cloud estabilished"); + + aclk_connection_initialized = 1; + _on_connect((void *) mosq); + + return; +} + + +void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + (void) obj; + (void) rc; + + info("Connection to cloud failed"); + // TODO: Keep the connection "alive" for now. The library will reconnect. + + //mqtt_connection_initialized = 0; + _on_disconnect((void *) mosq); + //sleep_usec(USEC_PER_SEC * 5); + return; +} + + +void _show_mqtt_info() +{ + int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + + info("Detected libmosquitto library version %d, %d.%d.%d",libmosq_version, libmosq_major, libmosq_minor, libmosq_revision); +} + +int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) +{ + int rc; + int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + char *ca_crt; + char *server_crt; + char *server_key; + + // show library info so can have in in the logfile + libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + ca_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link cert", "*"); + server_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link server cert", "*"); + server_key = config_get(CONFIG_SECTION_ACLK, "agent cloud link server key", "*"); + + + if (ca_crt[0] == '*') { + freez(ca_crt); + ca_crt = NULL; + } + + if (server_crt[0] == '*') { + freez(server_crt); + server_crt = NULL; + } + + if (server_key[0] == '*') { + freez(server_key); + server_key = NULL; + } + + info( + "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, + libmosq_revision); + + rc = mosquitto_lib_init(); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("Failed to initialize MQTT (libmosquitto library)"); + return 1; + } + + mosq = mosquitto_new("anon", true, NULL); + if (unlikely(!mosq)) { + mosquitto_lib_cleanup(); + error("MQTT new structure -- %s", mosquitto_strerror(errno)); + return 1; + } + + _on_connect = on_connect; + _on_disconnect = on_disconnect; + + mosquitto_connect_callback_set(mosq, connect_callback); + mosquitto_disconnect_callback_set(mosq, disconnect_callback); + + mosquitto_username_pw_set(mosq, "anon", "anon"); + + rc = mosquitto_threaded_set(mosq, 1); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc)); + +#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000 + rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc)); + + rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1); + info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); +#endif + + rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc)); + + mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL); + + rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Connect %s MQTT status = %d (%s)", aclk_hostname, rc, mosquitto_strerror(rc)); + else + info("Establishing MQTT link to %s", aclk_hostname); + + return rc; +} + +int _link_event_loop(int timeout) +{ + int rc; + + rc = mosquitto_loop(mosq, timeout, 1); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + errno = 0; + error("Loop error code %d (%s)", rc, mosquitto_strerror(rc)); + rc = mosquitto_reconnect(mosq); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("Reconnect loop error code %d (%s)", rc, mosquitto_strerror(rc)); + } + // TBD: Using delay + sleep_usec(USEC_PER_SEC * 10); + } + return rc; +} + +void _link_shutdown() +{ + int rc; + + rc = mosquitto_disconnect(mosq); + switch (rc) { + case MOSQ_ERR_SUCCESS: + info("MQTT disconnected from broker"); + break; + default: + info("MQTT invalid structure"); + break; + }; + + mosquitto_destroy(mosq); + mosq = NULL; + return; +} + + +int _link_subscribe(char *topic, int qos) +{ + int rc; + + if (unlikely(!mosq)) + return 1; + + mosquitto_message_callback_set(mosq, mqtt_message_callback); + + rc = mosquitto_subscribe(mosq, NULL, topic, qos); + if (unlikely(rc)) { + errno = 0; + error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc)); + return 1; + } + + return 0; +} + + +/* + * Send a message to the cloud to specific topic + * + */ + +int _link_send_message(char *topic, char *message) +{ + int rc; + + rc = mosquitto_pub_topic_check(topic); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + return rc; + + int msg_len = strlen(message); + + // TODO: handle encoding validation -- the message should be UFT8 encoded by the sender + //rc = mosquitto_validate_utf8(message, msg_len); + //if (unlikely(rc != MOSQ_ERR_SUCCESS)) + // return rc; + + rc = mosquitto_publish(mosq, NULL, topic, msg_len, message, ACLK_QOS, 0); + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("MQTT message failed : %s", mosquitto_strerror(rc)); + } + + return rc; +} +#endif
\ No newline at end of file |