summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-19 20:52:35 +0300
committerGitHub <noreply@github.com>2023-06-19 20:52:35 +0300
commit0b4f820e9d42d10f64c3305d9c084261bc9880cf (patch)
tree641fcb81e9c84e08fbe08ca80776c6b593b218ba /aclk
parent35884c7a8447fbeb699cae6a2a20dc0a2137c659 (diff)
/api/v2/nodes and streaming function (#15168)
* dummy streaming function * expose global functions upstream * separate function for pushing global functions * add missing conditions * allow streaming function to run async * started internal API for functions * cache host retention and expose it to /api/v2/nodes * internal API for function table fields; more progress on streaming status * abstracted and unified rrdhost status * port old coverity warning fix - although it is not needed * add ML information to rrdhost status * add ML capability to streaming to signal the transmission of ML information; added ML information to host status * protect host->receiver * count metrics and instances per host * exposed all inbound and outbound streaming * fix for ML status and dependency of DATA_WITH_ML to INTERPOLATED, not IEEE754 * update ML dummy * added all fields * added streaming group by and cleaned up accepted values by cloud * removed type * Revert "removed type" This reverts commit faae4177e603d4f85b7433f33f92ef3ccd23976e. * added context to db summary * new /api/v2/nodes schema * added ML type * change default function charts * log to trace new capa * add more debug * removed debugging code * retry on receive interrupted read; respect sender reconnect delay in all cases * set disconnected host flag and manipulate localhost child count atomically, inside set/clear receiver * fix infinite loop * send_to_plugin() now has a spinlock to ensure that only 1 thread is writing to the plugin/child at the same time * global cloud_status() call * cloud should be a section, since it will contain error information * put cloud capabilities into cloud * aclk status in /api/v2 agents sections * keep aclk_connection_counter * updates on /api/v2/nodes * final /api/v2/nodes and addition of /api/v2/nodes_instances * parametrize all /api/v2/xxx output to control which info is outputed per endpoint * always accept nodes selector * st needs to be per instance, not per node * fix merging of contexts; fix cups plugin priorities * add after and before parameters to /api/v2/contexts/nodes/nodes_instances/q * give each libuv worker a unique id * aclk http_api_v2 version 4
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c135
-rw-r--r--aclk/aclk.h33
-rw-r--r--aclk/aclk_capas.c11
-rw-r--r--aclk/aclk_otp.c8
4 files changed, 166 insertions, 21 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 399bc9876b..d7ed3315f7 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -155,7 +155,7 @@ biofailed:
static int wait_till_cloud_enabled()
{
info("Waiting for Cloud to be enabled");
- while (!netdata_cloud_setting) {
+ while (!netdata_cloud_enabled) {
sleep_usec(USEC_PER_SEC * 1);
if (!service_running(SERVICE_ACLK))
return 1;
@@ -489,6 +489,74 @@ static int aclk_get_transport_idx(aclk_env_t *env) {
}
#endif
+ACLK_STATUS aclk_status = ACLK_STATUS_INITIALIZING;
+
+const char *aclk_status_to_string(void) {
+ switch(aclk_status) {
+ case ACLK_STATUS_CONNECTED:
+ return "connected";
+
+ case ACLK_STATUS_INITIALIZING:
+ return "initializing";
+
+ case ACLK_STATUS_DISABLED:
+ return "disabled";
+
+ case ACLK_STATUS_NO_CLOUD_URL:
+ return "no_cloud_url";
+
+ case ACLK_STATUS_INVALID_CLOUD_URL:
+ return "invalid_cloud_url";
+
+ case ACLK_STATUS_NOT_CLAIMED:
+ return "not_claimed";
+
+ case ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE:
+ return "env_endpoint_unreachable";
+
+ case ACLK_STATUS_ENV_RESPONSE_NOT_200:
+ return "env_response_not_200";
+
+ case ACLK_STATUS_ENV_RESPONSE_EMPTY:
+ return "env_response_empty";
+
+ case ACLK_STATUS_ENV_RESPONSE_NOT_JSON:
+ return "env_response_not_json";
+
+ case ACLK_STATUS_ENV_FAILED:
+ return "env_failed";
+
+ case ACLK_STATUS_BLOCKED:
+ return "blocked";
+
+ case ACLK_STATUS_NO_OLD_PROTOCOL:
+ return "no_old_protocol";
+
+ case ACLK_STATUS_NO_PROTOCOL_CAPABILITY:
+ return "no_protocol_capability";
+
+ case ACLK_STATUS_INVALID_ENV_AUTH_URL:
+ return "invalid_env_auth_url";
+
+ case ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX:
+ return "invalid_env_transport_idx";
+
+ case ACLK_STATUS_INVALID_ENV_TRANSPORT_URL:
+ return "invalid_env_transport_url";
+
+ case ACLK_STATUS_INVALID_OTP:
+ return "invalid_otp";
+
+ case ACLK_STATUS_NO_LWT_TOPIC:
+ return "no_lwt_topic";
+
+ default:
+ return "unknown";
+ }
+}
+
+const char *aclk_cloud_base_url = NULL;
+
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
* @return 0 - Successful Connection,
@@ -513,18 +581,22 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
#endif
while (service_running(SERVICE_ACLK)) {
- char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
- if (cloud_base_url == NULL) {
+ aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (aclk_cloud_base_url == NULL) {
error_report("Do not move the cloud base url out of post_conf_load!!");
+ aclk_status = ACLK_STATUS_NO_CLOUD_URL;
return -1;
}
- if (aclk_block_till_recon_allowed())
+ if (aclk_block_till_recon_allowed()) {
+ aclk_status = ACLK_STATUS_BLOCKED;
return 1;
+ }
info("Attempting connection now");
memset(&base_url, 0, sizeof(url_t));
- if (url_parse(cloud_base_url, &base_url)) {
+ if (url_parse(aclk_cloud_base_url, &base_url)) {
+ aclk_status = ACLK_STATUS_INVALID_CLOUD_URL;
error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
url_t_destroy(&base_url);
@@ -554,21 +626,57 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
url_t_destroy(&base_url);
- if (ret) {
- error_report("Failed to Get ACLK environment");
- // delay handled by aclk_block_till_recon_allowed
- continue;
+ if(ret) switch(ret) {
+ case 1:
+ aclk_status = ACLK_STATUS_NOT_CLAIMED;
+ error_report("Failed to Get ACLK environment (agent is not claimed)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 2:
+ aclk_status = ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE;
+ error_report("Failed to Get ACLK environment (cannot contact ENV endpoint)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 3:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_200;
+ error_report("Failed to Get ACLK environment (ENV response code is not 200)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 4:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_EMPTY;
+ error_report("Failed to Get ACLK environment (ENV response is empty)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 5:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_JSON;
+ error_report("Failed to Get ACLK environment (ENV response is not JSON)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ default:
+ aclk_status = ACLK_STATUS_ENV_FAILED;
+ error_report("Failed to Get ACLK environment (unknown error)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
}
- if (!service_running(SERVICE_ACLK))
+ if (!service_running(SERVICE_ACLK)) {
+ aclk_status = ACLK_STATUS_DISABLED;
return 1;
+ }
if (aclk_env->encoding != ACLK_ENC_PROTO) {
+ aclk_status = ACLK_STATUS_NO_OLD_PROTOCOL;
error_report("This agent can only use the new cloud protocol but cloud requested old one.");
continue;
}
if (!aclk_env_has_capa("proto")) {
+ aclk_status = ACLK_STATUS_NO_PROTOCOL_CAPABILITY;
error_report("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
@@ -576,6 +684,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ aclk_status = ACLK_STATUS_INVALID_ENV_AUTH_URL;
error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
url_t_destroy(&auth_url);
continue;
@@ -584,6 +693,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
url_t_destroy(&auth_url);
if (ret) {
+ aclk_status = ACLK_STATUS_INVALID_OTP;
error_report("Error passing Challenge/Response to get OTP");
continue;
}
@@ -593,6 +703,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
+ aclk_status = ACLK_STATUS_NO_LWT_TOPIC;
error_report("Couldn't get LWT topic. Will not send LWT.");
continue;
}
@@ -600,12 +711,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// Do the MQTT connection
ret = aclk_get_transport_idx(aclk_env);
if (ret < 0) {
+ aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX;
error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
continue;
}
memset(&mqtt_url, 0, sizeof(url_t));
if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_URL;
error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
url_t_destroy(&mqtt_url);
continue;
@@ -638,6 +751,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
info("ACLK connection successfully established");
+ aclk_status = ACLK_STATUS_CONNECTED;
log_access("ACLK CONNECTED");
mqtt_connected_actions(client);
return 0;
@@ -646,6 +760,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
error_report("Connect failed");
}
+ aclk_status = ACLK_STATUS_DISABLED;
return 1;
}
diff --git a/aclk/aclk.h b/aclk/aclk.h
index bd8375fb53..17353be29d 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -13,17 +13,50 @@
#define ACLK_PUBACKS_CONN_STABLE 3
#endif /* ENABLE_ACLK */
+typedef enum __attribute__((packed)) {
+ ACLK_STATUS_CONNECTED = 0,
+ ACLK_STATUS_INITIALIZING,
+ ACLK_STATUS_DISABLED,
+ ACLK_STATUS_NO_CLOUD_URL,
+ ACLK_STATUS_INVALID_CLOUD_URL,
+ ACLK_STATUS_NOT_CLAIMED,
+ ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE,
+ ACLK_STATUS_ENV_RESPONSE_NOT_200,
+ ACLK_STATUS_ENV_RESPONSE_EMPTY,
+ ACLK_STATUS_ENV_RESPONSE_NOT_JSON,
+ ACLK_STATUS_ENV_FAILED,
+ ACLK_STATUS_BLOCKED,
+ ACLK_STATUS_NO_OLD_PROTOCOL,
+ ACLK_STATUS_NO_PROTOCOL_CAPABILITY,
+ ACLK_STATUS_INVALID_ENV_AUTH_URL,
+ ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX,
+ ACLK_STATUS_INVALID_ENV_TRANSPORT_URL,
+ ACLK_STATUS_INVALID_OTP,
+ ACLK_STATUS_NO_LWT_TOPIC,
+} ACLK_STATUS;
+
+extern ACLK_STATUS aclk_status;
+extern const char *aclk_cloud_base_url;
+const char *aclk_status_to_string(void);
+
extern int aclk_connected;
extern int aclk_ctx_based;
extern int aclk_disable_runtime;
extern int aclk_stats_enabled;
extern int aclk_kill_link;
+extern time_t last_conn_time_mqtt;
+extern time_t last_conn_time_appl;
+extern time_t last_disconnect_time;
+extern time_t next_connection_attempt;
+extern float last_backoff_value;
+
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
extern time_t aclk_block_until;
+extern int aclk_connection_counter;
extern int disconnect_req;
#ifdef ENABLE_ACLK
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c
index b38a928a55..13ae3441ac 100644
--- a/aclk/aclk_capas.c
+++ b/aclk/aclk_capas.c
@@ -13,7 +13,7 @@ const struct capability *aclk_get_agent_capas()
{ .name = "mc", .version = 0, .enabled = 0 },
{ .name = "ctx", .version = 1, .enabled = 1 },
{ .name = "funcs", .version = 1, .enabled = 1 },
- { .name = "http_api_v2", .version = 3, .enabled = 1 },
+ { .name = "http_api_v2", .version = 4, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = 0 },
{ .name = "req_cancel", .version = 1, .enabled = 1 },
{ .name = NULL, .version = 0, .enabled = 0 }
@@ -31,6 +31,8 @@ const struct capability *aclk_get_agent_capas()
struct capability *aclk_get_node_instance_capas(RRDHOST *host)
{
+ bool functions = (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)));
+
struct capability ni_caps[] = {
{ .name = "proto", .version = 1, .enabled = 1 },
{ .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) },
@@ -38,7 +40,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
.version = enable_metric_correlations ? metric_correlations_version : 0,
.enabled = enable_metric_correlations },
{ .name = "ctx", .version = 1, .enabled = 1 },
- { .name = "funcs", .version = 0, .enabled = 0 },
+ { .name = "funcs", .version = functions ? 1 : 0, .enabled = functions ? 1 : 0 },
{ .name = "http_api_v2", .version = 3, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = host->health.health_enabled },
{ .name = "req_cancel", .version = 1, .enabled = 1 },
@@ -48,10 +50,5 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
struct capability *ret = mallocz(sizeof(ni_caps));
memcpy(ret, ni_caps, sizeof(ni_caps));
- if (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))) {
- ret[4].version = 1;
- ret[4].enabled = 1;
- }
-
return ret;
}
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index 66d751be6c..a0a85b6c2c 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -846,7 +846,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
error("Error trying to contact env endpoint");
https_req_response_free(&resp);
buffer_free(buf);
- return 1;
+ return 2;
}
if (resp.http_code != 200) {
error("The HTTP code not 200 OK (Got %d)", resp.http_code);
@@ -854,21 +854,21 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
aclk_parse_otp_error(resp.payload);
https_req_response_free(&resp);
buffer_free(buf);
- return 1;
+ return 3;
}
if (!resp.payload || !resp.payload_size) {
error("Unexpected empty payload as response to /env call");
https_req_response_free(&resp);
buffer_free(buf);
- return 1;
+ return 4;
}
if (parse_json_env(resp.payload, env)) {
error ("error parsing /env message");
https_req_response_free(&resp);
buffer_free(buf);
- return 1;
+ return 5;
}
info("Getting Cloud /env successful");