summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c109
1 files changed, 90 insertions, 19 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 66ae4dd9ab..a6c0d9fb6d 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -9,6 +9,7 @@
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
+#include "https_client.h"
#ifdef ACLK_LOG_CONVERSATION_DIR
#include <sys/types.h>
@@ -29,6 +30,8 @@ int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
+aclk_env_t *aclk_env = NULL;
+
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -138,8 +141,7 @@ static int wait_till_agent_claimed(void)
*/
static int wait_till_agent_claim_ready()
{
- int port;
- char *hostname = NULL;
+ url_t url;
while (!netdata_exit) {
if (wait_till_agent_claimed())
return 1;
@@ -154,15 +156,14 @@ static int wait_till_agent_claim_ready()
// We just check configuration is valid here
// TODO make it without malloc/free
- if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) {
+ memset(&url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &url)) {
error("Agent is claimed but the configuration is invalid, please fix");
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
sleep(5);
continue;
}
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
if (!load_private_key()) {
sleep(5);
@@ -420,6 +421,23 @@ static int aclk_block_till_recon_allowed() {
return 0;
}
+#ifndef ACLK_DISABLE_CHALLENGE
+/* Cloud returns transport list ordered with highest
+ * priority first. This function selects highest prio
+ * transport that we can actually use (support)
+ */
+static int aclk_get_transport_idx(aclk_env_t *env) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ // currently we support only MQTT 3
+ // therefore select first transport that matches
+ if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+ return i;
+ }
+ }
+ return -1;
+}
+#endif
+
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
* @return 0 - Successfull Connection,
@@ -434,10 +452,14 @@ static int aclk_block_till_recon_allowed() {
#endif
static int aclk_attempt_to_connect(mqtt_wss_client client)
{
- char *aclk_hostname = NULL;
- int aclk_port;
+ int ret;
+
+ url_t base_url;
#ifndef ACLK_DISABLE_CHALLENGE
+ url_t auth_url;
+ url_t mqtt_url;
+
char *mqtt_otp_user = NULL;
char *mqtt_otp_pass = NULL;
#endif
@@ -455,9 +477,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
return 1;
info("Attempting connection now");
- if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
+ memset(&base_url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &base_url)) {
error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
+ url_t_destroy(&base_url);
continue;
}
@@ -473,29 +497,72 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.will_flags = MQTT_WSS_PUB_QOS2,
.keep_alive = 60
};
+
#ifndef ACLK_DISABLE_CHALLENGE
- aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass);
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ aclk_env = callocz(1, sizeof(aclk_env_t));
+
+ ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
+ url_t_destroy(&base_url);
+ if (ret) {
+ error("Failed to Get ACLK environment");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+ }
+
+ memset(&auth_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ url_t_destroy(&auth_url);
+ continue;
+ }
+
+ // TODO check success
+ aclk_get_mqtt_otp(aclk_private_key, &mqtt_otp_user, &mqtt_otp_pass, &auth_url);
+ url_t_destroy(&auth_url);
+
mqtt_conn_params.clientid = mqtt_otp_user;
mqtt_conn_params.username = mqtt_otp_user;
mqtt_conn_params.password = mqtt_otp_pass;
+
+ // Do the MQTT connection
+ ret = aclk_get_transport_idx(aclk_env);
+ if (ret < 0) {
+ error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ continue;
+ }
+
+ memset(&mqtt_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ url_t_destroy(&mqtt_url);
+ continue;
+ }
#endif
lwt = aclk_generate_disconnect(NULL);
mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
-
mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
- if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) {
- json_object_put(lwt);
- freez(aclk_hostname);
- aclk_hostname = NULL;
+
+#ifdef ACLK_DISABLE_CHALLENGE
+ ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&base_url);
+#else
+ ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&mqtt_url);
+#endif
+
+ json_object_put(lwt);
+
+ if (!ret) {
info("MQTTWSS connection succeeded");
mqtt_connected_actions(client);
return 0;
}
- freez(aclk_hostname);
- aclk_hostname = NULL;
- json_object_put(lwt);
error("Connect failed\n");
}
@@ -597,6 +664,10 @@ exit_full:
free_topic_cache();
mqtt_wss_destroy(mqttwss_client);
exit:
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
}