summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2021-07-07 16:32:37 +0200
committerGitHub <noreply@github.com>2021-07-07 16:32:37 +0200
commitd005dee55800818b26f6308c433e6aed8079f7fe (patch)
tree2e884e10d333dfa947b1b20d360dcad805a492ed /aclk
parent59394b5f9d8891cb59c42ac87fd8f0d41b28db94 (diff)
ACLK-NG New Cloud NodeInstance related msgs (#11234)
Adds new cloud arch NodeInstance messages as per design. Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'aclk')
m---------aclk/aclk-schemas0
-rw-r--r--aclk/aclk.c198
-rw-r--r--aclk/aclk.h4
-rw-r--r--aclk/aclk_api.c13
-rw-r--r--aclk/aclk_api.h2
-rw-r--r--aclk/aclk_query.c60
-rw-r--r--aclk/aclk_query_queue.c11
-rw-r--r--aclk/aclk_query_queue.h7
-rw-r--r--aclk/aclk_rx_msgs.c61
-rw-r--r--aclk/aclk_rx_msgs.h2
-rw-r--r--aclk/aclk_tx_msgs.c110
-rw-r--r--aclk/aclk_tx_msgs.h8
-rw-r--r--aclk/aclk_util.c36
-rw-r--r--aclk/aclk_util.h17
-rw-r--r--aclk/legacy/agent_cloud_link.c23
-rw-r--r--aclk/legacy/agent_cloud_link.h2
-rw-r--r--aclk/schema-wrappers/connection.cc34
-rw-r--r--aclk/schema-wrappers/connection.h34
-rw-r--r--aclk/schema-wrappers/node_connection.cc37
-rw-r--r--aclk/schema-wrappers/node_connection.h29
-rw-r--r--aclk/schema-wrappers/node_creation.cc39
-rw-r--r--aclk/schema-wrappers/node_creation.h31
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.h12
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h12
24 files changed, 726 insertions, 56 deletions
diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas
new file mode 160000
+Subproject b5fef3f3a84e6a5013b36b906f4677012c73441
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 7e8c1c32e0..3cb25a67d3 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -223,6 +223,45 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
aclk_handle_cloud_message(cmsg);
}
+
+static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ if (msglen > RX_MSGLEN_MAX)
+ error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
+
+ debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+ error("Link is shutting down. Ignoring message.");
+ return;
+ }
+
+ const char *msgtype = strrchr(topic, '/');
+ if (unlikely(!msgtype)) {
+ error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+ msgtype++;
+ if (unlikely(!*msgtype)) {
+ error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+ char filename[FN_MAX_LEN];
+ int logfd;
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
+ logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+ if(logfd < 0)
+ error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ write(logfd, msg, msglen);
+ close(logfd);
+#endif
+
+ aclk_handle_new_cloud_msg(msgtype, msg, msglen);
+}
+
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
@@ -306,11 +345,6 @@ static inline void queue_connect_payloads(void)
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
- // TODO global vars?
- usec_t now = now_realtime_usec();
- aclk_session_sec = now / USEC_PER_SEC;
- aclk_session_us = now % USEC_PER_SEC;
-
const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
@@ -318,16 +352,28 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
+ if (aclk_use_new_cloud_arch) {
+ topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+ }
+
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
- error("Sending `connect` payload immediately as popcorning was finished already.");
- queue_connect_payloads();
+ if (!aclk_use_new_cloud_arch) {
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
+ error("Sending `connect` payload immediately as popcorning was finished already.");
+ queue_connect_payloads();
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ } else {
+ aclk_send_agent_connection_update(client, 1);
}
- ACLK_SHARED_STATE_UNLOCK;
}
/* Waits until agent is ready or needs to exit
@@ -337,10 +383,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
* @return 0 - Popcorning Finished - Agent STABLE,
* !0 - netdata_exit
*/
-static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads)
+static int wait_popcorning_finishes()
{
time_t elapsed;
int need_wait;
+ if (aclk_use_new_cloud_arch)
+ return 0;
+
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
@@ -352,9 +401,6 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th
aclk_shared_state.agent_state = ACLK_HOST_STABLE;
ACLK_SHARED_STATE_UNLOCK;
error("ACLK localhost popocorn finished");
- if (unlikely(!query_threads->thread_list))
- aclk_query_threads_start(query_threads, client);
- queue_connect_payloads();
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
@@ -370,7 +416,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
error("Preparing to Gracefully Shutdown the ACLK");
aclk_queue_lock();
aclk_queue_flush();
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+ if (aclk_use_new_cloud_arch)
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
+ else
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
if (now_monotonic_sec() - t >= 2) {
@@ -481,7 +531,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- json_object *lwt;
+ json_object *lwt = NULL;
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
@@ -546,7 +596,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// aclk_get_topic moved here as during OTP we
// generate the topic cache
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ if (aclk_use_new_cloud_arch)
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
+ else
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+
if (!mqtt_conn_params.will_topic) {
error("Couldn't get LWT topic. Will not send LWT.");
continue;
@@ -567,9 +621,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
#endif
- lwt = aclk_generate_disconnect(NULL);
- mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
- mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ aclk_session_newarch = now_realtime_usec();
+ aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
+ aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+
+ if (aclk_use_new_cloud_arch) {
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
+ } else {
+ lwt = aclk_generate_disconnect(NULL);
+ mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
+ mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ }
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -583,7 +645,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- json_object_put(lwt);
+ if (aclk_use_new_cloud_arch)
+ freez((char *)mqtt_conn_params.will_msg);
+ else
+ json_object_put(lwt);
if (!ret) {
info("MQTTWSS connection succeeded");
@@ -609,6 +674,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
*/
void *aclk_main(void *ptr)
{
+#ifdef ACLK_NEWARCH_DEVMODE
+ aclk_use_new_cloud_arch = 1;
+#endif
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_stats_thread *stats_thread = NULL;
@@ -642,7 +710,7 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -666,8 +734,14 @@ void *aclk_main(void *ptr)
// warning this assumes the popcorning is relative short (3s)
// if that changes call mqtt_wss_service from within
// to keep OpenSSL, WSS and MQTT connection alive
- if (wait_popcorning_finishes(mqttwss_client, &query_threads))
+ if (wait_popcorning_finishes())
goto exit_full;
+
+ if (unlikely(!query_threads.thread_list))
+ aclk_query_threads_start(&query_threads, mqttwss_client);
+
+ if (!aclk_use_new_cloud_arch)
+ queue_connect_payloads();
if (!handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
@@ -775,7 +849,7 @@ void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *m
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -818,7 +892,7 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -854,3 +928,77 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
+
+void ng_aclk_host_state_update(RRDHOST *host, int cmd)
+{
+ uuid_t node_id;
+ int ret;
+
+ if (!aclk_connected || !aclk_use_new_cloud_arch)
+ return;
+
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = 1; //TODO - real hop count instead of hardcoded
+ create_query->data.node_creation.hostname = strdupz(host->hostname);
+ create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+ aclk_queue_query(create_query);
+ return;
+ }
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = cmd;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+}
+
+void aclk_send_node_instances()
+{
+ struct node_instance_list *list = get_node_list();
+ while (!uuid_is_null(list->host_id)) {
+ if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = list->live;
+ query->data.node_update.hops = list->hops;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ } else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = uuid_compare(list->host_id, localhost->host_uuid) ? 1 : 0; // TODO - when streaming supports hops
+ create_query->data.node_creation.hostname = list->hostname;
+ create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
+ aclk_queue_query(create_query);
+ }
+
+ list++;
+ }
+}
diff --git a/aclk/aclk.h b/aclk/aclk.h
index ab5332dc14..18cdbd248c 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -43,4 +43,8 @@ int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void ng_aclk_host_state_update(RRDHOST *host, int cmd);
+
+void aclk_send_node_instances(void);
+
#endif /* ACLK_H */
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index edbe02be34..4838f4b7f4 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -146,6 +146,19 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
error_report("No usable aclk_del_collector implementation");
}
+void aclk_host_state_update(RRDHOST *host, int connect)
+{
+#ifdef ACLK_NG
+ if (aclk_ng)
+ return ng_aclk_host_state_update(host, connect);
+#endif
+#ifdef ACLK_LEGACY
+ if (!aclk_ng)
+ return legacy_aclk_host_state_update(host, connect);
+#endif
+ error_report("Couldn't use any version of aclk_host_state_update");
+}
+
#endif /* ENABLE_ACLK */
struct label *add_aclk_host_labels(struct label *label) {
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
index b76530c5ea..b0e9a075b4 100644
--- a/aclk/aclk_api.h
+++ b/aclk/aclk_api.h
@@ -35,6 +35,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_host_state_update(RRDHOST *host, int connect);
+
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
.config_section = NULL, \
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 3e2f88e468..2bf60532ac 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -55,11 +55,33 @@ static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char
return t;
}
+static RRDHOST *node_id_2_rrdhost(const char *node_id)
+{
+ int res;
+ uuid_t node_id_bin, host_id_bin;
+ char host_id[UUID_STR_LEN];
+ if (uuid_parse(node_id, node_id_bin)) {
+ error("Couldn't parse UUID %s", node_id);
+ return NULL;
+ }
+ if ((res = get_host_id(&node_id_bin, &host_id_bin))) {
+ error("node not found rc=%d", res);
+ return NULL;
+ }
+ uuid_unparse_lower(host_id_bin, host_id);
+ return rrdhost_find_by_guid(host_id, 0);
+}
+
+#define NODE_ID_QUERY "/node/"
+// TODO this function should be quarantied and written nicely
+// lots of skeletons from initial ACLK Legacy impl.
+// quick and dirty from the start
static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
{
int retval = 0;
usec_t t;
BUFFER *local_buffer = NULL;
+ RRDHOST *query_host = localhost;
#ifdef NETDATA_WITH_ZLIB
int z_ret;
@@ -76,6 +98,24 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
+ if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
+ char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
+ char nodeid[UUID_STR_LEN];
+ if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
+ error("URL requests node_id but there is not enough chars following");
+ retval = 1;
+ goto cleanup;
+ }
+ strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1);
+
+ query_host = node_id_2_rrdhost(nodeid);
+ if (!query_host) {
+ error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid);
+ retval = 1;
+ goto cleanup;
+ }
+ }
+
char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
@@ -86,7 +126,7 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
mysep = strrchr(query->data.http_api_v2.query, '/');
// execute the query
- t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop");
+ t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
@@ -187,6 +227,22 @@ static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query)
return 0;
}
+static int register_node(mqtt_wss_client client, aclk_query_t query) {
+ // TODO create a pending registrations list
+ // with some timeouts to detect registration requests that
+ // go unanswered from the cloud
+ aclk_generate_node_registration(client, &query->data.node_creation);
+ return 0;
+}
+
+static int node_state_update(mqtt_wss_client client, aclk_query_t query) {
+ // TODO create a pending registrations list
+ // with some timeouts to detect registration requests that
+ // go unanswered from the cloud
+ aclk_generate_node_state_update(client, &query->data.node_update);
+ return 0;
+}
+
aclk_query_handler aclk_query_handlers[] = {
{ .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
{ .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
@@ -194,6 +250,8 @@ aclk_query_handler aclk_query_handlers[] = {
{ .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
{ .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
+ { .type = REGISTER_NODE, .name = "register node", .fnc = register_node },
+ { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update },
{ .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index c9461b2338..baca4a2f5d 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -114,6 +114,17 @@ void aclk_query_free(aclk_query_t query)
if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update)
json_object_put(query->data.alarm_update);
+ if (query->type == NODE_STATE_UPDATE) {
+ freez((void*)query->data.node_update.claim_id);
+ freez((void*)query->data.node_update.node_id);
+ }
+
+ if (query->type == REGISTER_NODE) {
+ freez((void*)query->data.node_creation.claim_id);
+ freez((void*)query->data.node_creation.hostname);
+ freez((void*)query->data.node_creation.machine_guid);
+ }
+
freez(query->dedup_id);
freez(query->callback_topic);
freez(query->msg_id);
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index 050dc7d224..cbc31ae3cc 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -5,6 +5,7 @@
#include "libnetdata/libnetdata.h"
#include "daemon/common.h"
+#include "schema-wrappers/schema_wrappers.h"
typedef enum {
UNKNOWN,
@@ -13,7 +14,9 @@ typedef enum {
HTTP_API_V2,
CHART_NEW,
CHART_DEL,
- ALARM_STATE_UPDATE
+ ALARM_STATE_UPDATE,
+ REGISTER_NODE,
+ NODE_STATE_UPDATE
} aclk_query_type_t;
struct aclk_query_metadata {
@@ -55,6 +58,8 @@ struct aclk_query {
struct aclk_query_metadata metadata_alarms;
struct aclk_query_http_api_v2 http_api_v2;
struct aclk_query_chart_add_del chart_add_del;
+ node_instance_creation_t node_creation;
+ node_instance_connection_t node_update;
json_object *alarm_update;
} data;
};
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index ef83461a35..26e8fdc51b 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -7,7 +7,7 @@
#include "aclk.h"
#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
-#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/"
+#define ACLK_CLOUD_REQ_V2_PREFIX "GET /"
#define ACLK_V_COMPRESSION 2
@@ -91,6 +91,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
{
const char *start, *end;
+ // TODO better check of URL
if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) {
errno = 0;
error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
@@ -120,7 +121,9 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
- HTTP_CHECK_AGENT_INITIALIZED();
+ if (!aclk_use_new_cloud_arch) {
+ HTTP_CHECK_AGENT_INITIALIZED();
+ }
aclk_query_t query;
@@ -256,3 +259,57 @@ err_cleanup_nojson:
return 1;
}
+
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+{
+ // TODO do the look up table with hashes to optimize when there are more
+ // than few
+ if (!strcmp(message_type, "cmd")) {
+ aclk_handle_cloud_message((char *)msg);
+ return;
+ }
+ if (!strcmp(message_type, "CreateNodeInstanceResult")) {
+ node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
+ debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
+ uuid_t host_id, node_id;
+ uuid_parse(res.machine_guid, host_id);
+ uuid_parse(res.node_id, node_id);
+ update_node_id(&host_id, &node_id);
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+
+ RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
+ query->data.node_update.live = 0;
+
+ if (host) {
+ // not all host must have RRDHOST struct created for them
+ // if they never connected during runtime of agent
+ if (host == localhost) {
+ query->data.node_update.live = 1;
+ query->data.node_update.hops = 0;
+ } else {
+ netdata_mutex_lock(&host->receiver_lock);
+ query->data.node_update.live = (host->receiver != NULL);
+ netdata_mutex_unlock(&host->receiver_lock);
+ }
+ }
+
+ query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ freez(res.machine_guid);
+ return;
+ }
+ if (!strcmp(message_type, "SendNodeInstances")) {
+ debug(D_ACLK, "Got SendNodeInstances");
+ aclk_send_node_instances();
+ return;
+ }
+
+ error ("Unknown new cloud arch message type received \"%s\"", message_type);
+}
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index 21c202dee2..98024d5d4e 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -10,4 +10,6 @@
int aclk_handle_cloud_message(char *payload);
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len);
+
#endif /* ACLK_RX_MSGS_H */
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index e53d966146..2a0fdd5e33 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -36,6 +36,37 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg,
#endif
}
+static uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+{
+#ifndef ACLK_LOG_CONVERSATION_DIR
+ UNUSED(msgname);
+#endif
+ uint16_t packet_id;
+ const char *topic = aclk_get_topic(subtopic);
+
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting message send.");
+ return 0;
+ }
+
+ mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 1024
+ char filename[FN_MAX_LEN];
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname);
+ FILE *fptr;
+ if (fptr = fopen(filename,"w")) {
+ fwrite(msg, msg_len, 1, fptr);
+ fclose(fptr);
+ }
+#endif
+
+ return packet_id;
+}
+
static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;
@@ -372,6 +403,85 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
return pid;
}
+// new protobuf msgs
+uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
+ size_t len;
+ uint16_t pid;
+ update_agent_connection_t conn = {
+ .reachable = (reachable ? 1 : 0),
+ .lwt = 0,
+ .session_id = aclk_session_newarch
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return 0;
+ }
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(&len, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg) {
+ error("Error generating agent::v1::UpdateAgentConnection payload");
+ return 0;
+ }
+
+ pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
+ freez(msg);
+ return pid;
+}
+
+char *aclk_generate_lwt(size_t *size) {
+ update_agent_connection_t conn = {
+ .reachable = 0,
+ .lwt = 1,
+ .session_id = aclk_session_newarch
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return NULL;
+ }
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(size, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg)
+ error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
+
+ return msg;
+}
+
+void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_crea