diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2022-07-28 23:59:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-28 23:59:32 +0300 |
commit | ed52c959de2e0232170087a3e743562522c1ebfa (patch) | |
tree | f86c43d6c626bdecb579209b62e6ddb75f9f70f3 /aclk/aclk.c | |
parent | 9a6fd6366fafc5585b3fe9d2be2258df4738050d (diff) |
Revert "Query queue only for queries" (#13452)
Revert "Query queue only for queries (#13431)"
This reverts commit 221fd512873613a10b3d95b25a8a4d542b2c4801.
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 38 |
1 files changed, 23 insertions, 15 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index c546739014..7b3641b1e2 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -742,8 +742,8 @@ void aclk_host_state_update(RRDHOST *host, int cmd) } if (ret < 0) { // node_id not found - size_t payload_len; - + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); rrdhost_aclk_state_lock(localhost); node_instance_creation_t node_instance_creation = { .claim_id = localhost->aclk_state.claimed_id, @@ -751,14 +751,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd) .hostname = host->hostname, .machine_guid = host->machine_guid }; - char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation); + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); + aclk_queue_query(create_query); return; } + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .hops = host->system_info->hops, .live = cmd, @@ -779,14 +781,15 @@ void aclk_host_state_update(RRDHOST *host, int cmd) rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - size_t payload_len; - char *payload = generate_node_instance_connection(&payload_len, &node_state_update); + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); freez((void*)node_state_update.node_id); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); } void aclk_send_node_instances() @@ -799,6 +802,7 @@ void aclk_send_node_instances() } while (!uuid_is_null(list->host_id)) { if (!uuid_is_null(list->node_id)) { + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); node_instance_connection_t node_state_update = { .live = list->live, .hops = list->hops, @@ -823,30 +827,34 @@ void aclk_send_node_instances() rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; - size_t payload_len; - char *payload = generate_node_instance_connection(&payload_len, &node_state_update); + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); freez((void*)node_state_update.node_id); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); } else { + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); node_instance_creation_t node_instance_creation = { .hops = list->hops, .hostname = list->hostname, }; node_instance_creation.machine_guid = mallocz(UUID_STR_LEN); uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; rrdhost_aclk_state_lock(localhost); - node_instance_creation.claim_id = localhost->aclk_state.claimed_id; - size_t payload_len; - char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation); + node_instance_creation.claim_id = localhost->aclk_state.claimed_id, + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); freez(node_instance_creation.machine_guid); - aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); + aclk_queue_query(create_query); } freez(list->hostname); |