diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-01-04 11:01:31 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-04 11:01:31 +0100 |
commit | fa0045202fe4438fc090479913dc34338c3b5d11 (patch) | |
tree | 10f158f5bf3a63a77e4cb8c618b1267496fb7193 /aclk | |
parent | 5736b4bcb179896280cb579ee1dbe0cfa464064c (diff) |
Optimize rx msg name resolution (#11811)
* optimize rx msg name resolution
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 4 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 383 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.h | 1 |
3 files changed, 237 insertions, 151 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index d5f7818c2f..8a2710aaea 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -763,6 +763,10 @@ void *aclk_main(void *ptr) return NULL; } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + aclk_init_rx_msg_handlers(); +#endif + // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. netdata_thread_disable_cancelability(); diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index abbfdca0ee..ee9085565f 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -261,180 +261,261 @@ err_cleanup_nojson: } #ifdef ENABLE_NEW_CLOUD_PROTOCOL -void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +typedef uint32_t simple_hash_t; +typedef int(*rx_msg_handler)(const char *msg, size_t msg_len); + +int handle_old_proto_cmd(const char *msg, size_t msg_len) { - // TODO do the look up table with hashes to optimize when there are more - // than few - if (!strcmp(message_type, "cmd")) { - // msg is binary payload in all other cases - // however in this message from old legacy cloud - // we have to convert it to C string - char *str = mallocz(msg_len+1); - memcpy(str, msg, msg_len); - str[msg_len] = 0; - aclk_handle_cloud_message(str); - freez(str); - return; - } - if (!strcmp(message_type, "CreateNodeInstanceResult")) { - node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); - if (!res.machine_guid || !res.node_id) { - error_report("Error parsing CreateNodeInstanceResult"); - freez(res.machine_guid); - freez(res.node_id); - return; - } + // msg is binary payload in all other cases + // however in this message from old legacy cloud + // we have to convert it to C string + char *str = mallocz(msg_len+1); + memcpy(str, msg, msg_len); + str[msg_len] = 0; + aclk_handle_cloud_message(str); + freez(str); + return 0; +} - debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); +int create_node_instance_result(const char *msg, size_t msg_len) +{ + node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); + if (!res.machine_guid || !res.node_id) { + error_report("Error parsing CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return 1; + } - uuid_t host_id, node_id; - if (uuid_parse(res.machine_guid, host_id)) { - error("Error parsing machine_guid provided by CreateNodeInstanceResult"); - freez(res.machine_guid); - freez(res.node_id); - return; - } - if (uuid_parse(res.node_id, node_id)) { - error("Error parsing node_id provided by CreateNodeInstanceResult"); - freez(res.machine_guid); - freez(res.node_id); - return; - } - update_node_id(&host_id, &node_id); - - aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); - query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded - rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); - rrdhost_aclk_state_unlock(localhost); - - RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); - query->data.node_update.live = 0; - - if (host) { - // not all host must have RRDHOST struct created for them - // if they never connected during runtime of agent - if (host == localhost) { - query->data.node_update.live = 1; - query->data.node_update.hops = 0; - } else { - netdata_mutex_lock(&host->receiver_lock); - query->data.node_update.live = (host->receiver != NULL); - netdata_mutex_unlock(&host->receiver_lock); - query->data.node_update.hops = host->system_info->hops; - } - } + debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); - query->data.node_update.node_id = res.node_id; // aclk_query_free will free it - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - aclk_queue_query(query); + uuid_t host_id, node_id; + if (uuid_parse(res.machine_guid, host_id)) { + error("Error parsing machine_guid provided by CreateNodeInstanceResult"); freez(res.machine_guid); - return; + freez(res.node_id); + return 1; } - if (!strcmp(message_type, "SendNodeInstances")) { - debug(D_ACLK, "Got SendNodeInstances"); - aclk_send_node_instances(); - return; + if (uuid_parse(res.node_id, node_id)) { + error("Error parsing node_id provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return 1; } - - if (!strcmp(message_type, "StreamChartsAndDimensions")) { - stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return; + update_node_id(&host_id, &node_id); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + query->data.node_update.live = 0; + + if (host) { + // not all host must have RRDHOST struct created for them + // if they never connected during runtime of agent + if (host == localhost) { + query->data.node_update.live = 1; + query->data.node_update.hops = 0; + } else { + netdata_mutex_lock(&host->receiver_lock); + query->data.node_update.live = (host->receiver != NULL); + netdata_mutex_unlock(&host->receiver_lock); + query->data.node_update.hops = host->system_info->hops; } - chart_batch_id = res.batch_id; - aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); + } + + query->data.node_update.node_id = res.node_id; // aclk_query_free will free it + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + freez(res.machine_guid); + return 0; +} + +int send_node_instances(const char *msg, size_t msg_len) +{ + UNUSED(msg); + UNUSED(msg_len); + aclk_send_node_instances(); + return 0; +} + +int stream_charts_and_dimensions(const char *msg, size_t msg_len) +{ + stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); freez(res.claim_id); freez(res.node_id); - return; + return 1; } - if (!strcmp(message_type, "ChartsAndDimensionsAck")) { - chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return; - } - aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); + chart_batch_id = res.batch_id; + aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); + freez(res.claim_id); + freez(res.node_id); + return 0; +} + +int charts_and_dimensions_ack(const char *msg, size_t msg_len) +{ + chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); freez(res.claim_id); freez(res.node_id); - return; - } - if (!strcmp(message_type, "UpdateChartConfigs")) { - struct update_chart_config res = parse_update_chart_config(msg, msg_len); - if (!res.claim_id || !res.node_id || !res.hashes) - error("Error parsing UpdateChartConfigs msg"); - else - aclk_get_chart_config(res.hashes); - destroy_update_chart_config(&res); - return; + return 1; } - if (!strcmp(message_type, "StartAlarmStreaming")) { - struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); - if (!res.node_id || !res.batch_id) { - error("Error parsing StartAlarmStreaming"); - freez(res.node_id); - return; - } - aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); + freez(res.claim_id); + freez(res.node_id); + return 0; +} + +int update_chart_configs(const char *msg, size_t msg_len) +{ + struct update_chart_config res = parse_update_chart_config(msg, msg_len); + if (!res.claim_id || !res.node_id || !res.hashes) + error("Error parsing UpdateChartConfigs msg"); + else + aclk_get_chart_config(res.hashes); + destroy_update_chart_config(&res); + return 0; +} + +int start_alarm_streaming(const char *msg, size_t msg_len) +{ + struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); + if (!res.node_id || !res.batch_id) { + error("Error parsing StartAlarmStreaming"); freez(res.node_id); - return; + return 1; } - if (!strcmp(message_type, "SendAlarmLogHealth")) { - char *node_id = parse_send_alarm_log_health(msg, msg_len); - if (!node_id) { - error("Error parsing SendAlarmLogHealth"); - return; - } - aclk_send_alarm_health_log(node_id); - freez(node_id); - return; + aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + freez(res.node_id); + return 0; +} + +int send_alarm_log_health(const char *msg, size_t msg_len) +{ + char *node_id = parse_send_alarm_log_health(msg, msg_len); + if (!node_id) { + error("Error parsing SendAlarmLogHealth"); + return 1; } - if (!strcmp(message_type, "SendAlarmConfiguration")) { - char *config_hash = parse_send_alarm_configuration(msg, msg_len); - if (!config_hash || !*config_hash) { - error("Error parsing SendAlarmConfiguration"); - freez(config_hash); - return; - } - aclk_send_alarm_configuration(config_hash); + aclk_send_alarm_health_log(node_id); + freez(node_id); + return 0; +} + +int send_alarm_configuration(const char *msg, size_t msg_len) +{ + char *config_hash = parse_send_alarm_configuration(msg, msg_len); + if (!config_hash || !*config_hash) { + error("Error parsing SendAlarmConfiguration"); freez(config_hash); - return; + return 1; } - if (!strcmp(message_type, "SendAlarmSnapshot")) { - struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); - if (!sas->node_id || !sas->claim_id) { - error("Error parsing SendAlarmSnapshot"); - destroy_send_alarm_snapshot(sas); - return; - } - aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + aclk_send_alarm_configuration(config_hash); + freez(config_hash); + return 0; +} + +int send_alarm_snapshot(const char *msg, size_t msg_len) +{ + struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); + if (!sas->node_id || !sas->claim_id) { + error("Error parsing SendAlarmSnapshot"); destroy_send_alarm_snapshot(sas); - return; + return 1; } - if (!strcmp(message_type, "DisconnectReq")) { - struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); - if (!cmd) - return; - if (cmd->permaban) { - error ("Cloud Banned This Agent!"); - aclk_disable_runtime = 1; - } - info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); - if (cmd->reconnect_after_s > 0) { - aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; - info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s); + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + destroy_send_alarm_snapshot(sas); + return 0; +} + +int handle_disconnect_req(const char *msg, size_t msg_len) +{ + struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); + if (!cmd) + return 1; + if (cmd->permaban) { + error("Cloud Banned This Agent!"); + aclk_disable_runtime = 1; + } + info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); + if (cmd->reconnect_after_s > 0) { + aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; + info( + "Cloud asks not to reconnect for %u seconds. We shall honor that request", + (unsigned int)cmd->reconnect_after_s); + } + disconnect_req = 1; + freez(cmd->error_description); + freez(cmd); + return 0; +} + +typedef struct { + const char *name; + simple_hash_t name_hash; + rx_msg_handler fnc; +} new_cloud_rx_msg_t; + +new_cloud_rx_msg_t rx_msgs[] = { + { .name = "cmd", .name_hash = 0, .fnc = handle_old_proto_cmd }, + { .name = "CreateNodeInstanceResult", .name_hash = 0, .fnc = create_node_instance_result }, + { .name = "SendNodeInstances", .name_hash = 0, .fnc = send_node_instances }, + { .name = "StreamChartsAndDimensions", .name_hash = 0, .fnc = stream_charts_and_dimensions }, + { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack }, + { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs }, + { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming }, + { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health }, + { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, + { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, + { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, + { .name = NULL, .name_hash = 0, .fnc = NULL }, +}; + +new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash) +{ + // we can afford to not compare strings after hash match + // because we check for collisions at initialization in + // aclk_init_rx_msg_handlers() + for (int i = 0; rx_msgs[i].fnc; i++) { + if (rx_msgs[i].name_hash == hash) + return &rx_msgs[i]; + } + return NULL; +} + +void aclk_init_rx_msg_handlers(void) +{ + for (int i = 0; rx_msgs[i].fnc; i++) { + simple_hash_t hash = simple_hash(rx_msgs[i].name); + new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash); + if (unlikely(hdl)) { + // the list of message names changes only by changing + // the source code, therefore fatal is appropriate + fatal("Hash collision. Choose better hash. Added '%s' clashes with existing '%s'", rx_msgs[i].name, hdl->name); } - disconnect_req = 1; - freez(cmd->error_description); - freez(cmd); + rx_msgs[i].name_hash = hash; + } +} + +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +{ + new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type)); + debug(D_ACLK, "Got message named '%s' from cloud", message_type); + if (unlikely(!msg_descriptor)) { + error("Do not know how to handle message of type '%s'. Ignoring", message_type); + return; + } + if (msg_descriptor->fnc(msg, msg_len)) { + error("Error processing message of type '%s'", message_type); return; } - error ("Unknown new cloud arch message type received \"%s\"", message_type); } #endif diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index 074dc004ae..9e7377e9e1 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -11,6 +11,7 @@ int aclk_handle_cloud_message(char *payload); #ifdef ENABLE_NEW_CLOUD_PROTOCOL +void aclk_init_rx_msg_handlers(void); void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len); #endif |