diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-06-27 16:03:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-27 16:03:20 +0200 |
commit | cb13f0787d77c5e36f79ab18f492a52e0ec11123 (patch) | |
tree | 3f3f0269b34d895bb38d8c7bbc45cbd18022ac6c /aclk | |
parent | b8bfe953fbc8b35e13bd85975d4b23b90a6346b8 (diff) |
Removes Legacy JSON Cloud Protocol Support In Agent (#13111)
* removes old protocol support (cloud removed support already)
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 424 | ||||
-rw-r--r-- | aclk/aclk.h | 16 | ||||
-rw-r--r-- | aclk/aclk_api.c | 18 | ||||
-rw-r--r-- | aclk/aclk_api.h | 15 | ||||
-rw-r--r-- | aclk/aclk_collector_list.c | 193 | ||||
-rw-r--r-- | aclk/aclk_collector_list.h | 41 | ||||
-rw-r--r-- | aclk/aclk_otp.c | 7 | ||||
-rw-r--r-- | aclk/aclk_query.c | 128 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 9 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 19 | ||||
-rw-r--r-- | aclk/aclk_rrdhost_state.h | 34 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 14 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.h | 2 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 18 | ||||
-rw-r--r-- | aclk/aclk_stats.h | 2 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 217 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.h | 13 | ||||
-rw-r--r-- | aclk/aclk_util.c | 19 | ||||
-rw-r--r-- | aclk/aclk_util.h | 1 |
19 files changed, 78 insertions, 1112 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 5d2b5405c9..6f0a0d0ef3 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -10,7 +10,6 @@ #include "aclk_query_queue.h" #include "aclk_util.h" #include "aclk_rx_msgs.h" -#include "aclk_collector_list.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" @@ -46,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { - .agent_state = ACLK_HOST_INITIALIZING, - .last_popcorn_interrupt = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; @@ -188,54 +185,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) //TODO prevent big buffer on stack #define RX_MSGLEN_MAX 4096 -static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) -{ - UNUSED(qos); - char cmsg[RX_MSGLEN_MAX]; - size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); - const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); - if (!cmd_topic) { - error("Error retrieving command topic"); - return; - } - - if (msglen > RX_MSGLEN_MAX - 1) - error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); - - memcpy(cmsg, - msg, - len); - cmsg[len] = 0; - -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 512 - char filename[FN_MAX_LEN]; - int logfd; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT()); - logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); - if(logfd < 0) - error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); - write(logfd, msg, msglen); - close(logfd); -#endif - - debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg); - - if (strcmp(cmd_topic, topic)) - error("Received message on unexpected topic %s", topic); - - if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring incoming message."); - return; - } - - aclk_handle_cloud_cmd_message(cmsg); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); + aclk_rcvd_cloud_msgs++; if (msglen > RX_MSGLEN_MAX) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -272,15 +225,6 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t aclk_handle_new_cloud_msg(msgtype, msg, msglen); } -static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { - aclk_rcvd_cloud_msgs++; - if (aclk_use_new_cloud_arch) - msg_callback_new_protocol(topic, msg, msglen, qos); - else - msg_callback_old_protocol(topic, msg, msglen, qos); -} -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ - static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) { @@ -356,40 +300,6 @@ static int handle_connection(mqtt_wss_client client) return 0; } -inline static int aclk_popcorn_check() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -inline static int aclk_popcorn_check_bump() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -static inline void queue_connect_payloads(void) -{ - aclk_query_t query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; - query->data.metadata_info.initial_on_connect = 1; - aclk_queue_query(query); - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 1; - aclk_queue_query(query); -} - static inline void mqtt_connected_actions(mqtt_wss_client client) { char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -399,15 +309,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); - if (!topic) - error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); - else - mqtt_wss_subscribe(client, topic, 1); - } -#endif + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); aclk_stats_upd_online(1); aclk_connected = 1; @@ -415,55 +321,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!aclk_use_new_cloud_arch) { -#endif - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); - } - ACLK_SHARED_STATE_UNLOCK; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } else { - aclk_send_agent_connection_update(client, 1); - } -#endif -} - -/* Waits until agent is ready or needs to exit - * @param client instance of mqtt_wss_client - * @param query_threads pointer to aclk_query_threads - * structure where to store data about started query threads - * @return 0 - Popcorning Finished - Agent STABLE, - * !0 - netdata_exit - */ -static int wait_popcorning_finishes() -{ - time_t elapsed; - int need_wait; - if (aclk_use_new_cloud_arch) - return 0; - - while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; - if (elapsed >= ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = ACLK_HOST_STABLE; - ACLK_SHARED_STATE_UNLOCK; - error("ACLK localhost popcorn timer finished"); - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - need_wait = ACLK_STABLE_TIMEOUT - elapsed; - error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait); - sleep(need_wait); - } - return 1; + aclk_send_agent_connection_update(client, 1); } void aclk_graceful_disconnect(mqtt_wss_client client) @@ -471,12 +329,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client) info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); - else -#endif - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { @@ -594,8 +448,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt = NULL; - while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { @@ -629,8 +481,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .drop_on_publish_fail = 1 }; - aclk_use_new_cloud_arch = 0; - #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { aclk_env_t_destroy(aclk_env); @@ -649,19 +499,16 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (netdata_exit) return 1; - if (aclk_env->encoding == ACLK_ENC_PROTO) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); + if (aclk_env->encoding != ACLK_ENC_PROTO) { + error_report("This agent can only use the new cloud protocol but cloud requested old one."); + continue; + } + + if (!aclk_env_has_capa("proto")) { + error ("Can't use encoding=proto without at least \"proto\" capability."); continue; -#else - if (!aclk_env_has_capa("proto")) { - error ("Can't encoding=proto without at least \"proto\" capability."); - continue; - } - info("Switching ACLK to new protobuf protocol. Due to /env response."); - aclk_use_new_cloud_arch = 1; -#endif } + info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { @@ -679,10 +526,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - if (aclk_use_new_cloud_arch) - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); - else - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); @@ -708,17 +552,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; aclk_session_us = aclk_session_newarch % USEC_PER_SEC; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); - } else { -#endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } -#endif + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -732,10 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - if (aclk_use_new_cloud_arch) - freez((char *)mqtt_conn_params.will_msg); - else - json_object_put(lwt); + freez((char *)mqtt_conn_params.will_msg); if (!ret) { last_conn_time_mqtt = now_realtime_sec(); @@ -778,10 +609,7 @@ void *aclk_main(void *ptr) return NULL; } - unsigned int proto_hdl_cnt; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - proto_hdl_cnt = aclk_init_rx_msg_handlers(); -#endif + unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers(); // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. @@ -792,7 +620,6 @@ void *aclk_main(void *ptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif - aclk_popcorn_check_bump(); // start localhost popcorn timer query_threads.count = read_query_thread_count(); if (wait_till_cloud_enabled()) @@ -803,11 +630,7 @@ void *aclk_main(void *ptr) use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { -#else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { -#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -835,28 +658,9 @@ void *aclk_main(void *ptr) if (aclk_attempt_to_connect(mqttwss_client)) goto exit_full; -#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL) - error_report("############################ WARNING ###############################"); - error_report("# Your agent is configured to connect to cloud but has #"); - error_report("# no protobuf protocol support (uses legacy JSON protocol) #"); - error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #"); - error_report("# Visit following link for more info and instructions how to solve #"); - error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #"); - error_report("######################################################################"); -#endif - - // warning this assumes the popcorning is relative short (3s) - // if that changes call mqtt_wss_service from within - // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes()) - goto exit_full; - if (unlikely(!query_threads.thread_list)) aclk_query_threads_start(&query_threads, mqttwss_client); - if (!aclk_use_new_cloud_arch) - queue_connect_payloads(); - if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); @@ -890,168 +694,12 @@ exit: return NULL; } -// TODO this is taken over as workaround from old ACLK -// fix this in both old and new ACLK -extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); - -void aclk_alarm_reload(void) -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return; - } - ACLK_SHARED_STATE_UNLOCK; - - aclk_queue_query(aclk_query_new(METADATA_ALARMS)); -} - -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) -{ - BUFFER *local_buffer; - json_object *msg; - - if (host != localhost) - return 0; - - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - - netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - health_alarm_entry2json_nolock(local_buffer, ae, host); - netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - - msg = json_tokener_parse(local_buffer->buffer); - - struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE); - query->data.alarm_update = msg; - aclk_queue_query(query); - - buffer_free(local_buffer); - return 0; -} - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create) -{ - struct aclk_query *query; - - if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check()) - return 0; - - query = aclk_query_new(create ? CHART_NEW : CHART_DEL); - if(create) { - query->data.chart_add_del.host = host; - query->data.chart_add_del.chart_name = strdupz(chart_name); - } else { - query->data.metadata_info.host = host; - query->data.metadata_info.initial_on_connect = 0; - } - - aclk_queue_query(query); - return 0; -} - -/* - * Add a new collector to the list - * If it exists, update the chart count - */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(tmp_collector->count != 1)) { - COLLECTOR_UNLOCK; - return; - } - - COLLECTOR_UNLOCK; - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -/* - * Delete a collector from the list - * If the chart count reaches zero the collector will be removed - * from the list by calling del_collector. - * - * This function will release the memory used and schedule - * a cloud update - */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(!tmp_collector || tmp_collector->count)) { - COLLECTOR_UNLOCK; - return; - } - - debug( - D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", - tmp_collector->count); - - COLLECTOR_UNLOCK; - - _free_collector(tmp_collector); - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; int ret; - if (!aclk_connected || !aclk_use_new_cloud_arch) + if (!aclk_connected) return; ret = get_node_id(&host->host_uuid, &node_id); @@ -1158,14 +806,12 @@ void aclk_send_node_instances() } freez(list_head); } -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) { struct proto_alert_status status; @@ -1221,7 +867,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) ); freez(stats); } -#endif char *ng_aclk_state(void) { @@ -1232,13 +877,9 @@ char *ng_aclk_state(void) buffer_strcat(wb, "ACLK Available: Yes\n" "ACLK Version: 2\n" -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - "Protocols Supported: Legacy, Protobuf\n" -#else - "Protocols Supported: Legacy\n" -#endif + "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); char *agent_id = is_agent_claimed(); if (agent_id == NULL) @@ -1274,7 +915,6 @@ char *ng_aclk_state(void) if (aclk_connected) { buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL RRDHOST *host; rrd_rdlock(); rrdhost_foreach_read(host) { @@ -1309,7 +949,6 @@ char *ng_aclk_state(void) fill_chart_status_for_host(wb, host); } rrd_unlock(); -#endif } ret = strdupz(buffer_tostring(wb)); @@ -1317,7 +956,6 @@ char *ng_aclk_state(void) return ret; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { struct proto_alert_status status; @@ -1382,7 +1020,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) freez(stats); } -#endif static json_object *timestamp_to_json(const time_t *t) { @@ -1406,15 +1043,8 @@ char *ng_aclk_state_json(void) json_object_object_add(msg, "aclk-version", tmp); grp = json_object_new_array(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); tmp = json_object_new_string("Protobuf"); json_object_array_add(grp, tmp); -#else - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); -#endif json_object_object_add(msg, "protocols-supported", grp); char *agent_id = is_agent_claimed(); @@ -1435,7 +1065,7 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); @@ -1462,7 +1092,6 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_disable_runtime); json_object_object_add(msg, "banned-by-cloud", tmp); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL grp = json_object_new_array(); RRDHOST *host; @@ -1514,7 +1143,6 @@ char *ng_aclk_state_json(void) } rrd_unlock(); json_object_object_add(msg, "node-instances", grp); -#endif char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); diff --git a/aclk/aclk.h b/aclk/aclk.h index 41c4e05e40..5065ac2bfc 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -21,9 +21,6 @@ extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) extern struct aclk_shared_state { - ACLK_AGENT_STATE agent_state; - time_t last_popcorn_interrupt; - // To wait for `disconnect` message PUBACK // when shutting down // at the same time if > 0 we know link is @@ -32,21 +29,8 @@ extern struct aclk_shared_state { int mqtt_shutdown_msg_rcvd; } aclk_shared_state; -void aclk_alarm_reload(void); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); - -/* Informs ACLK about created/deleted chart - * @param create 0 - if chart was deleted, other if chart created - */ -int aclk_update_chart(RRDHOST *host, char *chart_name, int create); - -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd); void aclk_send_node_instances(void); -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index 1f63b748fe..9446b407f2 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -13,7 +13,6 @@ usec_t aclk_session_us = 0; time_t aclk_session_sec = 0; int aclk_disable_runtime = 0; -int aclk_disable_single_updates = 0; int aclk_stats_enabled; int use_mqtt_5 = 0; @@ -33,16 +32,6 @@ void *aclk_starter(void *ptr) { } return aclk_main(ptr); } - -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} #endif /* ENABLE_ACLK */ void add_aclk_host_labels(void) { @@ -71,16 +60,13 @@ void add_aclk_host_labels(void) { break; } + int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); + rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#else - rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#endif #endif } diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index b00a2a4e12..8bf7c72913 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -15,31 +15,16 @@ extern usec_t aclk_session_us; extern time_t aclk_session_sec; extern int aclk_disable_runtime; -extern int aclk_disable_single_updates; extern int aclk_stats_enabled; extern int aclk_alert_reloaded; -extern int aclk_ng; extern int use_mqtt_5; #ifdef ENABLE_ACLK void *aclk_starter(void *ptr); -void aclk_single_update_disable(); -void aclk_single_update_enable(); - -void aclk_alarm_reload(void); - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); - -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int connect); -#endif #define NETDATA_ACLK_HOOK \ { .name = "ACLK_Main", \ diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c deleted file mode 100644 index 2920c9a5c8..0000000000 --- a/aclk/aclk_collector_list.c +++ /dev/null @@ -1,193 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// This is copied from Legacy ACLK, Original Author: amoss - -// TODO unmess this - -#include "aclk_collector_list.h" - -netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; - -struct _collector *collector_list = NULL; - -/* - * Free a collector structure - */ -void _free_collector(struct _collector *collector) -{ - if (likely(collector->plugin_name)) - freez(collector->plugin_name); - - if (likely(collector->module_name)) - freez(collector->module_name); - - if (likely(collector->hostname)) - freez(collector->hostname); - - freez(collector); -} - -/* - * This will report the collector list - * - */ -#ifdef ACLK_DEBUG -static void _dump_collector_list() -{ - struct _collector *tmp_collector; - - COLLECTOR_LOCK; - - info("DUMPING ALL COLLECTORS"); - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - info("DUMPING ALL COLLECTORS -- nothing found"); - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - - while (tmp_collector) { - info( - "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, - tmp_collector->plugin_name ? tmp_collector->plugin_name : "", - tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); - - tmp_collector = tmp_collector->next; - } - info("DUMPING ALL COLLECTORS DONE"); - COLLECTOR_UNLOCK; -} -#endif - -/* - * This will cleanup the collector li |