summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-06-27 16:03:20 +0200
committerGitHub <noreply@github.com>2022-06-27 16:03:20 +0200
commitcb13f0787d77c5e36f79ab18f492a52e0ec11123 (patch)
tree3f3f0269b34d895bb38d8c7bbc45cbd18022ac6c /aclk
parentb8bfe953fbc8b35e13bd85975d4b23b90a6346b8 (diff)
Removes Legacy JSON Cloud Protocol Support In Agent (#13111)
* removes old protocol support (cloud removed support already)
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c424
-rw-r--r--aclk/aclk.h16
-rw-r--r--aclk/aclk_api.c18
-rw-r--r--aclk/aclk_api.h15
-rw-r--r--aclk/aclk_collector_list.c193
-rw-r--r--aclk/aclk_collector_list.h41
-rw-r--r--aclk/aclk_otp.c7
-rw-r--r--aclk/aclk_query.c128
-rw-r--r--aclk/aclk_query_queue.c9
-rw-r--r--aclk/aclk_query_queue.h19
-rw-r--r--aclk/aclk_rrdhost_state.h34
-rw-r--r--aclk/aclk_rx_msgs.c14
-rw-r--r--aclk/aclk_rx_msgs.h2
-rw-r--r--aclk/aclk_stats.c18
-rw-r--r--aclk/aclk_stats.h2
-rw-r--r--aclk/aclk_tx_msgs.c217
-rw-r--r--aclk/aclk_tx_msgs.h13
-rw-r--r--aclk/aclk_util.c19
-rw-r--r--aclk/aclk_util.h1
19 files changed, 78 insertions, 1112 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 5d2b5405c9..6f0a0d0ef3 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -10,7 +10,6 @@
#include "aclk_query_queue.h"
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
-#include "aclk_collector_list.h"
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
@@ -46,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
- .agent_state = ACLK_HOST_INITIALIZING,
- .last_popcorn_interrupt = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
@@ -188,54 +185,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
//TODO prevent big buffer on stack
#define RX_MSGLEN_MAX 4096
-static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
-{
- UNUSED(qos);
- char cmsg[RX_MSGLEN_MAX];
- size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
- const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
- if (!cmd_topic) {
- error("Error retrieving command topic");
- return;
- }
-
- if (msglen > RX_MSGLEN_MAX - 1)
- error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
-
- memcpy(cmsg,
- msg,
- len);
- cmsg[len] = 0;
-
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 512
- char filename[FN_MAX_LEN];
- int logfd;
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT());
- logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
- if(logfd < 0)
- error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
- write(logfd, msg, msglen);
- close(logfd);
-#endif
-
- debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg);
-
- if (strcmp(cmd_topic, topic))
- error("Received message on unexpected topic %s", topic);
-
- if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
- error("Link is shutting down. Ignoring incoming message.");
- return;
- }
-
- aclk_handle_cloud_cmd_message(cmsg);
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
+static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
+ aclk_rcvd_cloud_msgs++;
if (msglen > RX_MSGLEN_MAX)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@@ -272,15 +225,6 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t
aclk_handle_new_cloud_msg(msgtype, msg, msglen);
}
-static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
- aclk_rcvd_cloud_msgs++;
- if (aclk_use_new_cloud_arch)
- msg_callback_new_protocol(topic, msg, msglen, qos);
- else
- msg_callback_old_protocol(topic, msg, msglen, qos);
-}
-#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
-
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
@@ -356,40 +300,6 @@ static int handle_connection(mqtt_wss_client client)
return 0;
}
-inline static int aclk_popcorn_check()
-{
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return 1;
- }
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
-}
-
-inline static int aclk_popcorn_check_bump()
-{
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
- ACLK_SHARED_STATE_UNLOCK;
- return 1;
- }
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
-}
-
-static inline void queue_connect_payloads(void)
-{
- aclk_query_t query = aclk_query_new(METADATA_INFO);
- query->data.metadata_info.host = localhost;
- query->data.metadata_info.initial_on_connect = 1;
- aclk_queue_query(query);
- query = aclk_query_new(METADATA_ALARMS);
- query->data.metadata_alarms.initial_on_connect = 1;
- aclk_queue_query(query);
-}
-
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
@@ -399,15 +309,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (aclk_use_new_cloud_arch) {
- topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
- if (!topic)
- error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
- else
- mqtt_wss_subscribe(client, topic, 1);
- }
-#endif
+ topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
aclk_stats_upd_online(1);
aclk_connected = 1;
@@ -415,55 +321,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_rcvd_cloud_msgs = 0;
aclk_connection_counter++;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (!aclk_use_new_cloud_arch) {
-#endif
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
- error("Sending `connect` payload immediately as popcorning was finished already.");
- queue_connect_payloads();
- }
- ACLK_SHARED_STATE_UNLOCK;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- } else {
- aclk_send_agent_connection_update(client, 1);
- }
-#endif
-}
-
-/* Waits until agent is ready or needs to exit
- * @param client instance of mqtt_wss_client
- * @param query_threads pointer to aclk_query_threads
- * structure where to store data about started query threads
- * @return 0 - Popcorning Finished - Agent STABLE,
- * !0 - netdata_exit
- */
-static int wait_popcorning_finishes()
-{
- time_t elapsed;
- int need_wait;
- if (aclk_use_new_cloud_arch)
- return 0;
-
- while (!netdata_exit) {
- ACLK_SHARED_STATE_LOCK;
- if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
- }
- elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
- if (elapsed >= ACLK_STABLE_TIMEOUT) {
- aclk_shared_state.agent_state = ACLK_HOST_STABLE;
- ACLK_SHARED_STATE_UNLOCK;
- error("ACLK localhost popcorn timer finished");
- return 0;
- }
- ACLK_SHARED_STATE_UNLOCK;
- need_wait = ACLK_STABLE_TIMEOUT - elapsed;
- error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait);
- sleep(need_wait);
- }
- return 1;
+ aclk_send_agent_connection_update(client, 1);
}
void aclk_graceful_disconnect(mqtt_wss_client client)
@@ -471,12 +329,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
info("Preparing to gracefully shutdown ACLK connection");
aclk_queue_lock();
aclk_queue_flush();
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (aclk_use_new_cloud_arch)
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
- else
-#endif
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
@@ -594,8 +448,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- json_object *lwt = NULL;
-
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
@@ -629,8 +481,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.drop_on_publish_fail = 1
};
- aclk_use_new_cloud_arch = 0;
-
#ifndef ACLK_DISABLE_CHALLENGE
if (aclk_env) {
aclk_env_t_destroy(aclk_env);
@@ -649,19 +499,16 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (netdata_exit)
return 1;
- if (aclk_env->encoding == ACLK_ENC_PROTO) {
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
- error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
+ if (aclk_env->encoding != ACLK_ENC_PROTO) {
+ error_report("This agent can only use the new cloud protocol but cloud requested old one.");
+ continue;
+ }
+
+ if (!aclk_env_has_capa("proto")) {
+ error ("Can't use encoding=proto without at least \"proto\" capability.");
continue;
-#else
- if (!aclk_env_has_capa("proto")) {
- error ("Can't encoding=proto without at least \"proto\" capability.");
- continue;
- }
- info("Switching ACLK to new protobuf protocol. Due to /env response.");
- aclk_use_new_cloud_arch = 1;
-#endif
}
+ info("New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
@@ -679,10 +526,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// aclk_get_topic moved here as during OTP we
// generate the topic cache
- if (aclk_use_new_cloud_arch)
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
- else
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
error("Couldn't get LWT topic. Will not send LWT.");
@@ -708,17 +552,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (aclk_use_new_cloud_arch) {
- mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
- } else {
-#endif
- lwt = aclk_generate_disconnect(NULL);
- mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
- mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- }
-#endif
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -732,10 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- if (aclk_use_new_cloud_arch)
- freez((char *)mqtt_conn_params.will_msg);
- else
- json_object_put(lwt);
+ freez((char *)mqtt_conn_params.will_msg);
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
@@ -778,10 +609,7 @@ void *aclk_main(void *ptr)
return NULL;
}
- unsigned int proto_hdl_cnt;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- proto_hdl_cnt = aclk_init_rx_msg_handlers();
-#endif
+ unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
// as it must notify the far end that it shutdown gracefully and avoid the LWT.
@@ -792,7 +620,6 @@ void *aclk_main(void *ptr)
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
#endif
- aclk_popcorn_check_bump(); // start localhost popcorn timer
query_threads.count = read_query_thread_count();
if (wait_till_cloud_enabled())
@@ -803,11 +630,7 @@ void *aclk_main(void *ptr)
use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
-#else
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) {
-#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -835,28 +658,9 @@ void *aclk_main(void *ptr)
if (aclk_attempt_to_connect(mqttwss_client))
goto exit_full;
-#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL)
- error_report("############################ WARNING ###############################");
- error_report("# Your agent is configured to connect to cloud but has #");
- error_report("# no protobuf protocol support (uses legacy JSON protocol) #");
- error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #");
- error_report("# Visit following link for more info and instructions how to solve #");
- error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #");
- error_report("######################################################################");
-#endif
-
- // warning this assumes the popcorning is relative short (3s)
- // if that changes call mqtt_wss_service from within
- // to keep OpenSSL, WSS and MQTT connection alive
- if (wait_popcorning_finishes())
- goto exit_full;
-
if (unlikely(!query_threads.thread_list))
aclk_query_threads_start(&query_threads, mqttwss_client);
- if (!aclk_use_new_cloud_arch)
- queue_connect_payloads();
-
if (handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
@@ -890,168 +694,12 @@ exit:
return NULL;
}
-// TODO this is taken over as workaround from old ACLK
-// fix this in both old and new ACLK
-extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
-
-void aclk_alarm_reload(void)
-{
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return;
- }
- ACLK_SHARED_STATE_UNLOCK;
-
- aclk_queue_query(aclk_query_new(METADATA_ALARMS));
-}
-
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
-{
- BUFFER *local_buffer;
- json_object *msg;
-
- if (host != localhost)
- return 0;
-
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
- }
- ACLK_SHARED_STATE_UNLOCK;
-
- local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
-
- netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
- health_alarm_entry2json_nolock(local_buffer, ae, host);
- netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
-
- msg = json_tokener_parse(local_buffer->buffer);
-
- struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE);
- query->data.alarm_update = msg;
- aclk_queue_query(query);
-
- buffer_free(local_buffer);
- return 0;
-}
-
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
-{
- struct aclk_query *query;
-
- if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check())
- return 0;
-
- query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
- if(create) {
- query->data.chart_add_del.host = host;
- query->data.chart_add_del.chart_name = strdupz(chart_name);
- } else {
- query->data.metadata_info.host = host;
- query->data.metadata_info.initial_on_connect = 0;
- }
-
- aclk_queue_query(query);
- return 0;
-}
-
-/*
- * Add a new collector to the list
- * If it exists, update the chart count
- */
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
- struct aclk_query *query;
- struct _collector *tmp_collector;
- if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
- return;
- }
-
- COLLECTOR_LOCK;
-
- tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
-
- if (unlikely(tmp_collector->count != 1)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- COLLECTOR_UNLOCK;
-
- if (aclk_popcorn_check_bump())
- return;
-
- if (host != localhost)
- return;
-
- query = aclk_query_new(METADATA_INFO);
- query->data.metadata_info.host = localhost; //TODO
- query->data.metadata_info.initial_on_connect = 0;
- aclk_queue_query(query);
-
- query = aclk_query_new(METADATA_ALARMS);
- query->data.metadata_alarms.initial_on_connect = 0;
- aclk_queue_query(query);
-}
-
-/*
- * Delete a collector from the list
- * If the chart count reaches zero the collector will be removed
- * from the list by calling del_collector.
- *
- * This function will release the memory used and schedule
- * a cloud update
- */
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
- struct aclk_query *query;
- struct _collector *tmp_collector;
- if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
- return;
- }
-
- COLLECTOR_LOCK;
-
- tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
-
- if (unlikely(!tmp_collector || tmp_collector->count)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- debug(
- D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
- tmp_collector->count);
-
- COLLECTOR_UNLOCK;
-
- _free_collector(tmp_collector);
-
- if (aclk_popcorn_check_bump())
- return;
-
- if (host != localhost)
- return;
-
- query = aclk_query_new(METADATA_INFO);
- query->data.metadata_info.host = localhost; //TODO
- query->data.metadata_info.initial_on_connect = 0;
- aclk_queue_query(query);
-
- query = aclk_query_new(METADATA_ALARMS);
- query->data.metadata_alarms.initial_on_connect = 0;
- aclk_queue_query(query);
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
int ret;
- if (!aclk_connected || !aclk_use_new_cloud_arch)
+ if (!aclk_connected)
return;
ret = get_node_id(&host->host_uuid, &node_id);
@@ -1158,14 +806,12 @@ void aclk_send_node_instances()
}
freez(list_head);
}
-#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
{
struct proto_alert_status status;
@@ -1221,7 +867,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
);
freez(stats);
}
-#endif
char *ng_aclk_state(void)
{
@@ -1232,13 +877,9 @@ char *ng_aclk_state(void)
buffer_strcat(wb,
"ACLK Available: Yes\n"
"ACLK Version: 2\n"
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- "Protocols Supported: Legacy, Protobuf\n"
-#else
- "Protocols Supported: Legacy\n"
-#endif
+ "Protocols Supported: Protobuf\n"
);
- buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3);
+ buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
@@ -1274,7 +915,6 @@ char *ng_aclk_state(void)
if (aclk_connected) {
buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
RRDHOST *host;
rrd_rdlock();
rrdhost_foreach_read(host) {
@@ -1309,7 +949,6 @@ char *ng_aclk_state(void)
fill_chart_status_for_host(wb, host);
}
rrd_unlock();
-#endif
}
ret = strdupz(buffer_tostring(wb));
@@ -1317,7 +956,6 @@ char *ng_aclk_state(void)
return ret;
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
{
struct proto_alert_status status;
@@ -1382,7 +1020,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
freez(stats);
}
-#endif
static json_object *timestamp_to_json(const time_t *t)
{
@@ -1406,15 +1043,8 @@ char *ng_aclk_state_json(void)
json_object_object_add(msg, "aclk-version", tmp);
grp = json_object_new_array();
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- tmp = json_object_new_string("Legacy");
- json_object_array_add(grp, tmp);
tmp = json_object_new_string("Protobuf");
json_object_array_add(grp, tmp);
-#else
- tmp = json_object_new_string("Legacy");
- json_object_array_add(grp, tmp);
-#endif
json_object_object_add(msg, "protocols-supported", grp);
char *agent_id = is_agent_claimed();
@@ -1435,7 +1065,7 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_boolean(aclk_connected);
json_object_object_add(msg, "online", tmp);
- tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
+ tmp = json_object_new_string("Protobuf");
json_object_object_add(msg, "used-cloud-protocol", tmp);
tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
@@ -1462,7 +1092,6 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_boolean(aclk_disable_runtime);
json_object_object_add(msg, "banned-by-cloud", tmp);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
grp = json_object_new_array();
RRDHOST *host;
@@ -1514,7 +1143,6 @@ char *ng_aclk_state_json(void)
}
rrd_unlock();
json_object_object_add(msg, "node-instances", grp);
-#endif
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 41c4e05e40..5065ac2bfc 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -21,9 +21,6 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
extern struct aclk_shared_state {
- ACLK_AGENT_STATE agent_state;
- time_t last_popcorn_interrupt;
-
// To wait for `disconnect` message PUBACK
// when shutting down
// at the same time if > 0 we know link is
@@ -32,21 +29,8 @@ extern struct aclk_shared_state {
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;
-void aclk_alarm_reload(void);
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-
-/* Informs ACLK about created/deleted chart
- * @param create 0 - if chart was deleted, other if chart created
- */
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
-
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd);
void aclk_send_node_instances(void);
-#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index 1f63b748fe..9446b407f2 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -13,7 +13,6 @@ usec_t aclk_session_us = 0;
time_t aclk_session_sec = 0;
int aclk_disable_runtime = 0;
-int aclk_disable_single_updates = 0;
int aclk_stats_enabled;
int use_mqtt_5 = 0;
@@ -33,16 +32,6 @@ void *aclk_starter(void *ptr) {
}
return aclk_main(ptr);
}
-
-void aclk_single_update_disable()
-{
- aclk_disable_single_updates = 1;
-}
-
-void aclk_single_update_enable()
-{
- aclk_disable_single_updates = 0;
-}
#endif /* ENABLE_ACLK */
void add_aclk_host_labels(void) {
@@ -71,16 +60,13 @@ void add_aclk_host_labels(void) {
break;
}
+
int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
+
rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#else
- rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#endif
#endif
}
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
index b00a2a4e12..8bf7c72913 100644
--- a/aclk/aclk_api.h
+++ b/aclk/aclk_api.h
@@ -15,31 +15,16 @@ extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
extern int aclk_disable_runtime;
-extern int aclk_disable_single_updates;
extern int aclk_stats_enabled;
extern int aclk_alert_reloaded;
-extern int aclk_ng;
extern int use_mqtt_5;
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr);
-void aclk_single_update_disable();
-void aclk_single_update_enable();
-
-void aclk_alarm_reload(void);
-
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int connect);
-#endif
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c
deleted file mode 100644
index 2920c9a5c8..0000000000
--- a/aclk/aclk_collector_list.c
+++ /dev/null
@@ -1,193 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-// This is copied from Legacy ACLK, Original Author: amoss
-
-// TODO unmess this
-
-#include "aclk_collector_list.h"
-
-netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
-
-struct _collector *collector_list = NULL;
-
-/*
- * Free a collector structure
- */
-void _free_collector(struct _collector *collector)
-{
- if (likely(collector->plugin_name))
- freez(collector->plugin_name);
-
- if (likely(collector->module_name))
- freez(collector->module_name);
-
- if (likely(collector->hostname))
- freez(collector->hostname);
-
- freez(collector);
-}
-
-/*
- * This will report the collector list
- *
- */
-#ifdef ACLK_DEBUG
-static void _dump_collector_list()
-{
- struct _collector *tmp_collector;
-
- COLLECTOR_LOCK;
-
- info("DUMPING ALL COLLECTORS");
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- info("DUMPING ALL COLLECTORS -- nothing found");
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
-
- while (tmp_collector) {
- info(
- "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
- tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
- tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
-
- tmp_collector = tmp_collector->next;
- }
- info("DUMPING ALL COLLECTORS DONE");
- COLLECTOR_UNLOCK;
-}
-#endif
-
-/*
- * This will cleanup the collector list
- *
- */
-void _reset_collector_list()
-{
- struct _collector *tmp_collector, *next_collector;
-
- COLLECTOR_LOCK;
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
- collector_list->count = 0;
- collector_list->next = NULL;
-
-