summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-07-28 23:59:32 +0300
committerGitHub <noreply@github.com>2022-07-28 23:59:32 +0300
commited52c959de2e0232170087a3e743562522c1ebfa (patch)
treef86c43d6c626bdecb579209b62e6ddb75f9f70f3 /aclk/aclk.c
parent9a6fd6366fafc5585b3fe9d2be2258df4738050d (diff)
Revert "Query queue only for queries" (#13452)
Revert "Query queue only for queries (#13431)" This reverts commit 221fd512873613a10b3d95b25a8a4d542b2c4801.
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c38
1 files changed, 23 insertions, 15 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index c546739014..7b3641b1e2 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -742,8 +742,8 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
}
if (ret < 0) {
// node_id not found
- size_t payload_len;
-
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
rrdhost_aclk_state_lock(localhost);
node_instance_creation_t node_instance_creation = {
.claim_id = localhost->aclk_state.claimed_id,
@@ -751,14 +751,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
.hostname = host->hostname,
.machine_guid = host->machine_guid
};
- char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
-
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
+ aclk_queue_query(create_query);
return;
}
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.hops = host->system_info->hops,
.live = cmd,
@@ -779,14 +781,15 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- size_t payload_len;
- char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
host->system_info->hops);
freez((void*)node_state_update.node_id);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
}
void aclk_send_node_instances()
@@ -799,6 +802,7 @@ void aclk_send_node_instances()
}
while (!uuid_is_null(list->host_id)) {
if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.live = list->live,
.hops = list->hops,
@@ -823,30 +827,34 @@ void aclk_send_node_instances()
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
- size_t payload_len;
- char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
freez((void*)node_state_update.node_id);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
} else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
node_instance_creation_t node_instance_creation = {
.hops = list->hops,
.hostname = list->hostname,
};
node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
rrdhost_aclk_state_lock(localhost);
- node_instance_creation.claim_id = localhost->aclk_state.claimed_id;
- size_t payload_len;
- char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
freez(node_instance_creation.machine_guid);
- aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
+ aclk_queue_query(create_query);
}
freez(list->hostname);