summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-07-28 10:29:42 +0200
committerGitHub <noreply@github.com>2022-07-28 10:29:42 +0200
commit221fd512873613a10b3d95b25a8a4d542b2c4801 (patch)
tree42897125a2299a58fa473ce20174f35acde2f488 /aclk
parentfcd57482933fb7508211362371eeb0f288218246 (diff)
Query queue only for queries (#13431)
simplify and clean up
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c38
-rw-r--r--aclk/aclk.h11
-rw-r--r--aclk/aclk_alarm_api.c26
-rw-r--r--aclk/aclk_charts_api.c49
-rw-r--r--aclk/aclk_contexts_api.c14
-rw-r--r--aclk/aclk_query.c68
-rw-r--r--aclk/aclk_query.h2
-rw-r--r--aclk/aclk_query_queue.c35
-rw-r--r--aclk/aclk_query_queue.h36
-rw-r--r--aclk/aclk_rx_msgs.c17
-rw-r--r--aclk/aclk_stats.c23
-rw-r--r--aclk/aclk_stats.h3
-rw-r--r--aclk/schema-wrappers/context.cc4
-rw-r--r--aclk/schema-wrappers/context.h4
14 files changed, 77 insertions, 253 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 7b3641b1e2..c546739014 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -742,8 +742,8 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
}
if (ret < 0) {
// node_id not found
- aclk_query_t create_query;
- create_query = aclk_query_new(REGISTER_NODE);
+ size_t payload_len;
+
rrdhost_aclk_state_lock(localhost);
node_instance_creation_t node_instance_creation = {
.claim_id = localhost->aclk_state.claimed_id,
@@ -751,16 +751,14 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
.hostname = host->hostname,
.machine_guid = host->machine_guid
};
- create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
- create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+
info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
- aclk_queue_query(create_query);
+ aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
return;
}
- aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.hops = host->system_info->hops,
.live = cmd,
@@ -781,15 +779,14 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ size_t payload_len;
+ char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
host->system_info->hops);
freez((void*)node_state_update.node_id);
- query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
- query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
- aclk_queue_query(query);
+ aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
}
void aclk_send_node_instances()
@@ -802,7 +799,6 @@ void aclk_send_node_instances()
}
while (!uuid_is_null(list->host_id)) {
if (!uuid_is_null(list->node_id)) {
- aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.live = list->live,
.hops = list->hops,
@@ -827,34 +823,30 @@ void aclk_send_node_instances()
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ size_t payload_len;
+ char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
freez((void*)node_state_update.node_id);
- query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
- query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
- aclk_queue_query(query);
+ aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
} else {
- aclk_query_t create_query;
- create_query = aclk_query_new(REGISTER_NODE);
node_instance_creation_t node_instance_creation = {
.hops = list->hops,
.hostname = list->hostname,
};
node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
- create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
- create_query->data.bin_payload.msg_name = "CreateNodeInstance";
rrdhost_aclk_state_lock(localhost);
- node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
- create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id;
+ size_t payload_len;
+ char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
freez(node_instance_creation.machine_guid);
- aclk_queue_query(create_query);
+ aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
}
freez(list->hostname);
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 5065ac2bfc..6a0f2ad7a3 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -34,6 +34,17 @@ void aclk_send_node_instances(void);
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
+#define GENERATE_AND_SEND_PAYLOAD(topic, msg_name, generator_fnc, generator_data...) \
+ size_t payload_len; \
+ char *payload = generator_fnc(&payload_len, generator_data); \
+ if (unlikely(payload == NULL)) { \
+ error("Failed to generate payload (%s)", __FUNCTION__); \
+ return; \
+ } \
+ aclk_send_bin_msg(payload, payload_len, topic, msg_name); \
+ if (!use_mqtt_5) \
+ freez(payload);
+
char *ng_aclk_state(void);
char *ng_aclk_state_json(void);
diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c
index a181eb291f..7e75bf38a5 100644
--- a/aclk/aclk_alarm_api.c
+++ b/aclk/aclk_alarm_api.c
@@ -10,38 +10,20 @@
void aclk_send_alarm_log_health(struct alarm_log_health *log_health)
{
- aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH);
- query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health);
- query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH;
- query->data.bin_payload.msg_name = "AlarmLogHealth";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_HEALTH, "AlarmLogHealth", generate_alarm_log_health, log_health);
}
void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
{
- size_t payload_size;
- char *payload = generate_alarm_log_entry(&payload_size, log_entry);
-
- aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
-
- if (!use_mqtt_5)
- freez(payload);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry", generate_alarm_log_entry, log_entry);
}
void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
{
- aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG);
- query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg);
- query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG;
- query->data.bin_payload.msg_name = "ProvideAlarmConfiguration";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_CONFIG, "ProvideAlarmConfiguration", generate_provide_alarm_configuration, cfg);
}
void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot)
{
- aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT);
- query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot);
- query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT;
- query->data.bin_payload.msg_name = "AlarmSnapshot";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_SNAPSHOT, "AlarmSnapshot", generate_alarm_snapshot_bin, snapshot);
}
diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c
index 51d8dad58c..a82ee8d881 100644
--- a/aclk/aclk_charts_api.c
+++ b/aclk/aclk_charts_api.c
@@ -3,75 +3,46 @@
#include "aclk_query_queue.h"
+#include "aclk.h"
+
#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated"
void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
{
- aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
- query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
- query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_updated, payloads, payload_sizes, new_positions);
}
void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
{
- aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
- query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
- query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_chart_dimensions_updated, payloads, payload_sizes, new_positions);
}
void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
{
- aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
- query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id);
- query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_and_dimensions_updated, payloads, payload_sizes, is_dim, new_positions, batch_id);
}
void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size)
{
- aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED;
- query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size);
- query->data.bin_payload.msg_name = "ChartConfigsUpdated";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_CONFIGS_UPDATED, "ChartConfigsUpdated", generate_chart_configs_updated, config_list, list_size);
}
void aclk_chart_reset(chart_reset_t reset)
{
- aclk_query_t query = aclk_query_new(CHART_RESET);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET;
- query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset);
- query->data.bin_payload.msg_name = "ResetChartMessages";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_RESET, "ResetChartMessages", generate_reset_chart_messages, reset);
}
void aclk_retention_updated(struct retention_updated *data)
{
- aclk_query_t query = aclk_query_new(RETENTION_UPDATED);
- query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED;
- query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data);
- query->data.bin_payload.msg_name = "RetentionUpdated";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_RETENTION_UPDATED, "RetentionUpdated", generate_retention_updated, data);
}
void aclk_update_node_info(struct update_node_info *info)
{
- aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO);
- query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO;
- query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info);
- query->data.bin_payload.msg_name = "UpdateNodeInfo";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_INFO, "UpdateNodeInfo", generate_update_node_info_message, info);
}
void aclk_update_node_collectors(struct update_node_collectors *collectors)
{
- aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS);
- query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS;
- query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors);
- query->data.bin_payload.msg_name = "UpdateNodeCollectors";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_COLLECTORS, "UpdateNodeCollectors", generate_update_node_collectors_message, collectors);
}
diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c
index f17d3cabd9..a30d7a9d20 100644
--- a/aclk/aclk_contexts_api.c
+++ b/aclk/aclk_contexts_api.c
@@ -4,20 +4,14 @@
#include "aclk_contexts_api.h"
+#include "aclk.h"
+
void aclk_send_contexts_snapshot(contexts_snapshot_t data)
{
- aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
- query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT;
- query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size);
- query->data.bin_payload.msg_name = "ContextsSnapshot";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_SNAPSHOT, "ContextsSnapshot", contexts_snapshot_2bin, data);
}
void aclk_send_contexts_updated(contexts_updated_t data)
{
- aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
- query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED;
- query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size);
- query->data.bin_payload.msg_name = "ContextsUpdated";
- QUEUE_IF_PAYLOAD_PRESENT(query);
+ GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_UPDATED, "ContextsUpdated", contexts_updated_2bin, data);
}
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 981c01965a..aa198e0aee 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -84,7 +84,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
- buffer_strcat(log_buffer, query->data.http_api_v2.query);
+ buffer_strcat(log_buffer, query->http_api_v2.query);
size_t size = 0;
size_t sent = 0;
w->tv_in = query->created_tv;
@@ -102,8 +102,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
RRDHOST *temp_host = NULL;
- if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
- char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
+ if (!strncmp(query->http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
+ char *node_uuid = query->http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
error_report(CLOUD_EMSG_MALFORMED_NODE_ID);
@@ -127,14 +127,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
}
- char *mysep = strchr(query->data.http_api_v2.query, '?');
+ char *mysep = strchr(query->http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
*mysep = '\0';
} else
- url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+ url_decode_r(w->decoded_query_string, query->http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
- mysep = strrchr(query->data.http_api_v2.query, '/');
+ mysep = strrchr(query->http_api_v2.query, '/');
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
@@ -151,7 +151,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
- if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
+ if ((start = strstr((char *)query->http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
start += strlen(WEB_HDR_ACCEPT_ENC);
end = strstr(start, "\x0D\x0A");
start = strstr(start, "gzip");
@@ -256,57 +256,16 @@ cleanup:
return retval;
}
-static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- // this will be simplified when legacy support is removed
- aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
- return 0;
-}
-
-const char *aclk_query_get_name(aclk_query_type_t qt)
-{
- switch (qt) {
- case HTTP_API_V2: return "http_api_request_v2";
- case REGISTER_NODE: return "register_node";
- case NODE_STATE_UPDATE: return "node_state_update";
- case CHART_DIMS_UPDATE: return "chart_and_dim_update";
- case CHART_CONFIG_UPDATED: return "chart_config_updated";
- case CHART_RESET: return "reset_chart_messages";
- case RETENTION_UPDATED: return "update_retention_info";
- case UPDATE_NODE_INFO: return "update_node_info";
- case ALARM_LOG_HEALTH: return "alarm_log_health";
- case ALARM_PROVIDE_CFG: return "provide_alarm_config";
- case ALARM_SNAPSHOT: return "alarm_snapshot";
- case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
- case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
- default:
- error_report("Unknown query type used %d", (int) qt);
- return "unknown";
- }
-}
-
static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
- if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
- error_report("Unknown query in query queue. %u", query->type);
- aclk_query_free(query);
- return;
- }
-
- worker_is_busy(query->type);
- if (query->type == HTTP_API_V2) {
- debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
- http_api_v2(query_thr, query);
- } else {
- debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name);
- send_bin_msg(query_thr, query);
- }
+ worker_is_busy(0);
+ debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
+ http_api_v2(query_thr, query);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[query_thr->idx]++;
- aclk_metrics_per_sample.queries_per_type[query->type]++;
ACLK_STATS_UNLOCK;
}
@@ -326,11 +285,10 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
return 0;
}
-static void worker_aclk_register(void) {
+static void worker_aclk_register(void)
+{
worker_register("ACLKQUERY");
- for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
- worker_register_job_name(i, aclk_query_get_name(i));
- }
+ worker_register_job_name(0, "http query");
}
/**
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
index f86754a2a6..18618f16f4 100644
--- a/aclk/aclk_query.h
+++ b/aclk/aclk_query.h
@@ -31,6 +31,4 @@ struct aclk_query_threads {
void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
-const char *aclk_query_get_name(aclk_query_type_t qt);
-
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 01b20d23f3..794a8ca348 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -95,41 +95,16 @@ void aclk_queue_flush(void)
};
}
-aclk_query_t aclk_query_new(aclk_query_type_t type)
+aclk_query_t aclk_query_new()
{
- aclk_query_t query = callocz(1, sizeof(struct aclk_query));
- query->type = type;
- return query;
+ return callocz(1, sizeof(struct aclk_query));
}
void aclk_query_free(aclk_query_t query)
{
- switch (query->type) {
- case HTTP_API_V2:
- freez(query->data.http_api_v2.payload);
- if (query->data.http_api_v2.query != query->dedup_id)
- freez(query->data.http_api_v2.query);
- break;
-
- case NODE_STATE_UPDATE:
- case REGISTER_NODE:
- case CHART_DIMS_UPDATE:
- case CHART_CONFIG_UPDATED:
- case CHART_RESET:
- case RETENTION_UPDATED:
- case UPDATE_NODE_INFO:
- case ALARM_LOG_HEALTH:
- case ALARM_PROVIDE_CFG:
- case ALARM_SNAPSHOT:
- case UPDATE_NODE_COLLECTORS:
- case PROTO_BIN_MESSAGE:
- if (!use_mqtt_5)
- freez(query->data.bin_payload.payload);
- break;
-
- default:
- break;
- }
+ freez(query->http_api_v2.payload);
+ if (query->http_api_v2.query != query->dedup_id)
+ freez(query->http_api_v2.query);
freez(query->dedup_id);
freez(query->callback_topic);
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index ab94b63848..514295807b 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -9,24 +9,6 @@
#include "aclk_util.h"
-typedef enum {
- UNKNOWN = 0,
- HTTP_API_V2,
- REGISTER_NODE,
- NODE_STATE_UPDATE,
- CHART_DIMS_UPDATE,
- CHART_CONFIG_UPDATED,
- CHART_RESET,
- RETENTION_UPDATED,
- UPDATE_NODE_INFO,
- ALARM_LOG_HEALTH,
- ALARM_PROVIDE_CFG,
- ALARM_SNAPSHOT,
- UPDATE_NODE_COLLECTORS,
- PROTO_BIN_MESSAGE,
- ACLK_QUERY_TYPE_COUNT // always keep this as last
-} aclk_query_type_t;
-
struct aclk_query_http_api_v2 {
char *payload;
char *query;
@@ -41,8 +23,6 @@ struct aclk_bin_payload {
typedef struct aclk_query *aclk_query_t;
struct aclk_query {
- aclk_query_type_t type;
-
// dedup_id is used to deduplicate queries in the list
// if type and dedup_id is the same message is deduplicated
// set dedup_id to NULL to never deduplicate the message
@@ -59,13 +39,11 @@ struct aclk_query {
// TODO maybe remove?
int version;
- union {
- struct aclk_query_http_api_v2 http_api_v2;
- struct aclk_bin_payload bin_payload;
- } data;
+
+ struct aclk_query_http_api_v2 http_api_v2;
};
-aclk_query_t aclk_query_new(aclk_query_type_t type);
+aclk_query_t aclk_query_new();
void aclk_query_free(aclk_query_t query);
int aclk_queue_query(aclk_query_t query);
@@ -75,12 +53,4 @@ void aclk_queue_flush(void);
void aclk_queue_lock(void);
void aclk_queue_unlock(void);
-#define QUEUE_IF_PAYLOAD_PRESENT(query) \
- if (likely(query->data.bin_payload.payload)) { \
- aclk_queue_query(query); \
- } else { \
- error("Failed to generate payload (%s)", __FUNCTION__); \
- aclk_query_free(query); \
- }
-
#endif /* NETDATA_ACLK_QUERY_QUEUE_H */
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index e6ed332cc5..0e83dcac5a 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk.h"
+#include "aclk_tx_msgs.h"
#include "schema-wrappers/proto_2_json.h"
@@ -131,14 +132,14 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
return 1;
}
- query = aclk_query_new(HTTP_API_V2);
+ query = aclk_query_new();
- if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) {
+ if (unlikely(aclk_extract_v2_data(raw_payload, &query->http_api_v2.payload))) {
error("Error extracting payload expected after the JSON dictionary.");
goto error;
}
- if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) {
+ if (unlikely(aclk_v2_payload_get_query(query->http_api_v2.payload, &query->dedup_id))) {
error("Could not extract payload from query");
goto error;
}
@@ -158,7 +159,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
query->timeout = cloud_to_agent->timeout;
// for clarity and code readability as when we process the request
// it would be strange to get URL from `dedup_id`
- query->data.http_api_v2.query = query->dedup_id;
+ query->http_api_v2.query = query->dedup_id;
query->msg_id = cloud_to_agent->msg_id;
aclk_queue_query(query);
return 0;
@@ -265,7 +266,6 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
update_node_id(&host_id, &node_id);
- aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.hops = 1,
.live = 0,
@@ -298,15 +298,14 @@ int create_node_instance_result(const char *msg, size_t msg_len)
};
node_state_update.capabilities = caps;
+ size_t payload_len;
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
- query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_send_bin_msg(payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
- aclk_queue_query(query);
freez(res.node_id);
freez(res.machine_guid);
return 0;
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 241e9b724d..d5d629651c 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -118,28 +118,6 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
-static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st = NULL;
- static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT];
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s",
- "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
-
- for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
- dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
-
- } else
- rrdset_next(st);
-
- for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
- rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]);
-
- rrdset_done(st);
-}
-
static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = {
"other",
"info",
@@ -351,7 +329,6 @@ void *aclk_stats_main_thread(void *ptr)
#endif
aclk_stats_cloud_req(&per_sample);
- aclk_stats_cloud_req_type(&per_sample);
aclk_stats_cloud_req_http_type(&per_sample);
aclk_stats_query_threads(aclk_queries_per_thread_sample);
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index bec9ac2476..a45169cc71 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -51,9 +51,6 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_req_recvd;
volatile uint32_t cloud_req_err;
- // query types.
- volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT];
-
// HTTP-specific request types.
volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT];
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc
index b04c9d20cc..570cccc8b1 100644
--- a/aclk/schema-wrappers/context.cc
+++ b/aclk/schema-wrappers/context.cc
@@ -57,7 +57,7 @@ void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct
fill_ctx_updated(ctx, ctx_update);
}
-char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
+char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot)
{
ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
*len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
@@ -109,7 +109,7 @@ void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct con
fill_ctx_updated(ctx, ctx_update);
}
-char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
+char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated)
{
ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
*len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h
index cbb7701a81..5596b25b49 100644
--- a/aclk/schema-wrappers/context.h
+++ b/aclk/schema-wrappers/context.h
@@ -36,14 +36,14 @@ contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node
void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
-char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
+char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot);
// ContextS Updated related
contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
void contexts_updated_delete(contexts_updated_t ctxs_updated);
void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
-char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);
+char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated);
#ifdef __cplusplus