diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 109 |
1 files changed, 90 insertions, 19 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 66ae4dd9ab..a6c0d9fb6d 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -9,6 +9,7 @@ #include "aclk_util.h" #include "aclk_rx_msgs.h" #include "aclk_collector_list.h" +#include "https_client.h" #ifdef ACLK_LOG_CONVERSATION_DIR #include <sys/types.h> @@ -29,6 +30,8 @@ int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer +aclk_env_t *aclk_env = NULL; + mqtt_wss_client mqttwss_client; netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; @@ -138,8 +141,7 @@ static int wait_till_agent_claimed(void) */ static int wait_till_agent_claim_ready() { - int port; - char *hostname = NULL; + url_t url; while (!netdata_exit) { if (wait_till_agent_claimed()) return 1; @@ -154,15 +156,14 @@ static int wait_till_agent_claim_ready() // We just check configuration is valid here // TODO make it without malloc/free - if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) { + memset(&url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &url)) { error("Agent is claimed but the configuration is invalid, please fix"); - freez(hostname); - hostname = NULL; + url_t_destroy(&url); sleep(5); continue; } - freez(hostname); - hostname = NULL; + url_t_destroy(&url); if (!load_private_key()) { sleep(5); @@ -420,6 +421,23 @@ static int aclk_block_till_recon_allowed() { return 0; } +#ifndef ACLK_DISABLE_CHALLENGE +/* Cloud returns transport list ordered with highest + * priority first. This function selects highest prio + * transport that we can actually use (support) + */ +static int aclk_get_transport_idx(aclk_env_t *env) { + for (size_t i = 0; i < env->transport_count; i++) { + // currently we support only MQTT 3 + // therefore select first transport that matches + if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) { + return i; + } + } + return -1; +} +#endif + /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client * @return 0 - Successfull Connection, @@ -434,10 +452,14 @@ static int aclk_block_till_recon_allowed() { #endif static int aclk_attempt_to_connect(mqtt_wss_client client) { - char *aclk_hostname = NULL; - int aclk_port; + int ret; + + url_t base_url; #ifndef ACLK_DISABLE_CHALLENGE + url_t auth_url; + url_t mqtt_url; + char *mqtt_otp_user = NULL; char *mqtt_otp_pass = NULL; #endif @@ -455,9 +477,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) return 1; info("Attempting connection now"); - if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { + memset(&base_url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &base_url)) { error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); + url_t_destroy(&base_url); continue; } @@ -473,29 +497,72 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .will_flags = MQTT_WSS_PUB_QOS2, .keep_alive = 60 }; + #ifndef ACLK_DISABLE_CHALLENGE - aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass); + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } + aclk_env = callocz(1, sizeof(aclk_env_t)); + + ret = aclk_get_env(aclk_env, base_url.host, base_url.port); + url_t_destroy(&base_url); + if (ret) { + error("Failed to Get ACLK environment"); + // delay handled by aclk_block_till_recon_allowed + continue; + } + + memset(&auth_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->auth_endpoint, &auth_url)) { + error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); + url_t_destroy(&auth_url); + continue; + } + + // TODO check success + aclk_get_mqtt_otp(aclk_private_key, &mqtt_otp_user, &mqtt_otp_pass, &auth_url); + url_t_destroy(&auth_url); + mqtt_conn_params.clientid = mqtt_otp_user; mqtt_conn_params.username = mqtt_otp_user; mqtt_conn_params.password = mqtt_otp_pass; + + // Do the MQTT connection + ret = aclk_get_transport_idx(aclk_env); + if (ret < 0) { + error("Cloud /env endpoint didn't return any transport usable by this Agent."); + continue; + } + + memset(&mqtt_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ + error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); + url_t_destroy(&mqtt_url); + continue; + } #endif lwt = aclk_generate_disconnect(NULL); mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); - if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) { - json_object_put(lwt); - freez(aclk_hostname); - aclk_hostname = NULL; + +#ifdef ACLK_DISABLE_CHALLENGE + ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&base_url); +#else + ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&mqtt_url); +#endif + + json_object_put(lwt); + + if (!ret) { info("MQTTWSS connection succeeded"); mqtt_connected_actions(client); return 0; } - freez(aclk_hostname); - aclk_hostname = NULL; - json_object_put(lwt); error("Connect failed\n"); } @@ -597,6 +664,10 @@ exit_full: free_topic_cache(); mqtt_wss_destroy(mqttwss_client); exit: + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; } |