diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-07-28 10:29:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-28 10:29:42 +0200 |
commit | 221fd512873613a10b3d95b25a8a4d542b2c4801 (patch) | |
tree | 42897125a2299a58fa473ce20174f35acde2f488 /aclk | |
parent | fcd57482933fb7508211362371eeb0f288218246 (diff) |
Query queue only for queries (#13431)
simplify and clean up
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 38 | ||||
-rw-r--r-- | aclk/aclk.h | 11 | ||||
-rw-r--r-- | aclk/aclk_alarm_api.c | 26 | ||||
-rw-r--r-- | aclk/aclk_charts_api.c | 49 | ||||
-rw-r--r-- | aclk/aclk_contexts_api.c | 14 | ||||
-rw-r--r-- | aclk/aclk_query.c | 68 | ||||
-rw-r--r-- | aclk/aclk_query.h | 2 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 35 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 36 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 17 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 23 | ||||
-rw-r--r-- | aclk/aclk_stats.h | 3 | ||||
-rw-r--r-- | aclk/schema-wrappers/context.cc | 4 | ||||
-rw-r--r-- | aclk/schema-wrappers/context.h | 4 |
14 files changed, 77 insertions, 253 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 7b3641b1e2..c546739014 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -742,8 +742,8 @@ void aclk_host_state_update(RRDHOST *host, int cmd) } if (ret < 0) { // node_id not found - aclk_query_t create_query; - create_query = aclk_query_new(REGISTER_NODE); + size_t payload_len; + rrdhost_aclk_state_lock(localhost); node_instance_creation_t node_instance_creation = { .claim_id = localhost->aclk_state.claimed_id, @@ -751,16 +751,14 @@ void aclk_host_state_update(RRDHOST *host, int cmd) .hostname = host->hostname, .machine_guid = host->machine_guid }; - create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); + char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; - create_query->data.bin_payload.msg_name = "CreateNodeInstance"; + info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); - aclk_queue_query(create_query); + aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); return; } - aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .hops = host->system_info->hops, .live = cmd, @@ -781,15 +779,14 @@ void aclk_host_state_update(RRDHOST *host, int cmd) rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + size_t payload_len; + char *payload = generate_node_instance_connection(&payload_len, &node_state_update); rrdhost_aclk_state_unlock(localhost); info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); freez((void*)node_state_update.node_id); - query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; - query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; - aclk_queue_query(query); + aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); } void aclk_send_node_instances() @@ -802,7 +799,6 @@ void aclk_send_node_instances() } while (!uuid_is_null(list->host_id)) { if (!uuid_is_null(list->node_id)) { - aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .live = list->live, .hops = list->hops, @@ -827,34 +823,30 @@ void aclk_send_node_instances() rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + size_t payload_len; + char *payload = generate_node_instance_connection(&payload_len, &node_state_update); rrdhost_aclk_state_unlock(localhost); info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); freez((void*)node_state_update.node_id); - query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; - query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; - aclk_queue_query(query); + aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); } else { - aclk_query_t create_query; - create_query = aclk_query_new(REGISTER_NODE); node_instance_creation_t node_instance_creation = { .hops = list->hops, .hostname = list->hostname, }; node_instance_creation.machine_guid = mallocz(UUID_STR_LEN); uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); - create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; - create_query->data.bin_payload.msg_name = "CreateNodeInstance"; rrdhost_aclk_state_lock(localhost); - node_instance_creation.claim_id = localhost->aclk_state.claimed_id, - create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); + node_instance_creation.claim_id = localhost->aclk_state.claimed_id; + size_t payload_len; + char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); freez(node_instance_creation.machine_guid); - aclk_queue_query(create_query); + aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); } freez(list->hostname); diff --git a/aclk/aclk.h b/aclk/aclk.h index 5065ac2bfc..6a0f2ad7a3 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -34,6 +34,17 @@ void aclk_send_node_instances(void); void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); +#define GENERATE_AND_SEND_PAYLOAD(topic, msg_name, generator_fnc, generator_data...) \ + size_t payload_len; \ + char *payload = generator_fnc(&payload_len, generator_data); \ + if (unlikely(payload == NULL)) { \ + error("Failed to generate payload (%s)", __FUNCTION__); \ + return; \ + } \ + aclk_send_bin_msg(payload, payload_len, topic, msg_name); \ + if (!use_mqtt_5) \ + freez(payload); + char *ng_aclk_state(void); char *ng_aclk_state_json(void); diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index a181eb291f..7e75bf38a5 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -10,38 +10,20 @@ void aclk_send_alarm_log_health(struct alarm_log_health *log_health) { - aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH); - query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health); - query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH; - query->data.bin_payload.msg_name = "AlarmLogHealth"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_HEALTH, "AlarmLogHealth", generate_alarm_log_health, log_health); } void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) { - size_t payload_size; - char *payload = generate_alarm_log_entry(&payload_size, log_entry); - - aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); - - if (!use_mqtt_5) - freez(payload); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry", generate_alarm_log_entry, log_entry); } void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) { - aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG); - query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg); - query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG; - query->data.bin_payload.msg_name = "ProvideAlarmConfiguration"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_CONFIG, "ProvideAlarmConfiguration", generate_provide_alarm_configuration, cfg); } void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot) { - aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT); - query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot); - query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT; - query->data.bin_payload.msg_name = "AlarmSnapshot"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_SNAPSHOT, "AlarmSnapshot", generate_alarm_snapshot_bin, snapshot); } diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c index 51d8dad58c..a82ee8d881 100644 --- a/aclk/aclk_charts_api.c +++ b/aclk/aclk_charts_api.c @@ -3,75 +3,46 @@ #include "aclk_query_queue.h" +#include "aclk.h" + #define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated" void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) { - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_updated, payloads, payload_sizes, new_positions); } void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) { - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; - query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_chart_dimensions_updated, payloads, payload_sizes, new_positions); } void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) { - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; - query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_and_dimensions_updated, payloads, payload_sizes, is_dim, new_positions, batch_id); } void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size) { - aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED; - query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size); - query->data.bin_payload.msg_name = "ChartConfigsUpdated"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_CONFIGS_UPDATED, "ChartConfigsUpdated", generate_chart_configs_updated, config_list, list_size); } void aclk_chart_reset(chart_reset_t reset) { - aclk_query_t query = aclk_query_new(CHART_RESET); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET; - query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset); - query->data.bin_payload.msg_name = "ResetChartMessages"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_RESET, "ResetChartMessages", generate_reset_chart_messages, reset); } void aclk_retention_updated(struct retention_updated *data) { - aclk_query_t query = aclk_query_new(RETENTION_UPDATED); - query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED; - query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data); - query->data.bin_payload.msg_name = "RetentionUpdated"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_RETENTION_UPDATED, "RetentionUpdated", generate_retention_updated, data); } void aclk_update_node_info(struct update_node_info *info) { - aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); - query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; - query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); - query->data.bin_payload.msg_name = "UpdateNodeInfo"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_INFO, "UpdateNodeInfo", generate_update_node_info_message, info); } void aclk_update_node_collectors(struct update_node_collectors *collectors) { - aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); - query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; - query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); - query->data.bin_payload.msg_name = "UpdateNodeCollectors"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_COLLECTORS, "UpdateNodeCollectors", generate_update_node_collectors_message, collectors); } diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c index f17d3cabd9..a30d7a9d20 100644 --- a/aclk/aclk_contexts_api.c +++ b/aclk/aclk_contexts_api.c @@ -4,20 +4,14 @@ #include "aclk_contexts_api.h" +#include "aclk.h" + void aclk_send_contexts_snapshot(contexts_snapshot_t data) { - aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); - query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT; - query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size); - query->data.bin_payload.msg_name = "ContextsSnapshot"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_SNAPSHOT, "ContextsSnapshot", contexts_snapshot_2bin, data); } void aclk_send_contexts_updated(contexts_updated_t data) { - aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); - query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED; - query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size); - query->data.bin_payload.msg_name = "ContextsUpdated"; - QUEUE_IF_PAYLOAD_PRESENT(query); + GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_UPDATED, "ContextsUpdated", contexts_updated_2bin, data); } diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 981c01965a..aa198e0aee 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -84,7 +84,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; - buffer_strcat(log_buffer, query->data.http_api_v2.query); + buffer_strcat(log_buffer, query->http_api_v2.query); size_t size = 0; size_t sent = 0; w->tv_in = query->created_tv; @@ -102,8 +102,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } RRDHOST *temp_host = NULL; - if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { - char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); + if (!strncmp(query->http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { + char *node_uuid = query->http_api_v2.query + strlen(NODE_ID_QUERY); char nodeid[UUID_STR_LEN]; if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { error_report(CLOUD_EMSG_MALFORMED_NODE_ID); @@ -127,14 +127,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } } - char *mysep = strchr(query->data.http_api_v2.query, '?'); + char *mysep = strchr(query->http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); *mysep = '\0'; } else - url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + url_decode_r(w->decoded_query_string, query->http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1); - mysep = strrchr(query->data.http_api_v2.query, '/'); + mysep = strrchr(query->http_api_v2.query, '/'); if (aclk_stats_enabled) { ACLK_STATS_LOCK; @@ -151,7 +151,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used - if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) { + if ((start = strstr((char *)query->http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) { start += strlen(WEB_HDR_ACCEPT_ENC); end = strstr(start, "\x0D\x0A"); start = strstr(start, "gzip"); @@ -256,57 +256,16 @@ cleanup: return retval; } -static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - // this will be simplified when legacy support is removed - aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); - return 0; -} - -const char *aclk_query_get_name(aclk_query_type_t qt) -{ - switch (qt) { - case HTTP_API_V2: return "http_api_request_v2"; - case REGISTER_NODE: return "register_node"; - case NODE_STATE_UPDATE: return "node_state_update"; - case CHART_DIMS_UPDATE: return "chart_and_dim_update"; - case CHART_CONFIG_UPDATED: return "chart_config_updated"; - case CHART_RESET: return "reset_chart_messages"; - case RETENTION_UPDATED: return "update_retention_info"; - case UPDATE_NODE_INFO: return "update_node_info"; - case ALARM_LOG_HEALTH: return "alarm_log_health"; - case ALARM_PROVIDE_CFG: return "provide_alarm_config"; - case ALARM_SNAPSHOT: return "alarm_snapshot"; - case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; - case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; - default: - error_report("Unknown query type used %d", (int) qt); - return "unknown"; - } -} - static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { - if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) { - error_report("Unknown query in query queue. %u", query->type); - aclk_query_free(query); - return; - } - - worker_is_busy(query->type); - if (query->type == HTTP_API_V2) { - debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); - http_api_v2(query_thr, query); - } else { - debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); - send_bin_msg(query_thr, query); - } + worker_is_busy(0); + debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); + http_api_v2(query_thr, query); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; aclk_queries_per_thread[query_thr->idx]++; - aclk_metrics_per_sample.queries_per_type[query->type]++; ACLK_STATS_UNLOCK; } @@ -326,11 +285,10 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) return 0; } -static void worker_aclk_register(void) { +static void worker_aclk_register(void) +{ worker_register("ACLKQUERY"); - for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) { - worker_register_job_name(i, aclk_query_get_name(i)); - } + worker_register_job_name(0, "http query"); } /** diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h index f86754a2a6..18618f16f4 100644 --- a/aclk/aclk_query.h +++ b/aclk/aclk_query.h @@ -31,6 +31,4 @@ struct aclk_query_threads { void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client); void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); -const char *aclk_query_get_name(aclk_query_type_t qt); - #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 01b20d23f3..794a8ca348 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -95,41 +95,16 @@ void aclk_queue_flush(void) }; } -aclk_query_t aclk_query_new(aclk_query_type_t type) +aclk_query_t aclk_query_new() { - aclk_query_t query = callocz(1, sizeof(struct aclk_query)); - query->type = type; - return query; + return callocz(1, sizeof(struct aclk_query)); } void aclk_query_free(aclk_query_t query) { - switch (query->type) { - case HTTP_API_V2: - freez(query->data.http_api_v2.payload); - if (query->data.http_api_v2.query != query->dedup_id) - freez(query->data.http_api_v2.query); - break; - - case NODE_STATE_UPDATE: - case REGISTER_NODE: - case CHART_DIMS_UPDATE: - case CHART_CONFIG_UPDATED: - case CHART_RESET: - case RETENTION_UPDATED: - case UPDATE_NODE_INFO: - case ALARM_LOG_HEALTH: - case ALARM_PROVIDE_CFG: - case ALARM_SNAPSHOT: - case UPDATE_NODE_COLLECTORS: - case PROTO_BIN_MESSAGE: - if (!use_mqtt_5) - freez(query->data.bin_payload.payload); - break; - - default: - break; - } + freez(query->http_api_v2.payload); + if (query->http_api_v2.query != query->dedup_id) + freez(query->http_api_v2.query); freez(query->dedup_id); freez(query->callback_topic); diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index ab94b63848..514295807b 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -9,24 +9,6 @@ #include "aclk_util.h" -typedef enum { - UNKNOWN = 0, - HTTP_API_V2, - REGISTER_NODE, - NODE_STATE_UPDATE, - CHART_DIMS_UPDATE, - CHART_CONFIG_UPDATED, - CHART_RESET, - RETENTION_UPDATED, - UPDATE_NODE_INFO, - ALARM_LOG_HEALTH, - ALARM_PROVIDE_CFG, - ALARM_SNAPSHOT, - UPDATE_NODE_COLLECTORS, - PROTO_BIN_MESSAGE, - ACLK_QUERY_TYPE_COUNT // always keep this as last -} aclk_query_type_t; - struct aclk_query_http_api_v2 { char *payload; char *query; @@ -41,8 +23,6 @@ struct aclk_bin_payload { typedef struct aclk_query *aclk_query_t; struct aclk_query { - aclk_query_type_t type; - // dedup_id is used to deduplicate queries in the list // if type and dedup_id is the same message is deduplicated // set dedup_id to NULL to never deduplicate the message @@ -59,13 +39,11 @@ struct aclk_query { // TODO maybe remove? int version; - union { - struct aclk_query_http_api_v2 http_api_v2; - struct aclk_bin_payload bin_payload; - } data; + + struct aclk_query_http_api_v2 http_api_v2; }; -aclk_query_t aclk_query_new(aclk_query_type_t type); +aclk_query_t aclk_query_new(); void aclk_query_free(aclk_query_t query); int aclk_queue_query(aclk_query_t query); @@ -75,12 +53,4 @@ void aclk_queue_flush(void); void aclk_queue_lock(void); void aclk_queue_unlock(void); -#define QUEUE_IF_PAYLOAD_PRESENT(query) \ - if (likely(query->data.bin_payload.payload)) { \ - aclk_queue_query(query); \ - } else { \ - error("Failed to generate payload (%s)", __FUNCTION__); \ - aclk_query_free(query); \ - } - #endif /* NETDATA_ACLK_QUERY_QUEUE_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index e6ed332cc5..0e83dcac5a 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -5,6 +5,7 @@ #include "aclk_stats.h" #include "aclk_query_queue.h" #include "aclk.h" +#include "aclk_tx_msgs.h" #include "schema-wrappers/proto_2_json.h" @@ -131,14 +132,14 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent return 1; } - query = aclk_query_new(HTTP_API_V2); + query = aclk_query_new(); - if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) { + if (unlikely(aclk_extract_v2_data(raw_payload, &query->http_api_v2.payload))) { error("Error extracting payload expected after the JSON dictionary."); goto error; } - if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) { + if (unlikely(aclk_v2_payload_get_query(query->http_api_v2.payload, &query->dedup_id))) { error("Could not extract payload from query"); goto error; } @@ -158,7 +159,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent query->timeout = cloud_to_agent->timeout; // for clarity and code readability as when we process the request // it would be strange to get URL from `dedup_id` - query->data.http_api_v2.query = query->dedup_id; + query->http_api_v2.query = query->dedup_id; query->msg_id = cloud_to_agent->msg_id; aclk_queue_query(query); return 0; @@ -265,7 +266,6 @@ int create_node_instance_result(const char *msg, size_t msg_len) } update_node_id(&host_id, &node_id); - aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .hops = 1, .live = 0, @@ -298,15 +298,14 @@ int create_node_instance_result(const char *msg, size_t msg_len) }; node_state_update.capabilities = caps; + size_t payload_len; rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + char *payload = generate_node_instance_connection(&payload_len, &node_state_update); rrdhost_aclk_state_unlock(localhost); - query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; - query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_send_bin_msg(payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); - aclk_queue_query(query); freez(res.node_id); freez(res.machine_guid); return 0; diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 241e9b724d..d5d629651c 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -118,28 +118,6 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st = NULL; - static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT]; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s", - "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - - for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) - dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - - } else - rrdset_next(st); - - for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) - rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]); - - rrdset_done(st); -} - static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = { "other", "info", @@ -351,7 +329,6 @@ void *aclk_stats_main_thread(void *ptr) #endif aclk_stats_cloud_req(&per_sample); - aclk_stats_cloud_req_type(&per_sample); aclk_stats_cloud_req_http_type(&per_sample); aclk_stats_query_threads(aclk_queries_per_thread_sample); diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index bec9ac2476..a45169cc71 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -51,9 +51,6 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_req_recvd; volatile uint32_t cloud_req_err; - // query types. - volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT]; - // HTTP-specific request types. volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT]; diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc index b04c9d20cc..570cccc8b1 100644 --- a/aclk/schema-wrappers/context.cc +++ b/aclk/schema-wrappers/context.cc @@ -57,7 +57,7 @@ void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct fill_ctx_updated(ctx, ctx_update); } -char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len) +char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot) { ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap); @@ -109,7 +109,7 @@ void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct con fill_ctx_updated(ctx, ctx_update); } -char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len) +char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated) { ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update); diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h index cbb7701a81..5596b25b49 100644 --- a/aclk/schema-wrappers/context.h +++ b/aclk/schema-wrappers/context.h @@ -36,14 +36,14 @@ contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot); void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version); void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update); -char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len); +char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot); // ContextS Updated related contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at); void contexts_updated_delete(contexts_updated_t ctxs_updated); void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash); void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update); -char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len); +char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated); #ifdef __cplusplus |