diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-06-19 20:52:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-19 20:52:35 +0300 |
commit | 0b4f820e9d42d10f64c3305d9c084261bc9880cf (patch) | |
tree | 641fcb81e9c84e08fbe08ca80776c6b593b218ba /aclk | |
parent | 35884c7a8447fbeb699cae6a2a20dc0a2137c659 (diff) |
/api/v2/nodes and streaming function (#15168)
* dummy streaming function
* expose global functions upstream
* separate function for pushing global functions
* add missing conditions
* allow streaming function to run async
* started internal API for functions
* cache host retention and expose it to /api/v2/nodes
* internal API for function table fields; more progress on streaming status
* abstracted and unified rrdhost status
* port old coverity warning fix - although it is not needed
* add ML information to rrdhost status
* add ML capability to streaming to signal the transmission of ML information; added ML information to host status
* protect host->receiver
* count metrics and instances per host
* exposed all inbound and outbound streaming
* fix for ML status and dependency of DATA_WITH_ML to INTERPOLATED, not IEEE754
* update ML dummy
* added all fields
* added streaming group by and cleaned up accepted values by cloud
* removed type
* Revert "removed type"
This reverts commit faae4177e603d4f85b7433f33f92ef3ccd23976e.
* added context to db summary
* new /api/v2/nodes schema
* added ML type
* change default function charts
* log to trace new capa
* add more debug
* removed debugging code
* retry on receive interrupted read; respect sender reconnect delay in all cases
* set disconnected host flag and manipulate localhost child count atomically, inside set/clear receiver
* fix infinite loop
* send_to_plugin() now has a spinlock to ensure that only 1 thread is writing to the plugin/child at the same time
* global cloud_status() call
* cloud should be a section, since it will contain error information
* put cloud capabilities into cloud
* aclk status in /api/v2 agents sections
* keep aclk_connection_counter
* updates on /api/v2/nodes
* final /api/v2/nodes and addition of /api/v2/nodes_instances
* parametrize all /api/v2/xxx output to control which info is outputed per endpoint
* always accept nodes selector
* st needs to be per instance, not per node
* fix merging of contexts; fix cups plugin priorities
* add after and before parameters to /api/v2/contexts/nodes/nodes_instances/q
* give each libuv worker a unique id
* aclk http_api_v2 version 4
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 135 | ||||
-rw-r--r-- | aclk/aclk.h | 33 | ||||
-rw-r--r-- | aclk/aclk_capas.c | 11 | ||||
-rw-r--r-- | aclk/aclk_otp.c | 8 |
4 files changed, 166 insertions, 21 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 399bc9876b..d7ed3315f7 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -155,7 +155,7 @@ biofailed: static int wait_till_cloud_enabled() { info("Waiting for Cloud to be enabled"); - while (!netdata_cloud_setting) { + while (!netdata_cloud_enabled) { sleep_usec(USEC_PER_SEC * 1); if (!service_running(SERVICE_ACLK)) return 1; @@ -489,6 +489,74 @@ static int aclk_get_transport_idx(aclk_env_t *env) { } #endif +ACLK_STATUS aclk_status = ACLK_STATUS_INITIALIZING; + +const char *aclk_status_to_string(void) { + switch(aclk_status) { + case ACLK_STATUS_CONNECTED: + return "connected"; + + case ACLK_STATUS_INITIALIZING: + return "initializing"; + + case ACLK_STATUS_DISABLED: + return "disabled"; + + case ACLK_STATUS_NO_CLOUD_URL: + return "no_cloud_url"; + + case ACLK_STATUS_INVALID_CLOUD_URL: + return "invalid_cloud_url"; + + case ACLK_STATUS_NOT_CLAIMED: + return "not_claimed"; + + case ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE: + return "env_endpoint_unreachable"; + + case ACLK_STATUS_ENV_RESPONSE_NOT_200: + return "env_response_not_200"; + + case ACLK_STATUS_ENV_RESPONSE_EMPTY: + return "env_response_empty"; + + case ACLK_STATUS_ENV_RESPONSE_NOT_JSON: + return "env_response_not_json"; + + case ACLK_STATUS_ENV_FAILED: + return "env_failed"; + + case ACLK_STATUS_BLOCKED: + return "blocked"; + + case ACLK_STATUS_NO_OLD_PROTOCOL: + return "no_old_protocol"; + + case ACLK_STATUS_NO_PROTOCOL_CAPABILITY: + return "no_protocol_capability"; + + case ACLK_STATUS_INVALID_ENV_AUTH_URL: + return "invalid_env_auth_url"; + + case ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX: + return "invalid_env_transport_idx"; + + case ACLK_STATUS_INVALID_ENV_TRANSPORT_URL: + return "invalid_env_transport_url"; + + case ACLK_STATUS_INVALID_OTP: + return "invalid_otp"; + + case ACLK_STATUS_NO_LWT_TOPIC: + return "no_lwt_topic"; + + default: + return "unknown"; + } +} + +const char *aclk_cloud_base_url = NULL; + /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client * @return 0 - Successful Connection, @@ -513,18 +581,22 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) #endif while (service_running(SERVICE_ACLK)) { - char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); - if (cloud_base_url == NULL) { + aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (aclk_cloud_base_url == NULL) { error_report("Do not move the cloud base url out of post_conf_load!!"); + aclk_status = ACLK_STATUS_NO_CLOUD_URL; return -1; } - if (aclk_block_till_recon_allowed()) + if (aclk_block_till_recon_allowed()) { + aclk_status = ACLK_STATUS_BLOCKED; return 1; + } info("Attempting connection now"); memset(&base_url, 0, sizeof(url_t)); - if (url_parse(cloud_base_url, &base_url)) { + if (url_parse(aclk_cloud_base_url, &base_url)) { + aclk_status = ACLK_STATUS_INVALID_CLOUD_URL; error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); url_t_destroy(&base_url); @@ -554,21 +626,57 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_env(aclk_env, base_url.host, base_url.port); url_t_destroy(&base_url); - if (ret) { - error_report("Failed to Get ACLK environment"); - // delay handled by aclk_block_till_recon_allowed - continue; + if(ret) switch(ret) { + case 1: + aclk_status = ACLK_STATUS_NOT_CLAIMED; + error_report("Failed to Get ACLK environment (agent is not claimed)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 2: + aclk_status = ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE; + error_report("Failed to Get ACLK environment (cannot contact ENV endpoint)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 3: + aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_200; + error_report("Failed to Get ACLK environment (ENV response code is not 200)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 4: + aclk_status = ACLK_STATUS_ENV_RESPONSE_EMPTY; + error_report("Failed to Get ACLK environment (ENV response is empty)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 5: + aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_JSON; + error_report("Failed to Get ACLK environment (ENV response is not JSON)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + default: + aclk_status = ACLK_STATUS_ENV_FAILED; + error_report("Failed to Get ACLK environment (unknown error)"); + // delay handled by aclk_block_till_recon_allowed + continue; } - if (!service_running(SERVICE_ACLK)) + if (!service_running(SERVICE_ACLK)) { + aclk_status = ACLK_STATUS_DISABLED; return 1; + } if (aclk_env->encoding != ACLK_ENC_PROTO) { + aclk_status = ACLK_STATUS_NO_OLD_PROTOCOL; error_report("This agent can only use the new cloud protocol but cloud requested old one."); continue; } if (!aclk_env_has_capa("proto")) { + aclk_status = ACLK_STATUS_NO_PROTOCOL_CAPABILITY; error_report("Can't use encoding=proto without at least \"proto\" capability."); continue; } @@ -576,6 +684,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { + aclk_status = ACLK_STATUS_INVALID_ENV_AUTH_URL; error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); url_t_destroy(&auth_url); continue; @@ -584,6 +693,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); url_t_destroy(&auth_url); if (ret) { + aclk_status = ACLK_STATUS_INVALID_OTP; error_report("Error passing Challenge/Response to get OTP"); continue; } @@ -593,6 +703,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { + aclk_status = ACLK_STATUS_NO_LWT_TOPIC; error_report("Couldn't get LWT topic. Will not send LWT."); continue; } @@ -600,12 +711,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // Do the MQTT connection ret = aclk_get_transport_idx(aclk_env); if (ret < 0) { + aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX; error_report("Cloud /env endpoint didn't return any transport usable by this Agent."); continue; } memset(&mqtt_url, 0, sizeof(url_t)); if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ + aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_URL; error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); url_t_destroy(&mqtt_url); continue; @@ -638,6 +751,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (!ret) { last_conn_time_mqtt = now_realtime_sec(); info("ACLK connection successfully established"); + aclk_status = ACLK_STATUS_CONNECTED; log_access("ACLK CONNECTED"); mqtt_connected_actions(client); return 0; @@ -646,6 +760,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) error_report("Connect failed"); } + aclk_status = ACLK_STATUS_DISABLED; return 1; } diff --git a/aclk/aclk.h b/aclk/aclk.h index bd8375fb53..17353be29d 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -13,17 +13,50 @@ #define ACLK_PUBACKS_CONN_STABLE 3 #endif /* ENABLE_ACLK */ +typedef enum __attribute__((packed)) { + ACLK_STATUS_CONNECTED = 0, + ACLK_STATUS_INITIALIZING, + ACLK_STATUS_DISABLED, + ACLK_STATUS_NO_CLOUD_URL, + ACLK_STATUS_INVALID_CLOUD_URL, + ACLK_STATUS_NOT_CLAIMED, + ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE, + ACLK_STATUS_ENV_RESPONSE_NOT_200, + ACLK_STATUS_ENV_RESPONSE_EMPTY, + ACLK_STATUS_ENV_RESPONSE_NOT_JSON, + ACLK_STATUS_ENV_FAILED, + ACLK_STATUS_BLOCKED, + ACLK_STATUS_NO_OLD_PROTOCOL, + ACLK_STATUS_NO_PROTOCOL_CAPABILITY, + ACLK_STATUS_INVALID_ENV_AUTH_URL, + ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX, + ACLK_STATUS_INVALID_ENV_TRANSPORT_URL, + ACLK_STATUS_INVALID_OTP, + ACLK_STATUS_NO_LWT_TOPIC, +} ACLK_STATUS; + +extern ACLK_STATUS aclk_status; +extern const char *aclk_cloud_base_url; +const char *aclk_status_to_string(void); + extern int aclk_connected; extern int aclk_ctx_based; extern int aclk_disable_runtime; extern int aclk_stats_enabled; extern int aclk_kill_link; +extern time_t last_conn_time_mqtt; +extern time_t last_conn_time_appl; +extern time_t last_disconnect_time; +extern time_t next_connection_attempt; +extern float last_backoff_value; + extern usec_t aclk_session_us; extern time_t aclk_session_sec; extern time_t aclk_block_until; +extern int aclk_connection_counter; extern int disconnect_req; #ifdef ENABLE_ACLK diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c index b38a928a55..13ae3441ac 100644 --- a/aclk/aclk_capas.c +++ b/aclk/aclk_capas.c @@ -13,7 +13,7 @@ const struct capability *aclk_get_agent_capas() { .name = "mc", .version = 0, .enabled = 0 }, { .name = "ctx", .version = 1, .enabled = 1 }, { .name = "funcs", .version = 1, .enabled = 1 }, - { .name = "http_api_v2", .version = 3, .enabled = 1 }, + { .name = "http_api_v2", .version = 4, .enabled = 1 }, { .name = "health", .version = 1, .enabled = 0 }, { .name = "req_cancel", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } @@ -31,6 +31,8 @@ const struct capability *aclk_get_agent_capas() struct capability *aclk_get_node_instance_capas(RRDHOST *host) { + bool functions = (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))); + struct capability ni_caps[] = { { .name = "proto", .version = 1, .enabled = 1 }, { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) }, @@ -38,7 +40,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host) .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = "funcs", .version = 0, .enabled = 0 }, + { .name = "funcs", .version = functions ? 1 : 0, .enabled = functions ? 1 : 0 }, { .name = "http_api_v2", .version = 3, .enabled = 1 }, { .name = "health", .version = 1, .enabled = host->health.health_enabled }, { .name = "req_cancel", .version = 1, .enabled = 1 }, @@ -48,10 +50,5 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host) struct capability *ret = mallocz(sizeof(ni_caps)); memcpy(ret, ni_caps, sizeof(ni_caps)); - if (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))) { - ret[4].version = 1; - ret[4].enabled = 1; - } - return ret; } diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index 66d751be6c..a0a85b6c2c 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -846,7 +846,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { error("Error trying to contact env endpoint"); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 2; } if (resp.http_code != 200) { error("The HTTP code not 200 OK (Got %d)", resp.http_code); @@ -854,21 +854,21 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { aclk_parse_otp_error(resp.payload); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 3; } if (!resp.payload || !resp.payload_size) { error("Unexpected empty payload as response to /env call"); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 4; } if (parse_json_env(resp.payload, env)) { error ("error parsing /env message"); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 5; } info("Getting Cloud /env successful"); |