diff options
42 files changed, 3905 insertions, 20 deletions
diff --git a/.github/workflows/build-and-install.yml b/.github/workflows/build-and-install.yml index 26a144acd1..c8b0c13697 100644 --- a/.github/workflows/build-and-install.yml +++ b/.github/workflows/build-and-install.yml @@ -12,6 +12,8 @@ jobs: steps: - name: Git clone repository uses: actions/checkout@v2 + with: + submodules: recursive - run: | git fetch --prune --unshallow --tags - name: Build @@ -98,6 +100,8 @@ jobs: steps: - name: Git clone repository uses: actions/checkout@v2 + with: + submodules: recursive - name: install-required-packages.sh on ${{ matrix.distro }} env: PRE: ${{ matrix.pre }} @@ -183,6 +187,8 @@ jobs: steps: - name: Git clone repository uses: actions/checkout@v2 + with: + submodules: recursive - name: install-required-packages.sh on ${{ matrix.distro }} env: PRE: ${{ matrix.pre }} diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index cf494e95c3..7225d3dbee 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -12,6 +12,8 @@ jobs: steps: - name: Git clone repository uses: actions/checkout@v2 + with: + submodules: recursive - name: Run checksum checks on kickstart files env: LOCAL_ONLY: "true" @@ -23,6 +25,8 @@ jobs: steps: - name: Git clone repository uses: actions/checkout@v2 + with: + submodules: recursive - name: Install required packages run: | ./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata @@ -43,6 +47,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Build run: > docker run -v "$PWD":/netdata -w /netdata alpine:latest /bin/sh -c @@ -68,6 +74,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Prepare environment run: | ./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata @@ -98,6 +106,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Prepare environment run: ./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata - name: Build netdata diff --git a/.github/workflows/coverity.yml b/.github/workflows/coverity.yml index 926257dc08..1b7847844d 100644 --- a/.github/workflows/coverity.yml +++ b/.github/workflows/coverity.yml @@ -15,6 +15,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Prepare environment env: DEBIAN_FRONTEND: 'noninteractive' diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 5d5e8b0a7b..c25620e1b2 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -26,6 +26,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Determine if we should push changes and which tags to use if: github.event_name == 'workflow_dispatch' && github.event.inputs.version != 'nightly' run: | diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 2a4fe87e4a..9f7234f92d 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -16,6 +16,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Run link check uses: gaurav-nelson/github-action-markdown-link-check@v1 with: diff --git a/.github/workflows/review.yml b/.github/workflows/review.yml index ca8f6de130..a267fea3f4 100644 --- a/.github/workflows/review.yml +++ b/.github/workflows/review.yml @@ -16,6 +16,7 @@ jobs: - name: Git clone repository uses: actions/checkout@v2 with: + submodules: recursive fetch-depth: 0 - name: Check files run: | @@ -57,6 +58,7 @@ jobs: - name: Git clone repository uses: actions/checkout@v2 with: + submodules: recursive fetch-depth: 0 - name: Check files run: | @@ -80,6 +82,7 @@ jobs: - name: Git clone repository uses: actions/checkout@v2 with: + submodules: recursive fetch-depth: 0 - name: Check files run: | diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ef6bfbc2ab..c166c74421 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,6 +21,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Prepare environment run: | ./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata-all @@ -39,6 +41,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v2 + with: + submodules: recursive - name: Prepare environment run: | ./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata-all @@ -48,7 +52,7 @@ jobs: - name: Configure run: | autoreconf -ivf - ./configure + ./configure --without-aclk-ng # XXX: Work-around for bug with libbson-1.0 in Ubuntu 18.04 # See: https://bugs.launchpad.net/ubuntu/+source/libmongoc/+bug/1790771 # https://jira.mongodb.org/browse/CDRIVER-2818 diff --git a/.github/workflows/updater.yml b/.github/workflows/updater.yml index 48e5ac116a..309d684ac7 100644 --- a/.github/workflows/updater.yml +++ b/.github/workflows/updater.yml @@ -34,6 +34,8 @@ jobs: steps: - name: Git clone repository uses: actions/checkout@v2 + with: + submodules: recursive - name: Install required packages & build tarball run: | ./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata-all diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..ef9349b389 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "mqtt_websockets"] + path = mqtt_websockets + url = https://github.com/underhood/mqtt_websockets.git diff --git a/Makefile.am b/Makefile.am index a9707cce2e..18828ea463 100644 --- a/Makefile.am +++ b/Makefile.am @@ -116,10 +116,18 @@ SUBDIRS += \ web \ claim \ parser \ - aclk/legacy \ spawn \ $(NULL) +if ACLK_NG +SUBDIRS += \ + mqtt_websockets \ + $(NULL) +else +SUBDIRS += \ + aclk/legacy \ + $(NULL) +endif AM_CFLAGS = \ $(OPTIONAL_MATH_CFLAGS) \ @@ -525,6 +533,28 @@ PARSER_FILES = \ parser/parser.h \ $(NULL) +if ACLK_NG +ACLK_FILES = \ + aclk/aclk.c \ + aclk/aclk.h \ + aclk/aclk_util.c \ + aclk/aclk_util.h \ + aclk/aclk_stats.c \ + aclk/aclk_stats.h \ + aclk/aclk_query.c \ + aclk/aclk_query.h \ + aclk/aclk_query_queue.c \ + aclk/aclk_query_queue.h \ + aclk/aclk_collector_list.c \ + aclk/aclk_collector_list.h \ + aclk/aclk_otp.c \ + aclk/aclk_otp.h \ + aclk/aclk_tx_msgs.c \ + aclk/aclk_tx_msgs.h \ + aclk/aclk_rx_msgs.c \ + aclk/aclk_rx_msgs.h \ + $(NULL) +else #ACLK_NG ACLK_FILES = \ aclk/legacy/aclk_rrdhost_state.h \ aclk/legacy/aclk_common.c \ @@ -548,9 +578,8 @@ ACLK_FILES += \ aclk/legacy/aclk_lws_https_client.c \ aclk/legacy/aclk_lws_https_client.h \ $(NULL) -endif - - +endif #ENABLE_ACLK +endif #ACLK_NG SPAWN_PLUGIN_FILES = \ spawn/spawn.c \ @@ -714,6 +743,12 @@ NETDATACLI_FILES = \ sbin_PROGRAMS += netdata netdata_SOURCES = $(NETDATA_FILES) +if ACLK_NG +netdata_LDADD = \ + mqtt_websockets/libmqttwebsockets.a \ + $(NETDATA_COMMON_LIBS) \ + $(NULL) +else #ACLK_NG if ENABLE_ACLK netdata_LDADD = \ externaldeps/mosquitto/libmosquitto.a \ @@ -721,11 +756,12 @@ netdata_LDADD = \ $(OPTIONAL_LWS_LIBS) \ $(NETDATA_COMMON_LIBS) \ $(NULL) -else +else #ENABLE_ACLK netdata_LDADD = \ $(NETDATA_COMMON_LIBS) \ $(NULL) -endif +endif #ENABLE_ACLK +endif #ACLK_NG if ENABLE_CXX_LINKER netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@ diff --git a/aclk/aclk.c b/aclk/aclk.c new file mode 100644 index 0000000000..f0cbac249e --- /dev/null +++ b/aclk/aclk.c @@ -0,0 +1,820 @@ +#include "aclk.h" + +#include "aclk_stats.h" +#include "mqtt_wss_client.h" +#include "aclk_otp.h" +#include "aclk_tx_msgs.h" +#include "aclk_query.h" +#include "aclk_query_queue.h" +#include "aclk_util.h" +#include "aclk_rx_msgs.h" +#include "aclk_collector_list.h" + +#ifdef ACLK_LOG_CONVERSATION_DIR +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#endif + +#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable + +//TODO remove most (as in 99.999999999%) of this crap +int aclk_connected = 0; +int aclk_disable_runtime = 0; +int aclk_disable_single_updates = 0; +int aclk_kill_link = 0; + +int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. + +usec_t aclk_session_us = 0; // Used by the mqtt layer +time_t aclk_session_sec = 0; // Used by the mqtt layer + +mqtt_wss_client mqttwss_client; + +netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +struct aclk_shared_state aclk_shared_state = { + .agent_state = AGENT_INITIALIZING, + .last_popcorn_interrupt = 0, + .version_neg = 0, + .version_neg_wait_till = 0, + .mqtt_shutdown_msg_id = -1, + .mqtt_shutdown_msg_rcvd = 0 +}; + +void aclk_single_update_disable() +{ + aclk_disable_single_updates = 1; +} + +void aclk_single_update_enable() +{ + aclk_disable_single_updates = 0; +} + +//ENDTODO + +static RSA *aclk_private_key = NULL; +static int load_private_key() +{ + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + aclk_private_key = NULL; + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); + + long bytes_read; + char *private_key = read_by_filename(filename, &bytes_read); + if (!private_key) { + error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + return 1; + } + debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + + BIO *key_bio = BIO_new_mem_buf(private_key, -1); + if (key_bio==NULL) { + error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + goto biofailed; + } + + aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); + BIO_free(key_bio); + if (aclk_private_key!=NULL) + { + freez(private_key); + return 0; + } + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + +biofailed: + freez(private_key); + return 1; +} + +static int wait_till_cloud_enabled() +{ + info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_setting) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + return 1; + } + return 0; +} + +/** + * Will block until agent is claimed. Returns only if agent claimed + * or if agent needs to shutdown. + * + * @return `0` if agent has been claimed, + * `1` if interrupted due to agent shutting down + */ +static int wait_till_agent_claimed(void) +{ + //TODO prevent malloc and freez + char *agent_id = is_agent_claimed(); + while (likely(!agent_id)) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + return 1; + agent_id = is_agent_claimed(); + } + freez(agent_id); + return 0; +} + +/** + * Checks everything is ready for connection + * agent claimed, cloud url set and private key available + * + * @param aclk_hostname points to location where string pointer to hostname will be set + * @param ackl_port port to int where port will be saved + * + * @return If non 0 returned irrecoverable error happened and ACLK should be terminated + */ +static int wait_till_agent_claim_ready() +{ + int port; + char *hostname = NULL; + while (!netdata_exit) { + if (wait_till_agent_claimed()) + return 1; + + // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. + // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); + return 1; + } + + // We just check configuration is valid here + // TODO make it without malloc/free + if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) { + error("Agent is claimed but the configuration is invalid, please fix"); + freez(hostname); + hostname = NULL; + sleep(5); + continue; + } + freez(hostname); + hostname = NULL; + + if (!load_private_key()) { + sleep(5); + break; + } + } + + return 0; +} + +void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) +{ + switch(log_type) { + case MQTT_WSS_LOG_ERROR: + case MQTT_WSS_LOG_FATAL: + case MQTT_WSS_LOG_WARN: + error("%s", str); + return; + case MQTT_WSS_LOG_INFO: + info("%s", str); + return; + case MQTT_WSS_LOG_DEBUG: + debug(D_ACLK, "%s", str); + return; + default: + error("Unknown log type from mqtt_wss"); + } +} + +//TODO prevent big buffer on stack +#define RX_MSGLEN_MAX 4096 +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) +{ + char cmsg[RX_MSGLEN_MAX]; + size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); + + if (msglen > RX_MSGLEN_MAX - 1) + error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); + + memcpy(cmsg, + msg, + len); + cmsg[len] = 0; + +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 512 + char filename[FN_MAX_LEN]; + int logfd; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT()); + logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); + if(logfd < 0) + error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + write(logfd, msg, msglen); + close(logfd); +#endif + + debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg); + + if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic)) + error("Received message on unexpected topic %s", topic); + + if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { + error("Link is shutting down. Ignoring message."); + return; + } + + aclk_handle_cloud_message(cmsg); +} + +static void puback_callback(uint16_t packet_id) +{ + if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) + aclk_reconnect_delay(0); + +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_puback(packet_id); +#endif + + if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { + error("Got PUBACK for shutdown message. Can exit gracefully."); + aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; + } +} + +static int read_query_thread_count() +{ + int threads = MIN(processors/2, 6); + threads = MAX(threads, 2); + threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + if(threads < 1) { + error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); + threads = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + } + return threads; +} + +/* Keeps connection alive and handles all network comms. + * Returns on error or when netdata is shutting down. + * @param client instance of mqtt_wss_client + * @returns 0 - Netdata Exits + * >0 - Error happened. Reconnect and start over. + */ +static int handle_connection(mqtt_wss_client client) +{ + time_t last_periodic_query_wakeup = now_monotonic_sec(); + while (!netdata_exit) { + // timeout 1000 to check at least once a second + // for netdata_exit + if (mqtt_wss_service(client, 1000) < 0){ + error("Connection Error or Dropped"); + return 1; + } + + // mqtt_wss_service will return faster than in one second + // if there is enough work to do + time_t now = now_monotonic_sec(); + if (last_periodic_query_wakeup < now) { + // wake up at least one Query Thread at least + // once per second + last_periodic_query_wakeup = now; + QUERY_THREAD_WAKEUP; + } + } + return 0; +} + +inline static int aclk_popcorn_check_bump() +{ + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); + ACLK_SHARED_STATE_UNLOCK; + return 1; + } + ACLK_SHARED_STATE_UNLOCK; + return 0; +} + +static inline void queue_connect_payloads(void) +{ + aclk_query_t query = aclk_query_new(METADATA_INFO); + query->data.metadata_info.host = localhost; + query->data.metadata_info.initial_on_connect = 1; + aclk_queue_query(query); + query = aclk_query_new(METADATA_ALARMS); + query->data.metadata_alarms.initial_on_connect = 1; + aclk_queue_query(query); +} + +static inline void mqtt_connected_actions(mqtt_wss_client client) +{ + // TODO global vars? + usec_t now = now_realtime_usec(); + aclk_session_sec = now / USEC_PER_SEC; + aclk_session_us = now % USEC_PER_SEC; + + mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1); + + aclk_stats_upd_online(1); + aclk_connected = 1; + aclk_pubacks_per_conn = 0; + aclk_hello_msg(client); + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { + error("Sending `connect` payload immediatelly as popcorning was finished already."); + queue_connect_payloads(); + } + ACLK_SHARED_STATE_UNLOCK; +} + +/* Waits until agent is ready or needs to exit + * @param client instance of mqtt_wss_client + * @param query_threads pointer to aclk_query_threads + * structure where to store data about started query threads + * @return 0 - Popcorning Finished - Agent STABLE, + * !0 - netdata_exit + */ +static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads) +{ + time_t elapsed; + int need_wait; + while (!netdata_exit) { + ACLK_SHARED_STATE_LOCK; + if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; + return 0; + } + elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; + if (elapsed >= ACLK_STABLE_TIMEOUT) { + aclk_shared_state.agent_state = AGENT_STABLE; + ACLK_SHARED_STATE_UNLOCK; + error("ACLK localhost popocorn finished"); + if (unlikely(!query_threads->thread_list)) + aclk_query_threads_start(query_threads, client); + queue_connect_payloads(); + return 0; + } + ACLK_SHARED_STATE_UNLOCK; + need_wait = ACLK_STABLE_TIMEOUT - elapsed; + error("ACLK localhost popocorn wait %d seconds longer", need_wait); + sleep(need_wait); + } + return 1; +} + +void aclk_graceful_disconnect(mqtt_wss_client client) +{ + error("Preparing to Gracefully Shutdown the ACLK"); + aclk_queue_lock(); + aclk_queue_flush(); + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + time_t t = now_monotonic_sec(); + while (!mqtt_wss_service(client, 100)) { + if (now_monotonic_sec() - t >= 2) { + error("Wasn't able to gracefully shutdown ACLK in time!"); + break; + } + if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { + error("MQTT App Layer `disconnect` message sent successfully"); + break; + } + } + aclk_stats_upd_online(0); + aclk_connected = 0; + + error("Attempting to Gracefully Shutdown MQTT/WSS connection"); + mqtt_wss_disconnect(client, 1000); +} + |