summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2021-10-06 20:55:31 +0300
committerGitHub <noreply@github.com>2021-10-06 20:55:31 +0300
commit12f16063f51e51a8e3c0e0ae727a634258b95219 (patch)
tree24a7eae9d61f6f5009a9d6d1e1b5d7e2870d64ca /aclk
parentaf93cc31eda9a2b7058c4b02d8f984331e5f544b (diff)
Enable additional functionality for the new cloud architecture (#11579)
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c6
-rw-r--r--aclk/aclk_charts_api.c4
-rw-r--r--aclk/aclk_rx_msgs.c67
-rw-r--r--aclk/schema-wrappers/alarm_stream.cc4
-rw-r--r--aclk/schema-wrappers/chart_stream.cc2
5 files changed, 76 insertions, 7 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index cc17a21650..6e89facd81 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -973,7 +973,7 @@ void ng_aclk_host_state_update(RRDHOST *host, int cmd)
rrdhost_aclk_state_lock(localhost);
create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = 1; //TODO - real hop count instead of hardcoded
+ create_query->data.node_creation.hops = (uint32_t) host->system_info->hops;
create_query->data.node_creation.hostname = strdupz(host->hostname);
create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
aclk_queue_query(create_query);
@@ -981,7 +981,7 @@ void ng_aclk_host_state_update(RRDHOST *host, int cmd)
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ query->data.node_update.hops = (uint32_t) host->system_info->hops;
rrdhost_aclk_state_lock(localhost);
query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
rrdhost_aclk_state_unlock(localhost);
@@ -1020,7 +1020,7 @@ void aclk_send_node_instances()
rrdhost_aclk_state_lock(localhost);
create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
rrdhost_aclk_state_unlock(localhost);
- create_query->data.node_creation.hops = uuid_compare(list->host_id, localhost->host_uuid) ? 1 : 0; // TODO - when streaming supports hops
+ create_query->data.node_creation.hops = list->hops;
create_query->data.node_creation.hostname = list->hostname;
create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c
index 4d5b353b9d..4e1c466e87 100644
--- a/aclk/aclk_charts_api.c
+++ b/aclk/aclk_charts_api.c
@@ -16,6 +16,7 @@ void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_
void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
{
aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
QUEUE_IF_PAYLOAD_PRESENT(query);
@@ -24,6 +25,7 @@ void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_m
void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
{
aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id);
query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
QUEUE_IF_PAYLOAD_PRESENT(query);
@@ -32,6 +34,7 @@ void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int
void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size)
{
aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED;
query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size);
query->data.bin_payload.msg_name = "ChartConfigsUpdated";
QUEUE_IF_PAYLOAD_PRESENT(query);
@@ -40,6 +43,7 @@ void aclk_chart_config_updated(struct chart_config_updated *config_list, int lis
void aclk_chart_reset(chart_reset_t reset)
{
aclk_query_t query = aclk_query_new(CHART_RESET);
+ query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET;
query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset);
query->data.bin_payload.msg_name = "ResetChartMessages";
QUEUE_IF_PAYLOAD_PRESENT(query);
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 10378e5309..5bb5d1ff7a 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -330,6 +330,73 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
return;
}
+ if (!strcmp(message_type, "StreamChartsAndDimensions")) {
+ stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ chart_batch_id = res.batch_id;
+ aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ if (!strcmp(message_type, "ChartsAndDimensionsAck")) {
+ chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return;
+ }
+ if (!strcmp(message_type, "UpdateChartConfigs")) {
+ struct update_chart_config res = parse_update_chart_config(msg, msg_len);
+ if (!res.claim_id || !res.node_id || !res.hashes)
+ error("Error parsing UpdateChartConfigs msg");
+ else
+ aclk_get_chart_config(res.hashes);
+ destroy_update_chart_config(&res);
+ return;
+ }
+ if (!strcmp(message_type, "StartAlarmStreaming")) {
+ struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
+ if (!res.node_id || !res.batch_id) {
+ error("Error parsing StartAlarmStreaming");
+ freez(res.node_id);
+ return;
+ }
+ aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ freez(res.node_id);
+ return;
+ }
+ if (!strcmp(message_type, "SendAlarmLogHealth")) {
+ char *node_id = parse_send_alarm_log_health(msg, msg_len);
+ if (!node_id) {
+ error("Error parsing SendAlarmLogHealth");
+ return;
+ }
+ aclk_send_alarm_health_log(node_id);
+ freez(node_id);
+ return;
+ }
+ if (!strcmp(message_type, "SendAlarmConfiguration")) {
+ char *config_hash = parse_send_alarm_configuration(msg, msg_len);
+ if (!config_hash || !*config_hash) {
+ error("Error parsing SendAlarmConfiguration");
+ return;
+ }
+ aclk_send_alarm_configuration(config_hash);
+ freez(config_hash);
+ return;
+ }
error ("Unknown new cloud arch message type received \"%s\"", message_type);
}
#endif
diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc
index c74e053675..98d9d842c8 100644
--- a/aclk/schema-wrappers/alarm_stream.cc
+++ b/aclk/schema-wrappers/alarm_stream.cc
@@ -97,8 +97,8 @@ static alarmstream::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_s
void destroy_alarm_log_entry(struct alarm_log_entry *entry)
{
- freez(entry->node_id);
- freez(entry->claim_id);
+ //freez(entry->node_id);
+ //freez(entry->claim_id);
freez(entry->chart);
freez(entry->name);
diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc
index 8a43a03752..7d820e533b 100644
--- a/aclk/schema-wrappers/chart_stream.cc
+++ b/aclk/schema-wrappers/chart_stream.cc
@@ -75,8 +75,6 @@ void chart_instance_updated_destroy(struct chart_instance_updated *instance)
{
freez((char*)instance->id);
freez((char*)instance->claim_id);
- freez((char*)instance->node_id);
- freez((char*)instance->name);
free_label_list(instance->label_head);