diff options
31 files changed, 933 insertions, 93 deletions
diff --git a/.gitmodules b/.gitmodules index ef9349b389..83ef65e8d1 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "mqtt_websockets"] path = mqtt_websockets url = https://github.com/underhood/mqtt_websockets.git +[submodule "aclk/aclk-schemas"] + path = aclk/aclk-schemas + url = https://github.com/netdata/aclk-schemas.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a50ff51d3..f836c718af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -796,6 +796,14 @@ set(ACLK_NG_FILES mqtt_websockets/c-rbuf/src/ringbuffer_internal.h mqtt_websockets/MQTT-C/src/mqtt.c mqtt_websockets/MQTT-C/include/mqtt.h + aclk/schema-wrappers/connection.cc + aclk/schema-wrappers/connection.h + aclk/schema-wrappers/node_connection.cc + aclk/schema-wrappers/node_connection.h + aclk/schema-wrappers/node_creation.cc + aclk/schema-wrappers/node_creation.h + aclk/schema-wrappers/schema_wrappers.h + aclk/schema-wrappers/schema_wrapper_utils.h ) set(SPAWN_PLUGIN_FILES @@ -1038,9 +1046,58 @@ ELSE() message(STATUS "agent-cloud-link Legacy: disabled") ENDIF() +find_package(Protobuf REQUIRED) + +function(PROTOBUF_ACLK_GENERATE_CPP SRCS HDRS) + if(NOT ARGN) + message(SEND_ERROR "Error: PROTOBUF_ACLK_GENERATE_CPP() called without any proto files") + return() + endif() + + set(${SRCS}) + set(${HDRS}) + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(DIR ${ABS_FIL} DIRECTORY) + get_filename_component(FIL_WE ${FIL} NAME_WE) + set(GENERATED_PB_CC "${DIR}/${FIL_WE}.pb.cc") + set(GENERATED_PB_H "${DIR}/${FIL_WE}.pb.h") +# cmake > 3.20 required :( +# cmake_path(SET GENERATED_PB_CC "${DIR}") +# cmake_path(SET GENERATED_PB_H "${DIR}") +# cmake_path(APPEND GENERATED_PB_CC "${FIL_WE}.pb.cc") +# cmake_path(APPEND GENERATED_PB_H "${FIL_WE}.pb.h") + + list(APPEND ${SRCS} ${GENERATED_PB_CC}) + list(APPEND ${HDRS} ${GENERATED_PB_H}) + add_custom_command( + OUTPUT ${GENERATED_PB_CC} + ${GENERATED_PB_H} + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} + ARGS -I=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas --cpp_out=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas ${ABS_FIL} + DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} + COMMENT "Running C++ protocol buffer compiler on ${FIL}" + VERBATIM ) + endforeach() + set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE) + set(${SRCS} ${${SRCS}} PARENT_SCOPE) + set(${HDRS} ${${HDRS}} PARENT_SCOPE) +endfunction() + +set(ACLK_NG_PROTO_DEFS + aclk/aclk-schemas/proto/agent/v1/connection.proto + aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto + aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto + ) +PROTOBUF_ACLK_GENERATE_CPP(ACLK_NG_PROTO_BUILT_SRCS ACLK_NG_PROTO_BUILT_HDRS ${ACLK_NG_PROTO_DEFS}) + +list(APPEND NETDATA_COMMON_LIBRARIES ${PROTOBUF_LIBRARIES}) +list(APPEND NETDATA_COMMON_INCLUDE_DIRS ${PROTOBUF_INCLUDE_DIRS}) +list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER}) list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD}) -list(APPEND NETDATA_FILES ${ACLK_NG_FILES}) +list(APPEND NETDATA_FILES ${ACLK_NG_FILES} ${ACLK_NG_PROTO_BUILT_SRCS} ${ACLK_NG_PROTO_BUILT_HDRS}) list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES}) +include_directories(BEFORE ${CMAKE_SOURCE_DIR}/aclk/aclk-schemas) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include) include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/c-rbuf/include) diff --git a/Makefile.am b/Makefile.am index 3c47b62e6d..e3acf87515 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,6 +3,9 @@ AUTOMAKE_OPTIONS = foreign subdir-objects 1.11 ACLOCAL_AMFLAGS = -I build/m4 +nodist_netdata_SOURCES=$(NULL) +BUILT_SOURCES=$(NULL) + MAINTAINERCLEANFILES = \ config.log config.status \ $(srcdir)/Makefile.in \ @@ -572,7 +575,39 @@ ACLK_NG_FILES = \ mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \ mqtt_websockets/MQTT-C/src/mqtt.c \ mqtt_websockets/MQTT-C/include/mqtt.h \ + aclk/schema-wrappers/connection.cc \ + aclk/schema-wrappers/connection.h \ + aclk/schema-wrappers/node_connection.cc \ + aclk/schema-wrappers/node_connection.h \ + aclk/schema-wrappers/node_creation.cc \ + aclk/schema-wrappers/node_creation.h \ + aclk/schema-wrappers/schema_wrappers.h \ + aclk/schema-wrappers/schema_wrapper_utils.h \ + $(NULL) + +ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \ + aclk/aclk-schemas/proto/agent/v1/connection.pb.h \ + aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \ + aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h \ + aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \ + aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h \ $(NULL) + +BUILT_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES) +nodist_netdata_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES) + +aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \ +aclk/aclk-schemas/proto/agent/v1/connection.pb.h: aclk/aclk-schemas/proto/agent/v1/connection.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \ +aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h: aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \ +aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h: aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + endif #ACLK_NG if ENABLE_ACLK @@ -784,12 +819,15 @@ netdata_LDADD = \ $(NETDATA_COMMON_LIBS) \ $(NULL) +if ACLK_NG + netdata_LDADD += $(OPTIONAL_PROTOBUF_LIBS) +endif + if ACLK_LEGACY netdata_LDADD += \ $(abs_top_srcdir)/externaldeps/mosquitto/libmosquitto.a \ $(OPTIONAL_LIBCAP_LIBS) \ $(OPTIONAL_LWS_LIBS) \ - $(NETDATA_COMMON_LIBS) \ $(NULL) endif #ACLK_LEGACY @@ -899,12 +937,15 @@ endif if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) - netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) - BUILT_SOURCES = \ + netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \ + $(OPTIONAL_PROTOBUF_LIBS) \ + $(NULL) + BACKEND_PROMETHEUS_BUILT_SOURCES = \ exporting/prometheus/remote_write/remote_write.pb.cc \ exporting/prometheus/remote_write/remote_write.pb.h \ $(NULL) - nodist_netdata_SOURCES = $(BUILT_SOURCES) + BUILT_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES) + nodist_netdata_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES) exporting/prometheus/remote_write/remote_write.pb.cc \ exporting/prometheus/remote_write/remote_write.pb.h: exporting/prometheus/remote_write/remote_write.proto @@ -1033,14 +1074,17 @@ if ENABLE_UNITTESTS exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS) if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE exporting_tests_exporting_engine_testdriver_SOURCES += $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES) - exporting_tests_exporting_engine_testdriver_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) + exporting_tests_exporting_engine_testdriver_LDADD += \ + $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \ + $(OPTIONAL_PROTOBUF_LIBS) \ + $(NULL) exporting_tests_exporting_engine_testdriver_LDFLAGS += \ -Wl,--wrap=init_write_request \ -Wl,--wrap=add_host_info \ -Wl,--wrap=add_label \ -Wl,--wrap=add_metric \ $(NULL) - nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BUILT_SOURCES) + nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BACKEND_PROMETHEUS_BUILT_SOURCES) endif if ENABLE_BACKEND_KINESIS exporting_tests_exporting_engine_testdriver_SOURCES += $(KINESIS_EXPORTING_FILES) diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas new file mode 160000 +Subproject b5fef3f3a84e6a5013b36b906f4677012c73441 diff --git a/aclk/aclk.c b/aclk/aclk.c index 7e8c1c32e0..3cb25a67d3 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -223,6 +223,45 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int aclk_handle_cloud_message(cmsg); } + +static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos) +{ + if (msglen > RX_MSGLEN_MAX) + error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); + + debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); + + if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { + error("Link is shutting down. Ignoring message."); + return; + } + + const char *msgtype = strrchr(topic, '/'); + if (unlikely(!msgtype)) { + error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic); + return; + } + msgtype++; + if (unlikely(!*msgtype)) { + error_report("Message type empty. Ignoring message from topic \"%s\"", topic); + return; + } + +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 512 + char filename[FN_MAX_LEN]; + int logfd; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype); + logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); + if(logfd < 0) + error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + write(logfd, msg, msglen); + close(logfd); +#endif + + aclk_handle_new_cloud_msg(msgtype, msg, msglen); +} + static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) @@ -306,11 +345,6 @@ static inline void queue_connect_payloads(void) static inline void mqtt_connected_actions(mqtt_wss_client client) { - // TODO global vars? - usec_t now = now_realtime_usec(); - aclk_session_sec = now / USEC_PER_SEC; - aclk_session_us = now % USEC_PER_SEC; - const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) @@ -318,16 +352,28 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); + if (aclk_use_new_cloud_arch) { + topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); + } + aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); + if (!aclk_use_new_cloud_arch) { + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { + error("Sending `connect` payload immediately as popcorning was finished already."); + queue_connect_payloads(); + } + ACLK_SHARED_STATE_UNLOCK; + } else { + aclk_send_agent_connection_update(client, 1); } - ACLK_SHARED_STATE_UNLOCK; } /* Waits until agent is ready or needs to exit @@ -337,10 +383,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) * @return 0 - Popcorning Finished - Agent STABLE, * !0 - netdata_exit */ -static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads) +static int wait_popcorning_finishes() { time_t elapsed; int need_wait; + if (aclk_use_new_cloud_arch) + return 0; + while (!netdata_exit) { ACLK_SHARED_STATE_LOCK; if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { @@ -352,9 +401,6 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th aclk_shared_state.agent_state = ACLK_HOST_STABLE; ACLK_SHARED_STATE_UNLOCK; error("ACLK localhost popocorn finished"); - if (unlikely(!query_threads->thread_list)) - aclk_query_threads_start(query_threads, client); - queue_connect_payloads(); return 0; } ACLK_SHARED_STATE_UNLOCK; @@ -370,7 +416,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client) error("Preparing to Gracefully Shutdown the ACLK"); aclk_queue_lock(); aclk_queue_flush(); - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + if (aclk_use_new_cloud_arch) + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); + else + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { if (now_monotonic_sec() - t >= 2) { @@ -481,7 +531,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt; + json_object *lwt = NULL; while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); @@ -546,7 +596,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (aclk_use_new_cloud_arch) + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); + else + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); continue; @@ -567,9 +621,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } #endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); + aclk_session_newarch = now_realtime_usec(); + aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; + aclk_session_us = aclk_session_newarch % USEC_PER_SEC; + + if (aclk_use_new_cloud_arch) { + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); + } else { + lwt = aclk_generate_disconnect(NULL); + mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); + mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); + } #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -583,7 +645,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - json_object_put(lwt); + if (aclk_use_new_cloud_arch) + freez((char *)mqtt_conn_params.will_msg); + else + json_object_put(lwt); if (!ret) { info("MQTTWSS connection succeeded"); @@ -609,6 +674,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) */ void *aclk_main(void *ptr) { +#ifdef ACLK_NEWARCH_DEVMODE + aclk_use_new_cloud_arch = 1; +#endif struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct aclk_stats_thread *stats_thread = NULL; @@ -642,7 +710,7 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) { error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -666,8 +734,14 @@ void *aclk_main(void *ptr) // warning this assumes the popcorning is relative short (3s) // if that changes call mqtt_wss_service from within // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes(mqttwss_client, &query_threads)) + if (wait_popcorning_finishes()) goto exit_full; + + if (unlikely(!query_threads.thread_list)) + aclk_query_threads_start(&query_threads, mqttwss_client); + + if (!aclk_use_new_cloud_arch) + queue_connect_payloads(); if (!handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); @@ -775,7 +849,7 @@ void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *m { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -818,7 +892,7 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -854,3 +928,77 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m query->data.metadata_alarms.initial_on_connect = 0; aclk_queue_query(query); } + +void ng_aclk_host_state_update(RRDHOST *host, int cmd) +{ + uuid_t node_id; + int ret; + + if (!aclk_connected || !aclk_use_new_cloud_arch) + return; + + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = 1; //TODO - real hop count instead of hardcoded + create_query->data.node_creation.hostname = strdupz(host->hostname); + create_query->data.node_creation.machine_guid = strdupz(host->machine_guid); + aclk_queue_query(create_query); + return; + } + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = cmd; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); +} + +void aclk_send_node_instances() +{ + struct node_instance_list *list = get_node_list(); + while (!uuid_is_null(list->host_id)) { + if (!uuid_is_null(list->node_id)) { + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = list->live; + query->data.node_update.hops = list->hops; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + } else { + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = uuid_compare(list->host_id, localhost->host_uuid) ? 1 : 0; // TODO - when streaming supports hops + create_query->data.node_creation.hostname = list->hostname; + create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid); + aclk_queue_query(create_query); + } + + list++; + } +} diff --git a/aclk/aclk.h b/aclk/aclk.h index ab5332dc14..18cdbd248c 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -43,4 +43,8 @@ int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create); void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void ng_aclk_host_state_update(RRDHOST *host, int cmd); + +void aclk_send_node_instances(void); + #endif /* ACLK_H */ diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index edbe02be34..4838f4b7f4 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -146,6 +146,19 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu error_report("No usable aclk_del_collector implementation"); } +void aclk_host_state_update(RRDHOST *host, int connect) +{ +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_host_state_update(host, connect); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_host_state_update(host, connect); +#endif + error_report("Couldn't use any version of aclk_host_state_update"); +} + #endif /* ENABLE_ACLK */ struct label *add_aclk_host_labels(struct label *label) { diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index b76530c5ea..b0e9a075b4 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -35,6 +35,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_host_state_update(RRDHOST *host, int connect); + #define NETDATA_ACLK_HOOK \ { .name = "ACLK_Main", \ .config_section = NULL, \ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 3e2f88e468..2bf60532ac 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -55,11 +55,33 @@ static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char return t; } +static RRDHOST *node_id_2_rrdhost(const char *node_id) +{ + int res; + uuid_t node_id_bin, host_id_bin; + char host_id[UUID_STR_LEN]; + if (uuid_parse(node_id, node_id_bin)) { + error("Couldn't parse UUID %s", node_id); + return NULL; + } + if ((res = get_host_id(&node_id_bin, &host_id_bin))) { + error("node not found rc=%d", res); + return NULL; + } + uuid_unparse_lower(host_id_bin, host_id); + return rrdhost_find_by_guid(host_id, 0); +} + +#define NODE_ID_QUERY "/node/" +// TODO this function should be quarantied and written nicely +// lots of skeletons from initial ACLK Legacy impl. +// quick and dirty from the start static int http_api_v2(mqtt_wss_client client, aclk_query_t query) { int retval = 0; usec_t t; BUFFER *local_buffer = NULL; + RRDHOST *query_host = localhost; #ifdef NETDATA_WITH_ZLIB int z_ret; @@ -76,6 +98,24 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; + if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { + char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); + char nodeid[UUID_STR_LEN]; + if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { + error("URL requests node_id but there is not enough chars following"); + retval = 1; + goto cleanup; + } + strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1); + + query_host = node_id_2_rrdhost(nodeid); + if (!query_host) { + error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid); + retval = 1; + goto cleanup; + } + } + char *mysep = strchr(query->data.http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); @@ -86,7 +126,7 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) mysep = strrchr(query->data.http_api_v2.query, '/'); // execute the query - t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop"); + t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -187,6 +227,22 @@ static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query) return 0; } +static int register_node(mqtt_wss_client client, aclk_query_t query) { + // TODO create a pending registrations list + // with some timeouts to detect registration requests that + // go unanswered from the cloud + aclk_generate_node_registration(client, &query->data.node_creation); + return 0; +} + +static int node_state_update(mqtt_wss_client client, aclk_query_t query) { + // TODO create a pending registrations list + // with some timeouts to detect registration requests that + // go unanswered from the cloud + aclk_generate_node_state_update(client, &query->data.node_update); + return 0; +} + aclk_query_handler aclk_query_handlers[] = { { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, @@ -194,6 +250,8 @@ aclk_query_handler aclk_query_handlers[] = { { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, + { .type = REGISTER_NODE, .name = "register node", .fnc = register_node }, + { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update }, { .type = UNKNOWN, .name = NULL, .fnc = NULL } }; diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index c9461b2338..baca4a2f5d 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -114,6 +114,17 @@ void aclk_query_free(aclk_query_t query) if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update) json_object_put(query->data.alarm_update); + if (query->type == NODE_STATE_UPDATE) { + freez((void*)query->data.node_update.claim_id); + freez((void*)query->data.node_update.node_id); + } + + if (query->type == REGISTER_NODE) { + freez((void*)query->data.node_creation.claim_id); + freez((void*)query->data.node_creation.hostname); + freez((void*)query->data.node_creation.machine_guid); + } + freez(query->dedup_id); freez(query->callback_topic); freez(query->msg_id); diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 050dc7d224..cbc31ae3cc 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -5,6 +5,7 @@ #include "libnetdata/libnetdata.h" #include "daemon/common.h" +#include "schema-wra |