diff options
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r-- | aclk/aclk_rx_msgs.c | 61 |
1 files changed, 59 insertions, 2 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index ef83461a35..26e8fdc51b 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -7,7 +7,7 @@ #include "aclk.h" #define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" -#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/" +#define ACLK_CLOUD_REQ_V2_PREFIX "GET /" #define ACLK_V_COMPRESSION 2 @@ -91,6 +91,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur { const char *start, *end; + // TODO better check of URL if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { errno = 0; error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); @@ -120,7 +121,9 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { - HTTP_CHECK_AGENT_INITIALIZED(); + if (!aclk_use_new_cloud_arch) { + HTTP_CHECK_AGENT_INITIALIZED(); + } aclk_query_t query; @@ -256,3 +259,57 @@ err_cleanup_nojson: return 1; } + +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +{ + // TODO do the look up table with hashes to optimize when there are more + // than few + if (!strcmp(message_type, "cmd")) { + aclk_handle_cloud_message((char *)msg); + return; + } + if (!strcmp(message_type, "CreateNodeInstanceResult")) { + node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); + debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); + uuid_t host_id, node_id; + uuid_parse(res.machine_guid, host_id); + uuid_parse(res.node_id, node_id); + update_node_id(&host_id, &node_id); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + query->data.node_update.live = 0; + + if (host) { + // not all host must have RRDHOST struct created for them + // if they never connected during runtime of agent + if (host == localhost) { + query->data.node_update.live = 1; + query->data.node_update.hops = 0; + } else { + netdata_mutex_lock(&host->receiver_lock); + query->data.node_update.live = (host->receiver != NULL); + netdata_mutex_unlock(&host->receiver_lock); + } + } + + query->data.node_update.node_id = res.node_id; // aclk_query_free will free it + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + freez(res.machine_guid); + return; + } + if (!strcmp(message_type, "SendNodeInstances")) { + debug(D_ACLK, "Got SendNodeInstances"); + aclk_send_node_instances(); + return; + } + + error ("Unknown new cloud arch message type received \"%s\"", message_type); +} |