diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 38 |
1 files changed, 35 insertions, 3 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 459d657fb9..cc17a21650 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -223,7 +223,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int aclk_handle_cloud_message(cmsg); } - +#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos) { if (msglen > RX_MSGLEN_MAX) @@ -261,6 +261,7 @@ static void msg_callback_new(const char *topic, const void *msg, size_t msglen, aclk_handle_new_cloud_msg(msgtype, msg, msglen); } +#endif static void puback_callback(uint16_t packet_id) { @@ -352,6 +353,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (aclk_use_new_cloud_arch) { topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); if (!topic) @@ -359,21 +361,26 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); } +#endif aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!aclk_use_new_cloud_arch) { +#endif ACLK_SHARED_STATE_LOCK; if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { error("Sending `connect` payload immediately as popcorning was finished already."); queue_connect_payloads(); } ACLK_SHARED_STATE_UNLOCK; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL } else { aclk_send_agent_connection_update(client, 1); } +#endif } /* Waits until agent is ready or needs to exit @@ -416,9 +423,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client) error("Preparing to Gracefully Shutdown the ACLK"); aclk_queue_lock(); aclk_queue_flush(); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (aclk_use_new_cloud_arch) aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); else +#endif aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); time_t t = now_monotonic_sec(); @@ -626,13 +635,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; aclk_session_us = aclk_session_newarch % USEC_PER_SEC; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (aclk_use_new_cloud_arch) { mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); } else { +#endif lwt = aclk_generate_disconnect(NULL); mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL } +#endif #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -675,7 +688,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) */ void *aclk_main(void *ptr) { -#ifdef ACLK_NEWARCH_DEVMODE +#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE) aclk_use_new_cloud_arch = 1; #endif struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; @@ -711,7 +724,11 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) { +#else + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { +#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -1028,6 +1045,11 @@ char *ng_aclk_state(void) buffer_strcat(wb, "ACLK Available: Yes\n" "ACLK Implementation: Next Generation\n" +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + "New Cloud Protocol Support: Yes\n" +#else + "New Cloud Protocol Support: No\n" +#endif "Claimed: " ); @@ -1039,7 +1061,7 @@ char *ng_aclk_state(void) freez(agent_id); } - buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No"); + buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy"); ret = strdupz(buffer_tostring(wb)); buffer_free(wb); @@ -1056,6 +1078,13 @@ char *ng_aclk_state_json(void) tmp = json_object_new_string("Next Generation"); json_object_object_add(msg, "aclk-implementation", tmp); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + tmp = json_object_new_boolean(1); +#else + tmp = json_object_new_boolean(0); +#endif + json_object_object_add(msg, "new-cloud-protocol-supported", tmp); + char *agent_id = is_agent_claimed(); tmp = json_object_new_boolean(agent_id != NULL); json_object_object_add(msg, "agent-claimed", tmp); @@ -1070,6 +1099,9 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); + tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy"); + json_object_object_add(msg, "used-cloud-protocol", tmp); + char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; |