summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c198
1 files changed, 173 insertions, 25 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 7e8c1c32e0..3cb25a67d3 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -223,6 +223,45 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
aclk_handle_cloud_message(cmsg);
}
+
+static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ if (msglen > RX_MSGLEN_MAX)
+ error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
+
+ debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+ error("Link is shutting down. Ignoring message.");
+ return;
+ }
+
+ const char *msgtype = strrchr(topic, '/');
+ if (unlikely(!msgtype)) {
+ error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+ msgtype++;
+ if (unlikely(!*msgtype)) {
+ error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+ char filename[FN_MAX_LEN];
+ int logfd;
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
+ logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+ if(logfd < 0)
+ error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ write(logfd, msg, msglen);
+ close(logfd);
+#endif
+
+ aclk_handle_new_cloud_msg(msgtype, msg, msglen);
+}
+
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
@@ -306,11 +345,6 @@ static inline void queue_connect_payloads(void)
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
- // TODO global vars?
- usec_t now = now_realtime_usec();
- aclk_session_sec = now / USEC_PER_SEC;
- aclk_session_us = now % USEC_PER_SEC;
-
const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
@@ -318,16 +352,28 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
+ if (aclk_use_new_cloud_arch) {
+ topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+ }
+
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
- error("Sending `connect` payload immediately as popcorning was finished already.");
- queue_connect_payloads();
+ if (!aclk_use_new_cloud_arch) {
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
+ error("Sending `connect` payload immediately as popcorning was finished already.");
+ queue_connect_payloads();
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ } else {
+ aclk_send_agent_connection_update(client, 1);
}
- ACLK_SHARED_STATE_UNLOCK;
}
/* Waits until agent is ready or needs to exit
@@ -337,10 +383,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
* @return 0 - Popcorning Finished - Agent STABLE,
* !0 - netdata_exit
*/
-static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads)
+static int wait_popcorning_finishes()
{
time_t elapsed;
int need_wait;
+ if (aclk_use_new_cloud_arch)
+ return 0;
+
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
@@ -352,9 +401,6 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th
aclk_shared_state.agent_state = ACLK_HOST_STABLE;
ACLK_SHARED_STATE_UNLOCK;
error("ACLK localhost popocorn finished");
- if (unlikely(!query_threads->thread_list))
- aclk_query_threads_start(query_threads, client);
- queue_connect_payloads();
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
@@ -370,7 +416,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
error("Preparing to Gracefully Shutdown the ACLK");
aclk_queue_lock();
aclk_queue_flush();
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+ if (aclk_use_new_cloud_arch)
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
+ else
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
if (now_monotonic_sec() - t >= 2) {
@@ -481,7 +531,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- json_object *lwt;
+ json_object *lwt = NULL;
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
@@ -546,7 +596,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// aclk_get_topic moved here as during OTP we
// generate the topic cache
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ if (aclk_use_new_cloud_arch)
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
+ else
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+
if (!mqtt_conn_params.will_topic) {
error("Couldn't get LWT topic. Will not send LWT.");
continue;
@@ -567,9 +621,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
#endif
- lwt = aclk_generate_disconnect(NULL);
- mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
- mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ aclk_session_newarch = now_realtime_usec();
+ aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
+ aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+
+ if (aclk_use_new_cloud_arch) {
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
+ } else {
+ lwt = aclk_generate_disconnect(NULL);
+ mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
+ mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ }
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -583,7 +645,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- json_object_put(lwt);
+ if (aclk_use_new_cloud_arch)
+ freez((char *)mqtt_conn_params.will_msg);
+ else
+ json_object_put(lwt);
if (!ret) {
info("MQTTWSS connection succeeded");
@@ -609,6 +674,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
*/
void *aclk_main(void *ptr)
{
+#ifdef ACLK_NEWARCH_DEVMODE
+ aclk_use_new_cloud_arch = 1;
+#endif
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_stats_thread *stats_thread = NULL;
@@ -642,7 +710,7 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -666,8 +734,14 @@ void *aclk_main(void *ptr)
// warning this assumes the popcorning is relative short (3s)
// if that changes call mqtt_wss_service from within
// to keep OpenSSL, WSS and MQTT connection alive
- if (wait_popcorning_finishes(mqttwss_client, &query_threads))
+ if (wait_popcorning_finishes())
goto exit_full;
+
+ if (unlikely(!query_threads.thread_list))
+ aclk_query_threads_start(&query_threads, mqttwss_client);
+
+ if (!aclk_use_new_cloud_arch)
+ queue_connect_payloads();
if (!handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
@@ -775,7 +849,7 @@ void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *m
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -818,7 +892,7 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -854,3 +928,77 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
+
+void ng_aclk_host_state_update(RRDHOST *host, int cmd)
+{
+ uuid_t node_id;
+ int ret;
+
+ if (!aclk_connected || !aclk_use_new_cloud_arch)
+ return;
+
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = 1; //TODO - real hop count instead of hardcoded
+ create_query->data.node_creation.hostname = strdupz(host->hostname);
+ create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+ aclk_queue_query(create_query);
+ return;
+ }
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = cmd;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+}
+
+void aclk_send_node_instances()
+{
+ struct node_instance_list *list = get_node_list();
+ while (!uuid_is_null(list->host_id)) {
+ if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = list->live;
+ query->data.node_update.hops = list->hops;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ } else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = uuid_compare(list->host_id, localhost->host_uuid) ? 1 : 0; // TODO - when streaming supports hops
+ create_query->data.node_creation.hostname = list->hostname;
+ create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
+ aclk_queue_query(create_query);
+ }
+
+ list++;
+ }
+}