From cb13f0787d77c5e36f79ab18f492a52e0ec11123 Mon Sep 17 00:00:00 2001 From: Timotej S <6674623+underhood@users.noreply.github.com> Date: Mon, 27 Jun 2022 16:03:20 +0200 Subject: Removes Legacy JSON Cloud Protocol Support In Agent (#13111) * removes old protocol support (cloud removed support already) --- CMakeLists.txt | 6 - Makefile.am | 16 +- aclk/aclk.c | 424 +++--------------------------------- aclk/aclk.h | 16 -- aclk/aclk_api.c | 18 +- aclk/aclk_api.h | 15 -- aclk/aclk_collector_list.c | 193 ---------------- aclk/aclk_collector_list.h | 41 ---- aclk/aclk_otp.c | 7 +- aclk/aclk_query.c | 128 ++++------- aclk/aclk_query_queue.c | 9 - aclk/aclk_query_queue.h | 19 -- aclk/aclk_rrdhost_state.h | 34 --- aclk/aclk_rx_msgs.c | 14 -- aclk/aclk_rx_msgs.h | 2 - aclk/aclk_stats.c | 18 +- aclk/aclk_stats.h | 2 - aclk/aclk_tx_msgs.c | 217 ------------------ aclk/aclk_tx_msgs.h | 13 -- aclk/aclk_util.c | 19 +- aclk/aclk_util.h | 1 - configure.ac | 83 +++---- daemon/analytics.c | 9 +- daemon/buildinfo.c | 15 +- daemon/main.c | 2 +- daemon/static_threads.c | 2 +- database/rrd.h | 9 +- database/rrddim.c | 6 +- database/rrdhost.c | 15 +- database/rrdset.c | 40 +--- database/sqlite/sqlite_aclk.c | 24 +- database/sqlite/sqlite_aclk.h | 2 - database/sqlite/sqlite_aclk_alert.c | 63 +----- database/sqlite/sqlite_aclk_chart.c | 26 +-- database/sqlite/sqlite_aclk_node.c | 4 +- health/health.c | 16 +- streaming/receiver.c | 4 +- web/api/web_api_v1.c | 12 +- 38 files changed, 165 insertions(+), 1379 deletions(-) delete mode 100644 aclk/aclk_collector_list.c delete mode 100644 aclk/aclk_collector_list.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c88f8fffb8..fd48820cf3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -787,11 +787,6 @@ set(ACLK_ALWAYS_BUILD aclk/aclk_proxy.h ) -set(ACLK_COMMON_FILES - aclk/aclk_collector_list.c - aclk/aclk_collector_list.h - ) - set(ACLK_FILES aclk/aclk.c aclk/aclk.h @@ -1172,7 +1167,6 @@ list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER}) list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD}) list(APPEND NETDATA_FILES ${TIMEX_PLUGIN_FILES}) list(APPEND NETDATA_FILES ${ACLK_FILES} ${ACLK_PROTO_BUILT_SRCS} ${ACLK_PROTO_BUILT_HDRS}) -list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES}) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/aclk/aclk-schemas) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include) diff --git a/Makefile.am b/Makefile.am index bd5ad4fce0..9ba949e355 100644 --- a/Makefile.am +++ b/Makefile.am @@ -641,11 +641,7 @@ ACLK_FILES = \ mqtt_websockets/c-rbuf/include/ringbuffer.h \ mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \ mqtt_websockets/MQTT-C/src/mqtt.c \ - mqtt_websockets/MQTT-C/include/mqtt.h - $(NULL) - -if ENABLE_NEW_CLOUD_PROTOCOL -ACLK_FILES += \ + mqtt_websockets/MQTT-C/include/mqtt.h \ aclk/aclk_charts_api.c \ aclk/aclk_charts_api.h \ aclk/aclk_alarm_api.c \ @@ -768,17 +764,8 @@ aclk/aclk-schemas/proto/nodeinstance/info/v1/info.pb.cc \ aclk/aclk-schemas/proto/nodeinstance/info/v1/info.pb.h: aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ -endif #ENABLE_NEW_CLOUD_PROTOCOL - endif #ENABLE_ACLK -if ENABLE_ACLK -ACLK_COMMON_FILES = \ - aclk/aclk_collector_list.c \ - aclk/aclk_collector_list.h \ - $(NULL) -endif - ACLK_ALWAYS_BUILD_FILES = \ aclk/aclk_rrdhost_state.h \ aclk/aclk_api.c \ @@ -885,7 +872,6 @@ NETDATA_FILES = \ $(CLAIM_FILES) \ $(PARSER_FILES) \ $(ACLK_ALWAYS_BUILD_FILES) \ - $(ACLK_COMMON_FILES) \ $(ACLK_FILES) \ $(SPAWN_PLUGIN_FILES) \ $(TIMEX_PLUGIN_FILES) \ diff --git a/aclk/aclk.c b/aclk/aclk.c index 5d2b5405c9..6f0a0d0ef3 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -10,7 +10,6 @@ #include "aclk_query_queue.h" #include "aclk_util.h" #include "aclk_rx_msgs.h" -#include "aclk_collector_list.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" @@ -46,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { - .agent_state = ACLK_HOST_INITIALIZING, - .last_popcorn_interrupt = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; @@ -188,54 +185,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) //TODO prevent big buffer on stack #define RX_MSGLEN_MAX 4096 -static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) -{ - UNUSED(qos); - char cmsg[RX_MSGLEN_MAX]; - size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); - const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); - if (!cmd_topic) { - error("Error retrieving command topic"); - return; - } - - if (msglen > RX_MSGLEN_MAX - 1) - error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); - - memcpy(cmsg, - msg, - len); - cmsg[len] = 0; - -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 512 - char filename[FN_MAX_LEN]; - int logfd; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT()); - logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); - if(logfd < 0) - error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); - write(logfd, msg, msglen); - close(logfd); -#endif - - debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg); - - if (strcmp(cmd_topic, topic)) - error("Received message on unexpected topic %s", topic); - - if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring incoming message."); - return; - } - - aclk_handle_cloud_cmd_message(cmsg); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); + aclk_rcvd_cloud_msgs++; if (msglen > RX_MSGLEN_MAX) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -272,15 +225,6 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t aclk_handle_new_cloud_msg(msgtype, msg, msglen); } -static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { - aclk_rcvd_cloud_msgs++; - if (aclk_use_new_cloud_arch) - msg_callback_new_protocol(topic, msg, msglen, qos); - else - msg_callback_old_protocol(topic, msg, msglen, qos); -} -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ - static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) { @@ -356,40 +300,6 @@ static int handle_connection(mqtt_wss_client client) return 0; } -inline static int aclk_popcorn_check() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -inline static int aclk_popcorn_check_bump() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -static inline void queue_connect_payloads(void) -{ - aclk_query_t query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; - query->data.metadata_info.initial_on_connect = 1; - aclk_queue_query(query); - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 1; - aclk_queue_query(query); -} - static inline void mqtt_connected_actions(mqtt_wss_client client) { char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -399,15 +309,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); - if (!topic) - error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); - else - mqtt_wss_subscribe(client, topic, 1); - } -#endif + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); aclk_stats_upd_online(1); aclk_connected = 1; @@ -415,55 +321,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!aclk_use_new_cloud_arch) { -#endif - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); - } - ACLK_SHARED_STATE_UNLOCK; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } else { - aclk_send_agent_connection_update(client, 1); - } -#endif -} - -/* Waits until agent is ready or needs to exit - * @param client instance of mqtt_wss_client - * @param query_threads pointer to aclk_query_threads - * structure where to store data about started query threads - * @return 0 - Popcorning Finished - Agent STABLE, - * !0 - netdata_exit - */ -static int wait_popcorning_finishes() -{ - time_t elapsed; - int need_wait; - if (aclk_use_new_cloud_arch) - return 0; - - while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; - if (elapsed >= ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = ACLK_HOST_STABLE; - ACLK_SHARED_STATE_UNLOCK; - error("ACLK localhost popcorn timer finished"); - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - need_wait = ACLK_STABLE_TIMEOUT - elapsed; - error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait); - sleep(need_wait); - } - return 1; + aclk_send_agent_connection_update(client, 1); } void aclk_graceful_disconnect(mqtt_wss_client client) @@ -471,12 +329,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client) info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); - else -#endif - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { @@ -594,8 +448,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt = NULL; - while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { @@ -629,8 +481,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .drop_on_publish_fail = 1 }; - aclk_use_new_cloud_arch = 0; - #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { aclk_env_t_destroy(aclk_env); @@ -649,19 +499,16 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (netdata_exit) return 1; - if (aclk_env->encoding == ACLK_ENC_PROTO) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); + if (aclk_env->encoding != ACLK_ENC_PROTO) { + error_report("This agent can only use the new cloud protocol but cloud requested old one."); + continue; + } + + if (!aclk_env_has_capa("proto")) { + error ("Can't use encoding=proto without at least \"proto\" capability."); continue; -#else - if (!aclk_env_has_capa("proto")) { - error ("Can't encoding=proto without at least \"proto\" capability."); - continue; - } - info("Switching ACLK to new protobuf protocol. Due to /env response."); - aclk_use_new_cloud_arch = 1; -#endif } + info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { @@ -679,10 +526,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - if (aclk_use_new_cloud_arch) - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); - else - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); @@ -708,17 +552,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; aclk_session_us = aclk_session_newarch % USEC_PER_SEC; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); - } else { -#endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } -#endif + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -732,10 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - if (aclk_use_new_cloud_arch) - freez((char *)mqtt_conn_params.will_msg); - else - json_object_put(lwt); + freez((char *)mqtt_conn_params.will_msg); if (!ret) { last_conn_time_mqtt = now_realtime_sec(); @@ -778,10 +609,7 @@ void *aclk_main(void *ptr) return NULL; } - unsigned int proto_hdl_cnt; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - proto_hdl_cnt = aclk_init_rx_msg_handlers(); -#endif + unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers(); // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. @@ -792,7 +620,6 @@ void *aclk_main(void *ptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif - aclk_popcorn_check_bump(); // start localhost popcorn timer query_threads.count = read_query_thread_count(); if (wait_till_cloud_enabled()) @@ -803,11 +630,7 @@ void *aclk_main(void *ptr) use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { -#else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { -#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -835,28 +658,9 @@ void *aclk_main(void *ptr) if (aclk_attempt_to_connect(mqttwss_client)) goto exit_full; -#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL) - error_report("############################ WARNING ###############################"); - error_report("# Your agent is configured to connect to cloud but has #"); - error_report("# no protobuf protocol support (uses legacy JSON protocol) #"); - error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #"); - error_report("# Visit following link for more info and instructions how to solve #"); - error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #"); - error_report("######################################################################"); -#endif - - // warning this assumes the popcorning is relative short (3s) - // if that changes call mqtt_wss_service from within - // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes()) - goto exit_full; - if (unlikely(!query_threads.thread_list)) aclk_query_threads_start(&query_threads, mqttwss_client); - if (!aclk_use_new_cloud_arch) - queue_connect_payloads(); - if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); @@ -890,168 +694,12 @@ exit: return NULL; } -// TODO this is taken over as workaround from old ACLK -// fix this in both old and new ACLK -extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); - -void aclk_alarm_reload(void) -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return; - } - ACLK_SHARED_STATE_UNLOCK; - - aclk_queue_query(aclk_query_new(METADATA_ALARMS)); -} - -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) -{ - BUFFER *local_buffer; - json_object *msg; - - if (host != localhost) - return 0; - - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - - netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - health_alarm_entry2json_nolock(local_buffer, ae, host); - netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - - msg = json_tokener_parse(local_buffer->buffer); - - struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE); - query->data.alarm_update = msg; - aclk_queue_query(query); - - buffer_free(local_buffer); - return 0; -} - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create) -{ - struct aclk_query *query; - - if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check()) - return 0; - - query = aclk_query_new(create ? CHART_NEW : CHART_DEL); - if(create) { - query->data.chart_add_del.host = host; - query->data.chart_add_del.chart_name = strdupz(chart_name); - } else { - query->data.metadata_info.host = host; - query->data.metadata_info.initial_on_connect = 0; - } - - aclk_queue_query(query); - return 0; -} - -/* - * Add a new collector to the list - * If it exists, update the chart count - */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(tmp_collector->count != 1)) { - COLLECTOR_UNLOCK; - return; - } - - COLLECTOR_UNLOCK; - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -/* - * Delete a collector from the list - * If the chart count reaches zero the collector will be removed - * from the list by calling del_collector. - * - * This function will release the memory used and schedule - * a cloud update - */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(!tmp_collector || tmp_collector->count)) { - COLLECTOR_UNLOCK; - return; - } - - debug( - D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", - tmp_collector->count); - - COLLECTOR_UNLOCK; - - _free_collector(tmp_collector); - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; int ret; - if (!aclk_connected || !aclk_use_new_cloud_arch) + if (!aclk_connected) return; ret = get_node_id(&host->host_uuid, &node_id); @@ -1158,14 +806,12 @@ void aclk_send_node_instances() } freez(list_head); } -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) { struct proto_alert_status status; @@ -1221,7 +867,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) ); freez(stats); } -#endif char *ng_aclk_state(void) { @@ -1232,13 +877,9 @@ char *ng_aclk_state(void) buffer_strcat(wb, "ACLK Available: Yes\n" "ACLK Version: 2\n" -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - "Protocols Supported: Legacy, Protobuf\n" -#else - "Protocols Supported: Legacy\n" -#endif + "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); char *agent_id = is_agent_claimed(); if (agent_id == NULL) @@ -1274,7 +915,6 @@ char *ng_aclk_state(void) if (aclk_connected) { buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL RRDHOST *host; rrd_rdlock(); rrdhost_foreach_read(host) { @@ -1309,7 +949,6 @@ char *ng_aclk_state(void) fill_chart_status_for_host(wb, host); } rrd_unlock(); -#endif } ret = strdupz(buffer_tostring(wb)); @@ -1317,7 +956,6 @@ char *ng_aclk_state(void) return ret; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { struct proto_alert_status status; @@ -1382,7 +1020,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) freez(stats); } -#endif static json_object *timestamp_to_json(const time_t *t) { @@ -1406,15 +1043,8 @@ char *ng_aclk_state_json(void) json_object_object_add(msg, "aclk-version", tmp); grp = json_object_new_array(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); tmp = json_object_new_string("Protobuf"); json_object_array_add(grp, tmp); -#else - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); -#endif json_object_object_add(msg, "protocols-supported", grp); char *agent_id = is_agent_claimed(); @@ -1435,7 +1065,7 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); @@ -1462,7 +1092,6 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_disable_runtime); json_object_object_add(msg, "banned-by-cloud", tmp); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL grp = json_object_new_array(); RRDHOST *host; @@ -1514,7 +1143,6 @@ char *ng_aclk_state_json(void) } rrd_unlock(); json_object_object_add(msg, "node-instances", grp); -#endif char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); diff --git a/aclk/aclk.h b/aclk/aclk.h index 41c4e05e40..5065ac2bfc 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -21,9 +21,6 @@ extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) extern struct aclk_shared_state { - ACLK_AGENT_STATE agent_state; - time_t last_popcorn_interrupt; - // To wait for `disconnect` message PUBACK // when shutting down // at the same time if > 0 we know link is @@ -32,21 +29,8 @@ extern struct aclk_shared_state { int mqtt_shutdown_msg_rcvd; } aclk_shared_state; -void aclk_alarm_reload(void); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); - -/* Informs ACLK about created/deleted chart - * @param create 0 - if chart was deleted, other if chart created - */ -int aclk_update_chart(RRDHOST *host, char *chart_name, int create); - -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd); void aclk_send_node_instances(void); -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index 1f63b748fe..9446b407f2 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -13,7 +13,6 @@ usec_t aclk_session_us = 0; time_t aclk_session_sec = 0; int aclk_disable_runtime = 0; -int aclk_disable_single_updates = 0; int aclk_stats_enabled; int use_mqtt_5 = 0; @@ -33,16 +32,6 @@ void *aclk_starter(void *ptr) { } return aclk_main(ptr); } - -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} #endif /* ENABLE_ACLK */ void add_aclk_host_labels(void) { @@ -71,16 +60,13 @@ void add_aclk_host_labels(void) { break; } + int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); + rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#else - rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#endif #endif } diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index b00a2a4e12..8bf7c72913 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -15,31 +15,16 @@ extern usec_t aclk_session_us; extern time_t aclk_session_sec; extern int aclk_disable_runtime; -extern int aclk_disable_single_updates; extern int aclk_stats_enabled; extern int aclk_alert_reloaded; -extern int aclk_ng; extern int use_mqtt_5; #ifdef ENABLE_ACLK void *aclk_starter(void *ptr); -void aclk_single_update_disable(); -void aclk_single_update_enable(); - -void aclk_alarm_reload(void); - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); - -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int connect); -#endif #define NETDATA_ACLK_HOOK \ { .name = "ACLK_Main", \ diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c deleted file mode 100644 index 2920c9a5c8..0000000000 --- a/aclk/aclk_collector_list.c +++ /dev/null @@ -1,193 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// This is copied from Legacy ACLK, Original Author: amoss - -// TODO unmess this - -#include "aclk_collector_list.h" - -netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; - -struct _collector *collector_list = NULL; - -/* - * Free a collector structure - */ -void _free_collector(struct _collector *collector) -{ - if (likely(collector->plugin_name)) - freez(collector->plugin_name); - - if (likely(collector->module_name)) - freez(collector->module_name); - - if (likely(collector->hostname)) - freez(collector->hostname); - - freez(collector); -} - -/* - * This will report the collector list - * - */ -#ifdef ACLK_DEBUG -static void _dump_collector_list() -{ - struct _collector *tmp_collector; - - COLLECTOR_LOCK; - - info("DUMPING ALL COLLECTORS"); - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - info("DUMPING ALL COLLECTORS -- nothing found"); - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - - while (tmp_collector) { - info( - "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, - tmp_collector->plugin_name ? tmp_collector->plugin_name : "", - tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); - - tmp_collector = tmp_collector->next; - } - info("DUMPING ALL COLLECTORS DONE"); - COLLECTOR_UNLOCK; -} -#endif - -/* - * This will cleanup the collector list - * - */ -void _reset_collector_list() -{ - struct _collector *tmp_collector, *next_collector; - - COLLECTOR_LOCK; - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - collector_list->count = 0; - collector_list->next = NULL; - - // We broke the link; we can unlock - COLLECTOR_UNLOCK; - - while (tmp_collector) { - next_collector = tmp_collector->next; - _free_collector(tmp_collector); - tmp_collector = next_collector; - } -} - -/* - * Find a collector (if it exists) - * Must lock before calling this - * If last_collector is not null, it will return the previous collector in the linked - * list (used in collector delete) - */ -static struct _collector *_find_collector( - const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) -{ - struct _collector *tmp_collector, *prev_collector; - uint32_t plugin_hash; - uint32_t module_hash; - uint32_t hostname_hash; - - if (unlikely(!collector_list)) { - collector_list = callocz(1, sizeof(struct _collector)); - return NULL; - } - - if (unlikely(!collector_list->next)) - return NULL; - - plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - module_hash = module_name ? simple_hash(module_name) : 1; - hostname_hash = simple_hash(hostname); - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - prev_collector = collector_list; - while (tmp_collector) { - if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && - hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && - (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && - (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { - if (unlikely(last_collector)) - *last_collector = prev_collector; - - return tmp_collector; - } - - prev_collector = tmp_collector; - tmp_collector = tmp_collector->next; - } - - return tmp_collector; -} - -/* - * Called to delete a collector - * It will reduce the count (chart_count) and will remove it - * from the linked list if the count reaches zero - * The structure will be returned to the caller to free - * the resources - * - */ -struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector, *prev_collector = NULL; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); - - if (likely(tmp_collector)) { - --tmp_collector->count; - if (unlikely(!tmp_collector->count)) - prev_collector->next = tmp_collector->next; - } - return tmp_collector; -} - -/* - * Add a new collector (plugin / module) to the list - * If it already exists just update the chart count - * - * Lock before calling - */ -struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); - - if (unlikely(!tmp_collector)) { - tmp_collector = callocz(1, sizeof(struct _collector)); - tmp_collector->hostname_hash = simple_hash(hostname); - tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; - - tmp_collector->hostname = strdupz(hostname); - tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; - tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; - - tmp_collector->next = collector_list->next; - collector_list->next = tmp_collector; - } - tmp_collector->count++; - debug( - D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", - module_name ? module_name : "*", tmp_collector->count); - return tmp_collector; -} diff --git a/aclk/aclk_collector_list.h b/aclk/aclk_collector_list.h deleted file mode 100644 index 09c06b14a0..0000000000 --- a/aclk/aclk_collector_list.h +++ /dev/null @@ -1,41 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// This is copied from Legacy ACLK, Original Author: amoss - -// TODO unmess this - -#ifndef ACLK_COLLECTOR_LIST_H -#define ACLK_COLLECTOR_LIST_H - -#include "libnetdata/libnetdata.h" - -extern netdata_mutex_t collector_mutex; - -#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) -#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) - -/* - * Maintain a list of collectors and chart count - * If all the charts of a collector are deleted - * then a new metadata dataset must be send to the cloud - * - */ -struct _collector { - time_t created; - uint32_t count; //chart count - uint32_t hostname_hash; - uint32_t plugin_hash; - uint32_t module_hash; - char *hostname; - char *plugin_name; - char *module_name; - struct _collector *next; -}; - -extern struct _collector *collector_list; - -struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name); -struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name); -void _reset_collector_list(); -void _free_collector(struct _collector *collector); - -#endif /* ACLK_COLLECTOR_LIST_H */ diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index c99c656373..47fdf1b598 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -814,11 +814,8 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { return 1; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json,proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); -#else - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); -#endif + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + freez(agent_id); req.host = (char*)aclk_hostname; diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index de970fc3d6..f722551b0c 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -13,27 +13,6 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) -typedef struct aclk_query_handler { - aclk_query_type_t type; - char *name; // for logging purposes - int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query); -} aclk_query_handler; - -static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_send_info_metadata(query_thr->client, - !query->data.metadata_info.initial_on_connect, - query->data.metadata_info.host); - return 0; -} - -static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_send_alarm_metadata(query_thr->client, - !query->data.metadata_info.initial_on_connect); - return 0; -} - static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url) { usec_t t; @@ -277,84 +256,61 @@ cleanup: return retval; } -static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); - return 0; -} - -static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_alarm_state_msg(query_thr->client, query->data.alarm_update); - // aclk_alarm_state_msg frees the json object including the header it generates - query->data.alarm_update = NULL; - return 0; -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { // this will be simplified when legacy support is removed aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); return 0; } -#endif - -aclk_query_handler aclk_query_handlers[] = { - { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 }, - { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query }, - { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata }, - { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata }, - { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, - { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg }, - { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg }, - { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, - { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, - { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, - { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg }, - { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg }, - { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg }, - { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg }, - { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg }, -#endif - { .type = UNKNOWN, .name = NULL, .fnc = NULL } -}; const char *aclk_query_get_name(aclk_query_type_t qt) { - aclk_query_handler *ptr = aclk_query_handlers; - while (ptr->type != UNKNOWN) { - if (ptr->type == qt) - return ptr->name; - ptr++; + switch (qt) { + case HTTP_API_V2: return "http_api_request_v2"; + case REGISTER_NODE: return "register_node"; + case NODE_STATE_UPDATE: return "node_state_update"; + case CHART_DIMS_UPDATE: return "chart_and_dim_update"; + case CHART_CONFIG_UPDATED: return "chart_config_updated"; + case CHART_RESET: return "reset_chart_messages"; + case RETENTION_UPDATED: return "update_retention_info"; + case UPDATE_NODE_INFO: return "update_node_info"; + case ALARM_LOG_HEALTH: return "alarm_log_health"; + case ALARM_PROVIDE_CFG: return "provide_alarm_config"; + case ALARM_SNAPSHOT: return "alarm_snapshot"; + default: + error_report("Unknown query type used %d", (int) qt); + return "unknown"; } - return "unknown"; } static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { - if (aclk_query_handlers[i].type == query->type) { - worker_is_busy(i); - - debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); - aclk_query_handlers[i].fnc(query_thr, query); - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[query_thr->idx]++; - aclk_metrics_per_sample.queries_per_type[query->type]++; - ACLK_STATS_UNLOCK; - } - aclk_query_free(query); +{ + if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) { + error_report("Unknown query in query queue. %u", query->type); + aclk_query_free(query); + return; + } - worker_is_idle(); - return; - } + worker_is_busy(query->type); + if (query->type == HTTP_API_V2) { + debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); + http_api_v2(query_thr, query); + } else { + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); + send_bin_msg(query_thr, query); } - fatal("Unknown query in query queue. %u", query->type); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[query_thr->idx]++; + aclk_metrics_per_sample.queries_per_type[query->type]++; + ACLK_STATS_UNLOCK; + } + + aclk_query_free(query); + + worker_is_idle(); } /* Processes messages from queue. Compete for work with other threads @@ -370,8 +326,8 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) static void worker_aclk_register(void) { worker_register("ACLKQUERY"); - for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { - worker_register_job_name(i, aclk_query_handlers[i].name); + for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) { + worker_register_job_name(i, aclk_query_get_name(i)); } } diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 2422b01e12..8ac14305d6 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -111,15 +111,6 @@ void aclk_query_free(aclk_query_t query) freez(query->data.http_api_v2.query); break; - case CHART_NEW: - freez(query->data.chart_add_del.chart_name); - break; - - case ALARM_STATE_UPDATE: - if (query->data.alarm_update) - json_object_put(query->data.alarm_update); - break; - case NODE_STATE_UPDATE: case REGISTER_NODE: case CHART_DIMS_UPDATE: diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 0b5ef8faaa..523841c919 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -11,12 +11,7 @@ typedef enum { UNKNOWN = 0, - METADATA_INFO, - METADATA_ALARMS, HTTP_API_V2, - CHART_NEW, - CHART_DEL, - ALARM_STATE_UPDATE, REGISTER_NODE, NODE_STATE_UPDATE, CHART_DIMS_UPDATE, @@ -30,16 +25,6 @@ typedef enum { ACLK_QUERY_TYPE_COUNT // always keep this as last } aclk_query_type_t; -struct aclk_query_metadata { - RRDHOST *host; - int initial_on_connect; -}; - -struct aclk_query_chart_add_del { - RRDHOST *host; - char* chart_name; -}; - struct aclk_query_http_api_v2 { char *payload; char *query; @@ -73,12 +58,8 @@ struct aclk_query { // TODO maybe remove? int version; union { - struct aclk_query_metadata metadata_info; - struct aclk_query_metadata metadata_alarms; struct aclk_query_http_api_v2 http_api_v2; - struct aclk_query_chart_add_del chart_add_del; struct aclk_bin_payload bin_payload; - json_object *alarm_update; } data; }; diff --git a/aclk/aclk_rrdhost_state.h b/aclk/aclk_rrdhost_state.h index 9138123dfe..5c8a2ddc94 100644 --- a/aclk/aclk_rrdhost_state.h +++ b/aclk/aclk_rrdhost_state.h @@ -3,43 +3,9 @@ #include "libnetdata/libnetdata.h" -#ifdef ACLK_LEGACY -typedef enum aclk_cmd { - ACLK_CMD_CLOUD, - ACLK_CMD_ONCONNECT, - ACLK_CMD_INFO, - ACLK_CMD_CHART, - ACLK_CMD_CHARTDEL, - ACLK_CMD_ALARM, - ACLK_CMD_CLOUD_QUERY_2, - ACLK_CMD_CHILD_CONNECT, - ACLK_CMD_CHILD_DISCONNECT -} ACLK_CMD; - -typedef enum aclk_metadata_state { - ACLK_METADATA_REQUIRED, - ACLK_METADATA_CMD_QUEUED, - ACLK_METADATA_SENT -} ACLK_METADATA_STATE; -#endif - -typedef enum aclk_agent_state { - ACLK_HOST_INITIALIZING, - ACLK_HOST_STABLE -} ACLK_AGENT_STATE; - typedef struct aclk_rrdhost_state { char *claimed_id; // Claimed ID if host has one otherwise NULL char *prev_claimed_id; // Claimed ID if changed (reclaimed) during runtime - -#ifdef ACLK_LEGACY - // per child popcorning - ACLK_AGENT_STATE state; - ACLK_METADATA_STATE metadata; - - time_t timestamp_created; - time_t t_last_popcorn_update; -#endif /* ACLK_LEGACY */ } aclk_rrdhost_state; #endif /* ACLK_RRDHOST_STATE_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 27f1bf2dc7..65e4955b82 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -116,20 +116,8 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur return 0; } -#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\ - debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ - ACLK_SHARED_STATE_UNLOCK;\ - return 1;\ - }\ - ACLK_SHARED_STATE_UNLOCK; - static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { - if (!aclk_use_new_cloud_arch) { - HTTP_CHECK_AGENT_INITIALIZED(); - } - aclk_query_t query; errno = 0; @@ -229,7 +217,6 @@ err_cleanup: return 1; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL typedef uint32_t simple_hash_t; typedef int(*rx_msg_handler)(const char *msg, size_t msg_len); @@ -524,4 +511,3 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t return; } } -#endif diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index 00f88c6a8d..0df612786e 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -10,10 +10,8 @@ int aclk_handle_cloud_cmd_message(char *payload); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL const char *rx_handler_get_name(size_t i); unsigned int aclk_init_rx_msg_handlers(void); void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len); -#endif #endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index b6e8a673eb..241e9b724d 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -8,11 +8,9 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; struct { int query_thread_count; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL unsigned int proto_hdl_cnt; uint32_t *aclk_proto_rx_msgs_sample; RRDDIM **rx_msg_dims; -#endif } aclk_stats_cfg; // there is only 1 stats thread at a time // data ACLK stats need per query thread @@ -237,7 +235,6 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL const char *rx_handler_get_name(size_t i); static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) { @@ -259,7 +256,6 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) rrdset_done(st); } -#endif static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) { @@ -290,31 +286,23 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - UNUSED(proto_hdl_cnt); -#endif - aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt; aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*)); -#endif } void aclk_stats_thread_cleanup() { -#ifdef ENABLE_NEW_CLOUD_PROTOCOL freez(aclk_stats_cfg.rx_msg_dims); freez(aclk_proto_rx_msgs_sample); freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample); -#endif freez(aclk_qt_data); freez(aclk_queries_per_thread); freez(aclk_queries_per_thread_sample); @@ -345,10 +333,10 @@ void *aclk_stats_main_thread(void *ptr) // to not hold lock longer than necessary, especially not to hold it // during database rrd* operations memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL + memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); -#endif + memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); @@ -373,9 +361,7 @@ void *aclk_stats_main_thread(void *ptr) struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client); aclk_stats_mqtt_wss(&mqtt_wss_stats); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample); -#endif } return 0; diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index aec13e212d..bec9ac2476 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -62,9 +62,7 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_q_process_max; } aclk_metrics_per_sample; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL extern uint32_t *aclk_proto_rx_msgs_sample; -#endif extern uint32_t *aclk_queries_per_thread; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 3530dccff8..4f094b94ee 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -13,29 +13,6 @@ // version for aclk legacy (old cloud arch) #define ACLK_VERSION 2 -static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) -{ - uint16_t packet_id; - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - const char *topic = aclk_get_topic(subtopic); - - if (unlikely(!topic)) { - error("Couldn't get topic. Aborting message send"); - return; - } - - mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(packet_id); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif -} - uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { #ifndef ACLK_LOG_CONVERSATION_DIR @@ -71,30 +48,6 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return packet_id; } -static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) -{ - uint16_t packet_id; - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - const char *topic = aclk_get_topic(subtopic); - - if (unlikely(!topic)) { - error("Couldn't get topic. Aborting message send"); - return 0; - } - - mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(packet_id); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif - return packet_id; -} - /* UNUSED now but can be used soon MVP1? static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic) { @@ -231,17 +184,6 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time return obj; } -static char *create_uuid() -{ - uuid_t uuid; - char *uuid_str = mallocz(36 + 1); - - uuid_generate(uuid); - uuid_unparse(uuid, uuid_str); - - return uuid_str; -} - #ifndef __GNUC__ #pragma endregion #endif @@ -250,90 +192,6 @@ static char *create_uuid() #pragma region aclk_tx_msgs message generators #endif -/* - * This will send the /api/v1/info - */ -#define BUFFER_INITIAL_SIZE (1024 * 16) -void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host) -{ - BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); - json_object *msg, *payload, *tmp; - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - if (metadata_submitted) - msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION); - else - msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); - - payload = json_object_new_object(); - json_object_object_add(msg, "payload", payload); - - web_client_api_request_v1_info_fill_buffer(host, local_buffer); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "info", tmp); - - buffer_flush(local_buffer); - - charts2json(host, local_buffer, 1, 0); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "charts", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); - - json_object_put(msg); - freez(msg_id); - buffer_free(local_buffer); -} - -// TODO should include header instead -void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); - -void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) -{ - BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); - json_object *msg, *payload, *tmp; - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - - if (metadata_submitted) - msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION); - else - msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); - - payload = json_object_new_object(); - json_object_object_add(msg, "payload", payload); - - health_alarms2json(localhost, local_buffer, 1); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "configured-alarms", tmp); - - buffer_flush(local_buffer); - - health_active_log_alarms_2json(localhost, local_buffer); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "alarms-active", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS); - - json_object_put(msg); - freez(msg_id); - buffer_free(local_buffer); -} - void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -384,80 +242,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg } } -void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) -{ - json_object *msg, *payload; - BUFFER *tmp_buffer; - RRDSET *st; - - st = rrdset_find(host, chart); - if (!st) - st = rrdset_find_byname(host, chart); - if (!st) { - info("FAILED to find chart %s", chart); - return; - } - - tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE); - rrdset2json(st, tmp_buffer, NULL, NULL, 1); - payload = json_tokener_parse(tmp_buffer->buffer); - if (!payload) { - error("Failed to parse JSON from rrdset2json"); - buffer_free(tmp_buffer); - return; - } - - msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION); - json_object_object_add(msg, "payload", payload); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART); - - buffer_free(tmp_buffer); - json_object_put(msg); -} - -void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg) -{ - // we create header here on purpose (and not send message with it already as `msg` param) - // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy - // send message with timestamps already to Query Queue they would be incorrect at time - // when query queue would get to send them) - json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION); - json_object_object_add(obj, "payload", msg); - - aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS); - json_object_put(obj); -} - -/* - * Will generate disconnect message. - * @param message if NULL it will generate LWT message (unexpected). - * Otherwise string pointed to by this parameter will be used as - * reason. - */ -json_object *aclk_generate_disconnect(const char *message) -{ - json_object *tmp, *msg; - - msg = create_hdr("disconnect", NULL, 0, 0, 2); - - tmp = json_object_new_string(message ? message : "unexpected"); - json_object_object_add(msg, "payload", tmp); - - return msg; -} - -int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) -{ - int pid; - json_object *msg = aclk_generate_disconnect(message); - pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA); - json_object_put(msg); - return pid; -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -// new protobuf msgs uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { size_t len; uint16_t pid; @@ -532,7 +316,6 @@ char *aclk_generate_lwt(size_t *size) { return msg; } -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ #ifndef __GNUC__ #pragma endregion diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index 44281eb688..31e5924100 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -11,23 +11,10 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); -void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); -void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); - void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len); void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); -void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart); - -void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg); - -json_object *aclk_generate_disconnect(const char *message); -int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -// new protobuf msgs uint16_