summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_rx_msgs.c
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-01-04 11:01:31 +0100
committerGitHub <noreply@github.com>2022-01-04 11:01:31 +0100
commitfa0045202fe4438fc090479913dc34338c3b5d11 (patch)
tree10f158f5bf3a63a77e4cb8c618b1267496fb7193 /aclk/aclk_rx_msgs.c
parent5736b4bcb179896280cb579ee1dbe0cfa464064c (diff)
Optimize rx msg name resolution (#11811)
* optimize rx msg name resolution
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r--aclk/aclk_rx_msgs.c383
1 files changed, 232 insertions, 151 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index abbfdca0ee..ee9085565f 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -261,180 +261,261 @@ err_cleanup_nojson:
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+typedef uint32_t simple_hash_t;
+typedef int(*rx_msg_handler)(const char *msg, size_t msg_len);
+
+int handle_old_proto_cmd(const char *msg, size_t msg_len)
{
- // TODO do the look up table with hashes to optimize when there are more
- // than few
- if (!strcmp(message_type, "cmd")) {
- // msg is binary payload in all other cases
- // however in this message from old legacy cloud
- // we have to convert it to C string
- char *str = mallocz(msg_len+1);
- memcpy(str, msg, msg_len);
- str[msg_len] = 0;
- aclk_handle_cloud_message(str);
- freez(str);
- return;
- }
- if (!strcmp(message_type, "CreateNodeInstanceResult")) {
- node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
- if (!res.machine_guid || !res.node_id) {
- error_report("Error parsing CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
+ // msg is binary payload in all other cases
+ // however in this message from old legacy cloud
+ // we have to convert it to C string
+ char *str = mallocz(msg_len+1);
+ memcpy(str, msg, msg_len);
+ str[msg_len] = 0;
+ aclk_handle_cloud_message(str);
+ freez(str);
+ return 0;
+}
- debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
+int create_node_instance_result(const char *msg, size_t msg_len)
+{
+ node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
+ if (!res.machine_guid || !res.node_id) {
+ error_report("Error parsing CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
+ }
- uuid_t host_id, node_id;
- if (uuid_parse(res.machine_guid, host_id)) {
- error("Error parsing machine_guid provided by CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
- if (uuid_parse(res.node_id, node_id)) {
- error("Error parsing node_id provided by CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
- update_node_id(&host_id, &node_id);
-
- aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
- rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
- rrdhost_aclk_state_unlock(localhost);
-
- RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
- query->data.node_update.live = 0;
-
- if (host) {
- // not all host must have RRDHOST struct created for them
- // if they never connected during runtime of agent
- if (host == localhost) {
- query->data.node_update.live = 1;
- query->data.node_update.hops = 0;
- } else {
- netdata_mutex_lock(&host->receiver_lock);
- query->data.node_update.live = (host->receiver != NULL);
- netdata_mutex_unlock(&host->receiver_lock);
- query->data.node_update.hops = host->system_info->hops;
- }
- }
+ debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
- query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- aclk_queue_query(query);
+ uuid_t host_id, node_id;
+ if (uuid_parse(res.machine_guid, host_id)) {
+ error("Error parsing machine_guid provided by CreateNodeInstanceResult");
freez(res.machine_guid);
- return;
+ freez(res.node_id);
+ return 1;
}
- if (!strcmp(message_type, "SendNodeInstances")) {
- debug(D_ACLK, "Got SendNodeInstances");
- aclk_send_node_instances();
- return;
+ if (uuid_parse(res.node_id, node_id)) {
+ error("Error parsing node_id provided by CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
}
-
- if (!strcmp(message_type, "StreamChartsAndDimensions")) {
- stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return;
+ update_node_id(&host_id, &node_id);
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+
+ RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
+ query->data.node_update.live = 0;
+
+ if (host) {
+ // not all host must have RRDHOST struct created for them
+ // if they never connected during runtime of agent
+ if (host == localhost) {
+ query->data.node_update.live = 1;
+ query->data.node_update.hops = 0;
+ } else {
+ netdata_mutex_lock(&host->receiver_lock);
+ query->data.node_update.live = (host->receiver != NULL);
+ netdata_mutex_unlock(&host->receiver_lock);
+ query->data.node_update.hops = host->system_info->hops;
}
- chart_batch_id = res.batch_id;
- aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ }
+
+ query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ freez(res.machine_guid);
+ return 0;
+}
+
+int send_node_instances(const char *msg, size_t msg_len)
+{
+ UNUSED(msg);
+ UNUSED(msg_len);
+ aclk_send_node_instances();
+ return 0;
+}
+
+int stream_charts_and_dimensions(const char *msg, size_t msg_len)
+{
+ stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
freez(res.claim_id);
freez(res.node_id);
- return;
+ return 1;
}
- if (!strcmp(message_type, "ChartsAndDimensionsAck")) {
- chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return;
- }
- aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ chart_batch_id = res.batch_id;
+ aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int charts_and_dimensions_ack(const char *msg, size_t msg_len)
+{
+ chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
freez(res.claim_id);
freez(res.node_id);
- return;
- }
- if (!strcmp(message_type, "UpdateChartConfigs")) {
- struct update_chart_config res = parse_update_chart_config(msg, msg_len);
- if (!res.claim_id || !res.node_id || !res.hashes)
- error("Error parsing UpdateChartConfigs msg");
- else
- aclk_get_chart_config(res.hashes);
- destroy_update_chart_config(&res);
- return;
+ return 1;
}
- if (!strcmp(message_type, "StartAlarmStreaming")) {
- struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
- if (!res.node_id || !res.batch_id) {
- error("Error parsing StartAlarmStreaming");
- freez(res.node_id);
- return;
- }
- aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int update_chart_configs(const char *msg, size_t msg_len)
+{
+ struct update_chart_config res = parse_update_chart_config(msg, msg_len);
+ if (!res.claim_id || !res.node_id || !res.hashes)
+ error("Error parsing UpdateChartConfigs msg");
+ else
+ aclk_get_chart_config(res.hashes);
+ destroy_update_chart_config(&res);
+ return 0;
+}
+
+int start_alarm_streaming(const char *msg, size_t msg_len)
+{
+ struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
+ if (!res.node_id || !res.batch_id) {
+ error("Error parsing StartAlarmStreaming");
freez(res.node_id);
- return;
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmLogHealth")) {
- char *node_id = parse_send_alarm_log_health(msg, msg_len);
- if (!node_id) {
- error("Error parsing SendAlarmLogHealth");
- return;
- }
- aclk_send_alarm_health_log(node_id);
- freez(node_id);
- return;
+ aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int send_alarm_log_health(const char *msg, size_t msg_len)
+{
+ char *node_id = parse_send_alarm_log_health(msg, msg_len);
+ if (!node_id) {
+ error("Error parsing SendAlarmLogHealth");
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmConfiguration")) {
- char *config_hash = parse_send_alarm_configuration(msg, msg_len);
- if (!config_hash || !*config_hash) {
- error("Error parsing SendAlarmConfiguration");
- freez(config_hash);
- return;
- }
- aclk_send_alarm_configuration(config_hash);
+ aclk_send_alarm_health_log(node_id);
+ freez(node_id);
+ return 0;
+}
+
+int send_alarm_configuration(const char *msg, size_t msg_len)
+{
+ char *config_hash = parse_send_alarm_configuration(msg, msg_len);
+ if (!config_hash || !*config_hash) {
+ error("Error parsing SendAlarmConfiguration");
freez(config_hash);
- return;
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmSnapshot")) {
- struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
- if (!sas->node_id || !sas->claim_id) {
- error("Error parsing SendAlarmSnapshot");
- destroy_send_alarm_snapshot(sas);
- return;
- }
- aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ aclk_send_alarm_configuration(config_hash);
+ freez(config_hash);
+ return 0;
+}
+
+int send_alarm_snapshot(const char *msg, size_t msg_len)
+{
+ struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
+ if (!sas->node_id || !sas->claim_id) {
+ error("Error parsing SendAlarmSnapshot");
destroy_send_alarm_snapshot(sas);
- return;
+ return 1;
}
- if (!strcmp(message_type, "DisconnectReq")) {
- struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
- if (!cmd)
- return;
- if (cmd->permaban) {
- error ("Cloud Banned This Agent!");
- aclk_disable_runtime = 1;
- }
- info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
- if (cmd->reconnect_after_s > 0) {
- aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
- info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s);
+ aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ destroy_send_alarm_snapshot(sas);
+ return 0;
+}
+
+int handle_disconnect_req(const char *msg, size_t msg_len)
+{
+ struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
+ if (!cmd)
+ return 1;
+ if (cmd->permaban) {
+ error("Cloud Banned This Agent!");
+ aclk_disable_runtime = 1;
+ }
+ info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
+ if (cmd->reconnect_after_s > 0) {
+ aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
+ info(
+ "Cloud asks not to reconnect for %u seconds. We shall honor that request",
+ (unsigned int)cmd->reconnect_after_s);
+ }
+ disconnect_req = 1;
+ freez(cmd->error_description);
+ freez(cmd);
+ return 0;
+}
+
+typedef struct {
+ const char *name;
+ simple_hash_t name_hash;
+ rx_msg_handler fnc;
+} new_cloud_rx_msg_t;
+
+new_cloud_rx_msg_t rx_msgs[] = {
+ { .name = "cmd", .name_hash = 0, .fnc = handle_old_proto_cmd },
+ { .name = "CreateNodeInstanceResult", .name_hash = 0, .fnc = create_node_instance_result },
+ { .name = "SendNodeInstances", .name_hash = 0, .fnc = send_node_instances },
+ { .name = "StreamChartsAndDimensions", .name_hash = 0, .fnc = stream_charts_and_dimensions },
+ { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack },
+ { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs },
+ { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming },
+ { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health },
+ { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
+ { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
+ { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
+ { .name = NULL, .name_hash = 0, .fnc = NULL },
+};
+
+new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash)
+{
+ // we can afford to not compare strings after hash match
+ // because we check for collisions at initialization in
+ // aclk_init_rx_msg_handlers()
+ for (int i = 0; rx_msgs[i].fnc; i++) {
+ if (rx_msgs[i].name_hash == hash)
+ return &rx_msgs[i];
+ }
+ return NULL;
+}
+
+void aclk_init_rx_msg_handlers(void)
+{
+ for (int i = 0; rx_msgs[i].fnc; i++) {
+ simple_hash_t hash = simple_hash(rx_msgs[i].name);
+ new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash);
+ if (unlikely(hdl)) {
+ // the list of message names changes only by changing
+ // the source code, therefore fatal is appropriate
+ fatal("Hash collision. Choose better hash. Added '%s' clashes with existing '%s'", rx_msgs[i].name, hdl->name);
}
- disconnect_req = 1;
- freez(cmd->error_description);
- freez(cmd);
+ rx_msgs[i].name_hash = hash;
+ }
+}
+
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+{
+ new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type));
+ debug(D_ACLK, "Got message named '%s' from cloud", message_type);
+ if (unlikely(!msg_descriptor)) {
+ error("Do not know how to handle message of type '%s'. Ignoring", message_type);
+ return;
+ }
+ if (msg_descriptor->fnc(msg, msg_len)) {
+ error("Error processing message of type '%s'", message_type);
return;
}
- error ("Unknown new cloud arch message type received \"%s\"", message_type);
}
#endif