summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitmodules3
-rw-r--r--CMakeLists.txt59
-rw-r--r--Makefile.am56
m---------aclk/aclk-schemas0
-rw-r--r--aclk/aclk.c198
-rw-r--r--aclk/aclk.h4
-rw-r--r--aclk/aclk_api.c13
-rw-r--r--aclk/aclk_api.h2
-rw-r--r--aclk/aclk_query.c60
-rw-r--r--aclk/aclk_query_queue.c11
-rw-r--r--aclk/aclk_query_queue.h7
-rw-r--r--aclk/aclk_rx_msgs.c61
-rw-r--r--aclk/aclk_rx_msgs.h2
-rw-r--r--aclk/aclk_tx_msgs.c110
-rw-r--r--aclk/aclk_tx_msgs.h8
-rw-r--r--aclk/aclk_util.c36
-rw-r--r--aclk/aclk_util.h17
-rw-r--r--aclk/legacy/agent_cloud_link.c23
-rw-r--r--aclk/legacy/agent_cloud_link.h2
-rw-r--r--aclk/schema-wrappers/connection.cc34
-rw-r--r--aclk/schema-wrappers/connection.h34
-rw-r--r--aclk/schema-wrappers/node_connection.cc37
-rw-r--r--aclk/schema-wrappers/node_connection.h29
-rw-r--r--aclk/schema-wrappers/node_creation.cc39
-rw-r--r--aclk/schema-wrappers/node_creation.h31
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.h12
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h12
-rw-r--r--configure.ac81
-rw-r--r--database/sqlite/sqlite_functions.c36
-rw-r--r--database/sqlite/sqlite_functions.h1
-rw-r--r--streaming/receiver.c8
31 files changed, 933 insertions, 93 deletions
diff --git a/.gitmodules b/.gitmodules
index ef9349b389..83ef65e8d1 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,6 @@
[submodule "mqtt_websockets"]
path = mqtt_websockets
url = https://github.com/underhood/mqtt_websockets.git
+[submodule "aclk/aclk-schemas"]
+ path = aclk/aclk-schemas
+ url = https://github.com/netdata/aclk-schemas.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2a50ff51d3..f836c718af 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -796,6 +796,14 @@ set(ACLK_NG_FILES
mqtt_websockets/c-rbuf/src/ringbuffer_internal.h
mqtt_websockets/MQTT-C/src/mqtt.c
mqtt_websockets/MQTT-C/include/mqtt.h
+ aclk/schema-wrappers/connection.cc
+ aclk/schema-wrappers/connection.h
+ aclk/schema-wrappers/node_connection.cc
+ aclk/schema-wrappers/node_connection.h
+ aclk/schema-wrappers/node_creation.cc
+ aclk/schema-wrappers/node_creation.h
+ aclk/schema-wrappers/schema_wrappers.h
+ aclk/schema-wrappers/schema_wrapper_utils.h
)
set(SPAWN_PLUGIN_FILES
@@ -1038,9 +1046,58 @@ ELSE()
message(STATUS "agent-cloud-link Legacy: disabled")
ENDIF()
+find_package(Protobuf REQUIRED)
+
+function(PROTOBUF_ACLK_GENERATE_CPP SRCS HDRS)
+ if(NOT ARGN)
+ message(SEND_ERROR "Error: PROTOBUF_ACLK_GENERATE_CPP() called without any proto files")
+ return()
+ endif()
+
+ set(${SRCS})
+ set(${HDRS})
+ foreach(FIL ${ARGN})
+ get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+ get_filename_component(DIR ${ABS_FIL} DIRECTORY)
+ get_filename_component(FIL_WE ${FIL} NAME_WE)
+ set(GENERATED_PB_CC "${DIR}/${FIL_WE}.pb.cc")
+ set(GENERATED_PB_H "${DIR}/${FIL_WE}.pb.h")
+# cmake > 3.20 required :(
+# cmake_path(SET GENERATED_PB_CC "${DIR}")
+# cmake_path(SET GENERATED_PB_H "${DIR}")
+# cmake_path(APPEND GENERATED_PB_CC "${FIL_WE}.pb.cc")
+# cmake_path(APPEND GENERATED_PB_H "${FIL_WE}.pb.h")
+
+ list(APPEND ${SRCS} ${GENERATED_PB_CC})
+ list(APPEND ${HDRS} ${GENERATED_PB_H})
+ add_custom_command(
+ OUTPUT ${GENERATED_PB_CC}
+ ${GENERATED_PB_H}
+ COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
+ ARGS -I=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas --cpp_out=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas ${ABS_FIL}
+ DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE}
+ COMMENT "Running C++ protocol buffer compiler on ${FIL}"
+ VERBATIM )
+ endforeach()
+ set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE)
+ set(${SRCS} ${${SRCS}} PARENT_SCOPE)
+ set(${HDRS} ${${HDRS}} PARENT_SCOPE)
+endfunction()
+
+set(ACLK_NG_PROTO_DEFS
+ aclk/aclk-schemas/proto/agent/v1/connection.proto
+ aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto
+ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto
+ )
+PROTOBUF_ACLK_GENERATE_CPP(ACLK_NG_PROTO_BUILT_SRCS ACLK_NG_PROTO_BUILT_HDRS ${ACLK_NG_PROTO_DEFS})
+
+list(APPEND NETDATA_COMMON_LIBRARIES ${PROTOBUF_LIBRARIES})
+list(APPEND NETDATA_COMMON_INCLUDE_DIRS ${PROTOBUF_INCLUDE_DIRS})
+list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER})
list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD})
-list(APPEND NETDATA_FILES ${ACLK_NG_FILES})
+list(APPEND NETDATA_FILES ${ACLK_NG_FILES} ${ACLK_NG_PROTO_BUILT_SRCS} ${ACLK_NG_PROTO_BUILT_HDRS})
list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES})
+include_directories(BEFORE ${CMAKE_SOURCE_DIR}/aclk/aclk-schemas)
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include)
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include)
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/c-rbuf/include)
diff --git a/Makefile.am b/Makefile.am
index 3c47b62e6d..e3acf87515 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -3,6 +3,9 @@
AUTOMAKE_OPTIONS = foreign subdir-objects 1.11
ACLOCAL_AMFLAGS = -I build/m4
+nodist_netdata_SOURCES=$(NULL)
+BUILT_SOURCES=$(NULL)
+
MAINTAINERCLEANFILES = \
config.log config.status \
$(srcdir)/Makefile.in \
@@ -572,7 +575,39 @@ ACLK_NG_FILES = \
mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \
mqtt_websockets/MQTT-C/src/mqtt.c \
mqtt_websockets/MQTT-C/include/mqtt.h \
+ aclk/schema-wrappers/connection.cc \
+ aclk/schema-wrappers/connection.h \
+ aclk/schema-wrappers/node_connection.cc \
+ aclk/schema-wrappers/node_connection.h \
+ aclk/schema-wrappers/node_creation.cc \
+ aclk/schema-wrappers/node_creation.h \
+ aclk/schema-wrappers/schema_wrappers.h \
+ aclk/schema-wrappers/schema_wrapper_utils.h \
+ $(NULL)
+
+ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
+ aclk/aclk-schemas/proto/agent/v1/connection.pb.h \
+ aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \
+ aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h \
+ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \
+ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h \
$(NULL)
+
+BUILT_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES)
+nodist_netdata_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES)
+
+aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
+aclk/aclk-schemas/proto/agent/v1/connection.pb.h: aclk/aclk-schemas/proto/agent/v1/connection.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \
+aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h: aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \
+aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h: aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
endif #ACLK_NG
if ENABLE_ACLK
@@ -784,12 +819,15 @@ netdata_LDADD = \
$(NETDATA_COMMON_LIBS) \
$(NULL)
+if ACLK_NG
+ netdata_LDADD += $(OPTIONAL_PROTOBUF_LIBS)
+endif
+
if ACLK_LEGACY
netdata_LDADD += \
$(abs_top_srcdir)/externaldeps/mosquitto/libmosquitto.a \
$(OPTIONAL_LIBCAP_LIBS) \
$(OPTIONAL_LWS_LIBS) \
- $(NETDATA_COMMON_LIBS) \
$(NULL)
endif #ACLK_LEGACY
@@ -899,12 +937,15 @@ endif
if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE
netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES)
- netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS)
- BUILT_SOURCES = \
+ netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \
+ $(OPTIONAL_PROTOBUF_LIBS) \
+ $(NULL)
+ BACKEND_PROMETHEUS_BUILT_SOURCES = \
exporting/prometheus/remote_write/remote_write.pb.cc \
exporting/prometheus/remote_write/remote_write.pb.h \
$(NULL)
- nodist_netdata_SOURCES = $(BUILT_SOURCES)
+ BUILT_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES)
+ nodist_netdata_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES)
exporting/prometheus/remote_write/remote_write.pb.cc \
exporting/prometheus/remote_write/remote_write.pb.h: exporting/prometheus/remote_write/remote_write.proto
@@ -1033,14 +1074,17 @@ if ENABLE_UNITTESTS
exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS)
if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE
exporting_tests_exporting_engine_testdriver_SOURCES += $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES)
- exporting_tests_exporting_engine_testdriver_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS)
+ exporting_tests_exporting_engine_testdriver_LDADD += \
+ $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \
+ $(OPTIONAL_PROTOBUF_LIBS) \
+ $(NULL)
exporting_tests_exporting_engine_testdriver_LDFLAGS += \
-Wl,--wrap=init_write_request \
-Wl,--wrap=add_host_info \
-Wl,--wrap=add_label \
-Wl,--wrap=add_metric \
$(NULL)
- nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BUILT_SOURCES)
+ nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BACKEND_PROMETHEUS_BUILT_SOURCES)
endif
if ENABLE_BACKEND_KINESIS
exporting_tests_exporting_engine_testdriver_SOURCES += $(KINESIS_EXPORTING_FILES)
diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas
new file mode 160000
+Subproject b5fef3f3a84e6a5013b36b906f4677012c73441
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 7e8c1c32e0..3cb25a67d3 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -223,6 +223,45 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
aclk_handle_cloud_message(cmsg);
}
+
+static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ if (msglen > RX_MSGLEN_MAX)
+ error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
+
+ debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+ error("Link is shutting down. Ignoring message.");
+ return;
+ }
+
+ const char *msgtype = strrchr(topic, '/');
+ if (unlikely(!msgtype)) {
+ error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+ msgtype++;
+ if (unlikely(!*msgtype)) {
+ error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+ char filename[FN_MAX_LEN];
+ int logfd;
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
+ logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+ if(logfd < 0)
+ error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ write(logfd, msg, msglen);
+ close(logfd);
+#endif
+
+ aclk_handle_new_cloud_msg(msgtype, msg, msglen);
+}
+
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
@@ -306,11 +345,6 @@ static inline void queue_connect_payloads(void)
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
- // TODO global vars?
- usec_t now = now_realtime_usec();
- aclk_session_sec = now / USEC_PER_SEC;
- aclk_session_us = now % USEC_PER_SEC;
-
const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
@@ -318,16 +352,28 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
+ if (aclk_use_new_cloud_arch) {
+ topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+ }
+
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
- error("Sending `connect` payload immediately as popcorning was finished already.");
- queue_connect_payloads();
+ if (!aclk_use_new_cloud_arch) {
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
+ error("Sending `connect` payload immediately as popcorning was finished already.");
+ queue_connect_payloads();
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ } else {
+ aclk_send_agent_connection_update(client, 1);
}
- ACLK_SHARED_STATE_UNLOCK;
}
/* Waits until agent is ready or needs to exit
@@ -337,10 +383,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
* @return 0 - Popcorning Finished - Agent STABLE,
* !0 - netdata_exit
*/
-static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads)
+static int wait_popcorning_finishes()
{
time_t elapsed;
int need_wait;
+ if (aclk_use_new_cloud_arch)
+ return 0;
+
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
@@ -352,9 +401,6 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th
aclk_shared_state.agent_state = ACLK_HOST_STABLE;
ACLK_SHARED_STATE_UNLOCK;
error("ACLK localhost popocorn finished");
- if (unlikely(!query_threads->thread_list))
- aclk_query_threads_start(query_threads, client);
- queue_connect_payloads();
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
@@ -370,7 +416,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
error("Preparing to Gracefully Shutdown the ACLK");
aclk_queue_lock();
aclk_queue_flush();
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+ if (aclk_use_new_cloud_arch)
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
+ else
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
if (now_monotonic_sec() - t >= 2) {
@@ -481,7 +531,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- json_object *lwt;
+ json_object *lwt = NULL;
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
@@ -546,7 +596,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// aclk_get_topic moved here as during OTP we
// generate the topic cache
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ if (aclk_use_new_cloud_arch)
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
+ else
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+
if (!mqtt_conn_params.will_topic) {
error("Couldn't get LWT topic. Will not send LWT.");
continue;
@@ -567,9 +621,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
#endif
- lwt = aclk_generate_disconnect(NULL);
- mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
- mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ aclk_session_newarch = now_realtime_usec();
+ aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
+ aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+
+ if (aclk_use_new_cloud_arch) {
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
+ } else {
+ lwt = aclk_generate_disconnect(NULL);
+ mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
+ mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ }
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -583,7 +645,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- json_object_put(lwt);
+ if (aclk_use_new_cloud_arch)
+ freez((char *)mqtt_conn_params.will_msg);
+ else
+ json_object_put(lwt);
if (!ret) {
info("MQTTWSS connection succeeded");
@@ -609,6 +674,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
*/
void *aclk_main(void *ptr)
{
+#ifdef ACLK_NEWARCH_DEVMODE
+ aclk_use_new_cloud_arch = 1;
+#endif
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_stats_thread *stats_thread = NULL;
@@ -642,7 +710,7 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -666,8 +734,14 @@ void *aclk_main(void *ptr)
// warning this assumes the popcorning is relative short (3s)
// if that changes call mqtt_wss_service from within
// to keep OpenSSL, WSS and MQTT connection alive
- if (wait_popcorning_finishes(mqttwss_client, &query_threads))
+ if (wait_popcorning_finishes())
goto exit_full;
+
+ if (unlikely(!query_threads.thread_list))
+ aclk_query_threads_start(&query_threads, mqttwss_client);
+
+ if (!aclk_use_new_cloud_arch)
+ queue_connect_payloads();
if (!handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
@@ -775,7 +849,7 @@ void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *m
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -818,7 +892,7 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -854,3 +928,77 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
+
+void ng_aclk_host_state_update(RRDHOST *host, int cmd)
+{
+ uuid_t node_id;
+ int ret;
+
+ if (!aclk_connected || !aclk_use_new_cloud_arch)
+ return;
+
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = 1; //TODO - real hop count instead of hardcoded
+ create_query->data.node_creation.hostname = strdupz(host->hostname);
+ create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+ aclk_queue_query(create_query);
+ return;
+ }
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = cmd;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+}
+
+void aclk_send_node_instances()
+{
+ struct node_instance_list *list = get_node_list();
+ while (!uuid_is_null(list->host_id)) {
+ if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = list->live;
+ query->data.node_update.hops = list->hops;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ } else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = uuid_compare(list->host_id, localhost->host_uuid) ? 1 : 0; // TODO - when streaming supports hops
+ create_query->data.node_creation.hostname = list->hostname;
+ create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
+ aclk_queue_query(create_query);
+ }
+
+ list++;
+ }
+}
diff --git a/aclk/aclk.h b/aclk/aclk.h
index ab5332dc14..18cdbd248c 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -43,4 +43,8 @@ int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void ng_aclk_host_state_update(RRDHOST *host, int cmd);
+
+void aclk_send_node_instances(void);
+
#endif /* ACLK_H */
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index edbe02be34..4838f4b7f4 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -146,6 +146,19 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
error_report("No usable aclk_del_collector implementation");
}
+void aclk_host_state_update(RRDHOST *host, int connect)
+{
+#ifdef ACLK_NG
+ if (aclk_ng)
+ return ng_aclk_host_state_update(host, connect);
+#endif
+#ifdef ACLK_LEGACY
+ if (!aclk_ng)
+ return legacy_aclk_host_state_update(host, connect);
+#endif
+ error_report("Couldn't use any version of aclk_host_state_update");
+}
+
#endif /* ENABLE_ACLK */
struct label *add_aclk_host_labels(struct label *label) {
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
index b76530c5ea..b0e9a075b4 100644
--- a/aclk/aclk_api.h
+++ b/aclk/aclk_api.h
@@ -35,6 +35,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_host_state_update(RRDHOST *host, int connect);
+
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
.config_section = NULL, \
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 3e2f88e468..2bf60532ac 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -55,11 +55,33 @@ static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char
return t;
}
+static RRDHOST *node_id_2_rrdhost(const char *node_id)
+{
+ int res;
+ uuid_t node_id_bin, host_id_bin;
+ char host_id[UUID_STR_LEN];
+ if (uuid_parse(node_id, node_id_bin)) {
+ error("Couldn't parse UUID %s", node_id);
+ return NULL;
+ }
+ if ((res = get_host_id(&node_id_bin, &host_id_bin))) {
+ error("node not found rc=%d", res);
+ return NULL;
+ }
+ uuid_unparse_lower(host_id_bin, host_id);
+ return rrdhost_find_by_guid(host_id, 0);
+}
+
+#define NODE_ID_QUERY "/node/"
+// TODO this function should be quarantied and written nicely
+// lots of skeletons from initial ACLK Legacy impl.
+// quick and dirty from the start
static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
{
int retval = 0;
usec_t t;
BUFFER *local_buffer = NULL;
+ RRDHOST *query_host = localhost;
#ifdef NETDATA_WITH_ZLIB
int z_ret;
@@ -76,6 +98,24 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
+ if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
+ char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
+ char nodeid[UUID_STR_LEN];
+ if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
+ error("URL requests node_id but there is not enough chars following");
+ retval = 1;
+ goto cleanup;
+ }
+ strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1);
+
+ query_host = node_id_2_rrdhost(nodeid);
+ if (!query_host) {
+ error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid);
+ retval = 1;
+ goto cleanup;
+ }
+ }
+
char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
@@ -86,7 +126,7 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
mysep = strrchr(query->data.http_api_v2.query, '/');
// execute the query
- t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop");
+ t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
@@ -187,6 +227,22 @@ static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query)
return 0;
}
+static int register_node(mqtt_wss_client client, aclk_query_t query) {
+ // TODO create a pending registrations list
+ // with some timeouts to detect registration requests that
+ // go unanswered from the cloud
+ aclk_generate_node_registration(client, &query->data.node_creation);
+ return 0;
+}
+
+static int node_state_update(mqtt_wss_client client, aclk_query_t query) {
+ // TODO create a pending registrations list
+ // with some timeouts to detect registration requests that
+ // go unanswered from the cloud
+ aclk_generate_node_state_update(client, &query->data.node_update);
+ return 0;
+}
+
aclk_query_handler aclk_query_handlers[] = {
{ .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
{ .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
@@ -194,6 +250,8 @@ aclk_query_handler aclk_query_handlers[] = {
{ .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
{ .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
+ { .type = REGISTER_NODE, .name = "register node", .fnc = register_node },
+ { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update },
{ .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index c9461b2338..baca4a2f5d 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -114,6 +114,17 @@ void aclk_query_free(aclk_query_t query)
if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update)
json_object_put(query->data.alarm_update);
+ if (query->type == NODE_STATE_UPDATE) {
+ freez((void*)query->data.node_update.claim_id);
+ freez((void*)query->data.node_update.node_id);
+ }
+
+ if (query->type == REGISTER_NODE) {
+ freez((void*)query->data.node_creation.claim_id);
+ freez((void*)query->data.node_creation.hostname);
+ freez((void*)query->data.node_creation.machine_guid);
+ }