From d005dee55800818b26f6308c433e6aed8079f7fe Mon Sep 17 00:00:00 2001 From: Timotej S <6674623+underhood@users.noreply.github.com> Date: Wed, 7 Jul 2021 16:32:37 +0200 Subject: ACLK-NG New Cloud NodeInstance related msgs (#11234) Adds new cloud arch NodeInstance messages as per design. Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> --- .gitmodules | 3 + CMakeLists.txt | 59 ++++++++- Makefile.am | 56 +++++++- aclk/aclk-schemas | 1 + aclk/aclk.c | 198 ++++++++++++++++++++++++---- aclk/aclk.h | 4 + aclk/aclk_api.c | 13 ++ aclk/aclk_api.h | 2 + aclk/aclk_query.c | 60 ++++++++- aclk/aclk_query_queue.c | 11 ++ aclk/aclk_query_queue.h | 7 +- aclk/aclk_rx_msgs.c | 61 ++++++++- aclk/aclk_rx_msgs.h | 2 + aclk/aclk_tx_msgs.c | 110 ++++++++++++++++ aclk/aclk_tx_msgs.h | 8 ++ aclk/aclk_util.c | 36 ++++- aclk/aclk_util.h | 17 ++- aclk/legacy/agent_cloud_link.c | 23 ++-- aclk/legacy/agent_cloud_link.h | 2 +- aclk/schema-wrappers/connection.cc | 34 +++++ aclk/schema-wrappers/connection.h | 34 +++++ aclk/schema-wrappers/node_connection.cc | 37 ++++++ aclk/schema-wrappers/node_connection.h | 29 ++++ aclk/schema-wrappers/node_creation.cc | 39 ++++++ aclk/schema-wrappers/node_creation.h | 31 +++++ aclk/schema-wrappers/schema_wrapper_utils.h | 12 ++ aclk/schema-wrappers/schema_wrappers.h | 12 ++ configure.ac | 81 ++++++++---- database/sqlite/sqlite_functions.c | 36 +++++ database/sqlite/sqlite_functions.h | 1 + streaming/receiver.c | 8 +- 31 files changed, 934 insertions(+), 93 deletions(-) create mode 160000 aclk/aclk-schemas create mode 100644 aclk/schema-wrappers/connection.cc create mode 100644 aclk/schema-wrappers/connection.h create mode 100644 aclk/schema-wrappers/node_connection.cc create mode 100644 aclk/schema-wrappers/node_connection.h create mode 100644 aclk/schema-wrappers/node_creation.cc create mode 100644 aclk/schema-wrappers/node_creation.h create mode 100644 aclk/schema-wrappers/schema_wrapper_utils.h create mode 100644 aclk/schema-wrappers/schema_wrappers.h diff --git a/.gitmodules b/.gitmodules index ef9349b389..83ef65e8d1 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "mqtt_websockets"] path = mqtt_websockets url = https://github.com/underhood/mqtt_websockets.git +[submodule "aclk/aclk-schemas"] + path = aclk/aclk-schemas + url = https://github.com/netdata/aclk-schemas.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a50ff51d3..f836c718af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -796,6 +796,14 @@ set(ACLK_NG_FILES mqtt_websockets/c-rbuf/src/ringbuffer_internal.h mqtt_websockets/MQTT-C/src/mqtt.c mqtt_websockets/MQTT-C/include/mqtt.h + aclk/schema-wrappers/connection.cc + aclk/schema-wrappers/connection.h + aclk/schema-wrappers/node_connection.cc + aclk/schema-wrappers/node_connection.h + aclk/schema-wrappers/node_creation.cc + aclk/schema-wrappers/node_creation.h + aclk/schema-wrappers/schema_wrappers.h + aclk/schema-wrappers/schema_wrapper_utils.h ) set(SPAWN_PLUGIN_FILES @@ -1038,9 +1046,58 @@ ELSE() message(STATUS "agent-cloud-link Legacy: disabled") ENDIF() +find_package(Protobuf REQUIRED) + +function(PROTOBUF_ACLK_GENERATE_CPP SRCS HDRS) + if(NOT ARGN) + message(SEND_ERROR "Error: PROTOBUF_ACLK_GENERATE_CPP() called without any proto files") + return() + endif() + + set(${SRCS}) + set(${HDRS}) + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(DIR ${ABS_FIL} DIRECTORY) + get_filename_component(FIL_WE ${FIL} NAME_WE) + set(GENERATED_PB_CC "${DIR}/${FIL_WE}.pb.cc") + set(GENERATED_PB_H "${DIR}/${FIL_WE}.pb.h") +# cmake > 3.20 required :( +# cmake_path(SET GENERATED_PB_CC "${DIR}") +# cmake_path(SET GENERATED_PB_H "${DIR}") +# cmake_path(APPEND GENERATED_PB_CC "${FIL_WE}.pb.cc") +# cmake_path(APPEND GENERATED_PB_H "${FIL_WE}.pb.h") + + list(APPEND ${SRCS} ${GENERATED_PB_CC}) + list(APPEND ${HDRS} ${GENERATED_PB_H}) + add_custom_command( + OUTPUT ${GENERATED_PB_CC} + ${GENERATED_PB_H} + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} + ARGS -I=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas --cpp_out=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas ${ABS_FIL} + DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} + COMMENT "Running C++ protocol buffer compiler on ${FIL}" + VERBATIM ) + endforeach() + set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE) + set(${SRCS} ${${SRCS}} PARENT_SCOPE) + set(${HDRS} ${${HDRS}} PARENT_SCOPE) +endfunction() + +set(ACLK_NG_PROTO_DEFS + aclk/aclk-schemas/proto/agent/v1/connection.proto + aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto + aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto + ) +PROTOBUF_ACLK_GENERATE_CPP(ACLK_NG_PROTO_BUILT_SRCS ACLK_NG_PROTO_BUILT_HDRS ${ACLK_NG_PROTO_DEFS}) + +list(APPEND NETDATA_COMMON_LIBRARIES ${PROTOBUF_LIBRARIES}) +list(APPEND NETDATA_COMMON_INCLUDE_DIRS ${PROTOBUF_INCLUDE_DIRS}) +list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER}) list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD}) -list(APPEND NETDATA_FILES ${ACLK_NG_FILES}) +list(APPEND NETDATA_FILES ${ACLK_NG_FILES} ${ACLK_NG_PROTO_BUILT_SRCS} ${ACLK_NG_PROTO_BUILT_HDRS}) list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES}) +include_directories(BEFORE ${CMAKE_SOURCE_DIR}/aclk/aclk-schemas) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/c-rbuf/include) diff --git a/Makefile.am b/Makefile.am index 3c47b62e6d..e3acf87515 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,6 +3,9 @@ AUTOMAKE_OPTIONS = foreign subdir-objects 1.11 ACLOCAL_AMFLAGS = -I build/m4 +nodist_netdata_SOURCES=$(NULL) +BUILT_SOURCES=$(NULL) + MAINTAINERCLEANFILES = \ config.log config.status \ $(srcdir)/Makefile.in \ @@ -572,7 +575,39 @@ ACLK_NG_FILES = \ mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \ mqtt_websockets/MQTT-C/src/mqtt.c \ mqtt_websockets/MQTT-C/include/mqtt.h \ + aclk/schema-wrappers/connection.cc \ + aclk/schema-wrappers/connection.h \ + aclk/schema-wrappers/node_connection.cc \ + aclk/schema-wrappers/node_connection.h \ + aclk/schema-wrappers/node_creation.cc \ + aclk/schema-wrappers/node_creation.h \ + aclk/schema-wrappers/schema_wrappers.h \ + aclk/schema-wrappers/schema_wrapper_utils.h \ + $(NULL) + +ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \ + aclk/aclk-schemas/proto/agent/v1/connection.pb.h \ + aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \ + aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h \ + aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \ + aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h \ $(NULL) + +BUILT_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES) +nodist_netdata_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES) + +aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \ +aclk/aclk-schemas/proto/agent/v1/connection.pb.h: aclk/aclk-schemas/proto/agent/v1/connection.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \ +aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h: aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \ +aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h: aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + endif #ACLK_NG if ENABLE_ACLK @@ -784,12 +819,15 @@ netdata_LDADD = \ $(NETDATA_COMMON_LIBS) \ $(NULL) +if ACLK_NG + netdata_LDADD += $(OPTIONAL_PROTOBUF_LIBS) +endif + if ACLK_LEGACY netdata_LDADD += \ $(abs_top_srcdir)/externaldeps/mosquitto/libmosquitto.a \ $(OPTIONAL_LIBCAP_LIBS) \ $(OPTIONAL_LWS_LIBS) \ - $(NETDATA_COMMON_LIBS) \ $(NULL) endif #ACLK_LEGACY @@ -899,12 +937,15 @@ endif if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) - netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) - BUILT_SOURCES = \ + netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \ + $(OPTIONAL_PROTOBUF_LIBS) \ + $(NULL) + BACKEND_PROMETHEUS_BUILT_SOURCES = \ exporting/prometheus/remote_write/remote_write.pb.cc \ exporting/prometheus/remote_write/remote_write.pb.h \ $(NULL) - nodist_netdata_SOURCES = $(BUILT_SOURCES) + BUILT_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES) + nodist_netdata_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES) exporting/prometheus/remote_write/remote_write.pb.cc \ exporting/prometheus/remote_write/remote_write.pb.h: exporting/prometheus/remote_write/remote_write.proto @@ -1033,14 +1074,17 @@ if ENABLE_UNITTESTS exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS) if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE exporting_tests_exporting_engine_testdriver_SOURCES += $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) - exporting_tests_exporting_engine_testdriver_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) + exporting_tests_exporting_engine_testdriver_LDADD += \ + $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \ + $(OPTIONAL_PROTOBUF_LIBS) \ + $(NULL) exporting_tests_exporting_engine_testdriver_LDFLAGS += \ -Wl,--wrap=init_write_request \ -Wl,--wrap=add_host_info \ -Wl,--wrap=add_label \ -Wl,--wrap=add_metric \ $(NULL) - nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BUILT_SOURCES) + nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BACKEND_PROMETHEUS_BUILT_SOURCES) endif if ENABLE_BACKEND_KINESIS exporting_tests_exporting_engine_testdriver_SOURCES += $(KINESIS_EXPORTING_FILES) diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas new file mode 160000 index 0000000000..b5fef3f3a8 --- /dev/null +++ b/aclk/aclk-schemas @@ -0,0 +1 @@ +Subproject commit b5fef3f3a84e6a5013b36b906f4677012c734416 diff --git a/aclk/aclk.c b/aclk/aclk.c index 7e8c1c32e0..3cb25a67d3 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -223,6 +223,45 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int aclk_handle_cloud_message(cmsg); } + +static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos) +{ + if (msglen > RX_MSGLEN_MAX) + error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); + + debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); + + if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { + error("Link is shutting down. Ignoring message."); + return; + } + + const char *msgtype = strrchr(topic, '/'); + if (unlikely(!msgtype)) { + error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic); + return; + } + msgtype++; + if (unlikely(!*msgtype)) { + error_report("Message type empty. Ignoring message from topic \"%s\"", topic); + return; + } + +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 512 + char filename[FN_MAX_LEN]; + int logfd; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype); + logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); + if(logfd < 0) + error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + write(logfd, msg, msglen); + close(logfd); +#endif + + aclk_handle_new_cloud_msg(msgtype, msg, msglen); +} + static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) @@ -306,11 +345,6 @@ static inline void queue_connect_payloads(void) static inline void mqtt_connected_actions(mqtt_wss_client client) { - // TODO global vars? - usec_t now = now_realtime_usec(); - aclk_session_sec = now / USEC_PER_SEC; - aclk_session_us = now % USEC_PER_SEC; - const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) @@ -318,16 +352,28 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); + if (aclk_use_new_cloud_arch) { + topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); + } + aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); + if (!aclk_use_new_cloud_arch) { + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { + error("Sending `connect` payload immediately as popcorning was finished already."); + queue_connect_payloads(); + } + ACLK_SHARED_STATE_UNLOCK; + } else { + aclk_send_agent_connection_update(client, 1); } - ACLK_SHARED_STATE_UNLOCK; } /* Waits until agent is ready or needs to exit @@ -337,10 +383,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) * @return 0 - Popcorning Finished - Agent STABLE, * !0 - netdata_exit */ -static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads) +static int wait_popcorning_finishes() { time_t elapsed; int need_wait; + if (aclk_use_new_cloud_arch) + return 0; + while (!netdata_exit) { ACLK_SHARED_STATE_LOCK; if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { @@ -352,9 +401,6 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th aclk_shared_state.agent_state = ACLK_HOST_STABLE; ACLK_SHARED_STATE_UNLOCK; error("ACLK localhost popocorn finished"); - if (unlikely(!query_threads->thread_list)) - aclk_query_threads_start(query_threads, client); - queue_connect_payloads(); return 0; } ACLK_SHARED_STATE_UNLOCK; @@ -370,7 +416,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client) error("Preparing to Gracefully Shutdown the ACLK"); aclk_queue_lock(); aclk_queue_flush(); - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + if (aclk_use_new_cloud_arch) + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); + else + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { if (now_monotonic_sec() - t >= 2) { @@ -481,7 +531,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt; + json_object *lwt = NULL; while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); @@ -546,7 +596,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (aclk_use_new_cloud_arch) + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); + else + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); continue; @@ -567,9 +621,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } #endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); + aclk_session_newarch = now_realtime_usec(); + aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; + aclk_session_us = aclk_session_newarch % USEC_PER_SEC; + + if (aclk_use_new_cloud_arch) { + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); + } else { + lwt = aclk_generate_disconnect(NULL); + mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); + mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); + } #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -583,7 +645,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - json_object_put(lwt); + if (aclk_use_new_cloud_arch) + freez((char *)mqtt_conn_params.will_msg); + else + json_object_put(lwt); if (!ret) { info("MQTTWSS connection succeeded"); @@ -609,6 +674,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) */ void *aclk_main(void *ptr) { +#ifdef ACLK_NEWARCH_DEVMODE + aclk_use_new_cloud_arch = 1; +#endif struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct aclk_stats_thread *stats_thread = NULL; @@ -642,7 +710,7 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) { error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -666,8 +734,14 @@ void *aclk_main(void *ptr) // warning this assumes the popcorning is relative short (3s) // if that changes call mqtt_wss_service from within // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes(mqttwss_client, &query_threads)) + if (wait_popcorning_finishes()) goto exit_full; + + if (unlikely(!query_threads.thread_list)) + aclk_query_threads_start(&query_threads, mqttwss_client); + + if (!aclk_use_new_cloud_arch) + queue_connect_payloads(); if (!handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); @@ -775,7 +849,7 @@ void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *m { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -818,7 +892,7 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -854,3 +928,77 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m query->data.metadata_alarms.initial_on_connect = 0; aclk_queue_query(query); } + +void ng_aclk_host_state_update(RRDHOST *host, int cmd) +{ + uuid_t node_id; + int ret; + + if (!aclk_connected || !aclk_use_new_cloud_arch) + return; + + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = 1; //TODO - real hop count instead of hardcoded + create_query->data.node_creation.hostname = strdupz(host->hostname); + create_query->data.node_creation.machine_guid = strdupz(host->machine_guid); + aclk_queue_query(create_query); + return; + } + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = cmd; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); +} + +void aclk_send_node_instances() +{ + struct node_instance_list *list = get_node_list(); + while (!uuid_is_null(list->host_id)) { + if (!uuid_is_null(list->node_id)) { + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = list->live; + query->data.node_update.hops = list->hops; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + } else { + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = uuid_compare(list->host_id, localhost->host_uuid) ? 1 : 0; // TODO - when streaming supports hops + create_query->data.node_creation.hostname = list->hostname; + create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid); + aclk_queue_query(create_query); + } + + list++; + } +} diff --git a/aclk/aclk.h b/aclk/aclk.h index ab5332dc14..18cdbd248c 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -43,4 +43,8 @@ int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create); void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void ng_aclk_host_state_update(RRDHOST *host, int cmd); + +void aclk_send_node_instances(void); + #endif /* ACLK_H */ diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index edbe02be34..4838f4b7f4 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -146,6 +146,19 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu error_report("No usable aclk_del_collector implementation"); } +void aclk_host_state_update(RRDHOST *host, int connect) +{ +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_host_state_update(host, connect); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_host_state_update(host, connect); +#endif + error_report("Couldn't use any version of aclk_host_state_update"); +} + #endif /* ENABLE_ACLK */ struct label *add_aclk_host_labels(struct label *label) { diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index b76530c5ea..b0e9a075b4 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -35,6 +35,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_host_state_update(RRDHOST *host, int connect); + #define NETDATA_ACLK_HOOK \ { .name = "ACLK_Main", \ .config_section = NULL, \ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 3e2f88e468..2bf60532ac 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -55,11 +55,33 @@ static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char return t; } +static RRDHOST *node_id_2_rrdhost(const char *node_id) +{ + int res; + uuid_t node_id_bin, host_id_bin; + char host_id[UUID_STR_LEN]; + if (uuid_parse(node_id, node_id_bin)) { + error("Couldn't parse UUID %s", node_id); + return NULL; + } + if ((res = get_host_id(&node_id_bin, &host_id_bin))) { + error("node not found rc=%d", res); + return NULL; + } + uuid_unparse_lower(host_id_bin, host_id); + return rrdhost_find_by_guid(host_id, 0); +} + +#define NODE_ID_QUERY "/node/" +// TODO this function should be quarantied and written nicely +// lots of skeletons from initial ACLK Legacy impl. +// quick and dirty from the start static int http_api_v2(mqtt_wss_client client, aclk_query_t query) { int retval = 0; usec_t t; BUFFER *local_buffer = NULL; + RRDHOST *query_host = localhost; #ifdef NETDATA_WITH_ZLIB int z_ret; @@ -76,6 +98,24 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; + if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { + char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); + char nodeid[UUID_STR_LEN]; + if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { + error("URL requests node_id but there is not enough chars following"); + retval = 1; + goto cleanup; + } + strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1); + + query_host = node_id_2_rrdhost(nodeid); + if (!query_host) { + error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid); + retval = 1; + goto cleanup; + } + } + char *mysep = strchr(query->data.http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); @@ -86,7 +126,7 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) mysep = strrchr(query->data.http_api_v2.query, '/'); // execute the query - t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop"); + t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -187,6 +227,22 @@ static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query) return 0; } +static int register_node(mqtt_wss_client client, aclk_query_t query) { + // TODO create a pending registrations list + // with some timeouts to detect registration requests that + // go unanswered from the cloud + aclk_generate_node_registration(client, &query->data.node_creation); + return 0; +} + +static int node_state_update(mqtt_wss_client client, aclk_query_t query) { + // TODO create a pending registrations list + // with some timeouts to detect registration requests that + // go unanswered from the cloud + aclk_generate_node_state_update(client, &query->data.node_update); + return 0; +} + aclk_query_handler aclk_query_handlers[] = { { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, @@ -194,6 +250,8 @@ aclk_query_handler aclk_query_handlers[] = { { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, + { .type = REGISTER_NODE, .name = "register node", .fnc = register_node }, + { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update }, { .type = UNKNOWN, .name = NULL, .fnc = NULL } }; diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index c9461b2338..baca4a2f5d 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -114,6 +114,17 @@ void aclk_query_free(aclk_query_t query) if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update) json_object_put(query->data.alarm_update); + if (query->type == NODE_STATE_UPDATE) { + freez((void*)query->data.node_update.claim_id); + freez((void*)query->data.node_update.node_id); + } + + if (query->type == REGISTER_NODE) { + freez((void*)query->data.node_creation.claim_id); + freez((void*)query->data.node_creation.hostname); + freez((void*)query->data.node_creation.machine_guid); + } + freez(query->dedup_id); freez(query->callback_topic); freez(query->msg_id); diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 050dc7d224..cbc31ae3cc 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -5,6 +5,7 @@ #include "libnetdata/libnetdata.h" #include "daemon/common.h" +#include "schema-wrappers/schema_wrappers.h" typedef enum { UNKNOWN, @@ -13,7 +14,9 @@ typedef enum { HTTP_API_V2, CHART_NEW, CHART_DEL, - ALARM_STATE_UPDATE + ALARM_STATE_UPDATE, + REGISTER_NODE, + NODE_STATE_UPDATE } aclk_query_type_t; struct aclk_query_metadata { @@ -55,6 +58,8 @@ struct aclk_query { struct aclk_query_metadata metadata_alarms; struct aclk_query_http_api_v2 http_api_v2; struct aclk_query_chart_add_del chart_add_del; + node_instance_creation_t node_creation; + node_instance_connection_t node_update; json_object *alarm_update; } data; }; diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index ef83461a35..26e8fdc51b 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -7,7 +7,7 @@ #include "aclk.h" #define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" -#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/" +#define ACLK_CLOUD_REQ_V2_PREFIX "GET /" #define ACLK_V_COMPRESSION 2 @@ -91,6 +91,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur { const char *start, *end; + // TODO better check of URL if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { errno = 0; error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); @@ -120,7 +121,9 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { - HTTP_CHECK_AGENT_INITIALIZED(); + if (!aclk_use_new_cloud_arch) { + HTTP_CHECK_AGENT_INITIALIZED(); + } aclk_query_t query; @@ -256,3 +259,57 @@ err_cleanup_nojson: return 1; } + +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +{ + // TODO do the look up table with hashes to optimize when there are more + // than few + if (!strcmp(message_type, "cmd")) { + aclk_handle_cloud_message((char *)msg); + return; + } + if (!strcmp(message_type, "CreateNodeInstanceResult")) { + node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); + debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); + uuid_t host_id, node_id; + uuid_parse(res.machine_guid, host_id); + uuid_parse(res.node_id, node_id); + update_node_id(&host_id, &node_id); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + query->data.node_update.live = 0; + + if (host) { + // not all host must have RRDHOST struct created for them + // if they never connected during runtime of agent + if (host == localhost) { + query->data.node_update.live = 1; + query->data.node_update.hops = 0; + } else { + netdata_mutex_lock(&host->receiver_lock); + query->data.node_update.live = (host->receiver != NULL); + netdata_mutex_unlock(&host->receiver_lock); + } + } + + query->data.node_update.node_id = res.node_id; // aclk_query_free will free it + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + freez(res.machine_guid); + return; + } + if (!strcmp(message_type, "SendNodeInstances")) { + debug(D_ACLK, "Got SendNodeInstances"); + aclk_send_node_instances(); + return; + } + + error ("Unknown new cloud arch message type received \"%s\"", message_type); +} diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index 21c202dee2..98024d5d4e 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -10,4 +10,6 @@ int aclk_handle_cloud_message(char *payload); +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len); + #endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index e53d966146..2a0fdd5e33 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -36,6 +36,37 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, #endif } +static uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +{ +#ifndef ACLK_LOG_CONVERSATION_DIR + UNUSED(msgname); +#endif + uint16_t packet_id; + const char *topic = aclk_get_topic(subtopic); + + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting message send."); + return 0; + } + + mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 1024 + char filename[FN_MAX_LEN]; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname); + FILE *fptr; + if (fptr = fopen(filename,"w")) { + fwrite(msg, msg_len, 1, fptr); + fclose(fptr); + } +#endif + + return packet_id; +} + static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) { uint16_t packet_id; @@ -372,6 +403,85 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) return pid; } +// new protobuf msgs +uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { + size_t len; + uint16_t pid; + update_agent_connection_t conn = { + .reachable = (reachable ? 1 : 0), + .lwt = 0, + .session_id = aclk_session_newarch + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return 0; + } + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(&len, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) { + error("Error generating agent::v1::UpdateAgentConnection payload"); + return 0; + } + + pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); + freez(msg); + return pid; +} + +char *aclk_generate_lwt(size_t *size) { + update_agent_connection_t conn = { + .reachable = 0, + .lwt = 1, + .session_id = aclk_session_newarch + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return NULL; + } + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(size, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) + error("Error generating agent::v1::UpdateAgentConnection payload for LWT"); + + return msg; +} + +void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) { + size_t len; + char *msg = generate_node_instance_creation(&len, node_creation); + if (!msg) { + error("Error generating nodeinstance::create::v1::CreateNodeInstance"); + return; + } + + aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); + freez(msg); +} + +void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) { + size_t len; + char *msg = generate_node_instance_connection(&len, node_connection); + if (!msg) { + error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection"); + return; + } + + aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + freez(msg); +} + #ifndef __GNUC__ #pragma endregion #endif diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index 9bcf7a5bc0..e4445f4427 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -6,6 +6,7 @@ #include "libnetdata/libnetdata.h" #include "daemon/common.h" #include "mqtt_wss_client.h" +#include "schema-wrappers/schema_wrappers.h" void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); @@ -19,4 +20,11 @@ void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg); json_object *aclk_generate_disconnect(const char *message); int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message); +// new protobuf msgs +uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable); +char *aclk_generate_lwt(size_t *size); + +void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation); +void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection); + #endif diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index 4910bbdb62..2f0035d825 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -10,6 +10,9 @@ #define UUID_STR_LEN 37 #endif +int aclk_use_new_cloud_arch = 0; +usec_t aclk_session_newarch = 0; + aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) { if (!strcmp(str, "json")) { return ACLK_ENC_JSON; @@ -107,14 +110,18 @@ struct topic_name { // in answer to /password endpoint const char *name; } topic_names[] = { - { .id = ACLK_TOPICID_CHART, .name = "chart" }, - { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, - { .id = ACLK_TOPICID_METADATA, .name = "meta" }, - { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, - { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } + { .id = ACLK_TOPICID_CHART, .name = "chart" }, + { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, + { .id = ACLK_TOPICID_METADATA, .name = "meta" }, + { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, + { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" }, + { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" }, + { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" }, + { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" }, + { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } }; -enum aclk_topics compulsory_topics[] = { +enum aclk_topics compulsory_topics_legacy[] = { ACLK_TOPICID_CHART, ACLK_TOPICID_ALARMS, ACLK_TOPICID_METADATA, @@ -122,6 +129,19 @@ enum aclk_topics compulsory_topics[] = { ACLK_TOPICID_UNKNOWN }; +enum aclk_topics compulsory_topics_new_cloud_arch[] = { +// TODO remove old topics once not needed anymore + ACLK_TOPICID_CHART, + ACLK_TOPICID_ALARMS, + ACLK_TOPICID_METADATA, + ACLK_TOPICID_COMMAND, + ACLK_TOPICID_AGENT_CONN, + ACLK_TOPICID_CMD_NG_V1, + ACLK_TOPICID_CREATE_NODE, + ACLK_TOPICID_NODE_CONN, + ACLK_TOPICID_UNKNOWN +}; + static enum aclk_topics topic_name_to_id(const char *name) { struct topic_name *topic = topic_names; while (topic->name) { @@ -186,7 +206,7 @@ static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *to } topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it))); if (topic->topic_id == ACLK_TOPICID_UNKNOWN) { - info("topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); } json_object_iter_next(&it); continue; @@ -244,6 +264,8 @@ int aclk_generate_topic_cache(struct json_object *json) } } + enum aclk_topics *compulsory_topics = aclk_use_new_cloud_arch ? compulsory_topics_new_cloud_arch : compulsory_topics_legacy; + for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { if (!aclk_get_topic(compulsory_topics[i])) { error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index a96e71bbbb..04897be809 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -8,6 +8,9 @@ // Helper stuff which should not have any further inside ACLK dependency // and are supposed not to be needed outside of ACLK +extern int aclk_use_new_cloud_arch; +extern usec_t aclk_session_newarch; + typedef enum { ACLK_ENC_UNKNOWN = 0, ACLK_ENC_JSON, @@ -51,11 +54,15 @@ void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc); void aclk_env_t_destroy(aclk_env_t *env); enum aclk_topics { - ACLK_TOPICID_UNKNOWN = 0, - ACLK_TOPICID_CHART = 1, - ACLK_TOPICID_ALARMS = 2, - ACLK_TOPICID_METADATA = 3, - ACLK_TOPICID_COMMAND = 4 + ACLK_TOPICID_UNKNOWN = 0, + ACLK_TOPICID_CHART = 1, + ACLK_TOPICID_ALARMS = 2, + ACLK_TOPICID_METADATA = 3, + ACLK_TOPICID_COMMAND = 4, + ACLK_TOPICID_AGENT_CONN = 5, + ACLK_TOPICID_CMD_NG_V1 = 6, + ACLK_TOPICID_CREATE_NODE = 7, + ACLK_TOPICID_NODE_CONN = 8 }; const char *aclk_get_topic(enum aclk_topics topic); diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c index da0af73373..3c579933bd 100644 --- a/aclk/legacy/agent_cloud_link.c +++ b/aclk/legacy/agent_cloud_link.c @@ -1269,7 +1269,7 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) return 0; } -void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +void legacy_aclk_host_state_update(RRDHOST *host, int connect) { #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) @@ -1281,19 +1281,14 @@ void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) if (unlikely(aclk_host_initializing(localhost))) return; - switch (cmd) { - case ACLK_CMD_CHILD_CONNECT: - debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); - aclk_start_host_popcorning(host); - legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); - break; - case ACLK_CMD_CHILD_DISCONNECT: - debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); - aclk_stop_host_popcorning(host); - legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); - break; - default: - error("Unknown command for aclk_host_state_update %d.", (int)cmd); + if (connect) { + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + } else { + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); } } diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h index 912034b7d7..25f8694ca8 100644 --- a/aclk/legacy/agent_cloud_link.h +++ b/aclk/legacy/agent_cloud_link.h @@ -73,7 +73,7 @@ void legacy_aclk_alarm_reload(void); unsigned long int aclk_reconnect_delay(int mode); extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); -void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd); +void legacy_aclk_host_state_update(RRDHOST *host, int connect); int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd); void aclk_update_next_child_to_popcorn(void); diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc new file mode 100644 index 0000000000..9dbc0e6b5c --- /dev/null +++ b/aclk/schema-wrappers/connection.cc @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/agent/v1/connection.pb.h" +#include "connection.h" + +#include "schema_wrapper_utils.h" + +#include +#include + +char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data) +{ + agent::v1::UpdateAgentConnection connupd; + + connupd.set_claim_id(data->claim_id); + connupd.set_reachable(data->reachable); + connupd.set_session_id(data->session_id); + + connupd.set_update_source((data->lwt) ? agent::v1::CONNECTION_UPDATE_SOURCE_LWT : agent::v1::CONNECTION_UPDATE_SOURCE_AGENT); + + struct timeval tv; + gettimeofday(&tv, NULL); + + google::protobuf::Timestamp *timestamp = connupd.mutable_updated_at(); + timestamp->set_seconds(tv.tv_sec); + timestamp->set_nanos(tv.tv_usec * 1000); + + *len = PROTO_COMPAT_MSG_SIZE(connupd); + char *msg = (char*)malloc(*len); + if (msg) + connupd.SerializeToArray(msg, *len); + + return msg; +} diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h new file mode 100644 index 0000000000..ac661c9542 --- /dev/null +++ b/aclk/schema-wrappers/connection.h @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H +#define ACLK_SCHEMA_WRAPPER_CONNECTION_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char *claim_id; + unsigned int reachable:1; + + int64_t session_id; + + unsigned int lwt:1; + +// TODO in future optional fields +// > 15 optional fields: +// How long the system was running until connection (only applicable when reachable=true) +// google.protobuf.Duration system_uptime = 15; +// How long the netdata agent was running until connection (only applicable when reachable=true) +// google.protobuf.Duration agent_uptime = 16; + + +} update_agent_connection_t; + +char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONNECTION_H */ diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc new file mode 100644 index 0000000000..0a4c8ece1b --- /dev/null +++ b/aclk/schema-wrappers/node_connection.cc @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/nodeinstance/connection/v1/connection.pb.h" +#include "node_connection.h" + +#include "schema_wrapper_utils.h" + +#include +#include + +char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data) { + nodeinstance::v1::UpdateNodeInstanceConnection msg; + + if(data->claim_id) + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + + msg.set_liveness(data->live); + msg.set_queryable(data->queryable); + + msg.set_session_id(data->session_id); + msg.set_hops(data->hops); + + struct timeval tv; + gettimeofday(&tv, NULL); + + google::protobuf::Timestamp *timestamp = msg.mutable_updated_at(); + timestamp->set_seconds(tv.tv_sec); + timestamp->set_nanos(tv.tv_usec * 1000); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h new file mode 100644 index 0000000000..3fd2072134 --- /dev/null +++ b/aclk/schema-wrappers/node_connection.h @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H +#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char* claim_id; + const char* node_id; + + unsigned int live:1; + unsigned int queryable:1; + + int64_t session_id; + + int32_t hops; +} node_instance_connection_t; + +char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H */ diff --git a/aclk/schema-wrappers/node_creation.cc b/aclk/schema-wrappers/node_creation.cc new file mode 100644 index 0000000000..c696bb27ba --- /dev/null +++ b/aclk/schema-wrappers/node_creation.cc @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/nodeinstance/create/v1/creation.pb.h" +#include "node_creation.h" + +#include "schema_wrapper_utils.h" + +#include + +char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data) +{ + nodeinstance::create::v1::CreateNodeInstance msg; + + if (data->claim_id) + msg.set_claim_id(data->claim_id); + msg.set_machine_guid(data->machine_guid); + msg.set_hostname(data->hostname); + msg.set_hops(data->hops); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} + +node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len) +{ + nodeinstance::create::v1::CreateNodeInstanceResult msg; + node_instance_creation_result_t res = { .node_id = NULL, .machine_guid = NULL }; + + if (!msg.ParseFromArray(data, len)) + return res; + + res.node_id = strdup(msg.node_id().c_str()); + res.machine_guid = strdup(msg.machine_guid().c_str()); + return res; +} diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h new file mode 100644 index 0000000000..71e45ef55e --- /dev/null +++ b/aclk/schema-wrappers/node_creation.h @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_CREATION_H +#define ACLK_SCHEMA_WRAPPER_NODE_CREATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char* claim_id; + const char* machine_guid; + const char* hostname; + + int32_t hops; +} node_instance_creation_t; + +typedef struct { + char *node_id; + char *machine_guid; +} node_instance_creation_result_t; + +char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data); +node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_CREATION_H */ diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h new file mode 100644 index 0000000000..ba9f82866a --- /dev/null +++ b/aclk/schema-wrappers/schema_wrapper_utils.h @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef SCHEMA_WRAPPER_UTILS_H +#define SCHEMA_WRAPPER_UTILS_H + +#if GOOGLE_PROTOBUF_VERSION < 3001000 +#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize(); +#else +#define PROTO_COMPAT_MSG_SIZE(msg) msg.ByteSizeLong(); +#endif + +#endif /* SCHEMA_WRAPPER_UTILS_H */ diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h new file mode 100644 index 0000000000..74fd018d49 --- /dev/null +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +// utility header to include all the message wrappers at once + +#ifndef SCHEMA_WRAPPERS_H +#define SCHEMA_WRAPPERS_H + +#include "connection.h" +#include "node_connection.h" +#include "node_creation.h" + +#endif /* SCHEMA_WRAPPERS_H */ diff --git a/configure.ac b/configure.ac index 9d0865e15a..8c81fe494a 100644 --- a/configure.ac +++ b/configure.ac @@ -642,6 +642,27 @@ AM_CONDITIONAL([ENABLE_CAPABILITY], [test "${with_libcap}" = "yes"]) # ----------------------------------------------------------------------------- # ACLK +PKG_CHECK_MODULES( + [PROTOBUF], + [protobuf >= 3], + [have_libprotobuf=yes], + [have_libprotobuf=no] +) + +AC_PATH_PROG([PROTOC], [protoc], [no]) +AS_IF( + [test x"${PROTOC}" == x"no"], + [have_protoc=no], + [have_protoc=yes] +) + +AC_PATH_PROG([CXX_BINARY], [${CXX}], [no]) +AS_IF( + [test x"${CXX_BINARY}" == x"no"], + [have_CXX_compiler=no], + [have_CXX_compiler=yes] +) + AC_MSG_CHECKING([if Cloud functionality should be enabled]) AC_MSG_RESULT([${enable_cloud}]) if test "$aclk_ng" = "no"; then @@ -684,6 +705,27 @@ if test "$enable_cloud" != "no" -a "$aclk_ng" != "no"; then else AC_MSG_RESULT([yes]) fi + AC_MSG_CHECKING([if protobuf available for ACLK Next Generation]) + if test "${have_libprotobuf}" != "yes"; then + AC_MSG_RESULT([no]) + can_enable_ng="no" + else + AC_MSG_RESULT([yes]) + fi + AC_MSG_CHECKING([if protoc available for ACLK Next Generation]) + if test "${have_protoc}" != "yes"; then + AC_MSG_RESULT([no]) + can_enable_ng="no" + else + AC_MSG_RESULT([yes]) + fi + AC_MSG_CHECKING([if C++ compiler available for ACLK Next Generation]) + if test "${have_CXX_compiler}" != "yes"; then + AC_MSG_RESULT([no]) + can_enable_ng="no" + else + AC_MSG_RESULT([yes]) + fi AC_MSG_CHECKING([ACLK Next Generation can be built]) AC_MSG_RESULT([${can_enable_ng}]) if test "$can_enable_ng" = "no" -a "$aclk_ng" = "yes"; then @@ -694,7 +736,10 @@ if test "$enable_cloud" != "no" -a "$aclk_ng" != "no"; then enable_aclk="yes" AC_DEFINE([ACLK_NG], [1], [ACLK Next Generation Should be used]) AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK]) - OPTIONAL_ACLK_NG_CFLAGS="-I \$(abs_top_srcdir)/mqtt_websockets/src/include -I \$(abs_top_srcdir)/mqtt_websockets/c-rbuf/include -I \$(abs_top_srcdir)/mqtt_websockets/MQTT-C/include" + OPTIONAL_ACLK_NG_CFLAGS="-I \$(abs_top_srcdir)/mqtt_websockets/src/include -I \$(abs_top_srcdir)/mqtt_websockets/c-rbuf/include -I \$(abs_top_srcdir)/mqtt_websockets/MQTT-C/include -I \$(abs_top_srcdir)/aclk/aclk-schemas" + OPTIONAL_PROTOBUF_CFLAGS="${PROTOBUF_CFLAGS}" + CXX11FLAG="-std=c++11" + OPTIONAL_PROTOBUF_LIBS="${PROTOBUF_LIBS}" fi fi @@ -1291,13 +1336,6 @@ AM_CONDITIONAL([ENABLE_EXPORTING_PUBSUB], [test "${enable_exporting_pubsub}" = " # ----------------------------------------------------------------------------- # Prometheus remote write backend - libprotobuf, libsnappy, protoc -PKG_CHECK_MODULES( - [PROTOBUF], - [protobuf >= 3], - [have_libprotobuf=yes], - [have_libprotobuf=no] -) - AC_MSG_CHECKING([for snappy::RawCompress in -lsnappy]) AC_LANG_SAVE @@ -1333,20 +1371,6 @@ AC_MSG_CHECKING([for snappy::RawCompress in -lsnappy]) AC_MSG_RESULT([${have_libsnappy}]) -AC_PATH_PROG([PROTOC], [protoc], [no]) -AS_IF( - [test x"${PROTOC}" == x"no"], - [have_protoc=no], - [have_protoc=yes] -) - -AC_PATH_PROG([CXX_BINARY], [${CXX}], [no]) -AS_IF( - [test x"${CXX_BINARY}" == x"no"], - [have_CXX_compiler=no], - [have_CXX_compiler=yes] -) - test "${enable_backend_prometheus_remote_write}" = "yes" -a "${have_libprotobuf}" != "yes" && \ AC_MSG_ERROR([libprotobuf required but not found. try installing protobuf]) @@ -1364,9 +1388,11 @@ if test "${enable_backend_prometheus_remote_write}" != "no" -a "${have_libprotob -a "${have_protoc}" = "yes" -a "${have_CXX_compiler}" = "yes"; then enable_backend_prometheus_remote_write="yes" AC_DEFINE([ENABLE_PROMETHEUS_REMOTE_WRITE], [1], [Prometheus remote write API usability]) - OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS="${PROTOBUF_CFLAGS} ${SNAPPY_CFLAGS} -I \$(abs_top_srcdir)/exporting/prometheus/remote_write" + OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS="${SNAPPY_CFLAGS} -I \$(abs_top_srcdir)/exporting/prometheus/remote_write" CXX11FLAG="-std=c++11" - OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS="${PROTOBUF_LIBS} ${SNAPPY_LIBS}" + OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS="${SNAPPY_LIBS}" + OPTIONAL_PROTOBUF_CFLAGS="${PROTOBUF_CFLAGS}" + OPTIONAL_PROTOBUF_LIBS="${PROTOBUF_LIBS}" else enable_backend_prometheus_remote_write="no" fi @@ -1449,7 +1475,8 @@ AC_MSG_RESULT([${enable_lto}]) AM_CONDITIONAL([ENABLE_CXX_LINKER], [test "${enable_backend_kinesis}" = "yes" \ -o "${enable_exporting_pubsub}" = "yes" \ - -o "${enable_backend_prometheus_remote_write}" = "yes"]) + -o "${enable_backend_prometheus_remote_write}" = "yes" \ + -o "${aclk_ng}" = "yes"]) AC_DEFINE_UNQUOTED([NETDATA_USER], ["${with_user}"], [use this user to drop privileged]) @@ -1481,7 +1508,7 @@ CFLAGS="${CFLAGS} ${OPTIONAL_MATH_CFLAGS} ${OPTIONAL_NFACCT_CFLAGS} ${OPTIONAL_Z ${OPTIONAL_LIBCAP_CFLAGS} ${OPTIONAL_IPMIMONITORING_CFLAGS} ${OPTIONAL_CUPS_CFLAGS} ${OPTIONAL_XENSTAT_FLAGS} \ ${OPTIONAL_KINESIS_CFLAGS} ${OPTIONAL_PUBSUB_CFLAGS} ${OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS} \ ${OPTIONAL_MONGOC_CFLAGS} ${LWS_CFLAGS} ${OPTIONAL_JSONC_STATIC_CFLAGS} ${OPTIONAL_BPF_CFLAGS} ${OPTIONAL_JUDY_CFLAGS} \ - ${OPTIONAL_ACLK_NG_CFLAGS}" + ${OPTIONAL_ACLK_NG_CFLAGS} ${OPTIONAL_PROTOBUF_CFLAGS}" CXXFLAGS="${CFLAGS} ${CXX11FLAG}" @@ -1532,6 +1559,8 @@ AC_SUBST([OPTIONAL_MONGOC_CFLAGS]) AC_SUBST([OPTIONAL_MONGOC_LIBS]) AC_SUBST([OPTIONAL_LWS_LIBS]) AC_SUBST([OPTIONAL_ACLK_NG_CFLAGS]) +AC_SUBST([OPTIONAL_PROTOBUF_CFLAGS]) +AC_SUBST([OPTIONAL_PROTOBUF_LIBS]) # ----------------------------------------------------------------------------- # Check if cmocka is available - needed for unit testing diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index b7c997ae9a..7fa340bdb8 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -1430,6 +1430,42 @@ failed: return rc - 1; } +#define SQL_SELECT_HOST_BY_NODE_ID "select host_id from node_instance where node_id = @node_id;" + +int get_host_id(uuid_t *node_id, uuid_t *host_id) +{ + sqlite3_stmt *res = NULL; + int rc; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) + error_report("Database has not been initialized"); + return 1; + } + + rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_NODE_ID, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to select node instance information for a node"); + return 1; + } + + rc = sqlite3_bind_blob(res, 1, node_id, sizeof(*node_id), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id parameter to select node instance information"); + goto failed; + } + + rc = sqlite3_step(res); + if (likely(rc == SQLITE_ROW && host_id)) + uuid_copy(*host_id, *((uuid_t *) sqlite3_column_blob(res, 0))); + +failed: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("Failed to finalize the prepared statement when selecting node instance information"); + + return (rc == SQLITE_ROW) ? 0 : -1; +} + #define SQL_SELECT_NODE_ID "select node_id from node_instance where host_id = @host_id and node_id not null;" int get_node_id(uuid_t *host_id, uuid_t *node_id) diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index fe1b2acb69..4fb43c2640 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -74,6 +74,7 @@ extern void sql_build_context_param_list(struct context_param **param_list, RRDH extern void store_claim_id(uuid_t *host_id, uuid_t *claim_id); extern int update_node_id(uuid_t *host_id, uuid_t *node_id); extern int get_node_id(uuid_t *host_id, uuid_t *node_id); +extern int get_host_id(uuid_t *node_id, uuid_t *host_id); extern void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id); extern struct node_instance_list *get_node_list(void); extern void sql_load_node_id(RRDHOST *host); diff --git a/streaming/receiver.c b/streaming/receiver.c index e1fcaafb38..6771387920 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -454,11 +454,11 @@ static int rrdpush_receive(struct receiver_state *rpt) cd.version = rpt->stream_version; -#if defined(ENABLE_ACLK) && !defined(ACLK_NG) +#if defined(ENABLE_ACLK) // in case we have cloud connection we inform cloud // new slave connected if (netdata_cloud_setting) - aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT); + aclk_host_state_update(rpt->host, 1); #endif size_t count = streaming_parser(rpt, &cd, fp); @@ -468,11 +468,11 @@ static int rrdpush_receive(struct