summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c38
-rw-r--r--aclk/aclk_otp.c6
-rw-r--r--aclk/aclk_query.c4
-rw-r--r--aclk/aclk_rx_msgs.c2
-rw-r--r--aclk/aclk_rx_msgs.h2
-rw-r--r--aclk/aclk_tx_msgs.c2
-rw-r--r--aclk/aclk_tx_msgs.h2
-rw-r--r--aclk/aclk_util.c6
-rw-r--r--aclk/aclk_util.h6
9 files changed, 53 insertions, 15 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 459d657fb9..cc17a21650 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -223,7 +223,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
aclk_handle_cloud_message(cmsg);
}
-
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos)
{
if (msglen > RX_MSGLEN_MAX)
@@ -261,6 +261,7 @@ static void msg_callback_new(const char *topic, const void *msg, size_t msglen,
aclk_handle_new_cloud_msg(msgtype, msg, msglen);
}
+#endif
static void puback_callback(uint16_t packet_id)
{
@@ -352,6 +353,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch) {
topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
if (!topic)
@@ -359,21 +361,26 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
}
+#endif
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch) {
+#endif
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
error("Sending `connect` payload immediately as popcorning was finished already.");
queue_connect_payloads();
}
ACLK_SHARED_STATE_UNLOCK;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
} else {
aclk_send_agent_connection_update(client, 1);
}
+#endif
}
/* Waits until agent is ready or needs to exit
@@ -416,9 +423,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
error("Preparing to Gracefully Shutdown the ACLK");
aclk_queue_lock();
aclk_queue_flush();
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch)
aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
else
+#endif
aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
time_t t = now_monotonic_sec();
@@ -626,13 +635,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch) {
mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
} else {
+#endif
lwt = aclk_generate_disconnect(NULL);
mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
}
+#endif
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -675,7 +688,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
*/
void *aclk_main(void *ptr)
{
-#ifdef ACLK_NEWARCH_DEVMODE
+#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE)
aclk_use_new_cloud_arch = 1;
#endif
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
@@ -711,7 +724,11 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) {
+#else
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -1028,6 +1045,11 @@ char *ng_aclk_state(void)
buffer_strcat(wb,
"ACLK Available: Yes\n"
"ACLK Implementation: Next Generation\n"
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ "New Cloud Protocol Support: Yes\n"
+#else
+ "New Cloud Protocol Support: No\n"
+#endif
"Claimed: "
);
@@ -1039,7 +1061,7 @@ char *ng_aclk_state(void)
freez(agent_id);
}
- buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No");
+ buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy");
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
@@ -1056,6 +1078,13 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_string("Next Generation");
json_object_object_add(msg, "aclk-implementation", tmp);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ tmp = json_object_new_boolean(1);
+#else
+ tmp = json_object_new_boolean(0);
+#endif
+ json_object_object_add(msg, "new-cloud-protocol-supported", tmp);
+
char *agent_id = is_agent_claimed();
tmp = json_object_new_boolean(agent_id != NULL);
json_object_object_add(msg, "agent-claimed", tmp);
@@ -1070,6 +1099,9 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_boolean(aclk_connected);
json_object_object_add(msg, "online", tmp);
+ tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy");
+ json_object_object_add(msg, "used-cloud-protocol", tmp);
+
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index 40c395ef89..e9ebeab99a 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -9,12 +9,6 @@
#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
-// CentOS 7 has older version that doesn't define this
-// same goes for MacOS
-#ifndef UUID_STR_LEN
-#define UUID_STR_LEN 37
-#endif
-
struct dictionary_singleton {
char *key;
char *result;
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 114589ce31..6ff73f923b 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -257,6 +257,7 @@ static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_qu
return 0;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) {
// TODO create a pending registrations list
// with some timeouts to detect registration requests that
@@ -279,6 +280,7 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
return 0;
}
+#endif
aclk_query_handler aclk_query_handlers[] = {
{ .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
@@ -287,6 +289,7 @@ aclk_query_handler aclk_query_handlers[] = {
{ .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
{ .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
{ .type = REGISTER_NODE, .name = "register node", .fnc = register_node },
{ .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update },
{ .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg },
@@ -296,6 +299,7 @@ aclk_query_handler aclk_query_handlers[] = {
{ .type = UPDATE_NODE_INFO, .name = "update node info", .fnc = send_bin_msg },
{ .type = ALARM_LOG_HEALTH, .name = "alarm log health", .fnc = send_bin_msg },
{ .type = ALARM_PROVIDE_CFG, .name = "provide alarm config", .fnc = send_bin_msg },
+#endif
{ .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index a97166716f..10378e5309 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -260,6 +260,7 @@ err_cleanup_nojson:
return 1;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
{
// TODO do the look up table with hashes to optimize when there are more
@@ -331,3 +332,4 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
error ("Unknown new cloud arch message type received \"%s\"", message_type);
}
+#endif
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index 98024d5d4e..074dc004ae 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -10,6 +10,8 @@
int aclk_handle_cloud_message(char *payload);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len);
+#endif
#endif /* ACLK_RX_MSGS_H */
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 0a69808562..279d3f37f3 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -403,6 +403,7 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
return pid;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
size_t len;
@@ -481,6 +482,7 @@ void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_conne
aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
freez(msg);
}
+#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
#ifndef __GNUC__
#pragma endregion
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index 4b661049fe..da29a4a32e 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -23,11 +23,13 @@ void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg);
json_object *aclk_generate_disconnect(const char *message);
int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
char *aclk_generate_lwt(size_t *size);
void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation);
void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection);
+#endif
#endif
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 8401ea02f5..595047a9b6 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -4,12 +4,6 @@
#include "daemon/common.h"
-// CentOS 7 has older version that doesn't define this
-// same goes for MacOS
-#ifndef UUID_STR_LEN
-#define UUID_STR_LEN 37
-#endif
-
int aclk_use_new_cloud_arch = 0;
usec_t aclk_session_newarch = 0;
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 3b5e7d7451..deb01213ec 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -5,6 +5,12 @@
#include "libnetdata/libnetdata.h"
#include "mqtt_wss_client.h"
+// CentOS 7 has older version that doesn't define this
+// same goes for MacOS
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN 37
+#endif
+
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK