diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-06-27 16:03:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-27 16:03:20 +0200 |
commit | cb13f0787d77c5e36f79ab18f492a52e0ec11123 (patch) | |
tree | 3f3f0269b34d895bb38d8c7bbc45cbd18022ac6c | |
parent | b8bfe953fbc8b35e13bd85975d4b23b90a6346b8 (diff) |
Removes Legacy JSON Cloud Protocol Support In Agent (#13111)
* removes old protocol support (cloud removed support already)
38 files changed, 165 insertions, 1379 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index c88f8fffb8..fd48820cf3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -787,11 +787,6 @@ set(ACLK_ALWAYS_BUILD aclk/aclk_proxy.h ) -set(ACLK_COMMON_FILES - aclk/aclk_collector_list.c - aclk/aclk_collector_list.h - ) - set(ACLK_FILES aclk/aclk.c aclk/aclk.h @@ -1172,7 +1167,6 @@ list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER}) list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD}) list(APPEND NETDATA_FILES ${TIMEX_PLUGIN_FILES}) list(APPEND NETDATA_FILES ${ACLK_FILES} ${ACLK_PROTO_BUILT_SRCS} ${ACLK_PROTO_BUILT_HDRS}) -list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES}) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/aclk/aclk-schemas) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include) diff --git a/Makefile.am b/Makefile.am index bd5ad4fce0..9ba949e355 100644 --- a/Makefile.am +++ b/Makefile.am @@ -641,11 +641,7 @@ ACLK_FILES = \ mqtt_websockets/c-rbuf/include/ringbuffer.h \ mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \ mqtt_websockets/MQTT-C/src/mqtt.c \ - mqtt_websockets/MQTT-C/include/mqtt.h - $(NULL) - -if ENABLE_NEW_CLOUD_PROTOCOL -ACLK_FILES += \ + mqtt_websockets/MQTT-C/include/mqtt.h \ aclk/aclk_charts_api.c \ aclk/aclk_charts_api.h \ aclk/aclk_alarm_api.c \ @@ -768,17 +764,8 @@ aclk/aclk-schemas/proto/nodeinstance/info/v1/info.pb.cc \ aclk/aclk-schemas/proto/nodeinstance/info/v1/info.pb.h: aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ -endif #ENABLE_NEW_CLOUD_PROTOCOL - endif #ENABLE_ACLK -if ENABLE_ACLK -ACLK_COMMON_FILES = \ - aclk/aclk_collector_list.c \ - aclk/aclk_collector_list.h \ - $(NULL) -endif - ACLK_ALWAYS_BUILD_FILES = \ aclk/aclk_rrdhost_state.h \ aclk/aclk_api.c \ @@ -885,7 +872,6 @@ NETDATA_FILES = \ $(CLAIM_FILES) \ $(PARSER_FILES) \ $(ACLK_ALWAYS_BUILD_FILES) \ - $(ACLK_COMMON_FILES) \ $(ACLK_FILES) \ $(SPAWN_PLUGIN_FILES) \ $(TIMEX_PLUGIN_FILES) \ diff --git a/aclk/aclk.c b/aclk/aclk.c index 5d2b5405c9..6f0a0d0ef3 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -10,7 +10,6 @@ #include "aclk_query_queue.h" #include "aclk_util.h" #include "aclk_rx_msgs.h" -#include "aclk_collector_list.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" @@ -46,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { - .agent_state = ACLK_HOST_INITIALIZING, - .last_popcorn_interrupt = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; @@ -188,54 +185,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) //TODO prevent big buffer on stack #define RX_MSGLEN_MAX 4096 -static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) -{ - UNUSED(qos); - char cmsg[RX_MSGLEN_MAX]; - size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); - const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); - if (!cmd_topic) { - error("Error retrieving command topic"); - return; - } - - if (msglen > RX_MSGLEN_MAX - 1) - error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); - - memcpy(cmsg, - msg, - len); - cmsg[len] = 0; - -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 512 - char filename[FN_MAX_LEN]; - int logfd; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT()); - logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); - if(logfd < 0) - error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); - write(logfd, msg, msglen); - close(logfd); -#endif - - debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg); - - if (strcmp(cmd_topic, topic)) - error("Received message on unexpected topic %s", topic); - - if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring incoming message."); - return; - } - - aclk_handle_cloud_cmd_message(cmsg); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); + aclk_rcvd_cloud_msgs++; if (msglen > RX_MSGLEN_MAX) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -272,15 +225,6 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t aclk_handle_new_cloud_msg(msgtype, msg, msglen); } -static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { - aclk_rcvd_cloud_msgs++; - if (aclk_use_new_cloud_arch) - msg_callback_new_protocol(topic, msg, msglen, qos); - else - msg_callback_old_protocol(topic, msg, msglen, qos); -} -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ - static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) { @@ -356,40 +300,6 @@ static int handle_connection(mqtt_wss_client client) return 0; } -inline static int aclk_popcorn_check() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -inline static int aclk_popcorn_check_bump() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -static inline void queue_connect_payloads(void) -{ - aclk_query_t query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; - query->data.metadata_info.initial_on_connect = 1; - aclk_queue_query(query); - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 1; - aclk_queue_query(query); -} - static inline void mqtt_connected_actions(mqtt_wss_client client) { char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -399,15 +309,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); - if (!topic) - error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); - else - mqtt_wss_subscribe(client, topic, 1); - } -#endif + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); aclk_stats_upd_online(1); aclk_connected = 1; @@ -415,55 +321,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!aclk_use_new_cloud_arch) { -#endif - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); - } - ACLK_SHARED_STATE_UNLOCK; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } else { - aclk_send_agent_connection_update(client, 1); - } -#endif -} - -/* Waits until agent is ready or needs to exit - * @param client instance of mqtt_wss_client - * @param query_threads pointer to aclk_query_threads - * structure where to store data about started query threads - * @return 0 - Popcorning Finished - Agent STABLE, - * !0 - netdata_exit - */ -static int wait_popcorning_finishes() -{ - time_t elapsed; - int need_wait; - if (aclk_use_new_cloud_arch) - return 0; - - while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; - if (elapsed >= ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = ACLK_HOST_STABLE; - ACLK_SHARED_STATE_UNLOCK; - error("ACLK localhost popcorn timer finished"); - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - need_wait = ACLK_STABLE_TIMEOUT - elapsed; - error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait); - sleep(need_wait); - } - return 1; + aclk_send_agent_connection_update(client, 1); } void aclk_graceful_disconnect(mqtt_wss_client client) @@ -471,12 +329,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client) info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); - else -#endif - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { @@ -594,8 +448,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt = NULL; - while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { @@ -629,8 +481,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .drop_on_publish_fail = 1 }; - aclk_use_new_cloud_arch = 0; - #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { aclk_env_t_destroy(aclk_env); @@ -649,19 +499,16 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (netdata_exit) return 1; - if (aclk_env->encoding == ACLK_ENC_PROTO) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); + if (aclk_env->encoding != ACLK_ENC_PROTO) { + error_report("This agent can only use the new cloud protocol but cloud requested old one."); + continue; + } + + if (!aclk_env_has_capa("proto")) { + error ("Can't use encoding=proto without at least \"proto\" capability."); continue; -#else - if (!aclk_env_has_capa("proto")) { - error ("Can't encoding=proto without at least \"proto\" capability."); - continue; - } - info("Switching ACLK to new protobuf protocol. Due to /env response."); - aclk_use_new_cloud_arch = 1; -#endif } + info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { @@ -679,10 +526,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - if (aclk_use_new_cloud_arch) - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); - else - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); @@ -708,17 +552,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; aclk_session_us = aclk_session_newarch % USEC_PER_SEC; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); - } else { -#endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } -#endif + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -732,10 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - if (aclk_use_new_cloud_arch) - freez((char *)mqtt_conn_params.will_msg); - else - json_object_put(lwt); + freez((char *)mqtt_conn_params.will_msg); if (!ret) { last_conn_time_mqtt = now_realtime_sec(); @@ -778,10 +609,7 @@ void *aclk_main(void *ptr) return NULL; } - unsigned int proto_hdl_cnt; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - proto_hdl_cnt = aclk_init_rx_msg_handlers(); -#endif + unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers(); // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. @@ -792,7 +620,6 @@ void *aclk_main(void *ptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif - aclk_popcorn_check_bump(); // start localhost popcorn timer query_threads.count = read_query_thread_count(); if (wait_till_cloud_enabled()) @@ -803,11 +630,7 @@ void *aclk_main(void *ptr) use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { -#else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { -#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -835,28 +658,9 @@ void *aclk_main(void *ptr) if (aclk_attempt_to_connect(mqttwss_client)) goto exit_full; -#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL) - error_report("############################ WARNING ###############################"); - error_report("# Your agent is configured to connect to cloud but has #"); - error_report("# no protobuf protocol support (uses legacy JSON protocol) #"); - error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #"); - error_report("# Visit following link for more info and instructions how to solve #"); - error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #"); - error_report("######################################################################"); -#endif - - // warning this assumes the popcorning is relative short (3s) - // if that changes call mqtt_wss_service from within - // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes()) - goto exit_full; - if (unlikely(!query_threads.thread_list)) aclk_query_threads_start(&query_threads, mqttwss_client); - if (!aclk_use_new_cloud_arch) - queue_connect_payloads(); - if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); @@ -890,168 +694,12 @@ exit: return NULL; } -// TODO this is taken over as workaround from old ACLK -// fix this in both old and new ACLK -extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); - -void aclk_alarm_reload(void) -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return; - } - ACLK_SHARED_STATE_UNLOCK; - - aclk_queue_query(aclk_query_new(METADATA_ALARMS)); -} - -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) -{ - BUFFER *local_buffer; - json_object *msg; - - if (host != localhost) - return 0; - - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - - netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - health_alarm_entry2json_nolock(local_buffer, ae, host); - netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - - msg = json_tokener_parse(local_buffer->buffer); - - struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE); - query->data.alarm_update = msg; - aclk_queue_query(query); - - buffer_free(local_buffer); - return 0; -} - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create) -{ - struct aclk_query *query; - - if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check()) - return 0; - - query = aclk_query_new(create ? CHART_NEW : CHART_DEL); - if(create) { - query->data.chart_add_del.host = host; - query->data.chart_add_del.chart_name = strdupz(chart_name); - } else { - query->data.metadata_info.host = host; - query->data.metadata_info.initial_on_connect = 0; - } - - aclk_queue_query(query); - return 0; -} - -/* - * Add a new collector to the list - * If it exists, update the chart count - */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(tmp_collector->count != 1)) { - COLLECTOR_UNLOCK; - return; - } - - COLLECTOR_UNLOCK; - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -/* - * Delete a collector from the list - * If the chart count reaches zero the collector will be removed - * from the list by calling del_collector. - * - * This function will release the memory used and schedule - * a cloud update - */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(!tmp_collector || tmp_collector->count)) { - COLLECTOR_UNLOCK; - return; - } - - debug( - D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", - tmp_collector->count); - - COLLECTOR_UNLOCK; - - _free_collector(tmp_collector); - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; int ret; - if (!aclk_connected || !aclk_use_new_cloud_arch) + if (!aclk_connected) return; ret = get_node_id(&host->host_uuid, &node_id); @@ -1158,14 +806,12 @@ void aclk_send_node_instances() } freez(list_head); } -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) { struct proto_alert_status status; @@ -1221,7 +867,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) ); freez(stats); } -#endif char *ng_aclk_state(void) { @@ -1232,13 +877,9 @@ char *ng_aclk_state(void) buffer_strcat(wb, "ACLK Available: Yes\n" "ACLK Version: 2\n" -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - "Protocols Supported: Legacy, Protobuf\n" -#else - "Protocols Supported: Legacy\n" -#endif + "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); char *agent_id = is_agent_claimed(); if (agent_id == NULL) @@ -1274,7 +915,6 @@ char *ng_aclk_state(void) if (aclk_connected) { buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL RRDHOST *host; rrd_rdlock(); rrdhost_foreach_read(host) { @@ -1309,7 +949,6 @@ char *ng_aclk_state(void) fill_chart_status_for_host(wb, host); } rrd_unlock(); -#endif } ret = strdupz(buffer_tostring(wb)); @@ -1317,7 +956,6 @@ char *ng_aclk_state(void) return ret; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { struct proto_alert_status status; @@ -1382,7 +1020,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) freez(stats); } -#endif static json_object *timestamp_to_json(const time_t *t) { @@ -1406,15 +1043,8 @@ char *ng_aclk_state_json(void) json_object_object_add(msg, "aclk-version", tmp); grp = json_object_new_array(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); tmp = json_object_new_string("Protobuf"); json_object_array_add(grp, tmp); -#else - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); -#endif json_object_object_add(msg, "protocols-supported", grp); char *agent_id = is_agent_claimed(); @@ -1435,7 +1065,7 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); @@ -1462,7 +1092,6 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_disable_runtime); json_object_object_add(msg, "banned-by-cloud", tmp); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL grp = json_object_new_array(); RRDHOST *host; @@ -1514,7 +1143,6 @@ char *ng_aclk_state_json(void) } rrd_unlock(); json_object_object_add(msg, "node-instances", grp); -#endif char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); diff --git a/aclk/aclk.h b/aclk/aclk.h index 41c4e05e40..5065ac2bfc 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -21,9 +21,6 @@ extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) extern struct aclk_shared_state { - ACLK_AGENT_STATE a |