summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2021-04-19 17:52:40 +0200
committerGitHub <noreply@github.com>2021-04-19 17:52:40 +0200
commita3c46ef3ec305bb2fa0e8f64e87e9ac8b99d6714 (patch)
treeceb5f17a21f61461bcb848f798076069fbbc408f /aclk
parentf569beac51534fb0fe5ca0e33d2b63dbdc2d5427 (diff)
implements ACLK env endpoint (#10833)
implements /env endpoint call and parsing of the response
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c109
-rw-r--r--aclk/aclk.h2
-rw-r--r--aclk/aclk_otp.c366
-rw-r--r--aclk/aclk_otp.h5
-rw-r--r--aclk/aclk_util.c83
-rw-r--r--aclk/aclk_util.h43
-rw-r--r--aclk/https_client.c100
-rw-r--r--aclk/https_client.h16
8 files changed, 654 insertions, 70 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 66ae4dd9ab..a6c0d9fb6d 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -9,6 +9,7 @@
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
+#include "https_client.h"
#ifdef ACLK_LOG_CONVERSATION_DIR
#include <sys/types.h>
@@ -29,6 +30,8 @@ int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
+aclk_env_t *aclk_env = NULL;
+
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -138,8 +141,7 @@ static int wait_till_agent_claimed(void)
*/
static int wait_till_agent_claim_ready()
{
- int port;
- char *hostname = NULL;
+ url_t url;
while (!netdata_exit) {
if (wait_till_agent_claimed())
return 1;
@@ -154,15 +156,14 @@ static int wait_till_agent_claim_ready()
// We just check configuration is valid here
// TODO make it without malloc/free
- if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) {
+ memset(&url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &url)) {
error("Agent is claimed but the configuration is invalid, please fix");
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
sleep(5);
continue;
}
- freez(hostname);
- hostname = NULL;
+ url_t_destroy(&url);
if (!load_private_key()) {
sleep(5);
@@ -420,6 +421,23 @@ static int aclk_block_till_recon_allowed() {
return 0;
}
+#ifndef ACLK_DISABLE_CHALLENGE
+/* Cloud returns transport list ordered with highest
+ * priority first. This function selects highest prio
+ * transport that we can actually use (support)
+ */
+static int aclk_get_transport_idx(aclk_env_t *env) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ // currently we support only MQTT 3
+ // therefore select first transport that matches
+ if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+ return i;
+ }
+ }
+ return -1;
+}
+#endif
+
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
* @return 0 - Successfull Connection,
@@ -434,10 +452,14 @@ static int aclk_block_till_recon_allowed() {
#endif
static int aclk_attempt_to_connect(mqtt_wss_client client)
{
- char *aclk_hostname = NULL;
- int aclk_port;
+ int ret;
+
+ url_t base_url;
#ifndef ACLK_DISABLE_CHALLENGE
+ url_t auth_url;
+ url_t mqtt_url;
+
char *mqtt_otp_user = NULL;
char *mqtt_otp_pass = NULL;
#endif
@@ -455,9 +477,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
return 1;
info("Attempting connection now");
- if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
+ memset(&base_url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &base_url)) {
error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
+ url_t_destroy(&base_url);
continue;
}
@@ -473,29 +497,72 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.will_flags = MQTT_WSS_PUB_QOS2,
.keep_alive = 60
};
+
#ifndef ACLK_DISABLE_CHALLENGE
- aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass);
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ aclk_env = callocz(1, sizeof(aclk_env_t));
+
+ ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
+ url_t_destroy(&base_url);
+ if (ret) {
+ error("Failed to Get ACLK environment");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+ }
+
+ memset(&auth_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ url_t_destroy(&auth_url);
+ continue;
+ }
+
+ // TODO check success
+ aclk_get_mqtt_otp(aclk_private_key, &mqtt_otp_user, &mqtt_otp_pass, &auth_url);
+ url_t_destroy(&auth_url);
+
mqtt_conn_params.clientid = mqtt_otp_user;
mqtt_conn_params.username = mqtt_otp_user;
mqtt_conn_params.password = mqtt_otp_pass;
+
+ // Do the MQTT connection
+ ret = aclk_get_transport_idx(aclk_env);
+ if (ret < 0) {
+ error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ continue;
+ }
+
+ memset(&mqtt_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ url_t_destroy(&mqtt_url);
+ continue;
+ }
#endif
lwt = aclk_generate_disconnect(NULL);
mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
-
mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
- if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) {
- json_object_put(lwt);
- freez(aclk_hostname);
- aclk_hostname = NULL;
+
+#ifdef ACLK_DISABLE_CHALLENGE
+ ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&base_url);
+#else
+ ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&mqtt_url);
+#endif
+
+ json_object_put(lwt);
+
+ if (!ret) {
info("MQTTWSS connection succeeded");
mqtt_connected_actions(client);
return 0;
}
- freez(aclk_hostname);
- aclk_hostname = NULL;
- json_object_put(lwt);
error("Connect failed\n");
}
@@ -597,6 +664,10 @@ exit_full:
free_topic_cache();
mqtt_wss_destroy(mqttwss_client);
exit:
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
}
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 29626c7f46..ed8cb636ab 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -43,6 +43,8 @@ extern int aclk_connected;
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
+extern aclk_env_t *aclk_env;
+
void *aclk_main(void *ptr);
void aclk_single_update_disable();
void aclk_single_update_enable();
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index b220449dc1..6c6d21f490 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -3,8 +3,6 @@
#include "aclk_otp.h"
-#include "https_client.h"
-
#include "../daemon/common.h"
#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h"
@@ -191,7 +189,7 @@ static int aclk_https_request(https_req_t *request, https_req_response_t *respon
}
#define OTP_URL_PREFIX "/api/v1/auth/node/"
-void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass) {
+void aclk_get_mqtt_otp(RSA *p_key, char **mqtt_usr, char **mqtt_pass, url_t *target) {
BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
https_req_t req = HTTPS_REQ_T_INITIALIZER;
@@ -205,9 +203,9 @@ void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_us
}
// GET Challenge
- req.host = aclk_hostname;
- req.port = port;
- buffer_sprintf(url, "%s%s/challenge", OTP_URL_PREFIX, agent_id);
+ req.host = target->host;
+ req.port = target->port;
+ buffer_sprintf(url, "%s/node/%s/challenge", target->path, agent_id);
req.url = url->buffer;
if (aclk_https_request(&req, &resp)) {
@@ -256,7 +254,7 @@ void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_us
// POST password
req.request_type = HTTP_REQ_POST;
buffer_flush(url);
- buffer_sprintf(url, "%s%s/password", OTP_URL_PREFIX, agent_id);
+ buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id);
req.url = url->buffer;
req.payload = response_json;
req.payload_size = strlen(response_json);
@@ -298,3 +296,357 @@ cleanup:
freez(agent_id);
buffer_free(url);
}
+
+#define PARSE_ENV_JSON_CHK_TYPE(it, type, name) \
+ if (json_object_get_type(json_object_iter_peek_value(it)) != type) { \
+ error("value of key \"%s\" should be %s", name, #type); \
+ goto exit; \
+ }
+
+#define JSON_KEY_ENC "encoding"
+#define JSON_KEY_AUTH_ENDPOINT "authEndpoint"
+#define JSON_KEY_TRP "transports"
+#define JSON_KEY_TRP_TYPE "type"
+#define JSON_KEY_TRP_ENDPOINT "endpoint"
+#define JSON_KEY_BACKOFF "backoff"
+#define JSON_KEY_BACKOFF_BASE "base"
+#define JSON_KEY_BACKOFF_MAX "maxSeconds"
+#define JSON_KEY_BACKOFF_MIN "minSeconds"
+#define JSON_KEY_CAPS "capabilities"
+
+static int parse_json_env_transport(json_object *json, aclk_transport_desc_t *trp) {
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_TYPE)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_TYPE)
+ if (trp->type != ACLK_TRP_UNKNOWN) {
+ error(JSON_KEY_TRP_TYPE " set already");
+ goto exit;
+ }
+ trp->type = aclk_transport_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it)));
+ if (trp->type == ACLK_TRP_UNKNOWN) {
+ error(JSON_KEY_TRP_TYPE " unknown type \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
+ goto exit;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_ENDPOINT)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_ENDPOINT)
+ if (trp->endpoint) {
+ error(JSON_KEY_TRP_ENDPOINT " set already");
+ goto exit;
+ }
+ trp->endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ if (!trp->endpoint) {
+ error (JSON_KEY_TRP_ENDPOINT " is missing from JSON dictionary");
+ goto exit;
+ }
+
+ if (trp->type == ACLK_TRP_UNKNOWN) {
+ error ("transport type not set");
+ goto exit;
+ }
+
+ return 0;
+
+exit:
+ aclk_transport_desc_t_destroy(trp);
+ return 1;
+}
+
+static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) {
+ aclk_transport_desc_t *trp;
+ json_object *obj;
+
+ if (env->transports) {
+ error("transports have been set already");
+ return 1;
+ }
+
+ env->transport_count = json_object_array_length(json_array);
+
+ env->transports = callocz(env->transport_count , sizeof(aclk_transport_desc_t *));
+
+ for (size_t i = 0; i < env->transport_count; i++) {
+ trp = callocz(1, sizeof(aclk_transport_desc_t));
+ obj = json_object_array_get_idx(json_array, i);
+ if (parse_json_env_transport(obj, trp)) {
+ error("error parsing transport idx %d", (int)i);
+ freez(trp);
+ return 1;
+ }
+ env->transports[i] = trp;
+ }
+
+ return 0;
+}
+
+#define MATCHED_CORRECT 1
+#define MATCHED_ERROR -1
+#define NOT_MATCHED 0
+static int parse_json_backoff_int(struct json_object_iterator *it, int *out, const char* name, int min, int max) {
+ if (!strcmp(json_object_iter_peek_name(it), name)) {
+ if (json_object_get_type(json_object_iter_peek_value(it)) != json_type_int) {
+ error("Could not parse \"%s\". Not an integer as expected.", name);
+ return MATCHED_ERROR;
+ }
+
+ *out = json_object_get_int(json_object_iter_peek_value(it));
+
+ if (*out < min || *out > max) {
+ error("Value of \"%s\"=%d out of range (%d-%d).", name, *out, min, max);
+ return MATCHED_ERROR;
+ }
+
+ return MATCHED_CORRECT;
+ }
+ return NOT_MATCHED;
+}
+
+static int parse_json_backoff(json_object *json, aclk_backoff_t *backoff) {
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+ int ret;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if ( (ret = parse_json_backoff_int(&it, &backoff->base, JSON_KEY_BACKOFF_BASE, 1, 10)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if ( (ret = parse_json_backoff_int(&it, &backoff->max_s, JSON_KEY_BACKOFF_MAX, 500, INT_MAX)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if ( (ret = parse_json_backoff_int(&it, &backoff->min_s, JSON_KEY_BACKOFF_MIN, 0, INT_MAX)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ return 0;
+}
+
+static int parse_json_env_caps(json_object *json, aclk_env_t *env) {
+ json_object *obj;
+ const char *str;
+
+ if (env->capabilities) {
+ error("transports have been set already");
+ return 1;
+ }
+
+ env->capability_count = json_object_array_length(json);
+
+ // empty capabilities list is allowed
+ if (!env->capability_count)
+ return 0;
+
+ env->capabilities = callocz(env->capability_count , sizeof(char *));
+
+ for (size_t i = 0; i < env->capability_count; i++) {
+ obj = json_object_array_get_idx(json, i);
+ if (json_object_get_type(obj) != json_type_string) {
+ error("Capability at index %d not a string!", (int)i);
+ return 1;
+ }
+ str = json_object_get_string(obj);
+ if (!str) {
+ error("Error parsing capabilities");
+ return 1;
+ }
+ env->capabilities[i] = strdupz(str);
+ }
+
+ return 0;
+}
+
+static int parse_json_env(const char *json_str, aclk_env_t *env) {
+ json_object *json;
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http respons of /env endpoint");
+ return 1;
+ }
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_AUTH_ENDPOINT)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_AUTH_ENDPOINT)
+ if (env->auth_endpoint) {
+ error("authEndpoint set already");
+ goto exit;
+ }
+ env->auth_endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ENC)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_ENC)
+ if (env->encoding != ACLK_ENC_UNKNOWN) {
+ error(JSON_KEY_ENC " set already");
+ goto exit;
+ }
+ env->encoding = aclk_encoding_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TRP)
+
+ json_object *now = json_object_iter_peek_value(&it);
+ parse_json_env_transports(now, env);
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_BACKOFF)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_object, JSON_KEY_BACKOFF)
+
+ if (parse_json_backoff(json_object_iter_peek_value(&it), &env->backoff)) {
+ env->backoff.base = 0;
+ error("Error parsing Backoff parameters in env");
+ goto exit;
+ }
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CAPS)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_CAPS)
+
+ if (parse_json_env_caps(json_object_iter_peek_value(&it), env)) {
+ error("Error parsing capabilities list");
+ goto exit;
+ }
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ // Check all compulsory keys have been set
+ if (env->transport_count < 1) {
+ error("env has to return at least one transport");
+ goto exit;
+ }
+ if (!env->auth_endpoint) {
+ error(JSON_KEY_AUTH_ENDPOINT " is compulsory");
+ goto exit;
+ }
+ if (env->encoding == ACLK_ENC_UNKNOWN) {
+ error(JSON_KEY_ENC " is compulsory");
+ goto exit;
+ }
+ if (!env->backoff.base) {
+ error(JSON_KEY_BACKOFF " is compulsory");
+ goto exit;
+ }
+
+ json_object_put(json);
+ return 0;
+
+exit:
+ aclk_env_t_destroy(env);
+ json_object_put(json);
+ return 1;
+}
+
+int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
+ BUFFER *buf = buffer_create(1024);
+
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
+
+ req.request_type = HTTP_REQ_GET;
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
+ {
+ error("Agent was not claimed - cannot perform challenge/response");
+ buffer_free(buf);
+ return 1;
+ }
+
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json$claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ freez(agent_id);
+
+ req.host = (char*)aclk_hostname;
+ req.port = aclk_port;
+ req.url = buf->buffer;
+ if (aclk_https_request(&req, &resp)) {
+ error("Error trying to contact env endpoint");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+ if (resp.http_code != 200) {
+ error("The HTTP code not 200 OK (Got %d)", resp.http_code);
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ if (!resp.payload || !resp.payload_size) {
+ error("Unexpected empty payload as response to /env call");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ if (parse_json_env(resp.payload, env)) {
+ error ("error parsing /env message");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ info("Getting Cloud /env successful");
+
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 0;
+}
diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h
index 31e81c5a12..e5f43bd701 100644
--- a/aclk/aclk_otp.h
+++ b/aclk/aclk_otp.h
@@ -5,6 +5,9 @@
#include "../daemon/common.h"
-void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass);
+#include "https_client.h"
+
+void aclk_get_mqtt_otp(RSA *p_key, char **mqtt_usr, char **mqtt_pass, url_t *target);
+int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port);
#endif /* ACLK_OTP_H */
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index d5024c5d8b..ad3a863cde 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -10,6 +10,48 @@
#define UUID_STR_LEN 37
#endif
+aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
+ if (!strcmp(str, "json")) {
+ return ACLK_ENC_JSON;
+ }
+ if (!strcmp(str, "proto")) {
+ return ACLK_ENC_PROTO;
+ }
+ return ACLK_ENC_UNKNOWN;
+}
+
+aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) {
+ if (!strcmp(str, "MQTTv3")) {
+ return ACLK_TRP_MQTT_3_1_1;
+ }
+ if (!strcmp(str, "MQTTv5")) {
+ return ACLK_TRP_MQTT_5;
+ }
+ return ACLK_TRP_UNKNOWN;
+}
+
+void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) {
+ freez(trp_desc->endpoint);
+}
+
+void aclk_env_t_destroy(aclk_env_t *env) {
+ freez(env->auth_endpoint);
+ if (env->transports) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ if(env->transports[i]) {
+ aclk_transport_desc_t_destroy(env->transports[i]);
+ env->transports[i] = NULL;
+ }
+ }
+ freez(env->transports);
+ }
+ if (env->capabilities) {
+ for (size_t i = 0; i < env->capability_count; i++)
+ freez(env->capabilities[i]);
+ freez(env->capabilities);
+ }
+}
+
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
@@ -88,47 +130,6 @@ const char *aclk_get_topic(enum aclk_topics topic)
return aclk_topic_cache[topic].topic;
}
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
-{
- int pos = 0;
- if (!strncmp("https://", url, 8)) {
- pos = 8;
- } else if (!strncmp("http://", url, 7)) {
- error("Cannot connect ACLK over %s -> unencrypted link is not supported", url);
- return 1;
- }
- int host_end = pos;
- while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':')
- host_end++;
- if (url[host_end] == 0) {
- *aclk_hostname = strdupz(url + pos);
- *aclk_port = 443;
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
- return 0;
- }
- if (url[host_end] == ':') {
- *aclk_hostname = callocz(host_end - pos + 1, 1);
- strncpy(*aclk_hostname, url + pos, host_end - pos);
- int port_end = host_end + 1;
- while (url[port_end] >= '0' && url[port_end] <= '9')
- port_end++;
- if (port_end - host_end > 6) {
- error("Port specified in %s is invalid", url);
- freez(*aclk_hostname);
- *aclk_hostname = NULL;
- return 1;
- }
- *aclk_port = atoi(&url[host_end+1]);
- }
- if (url[host_end] == '/') {
- *aclk_port = 443;
- *aclk_hostname = callocz(1, host_end - pos + 1);
- strncpy(*aclk_hostname, url+pos, host_end - pos);
- }
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
- return 0;
-}
-
/*
* TBEB with randomness
*
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 2a8993fbe7..a86bafbcfe 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -8,7 +8,47 @@
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
+typedef enum {
+ ACLK_ENC_UNKNOWN = 0,
+ ACLK_ENC_JSON,
+ ACLK_ENC_PROTO
+} aclk_encoding_type_t;
+
+typedef enum {
+ ACLK_TRP_UNKNOWN = 0,
+ ACLK_TRP_MQTT_3_1_1,
+ ACLK_TRP_MQTT_5
+} aclk_transport_type_t;
+
+typedef struct {
+ char *endpoint;
+ aclk_transport_type_t type;
+} aclk_transport_desc_t;
+
+typedef struct {
+ int base;
+ int max_s;
+ int min_s;
+} aclk_backoff_t;
+
+typedef struct {
+ char *auth_endpoint;
+ aclk_encoding_type_t encoding;
+
+ aclk_transport_desc_t **transports;
+ size_t transport_count;
+
+ char **capabilities;
+ size_t capability_count;
+
+ aclk_backoff_t backoff;
+} aclk_env_t;
+
+aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str);
+aclk_transport_type_t aclk_transport_type_t_from_str(const char *str);
+
+void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc);
+void aclk_env_t_destroy(aclk_env_t *env);
enum aclk_topics {
ACLK_TOPICID_CHART = 0,
@@ -47,7 +87,6 @@ const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type);
diff --git a/aclk/https_client.c b/aclk/https_client.c
index abe46c392c..907f512ba4 100644
--- a/aclk/https_client.c
+++ b/aclk/https_client.c
@@ -550,3 +550,103 @@ void https_req_response_init(https_req_response_t *res) {
res->payload = NULL;
res->payload_size = 0;
}
+
+static inline char *min_non_null(char *a, char *b) {
+ if (!a)
+ return b;
+ if (!b)
+ return a;
+ return (a < b ? a : b);
+}
+
+#define URI_PROTO_SEPARATOR "://"
+#define URL_PARSER_LOG_PREFIX "url_parser "
+
+static int parse_host_port(url_t *url) {
+ char *ptr = strrchr(url->host, ':');
+ if (ptr) {
+ size_t port_len = strlen(ptr + 1);
+ if (!port_len) {
+ error(URL_PARSER_LOG_PREFIX ": specified but no port number");
+ return 1;
+ }
+ if (port_len > 5 /* MAX port lenght is 5digit long in decimal */) {
+ error(URL_PARSER_LOG_PREFIX "port # is too long");
+ return 1;
+ }
+ *ptr = 0;
+ if (!strlen(url->host)) {
+ error(URL_PARSER_LOG_PREFIX "host empty after removing port");
+ return 1;
+ }
+ url->port = atoi (ptr + 1);
+ }
+ return 0;
+}
+
+static inline void port_by_proto(url_t *url) {
+ if (url->port)
+ return;
+ if (!url->proto)
+ return;
+ if (!strcmp(url->proto, "http")) {
+ url->port = 80;
+ return;
+ }
+ if (!strcmp(url->proto, "https")) {
+ url->port = 443;
+ return;
+ }
+}
+
+#define STRDUPZ_2PTR(dest, start, end) \
+ { \
+ dest = mallocz(1 + end - start); \
+ memcpy(dest, start, end - start); \
+ dest[end - start] = 0; \
+ }
+
+int url_parse(const char *url, url_t *parsed) {
+ const char *start = url;
+ const char *end = strstr(url, URI_PROTO_SEPARATOR);
+
+ if (end) {
+ if (end == start) {
+ error (URL_PARSER_LOG_PREFIX "found " URI_PROTO_SEPARATOR " without protocol specified");
+ return 1;
+ }
+
+ STRDUPZ_2PTR(parsed->proto, start, end)
+ start = end + strlen(URI_PROTO_SEPARATOR);
+ }
+
+ end = strchr(start, '/');
+ if (!end)
+ end = start + strlen(start);
+
+ if (start == end) {
+ error(URL_PARSER_LOG_PREFIX "Host empty");
+ return 1;
+ }
+
+ STRDUPZ_2PTR(parsed->host, start, end);
+
+ if (parse_host_port(parsed))
+ return 1;
+
+ if (!*end) {
+ parsed->path = strdupz("/");
+ port_by_proto(parsed);
+ return 0;
+ }
+
+ parsed->path = strdupz(end);
+ port_by_proto(parsed);
+ return 0;
+}
+
+void url_t_destroy(url_t *url) {
+ freez(url->host);
+ freez(url->path);
+ freez(url->proto);
+}
diff --git a/aclk/https_client.h b/aclk/https_client.h
index b513c6c799..f7bc3d43d6 100644
--- a/aclk/https_client.h
+++ b/aclk/https_client.h
@@ -34,6 +34,22 @@ typedef struct {
size_t payload_size;
} https_req_response_t;
+
+// Non feature complete URL parser
+// feel free to extend when needed
+// currently implements only what ACLK
+// needs
+// proto://host[:port]/path
+typedef struct {
+ char *proto;
+ char *host;
+ int port;
+ char* path;
+} url_t;
+
+int url_parse(const char *url, url_t *parsed);
+void url_t_destroy(url_t *url);
+
void https_req_response_free(https_req_response_t *res);
void https_req_response_init(https_req_response_t *res);