diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2022-07-28 23:59:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-28 23:59:32 +0300 |
commit | ed52c959de2e0232170087a3e743562522c1ebfa (patch) | |
tree | f86c43d6c626bdecb579209b62e6ddb75f9f70f3 /aclk | |
parent | 9a6fd6366fafc5585b3fe9d2be2258df4738050d (diff) |
Revert "Query queue only for queries" (#13452)
Revert "Query queue only for queries (#13431)"
This reverts commit 221fd512873613a10b3d95b25a8a4d542b2c4801.
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 38 | ||||
-rw-r--r-- | aclk/aclk.h | 11 | ||||
-rw-r--r-- | aclk/aclk_alarm_api.c | 26 | ||||
-rw-r--r-- | aclk/aclk_charts_api.c | 49 | ||||
-rw-r--r-- | aclk/aclk_contexts_api.c | 14 | ||||
-rw-r--r-- | aclk/aclk_query.c | 68 | ||||
-rw-r--r-- | aclk/aclk_query.h | 2 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 35 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 36 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 17 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 23 | ||||
-rw-r--r-- | aclk/aclk_stats.h | 3 | ||||
-rw-r--r-- | aclk/schema-wrappers/context.cc | 4 | ||||
-rw-r--r-- | aclk/schema-wrappers/context.h | 4 |
14 files changed, 253 insertions, 77 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index c546739014..7b3641b1e2 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -742,8 +742,8 @@ void aclk_host_state_update(RRDHOST *host, int cmd) } if (ret < 0) { // node_id not found - size_t payload_len; - + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); rrdhost_aclk_state_lock(localhost); node_instance_creation_t node_instance_creation = { .claim_id = localhost->aclk_state.claimed_id, @@ -751,14 +751,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd) .hostname = host->hostname, .machine_guid = host->machine_guid }; - char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation); + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); + aclk_queue_query(create_query); return; } + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .hops = host->system_info->hops, .live = cmd, @@ -779,14 +781,15 @@ void aclk_host_state_update(RRDHOST *host, int cmd) rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - size_t payload_len; - char *payload = generate_node_instance_connection(&payload_len, &node_state_update); + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); freez((void*)node_state_update.node_id); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); } void aclk_send_node_instances() @@ -799,6 +802,7 @@ void aclk_send_node_instances() } while (!uuid_is_null(list->host_id)) { if (!uuid_is_null(list->node_id)) { + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .live = list->live, .hops = list->hops, @@ -823,30 +827,34 @@ void aclk_send_node_instances() rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - size_t payload_len; - char *payload = generate_node_instance_connection(&payload_len, &node_state_update); + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); freez((void*)node_state_update.node_id); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); } else { + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); node_instance_creation_t node_instance_creation = { .hops = list->hops, .hostname = list->hostname, }; node_instance_creation.machine_guid = mallocz(UUID_STR_LEN); uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; rrdhost_aclk_state_lock(localhost); - node_instance_creation.claim_id = localhost->aclk_state.claimed_id; - size_t payload_len; - char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation); + node_instance_creation.claim_id = localhost->aclk_state.claimed_id, + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); freez(node_instance_creation.machine_guid); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); + aclk_queue_query(create_query); } freez(list->hostname); diff --git a/aclk/aclk.h b/aclk/aclk.h index 6a0f2ad7a3..5065ac2bfc 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -34,17 +34,6 @@ void aclk_send_node_instances(void); void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); -#define GENERATE_AND_SEND_PAYLOAD(topic, msg_name, generator_fnc, generator_data...) \ - size_t payload_len; \ - char *payload = generator_fnc(&payload_len, generator_data); \ - if (unlikely(payload == NULL)) { \ - error("Failed to generate payload (%s)", __FUNCTION__); \ - return; \ - } \ - aclk_send_bin_msg(payload, payload_len, topic, msg_name); \ - if (!use_mqtt_5) \ - freez(payload); - char *ng_aclk_state(void); char *ng_aclk_state_json(void); diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index 7e75bf38a5..a181eb291f 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -10,20 +10,38 @@ void aclk_send_alarm_log_health(struct alarm_log_health *log_health) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_HEALTH, "AlarmLogHealth", generate_alarm_log_health, log_health); + aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH); + query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH; + query->data.bin_payload.msg_name = "AlarmLogHealth"; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry", generate_alarm_log_entry, log_entry); + size_t payload_size; + char *payload = generate_alarm_log_entry(&payload_size, log_entry); + + aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); + + if (!use_mqtt_5) + freez(payload); } void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_CONFIG, "ProvideAlarmConfiguration", generate_provide_alarm_configuration, cfg); + aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG); + query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG; + query->data.bin_payload.msg_name = "ProvideAlarmConfiguration"; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_SNAPSHOT, "AlarmSnapshot", generate_alarm_snapshot_bin, snapshot); + aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT); + query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT; + query->data.bin_payload.msg_name = "AlarmSnapshot"; + QUEUE_IF_PAYLOAD_PRESENT(query); } diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c index a82ee8d881..51d8dad58c 100644 --- a/aclk/aclk_charts_api.c +++ b/aclk/aclk_charts_api.c @@ -3,46 +3,75 @@ #include "aclk_query_queue.h" -#include "aclk.h" - #define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated" void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_updated, payloads, payload_sizes, new_positions); + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_chart_dimensions_updated, payloads, payload_sizes, new_positions); + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; + query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_and_dimensions_updated, payloads, payload_sizes, is_dim, new_positions, batch_id); + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; + query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_CONFIGS_UPDATED, "ChartConfigsUpdated", generate_chart_configs_updated, config_list, list_size); + aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED; + query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size); + query->data.bin_payload.msg_name = "ChartConfigsUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_chart_reset(chart_reset_t reset) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_RESET, "ResetChartMessages", generate_reset_chart_messages, reset); + aclk_query_t query = aclk_query_new(CHART_RESET); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET; + query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset); + query->data.bin_payload.msg_name = "ResetChartMessages"; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_retention_updated(struct retention_updated *data) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_RETENTION_UPDATED, "RetentionUpdated", generate_retention_updated, data); + aclk_query_t query = aclk_query_new(RETENTION_UPDATED); + query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED; + query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data); + query->data.bin_payload.msg_name = "RetentionUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_update_node_info(struct update_node_info *info) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_INFO, "UpdateNodeInfo", generate_update_node_info_message, info); + aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; + query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); + query->data.bin_payload.msg_name = "UpdateNodeInfo"; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_update_node_collectors(struct update_node_collectors *collectors) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_COLLECTORS, "UpdateNodeCollectors", generate_update_node_collectors_message, collectors); + aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; + query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); + query->data.bin_payload.msg_name = "UpdateNodeCollectors"; + QUEUE_IF_PAYLOAD_PRESENT(query); } diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c index a30d7a9d20..f17d3cabd9 100644 --- a/aclk/aclk_contexts_api.c +++ b/aclk/aclk_contexts_api.c @@ -4,14 +4,20 @@ #include "aclk_contexts_api.h" -#include "aclk.h" - void aclk_send_contexts_snapshot(contexts_snapshot_t data) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_SNAPSHOT, "ContextsSnapshot", contexts_snapshot_2bin, data); + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT; + query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsSnapshot"; + QUEUE_IF_PAYLOAD_PRESENT(query); } void aclk_send_contexts_updated(contexts_updated_t data) { - GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_UPDATED, "ContextsUpdated", contexts_updated_2bin, data); + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED; + query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); } diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index aa198e0aee..981c01965a 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -84,7 +84,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; - buffer_strcat(log_buffer, query->http_api_v2.query); + buffer_strcat(log_buffer, query->data.http_api_v2.query); size_t size = 0; size_t sent = 0; w->tv_in = query->created_tv; @@ -102,8 +102,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } RRDHOST *temp_host = NULL; - if (!strncmp(query->http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { - char *node_uuid = query->http_api_v2.query + strlen(NODE_ID_QUERY); + if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { + char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); char nodeid[UUID_STR_LEN]; if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { error_report(CLOUD_EMSG_MALFORMED_NODE_ID); @@ -127,14 +127,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } } - char *mysep = strchr(query->http_api_v2.query, '?'); + char *mysep = strchr(query->data.http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); *mysep = '\0'; } else - url_decode_r(w->decoded_query_string, query->http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1); - mysep = strrchr(query->http_api_v2.query, '/'); + mysep = strrchr(query->data.http_api_v2.query, '/'); if (aclk_stats_enabled) { ACLK_STATS_LOCK; @@ -151,7 +151,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used - if ((start = strstr((char *)query->http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) { + if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) { start += strlen(WEB_HDR_ACCEPT_ENC); end = strstr(start, "\x0D\x0A"); start = strstr(start, "gzip"); @@ -256,16 +256,57 @@ cleanup: return retval; } +static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) +{ + // this will be simplified when legacy support is removed + aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); + return 0; +} + +const char *aclk_query_get_name(aclk_query_type_t qt) +{ + switch (qt) { + case HTTP_API_V2: return "http_api_request_v2"; + case REGISTER_NODE: return "register_node"; + case NODE_STATE_UPDATE: return "node_state_update"; + case CHART_DIMS_UPDATE: return "chart_and_dim_update"; + case CHART_CONFIG_UPDATED: return "chart_config_updated"; + case CHART_RESET: return "reset_chart_messages"; + case RETENTION_UPDATED: return "update_retention_info"; + case UPDATE_NODE_INFO: return "update_node_info"; + case ALARM_LOG_HEALTH: return "alarm_log_health"; + case ALARM_PROVIDE_CFG: return "provide_alarm_config"; + case ALARM_SNAPSHOT: return "alarm_snapshot"; + case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; + case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; + default: + error_report("Unknown query type used %d", (int) qt); + return "unknown"; + } +} + static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { - worker_is_busy(0); - debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); - http_api_v2(query_thr, query); + if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) { + error_report("Unknown query in query queue. %u", query->type); + aclk_query_free(query); + return; + } + + worker_is_busy(query->type); + if (query->type == HTTP_API_V2) { + debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); + http_api_v2(query_thr, query); + } else { + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); + send_bin_msg(query_thr, query); + } if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; aclk_queries_per_thread[query_thr->idx]++; + aclk_metrics_per_sample.queries_per_type[query->type]++; ACLK_STATS_UNLOCK; } @@ -285,10 +326,11 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) return 0; } -static void worker_aclk_register(void) -{ +static void worker_aclk_register(void) { worker_register("ACLKQUERY"); - worker_register_job_name(0, "http query"); + for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) { + worker_register_job_name(i, aclk_query_get_name(i)); + } } /** diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h index 18618f16f4..f86754a2a6 100644 --- a/aclk/aclk_query.h +++ b/aclk/aclk_query.h @@ -31,4 +31,6 @@ struct aclk_query_threads { void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client); void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); +const char *aclk_query_get_name(aclk_query_type_t qt); + #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 794a8ca348..01b20d23f3 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -95,16 +95,41 @@ void aclk_queue_flush(void) }; } -aclk_query_t aclk_query_new() +aclk_query_t aclk_query_new(aclk_query_type_t type) { - return callocz(1, sizeof(struct aclk_query)); + aclk_query_t query = callocz(1, sizeof(struct aclk_query)); + query->type = type; + return query; } void aclk_query_free(aclk_query_t query) { - freez(query->http_api_v2.payload); - if (query->http_api_v2.query != query->dedup_id) - freez(query->http_api_v2.query); + switch (query->type) { + case HTTP_API_V2: + freez(query->data.http_api_v2.payload); + if (query->data.http_api_v2.query != query->dedup_id) + freez(query->data.http_api_v2.query); + break; + + case NODE_STATE_UPDATE: + case REGISTER_NODE: + case CHART_DIMS_UPDATE: + case CHART_CONFIG_UPDATED: + case CHART_RESET: + case RETENTION_UPDATED: + case UPDATE_NODE_INFO: + case ALARM_LOG_HEALTH: + case ALARM_PROVIDE_CFG: + case ALARM_SNAPSHOT: + case UPDATE_NODE_COLLECTORS: + case PROTO_BIN_MESSAGE: + if (!use_mqtt_5) + freez(query->data.bin_payload.payload); + break; + + default: + break; + } freez(query->dedup_id); freez(query->callback_topic); diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 514295807b..ab94b63848 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -9,6 +9,24 @@ #include "aclk_util.h" +typedef enum { + UNKNOWN = 0, + HTTP_API_V2, + REGISTER_NODE, + NODE_STATE_UPDATE, + CHART_DIMS_UPDATE, + CHART_CONFIG_UPDATED, + CHART_RESET, + RETENTION_UPDATED, + UPDATE_NODE_INFO, + ALARM_LOG_HEALTH, + ALARM_PROVIDE_CFG, + ALARM_SNAPSHOT, + UPDATE_NODE_COLLECTORS, + PROTO_BIN_MESSAGE, + ACLK_QUERY_TYPE_COUNT // always keep this as last +} aclk_query_type_t; + struct aclk_query_http_api_v2 { char *payload; char *query; @@ -23,6 +41,8 @@ struct aclk_bin_payload { typedef struct aclk_query *aclk_query_t; struct aclk_query { + aclk_query_type_t type; + // dedup_id is used to deduplicate queries in the list // if type and dedup_id is the same message is deduplicated // set dedup_id to NULL to never deduplicate the message @@ -39,11 +59,13 @@ struct aclk_query { // TODO maybe remove? int version; - - struct aclk_query_http_api_v2 http_api_v2; + union { + struct aclk_query_http_api_v2 http_api_v2; + struct aclk_bin_payload bin_payload; + } data; }; -aclk_query_t aclk_query_new(); +aclk_query_t aclk_query_new(aclk_query_type_t type); void aclk_query_free(aclk_query_t query); int aclk_queue_query(aclk_query_t query); @@ -53,4 +75,12 @@ void aclk_queue_flush(void); void aclk_queue_lock(void); void aclk_queue_unlock(void); +#define QUEUE_IF_PAYLOAD_PRESENT(query) \ + if (likely(query->data.bin_payload.payload)) { \ + aclk_queue_query(query); \ + } else { \ + error("Failed to generate payload (%s)", __FUNCTION__); \ + aclk_query_free(query); \ + } + #endif /* NETDATA_ACLK_QUERY_QUEUE_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 0e83dcac5a..e6ed332cc5 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -5,7 +5,6 @@ #include "aclk_stats.h" #include "aclk_query_queue.h" #include "aclk.h" -#include "aclk_tx_msgs.h" #include "schema-wrappers/proto_2_json.h" @@ -132,14 +131,14 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent return 1; } - query = aclk_query_new(); + query = aclk_query_new(HTTP_API_V2); - if (unlikely(aclk_extract_v2_data(raw_payload, &query->http_api_v2.payload))) { + if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) { error("Error extracting payload expected after the JSON dictionary."); goto error; } - if (unlikely(aclk_v2_payload_get_query(query->http_api_v2.payload, &query->dedup_id))) { + if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) { error("Could not extract payload from query"); goto error; } @@ -159,7 +158,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent query->timeout = cloud_to_agent->timeout; // for clarity and code readability as when we process the request // it would be strange to get URL from `dedup_id` - query->http_api_v2.query = query->dedup_id; + query->data.http_api_v2.query = query->dedup_id; query->msg_id = cloud_to_agent->msg_id; aclk_queue_query(query); return 0; @@ -266,6 +265,7 @@ int create_node_instance_result(const char *msg, size_t msg_len) } update_node_id(&host_id, &node_id); + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .hops = 1, .live = 0, @@ -298,14 +298,15 @@ int create_node_instance_result(const char *msg, size_t msg_len) }; node_state_update.capabilities = caps; - size_t payload_len; rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - char *payload = generate_node_instance_connection(&payload_len, &node_state_update); + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - aclk_send_bin_msg(payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); freez(res.node_id); freez(res.machine_guid); return 0; diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index d5d629651c..241e9b724d 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -118,6 +118,28 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } +static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s", + "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) + dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + + } else + rrdset_next(st); + + for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) + rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]); + + rrdset_done(st); +} + static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = { "other", "info", @@ -329,6 +351,7 @@ void *aclk_stats_main_thread(void *ptr) #endif aclk_stats_cloud_req(&per_sample); + aclk_stats_cloud_req_type(&per_sample); aclk_stats_cloud_req_http_type(&per_sample); aclk_stats_query_threads(aclk_queries_per_thread_sample); diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index a45169cc71..bec9ac2476 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -51,6 +51,9 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_req_recvd; volatile uint32_t cloud_req_err; + // query types. + volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT]; + // HTTP-specific request types. volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT]; diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc index 570cccc8b1..b04c9d20cc 100644 --- a/aclk/schema-wrappers/context.cc +++ b/aclk/schema-wrappers/context.cc @@ -57,7 +57,7 @@ void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct fill_ctx_updated(ctx, ctx_update); } -char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot) +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len) { ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap); @@ -109,7 +109,7 @@ void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct con fill_ctx_updated(ctx, ctx_update); } -char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated) +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len) { ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update); diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h index 5596b25b49..cbb7701a81 100644 --- a/aclk/schema-wrappers/context.h +++ b/aclk/schema-wrappers/context.h @@ -36,14 +36,14 @@ contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot); void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version); void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update); -char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot); +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len); // ContextS Updated related contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at); void contexts_updated_delete(contexts_updated_t ctxs_updated); void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash); void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update); -char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated); +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len); |