summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-07-28 23:59:32 +0300
committerGitHub <noreply@github.com>2022-07-28 23:59:32 +0300
commited52c959de2e0232170087a3e743562522c1ebfa (patch)
treef86c43d6c626bdecb579209b62e6ddb75f9f70f3 /aclk
parent9a6fd6366fafc5585b3fe9d2be2258df4738050d (diff)
Revert "Query queue only for queries" (#13452)
Revert "Query queue only for queries (#13431)" This reverts commit 221fd512873613a10b3d95b25a8a4d542b2c4801.
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c38
-rw-r--r--aclk/aclk.h11
-rw-r--r--aclk/aclk_alarm_api.c26
-rw-r--r--aclk/aclk_charts_api.c49
-rw-r--r--aclk/aclk_contexts_api.c14
-rw-r--r--aclk/aclk_query.c68
-rw-r--r--aclk/aclk_query.h2
-rw-r--r--aclk/aclk_query_queue.c35
-rw-r--r--aclk/aclk_query_queue.h36
-rw-r--r--aclk/aclk_rx_msgs.c17
-rw-r--r--aclk/aclk_stats.c23
-rw-r--r--aclk/aclk_stats.h3
-rw-r--r--aclk/schema-wrappers/context.cc4
-rw-r--r--aclk/schema-wrappers/context.h4
14 files changed, 253 insertions, 77 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index c546739014..7b3641b1e2 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -742,8 +742,8 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
}
if (ret < 0) {
// node_id not found
- size_t payload_len;
-
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
rrdhost_aclk_state_lock(localhost);
node_instance_creation_t node_instance_creation = {
.claim_id = localhost->aclk_state.claimed_id,
@@ -751,14 +751,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
.hostname = host->hostname,
.machine_guid = host->machine_guid
};
- char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
-
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
+ aclk_queue_query(create_query);
return;
}
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.hops = host->system_info->hops,
.live = cmd,
@@ -779,14 +781,15 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- size_t payload_len;
- char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
host->system_info->hops);
freez((void*)node_state_update.node_id);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
}
void aclk_send_node_instances()
@@ -799,6 +802,7 @@ void aclk_send_node_instances()
}
while (!uuid_is_null(list->host_id)) {
if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.live = list->live,
.hops = list->hops,
@@ -823,30 +827,34 @@ void aclk_send_node_instances()
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- size_t payload_len;
- char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
freez((void*)node_state_update.node_id);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
} else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
node_instance_creation_t node_instance_creation = {
.hops = list->hops,
.hostname = list->hostname,
};
node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
rrdhost_aclk_state_lock(localhost);
- node_instance_creation.claim_id = localhost->aclk_state.claimed_id;
- size_t payload_len;
- char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
freez(node_instance_creation.machine_guid);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
+ aclk_queue_query(create_query);
}
freez(list->hostname);
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 6a0f2ad7a3..5065ac2bfc 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -34,17 +34,6 @@ void aclk_send_node_instances(void);
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
-#define GENERATE_AND_SEND_PAYLOAD(topic, msg_name, generator_fnc, generator_data...) \
- size_t payload_len; \
- char *payload = generator_fnc(&payload_len, generator_data); \
- if (unlikely(payload == NULL)) { \
- error("Failed to generate payload (%s)", __FUNCTION__); \
- return; \
- } \
- aclk_send_bin_msg(payload, payload_len, topic, msg_name); \
- if (!use_mqtt_5) \
- freez(payload);
-
char *ng_aclk_state(void);
char *ng_aclk_state_json(void);
diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c
index 7e75bf38a5..a181eb291f 100644
--- a/aclk/aclk_alarm_api.c
+++ b/aclk/aclk_alarm_api.c
@@ -10,20 +10,38 @@
void aclk_send_alarm_log_health(struct alarm_log_health *log_health)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_HEALTH, "AlarmLogHealth", generate_alarm_log_health, log_health);
+ aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH);
+ query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health);
+ query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH;
+ query->data.bin_payload.msg_name = "AlarmLogHealth";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry", generate_alarm_log_entry, log_entry);
+ size_t payload_size;
+ char *payload = generate_alarm_log_entry(&payload_size, log_entry);
+
+ aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
+
+ if (!use_mqtt_5)
+ freez(payload);
}
void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_CONFIG, "ProvideAlarmConfiguration", generate_provide_alarm_configuration, cfg);
+ aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG);
+ query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg);
+ query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG;
+ query->data.bin_payload.msg_name = "ProvideAlarmConfiguration";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_SNAPSHOT, "AlarmSnapshot", generate_alarm_snapshot_bin, snapshot);
+ aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT);
+ query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot);
+ query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT;
+ query->data.bin_payload.msg_name = "AlarmSnapshot";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c
index a82ee8d881..51d8dad58c 100644
--- a/aclk/aclk_charts_api.c
+++ b/aclk/aclk_charts_api.c
@@ -3,46 +3,75 @@
#include "aclk_query_queue.h"
-#include "aclk.h"
-
#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated"
void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_updated, payloads, payload_sizes, new_positions);
+ aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
+ query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_chart_dimensions_updated, payloads, payload_sizes, new_positions);
+ aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
+ query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
+ query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_and_dimensions_updated, payloads, payload_sizes, is_dim, new_positions, batch_id);
+ aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
+ query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id);
+ query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_CONFIGS_UPDATED, "ChartConfigsUpdated", generate_chart_configs_updated, config_list, list_size);
+ aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED;
+ query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size);
+ query->data.bin_payload.msg_name = "ChartConfigsUpdated";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_chart_reset(chart_reset_t reset)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_RESET, "ResetChartMessages", generate_reset_chart_messages, reset);
+ aclk_query_t query = aclk_query_new(CHART_RESET);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET;
+ query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset);
+ query->data.bin_payload.msg_name = "ResetChartMessages";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_retention_updated(struct retention_updated *data)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_RETENTION_UPDATED, "RetentionUpdated", generate_retention_updated, data);
+ aclk_query_t query = aclk_query_new(RETENTION_UPDATED);
+ query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED;
+ query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data);
+ query->data.bin_payload.msg_name = "RetentionUpdated";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_update_node_info(struct update_node_info *info)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_INFO, "UpdateNodeInfo", generate_update_node_info_message, info);
+ aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO);
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO;
+ query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info);
+ query->data.bin_payload.msg_name = "UpdateNodeInfo";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_update_node_collectors(struct update_node_collectors *collectors)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_COLLECTORS, "UpdateNodeCollectors", generate_update_node_collectors_message, collectors);
+ aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS);
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS;
+ query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors);
+ query->data.bin_payload.msg_name = "UpdateNodeCollectors";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c
index a30d7a9d20..f17d3cabd9 100644
--- a/aclk/aclk_contexts_api.c
+++ b/aclk/aclk_contexts_api.c
@@ -4,14 +4,20 @@
#include "aclk_contexts_api.h"
-#include "aclk.h"
-
void aclk_send_contexts_snapshot(contexts_snapshot_t data)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_SNAPSHOT, "ContextsSnapshot", contexts_snapshot_2bin, data);
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT;
+ query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsSnapshot";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
void aclk_send_contexts_updated(contexts_updated_t data)
{
- GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_UPDATED, "ContextsUpdated", contexts_updated_2bin, data);
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED;
+ query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsUpdated";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
}
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index aa198e0aee..981c01965a 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -84,7 +84,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
- buffer_strcat(log_buffer, query->http_api_v2.query);
+ buffer_strcat(log_buffer, query->data.http_api_v2.query);
size_t size = 0;
size_t sent = 0;
w->tv_in = query->created_tv;
@@ -102,8 +102,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
RRDHOST *temp_host = NULL;
- if (!strncmp(query->http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
- char *node_uuid = query->http_api_v2.query + strlen(NODE_ID_QUERY);
+ if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
+ char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
error_report(CLOUD_EMSG_MALFORMED_NODE_ID);
@@ -127,14 +127,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
}
- char *mysep = strchr(query->http_api_v2.query, '?');
+ char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
*mysep = '\0';
} else
- url_decode_r(w->decoded_query_string, query->http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+ url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
- mysep = strrchr(query->http_api_v2.query, '/');
+ mysep = strrchr(query->data.http_api_v2.query, '/');
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
@@ -151,7 +151,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
- if ((start = strstr((char *)query->http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
+ if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
start += strlen(WEB_HDR_ACCEPT_ENC);
end = strstr(start, "\x0D\x0A");
start = strstr(start, "gzip");
@@ -256,16 +256,57 @@ cleanup:
return retval;
}
+static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
+{
+ // this will be simplified when legacy support is removed
+ aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
+ return 0;
+}
+
+const char *aclk_query_get_name(aclk_query_type_t qt)
+{
+ switch (qt) {
+ case HTTP_API_V2: return "http_api_request_v2";
+ case REGISTER_NODE: return "register_node";
+ case NODE_STATE_UPDATE: return "node_state_update";
+ case CHART_DIMS_UPDATE: return "chart_and_dim_update";
+ case CHART_CONFIG_UPDATED: return "chart_config_updated";
+ case CHART_RESET: return "reset_chart_messages";
+ case RETENTION_UPDATED: return "update_retention_info";
+ case UPDATE_NODE_INFO: return "update_node_info";
+ case ALARM_LOG_HEALTH: return "alarm_log_health";
+ case ALARM_PROVIDE_CFG: return "provide_alarm_config";
+ case ALARM_SNAPSHOT: return "alarm_snapshot";
+ case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
+ case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
+ default:
+ error_report("Unknown query type used %d", (int) qt);
+ return "unknown";
+ }
+}
+
static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
- worker_is_busy(0);
- debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
- http_api_v2(query_thr, query);
+ if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
+ error_report("Unknown query in query queue. %u", query->type);
+ aclk_query_free(query);
+ return;
+ }
+
+ worker_is_busy(query->type);
+ if (query->type == HTTP_API_V2) {
+ debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
+ http_api_v2(query_thr, query);
+ } else {
+ debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name);
+ send_bin_msg(query_thr, query);
+ }
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[query_thr->idx]++;
+ aclk_metrics_per_sample.queries_per_type[query->type]++;
ACLK_STATS_UNLOCK;
}
@@ -285,10 +326,11 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
return 0;
}
-static void worker_aclk_register(void)
-{
+static void worker_aclk_register(void) {
worker_register("ACLKQUERY");
- worker_register_job_name(0, "http query");
+ for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
+ worker_register_job_name(i, aclk_query_get_name(i));
+ }
}
/**
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
index 18618f16f4..f86754a2a6 100644
--- a/aclk/aclk_query.h
+++ b/aclk/aclk_query.h
@@ -31,4 +31,6 @@ struct aclk_query_threads {
void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
+const char *aclk_query_get_name(aclk_query_type_t qt);
+
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 794a8ca348..01b20d23f3 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -95,16 +95,41 @@ void aclk_queue_flush(void)
};
}
-aclk_query_t aclk_query_new()
+aclk_query_t aclk_query_new(aclk_query_type_t type)
{
- return callocz(1, sizeof(struct aclk_query));
+ aclk_query_t query = callocz(1, sizeof(struct aclk_query));
+ query->type = type;
+ return query;
}
void aclk_query_free(aclk_query_t query)
{
- freez(query->http_api_v2.payload);
- if (query->http_api_v2.query != query->dedup_id)
- freez(query->http_api_v2.query);
+ switch (query->type) {
+ case HTTP_API_V2:
+ freez(query->data.http_api_v2.payload);
+ if (query->data.http_api_v2.query != query->dedup_id)
+ freez(query->data.http_api_v2.query);
+ break;
+
+ case NODE_STATE_UPDATE:
+ case REGISTER_NODE:
+ case CHART_DIMS_UPDATE:
+ case CHART_CONFIG_UPDATED:
+ case CHART_RESET:
+ case RETENTION_UPDATED:
+ case UPDATE_NODE_INFO:
+ case ALARM_LOG_HEALTH:
+ case ALARM_PROVIDE_CFG:
+ case ALARM_SNAPSHOT:
+ case UPDATE_NODE_COLLECTORS:
+ case PROTO_BIN_MESSAGE:
+ if (!use_mqtt_5)
+ freez(query->data.bin_payload.payload);
+ break;
+
+ default:
+ break;
+ }
freez(query->dedup_id);
freez(query->callback_topic);
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index 514295807b..ab94b63848 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -9,6 +9,24 @@
#include "aclk_util.h"
+typedef enum {
+ UNKNOWN = 0,
+ HTTP_API_V2,
+ REGISTER_NODE,
+ NODE_STATE_UPDATE,
+ CHART_DIMS_UPDATE,
+ CHART_CONFIG_UPDATED,
+ CHART_RESET,
+ RETENTION_UPDATED,
+ UPDATE_NODE_INFO,
+ ALARM_LOG_HEALTH,
+ ALARM_PROVIDE_CFG,
+ ALARM_SNAPSHOT,
+ UPDATE_NODE_COLLECTORS,
+ PROTO_BIN_MESSAGE,
+ ACLK_QUERY_TYPE_COUNT // always keep this as last
+} aclk_query_type_t;
+
struct aclk_query_http_api_v2 {
char *payload;
char *query;
@@ -23,6 +41,8 @@ struct aclk_bin_payload {
typedef struct aclk_query *aclk_query_t;
struct aclk_query {
+ aclk_query_type_t type;
+
// dedup_id is used to deduplicate queries in the list
// if type and dedup_id is the same message is deduplicated
// set dedup_id to NULL to never deduplicate the message
@@ -39,11 +59,13 @@ struct aclk_query {
// TODO maybe remove?
int version;
-
- struct aclk_query_http_api_v2 http_api_v2;
+ union {
+ struct aclk_query_http_api_v2 http_api_v2;
+ struct aclk_bin_payload bin_payload;
+ } data;
};
-aclk_query_t aclk_query_new();
+aclk_query_t aclk_query_new(aclk_query_type_t type);
void aclk_query_free(aclk_query_t query);
int aclk_queue_query(aclk_query_t query);
@@ -53,4 +75,12 @@ void aclk_queue_flush(void);
void aclk_queue_lock(void);
void aclk_queue_unlock(void);
+#define QUEUE_IF_PAYLOAD_PRESENT(query) \
+ if (likely(query->data.bin_payload.payload)) { \
+ aclk_queue_query(query); \
+ } else { \
+ error("Failed to generate payload (%s)", __FUNCTION__); \
+ aclk_query_free(query); \
+ }
+
#endif /* NETDATA_ACLK_QUERY_QUEUE_H */
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 0e83dcac5a..e6ed332cc5 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -5,7 +5,6 @@
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk.h"
-#include "aclk_tx_msgs.h"
#include "schema-wrappers/proto_2_json.h"
@@ -132,14 +131,14 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
return 1;
}
- query = aclk_query_new();
+ query = aclk_query_new(HTTP_API_V2);
- if (unlikely(aclk_extract_v2_data(raw_payload, &query->http_api_v2.payload))) {
+ if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) {
error("Error extracting payload expected after the JSON dictionary.");
goto error;
}
- if (unlikely(aclk_v2_payload_get_query(query->http_api_v2.payload, &query->dedup_id))) {
+ if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) {
error("Could not extract payload from query");
goto error;
}
@@ -159,7 +158,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
query->timeout = cloud_to_agent->timeout;
// for clarity and code readability as when we process the request
// it would be strange to get URL from `dedup_id`
- query->http_api_v2.query = query->dedup_id;
+ query->data.http_api_v2.query = query->dedup_id;
query->msg_id = cloud_to_agent->msg_id;
aclk_queue_query(query);
return 0;
@@ -266,6 +265,7 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
update_node_id(&host_id, &node_id);
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.hops = 1,
.live = 0,
@@ -298,14 +298,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
};
node_state_update.capabilities = caps;
- size_t payload_len;
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- aclk_send_bin_msg(payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
freez(res.node_id);
freez(res.machine_guid);
return 0;
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index d5d629651c..241e9b724d 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -118,6 +118,28 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
+static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT];
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s",
+ "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
+ dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+
+ } else
+ rrdset_next(st);
+
+ for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
+ rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]);
+
+ rrdset_done(st);
+}
+
static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = {
"other",
"info",
@@ -329,6 +351,7 @@ void *aclk_stats_main_thread(void *ptr)
#endif
aclk_stats_cloud_req(&per_sample);
+ aclk_stats_cloud_req_type(&per_sample);
aclk_stats_cloud_req_http_type(&per_sample);
aclk_stats_query_threads(aclk_queries_per_thread_sample);
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index a45169cc71..bec9ac2476 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -51,6 +51,9 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_req_recvd;
volatile uint32_t cloud_req_err;
+ // query types.
+ volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT];
+
// HTTP-specific request types.
volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT];
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc
index 570cccc8b1..b04c9d20cc 100644
--- a/aclk/schema-wrappers/context.cc
+++ b/aclk/schema-wrappers/context.cc
@@ -57,7 +57,7 @@ void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct
fill_ctx_updated(ctx, ctx_update);
}
-char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot)
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
{
ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
*len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
@@ -109,7 +109,7 @@ void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct con
fill_ctx_updated(ctx, ctx_update);
}
-char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated)
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
{
ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
*len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h
index 5596b25b49..cbb7701a81 100644
--- a/aclk/schema-wrappers/context.h
+++ b/aclk/schema-wrappers/context.h
@@ -36,14 +36,14 @@ contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node
void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
-char *contexts_snapshot_2bin(size_t *len, contexts_snapshot_t ctxs_snapshot);
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
// ContextS Updated related
contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
void contexts_updated_delete(contexts_updated_t ctxs_updated);
void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
-char *contexts_updated_2bin(size_t *len, contexts_updated_t ctxs_updated);
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);