diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2021-08-06 13:50:19 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-06 13:50:19 +0200 |
commit | 14ce65525275fa8760fa4f20e10b5a4e5c34ccde (patch) | |
tree | 99c387d8a093b05cc7a89a1fad81514924bdccaa | |
parent | 597763dd0ae60dd89af211267a17644e00114537 (diff) |
New Cloud chart related parsers and generators (#11393)
* adds message generators parsers and handlers for upcoming Chart stream implementation
-rw-r--r-- | CMakeLists.txt | 1 | ||||
-rw-r--r-- | Makefile.am | 38 | ||||
m--------- | aclk/aclk-schemas | 0 | ||||
-rw-r--r-- | aclk/aclk_charts_api.c | 61 | ||||
-rw-r--r-- | aclk/aclk_charts_api.h | 18 | ||||
-rw-r--r-- | aclk/aclk_query.c | 29 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 35 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 16 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 2 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.h | 3 | ||||
-rw-r--r-- | aclk/aclk_util.c | 28 | ||||
-rw-r--r-- | aclk/aclk_util.h | 24 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_config.cc | 105 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_config.h | 50 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_stream.cc | 344 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_stream.h | 121 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrapper_utils.cc | 15 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrapper_utils.h | 7 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 2 |
19 files changed, 860 insertions, 39 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index b4425e6fc8..0ba7caf17e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -805,6 +805,7 @@ set(ACLK_NG_FILES aclk/schema-wrappers/node_creation.cc aclk/schema-wrappers/node_creation.h aclk/schema-wrappers/schema_wrappers.h + aclk/schema-wrappers/schema_wrapper_utils.cc aclk/schema-wrappers/schema_wrapper_utils.h ) diff --git a/Makefile.am b/Makefile.am index e03261b7ed..690b03b35d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -567,6 +567,8 @@ ACLK_NG_FILES = \ aclk/aclk_rx_msgs.h \ aclk/https_client.c \ aclk/https_client.h \ + aclk/aclk_charts_api.c \ + aclk/aclk_charts_api.h \ mqtt_websockets/src/mqtt_wss_client.c \ mqtt_websockets/src/include/mqtt_wss_client.h \ mqtt_websockets/src/mqtt_wss_log.c \ @@ -584,7 +586,12 @@ ACLK_NG_FILES = \ aclk/schema-wrappers/node_connection.h \ aclk/schema-wrappers/node_creation.cc \ aclk/schema-wrappers/node_creation.h \ + aclk/schema-wrappers/chart_stream.cc \ + aclk/schema-wrappers/chart_stream.h \ + aclk/schema-wrappers/chart_config.cc \ + aclk/schema-wrappers/chart_config.h \ aclk/schema-wrappers/schema_wrappers.h \ + aclk/schema-wrappers/schema_wrapper_utils.cc \ aclk/schema-wrappers/schema_wrapper_utils.h \ $(NULL) @@ -594,10 +601,21 @@ ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \ aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h \ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h \ + aclk/aclk-schemas/proto/chart/v1/stream.pb.cc \ + aclk/aclk-schemas/proto/chart/v1/stream.pb.h \ + aclk/aclk-schemas/proto/chart/v1/instance.pb.cc \ + aclk/aclk-schemas/proto/chart/v1/instance.pb.h \ + aclk/aclk-schemas/proto/chart/v1/dimension.pb.cc \ + aclk/aclk-schemas/proto/chart/v1/dimension.pb.h \ + aclk/aclk-schemas/proto/chart/v1/config.pb.cc \ + aclk/aclk-schemas/proto/chart/v1/config.pb.h \ + aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \ + aclk/aclk-schemas/proto/aclk/v1/lib.pb.h \ $(NULL) BUILT_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES) nodist_netdata_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES) +CLEANFILES += $(ACLK_NG_PROTO_BUILT_FILES) aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \ aclk/aclk-schemas/proto/agent/v1/connection.pb.h: aclk/aclk-schemas/proto/agent/v1/connection.proto @@ -611,6 +629,26 @@ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h: aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ +aclk/aclk-schemas/proto/chart/v1/stream.pb.cc \ +aclk/aclk-schemas/proto/chart/v1/stream.pb.h: aclk/aclk-schemas/proto/chart/v1/stream.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/chart/v1/instance.pb.cc \ +aclk/aclk-schemas/proto/chart/v1/instance.pb.h: aclk/aclk-schemas/proto/chart/v1/instance.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/chart/v1/dimension.pb.cc \ +aclk/aclk-schemas/proto/chart/v1/dimension.pb.h: aclk/aclk-schemas/proto/chart/v1/dimension.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/chart/v1/config.pb.cc \ +aclk/aclk-schemas/proto/chart/v1/config.pb.h: aclk/aclk-schemas/proto/chart/v1/config.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + +aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \ +aclk/aclk-schemas/proto/aclk/v1/lib.pb.h: aclk/aclk-schemas/proto/aclk/v1/lib.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + endif #ACLK_NG if ENABLE_ACLK diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas -Subproject b5fef3f3a84e6a5013b36b906f4677012c73441 +Subproject a0adf5b1e026ee8339d56cfa27af95bb26b5317 diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c new file mode 100644 index 0000000000..9dabf82041 --- /dev/null +++ b/aclk/aclk_charts_api.c @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "aclk_charts_api.h" + +#include "aclk_query_queue.h" + +#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated" + +void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) +{ + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + if (query->data.bin_payload.payload) + aclk_queue_query(query); +} + +void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) +{ + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + if (query->data.bin_payload.payload) + aclk_queue_query(query); +} + +void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) +{ + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + if (query->data.bin_payload.payload) + aclk_queue_query(query); +} + +void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size) +{ + aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED); + query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size); + query->data.bin_payload.msg_name = "ChartConfigsUpdated"; + if (query->data.bin_payload.payload) + aclk_queue_query(query); +} + +void aclk_chart_reset(chart_reset_t reset) +{ + aclk_query_t query = aclk_query_new(CHART_RESET); + query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset); + query->data.bin_payload.msg_name = "ResetChartMessages"; + if (query->data.bin_payload.payload) + aclk_queue_query(query); +} + +void aclk_retention_updated(struct retention_updated *data) +{ + aclk_query_t query = aclk_query_new(RETENTION_UPDATED); + query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED; + query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data); + query->data.bin_payload.msg_name = "RetentionUpdated"; + if (query->data.bin_payload.payload) + aclk_queue_query(query); +} diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h new file mode 100644 index 0000000000..c2db9d4979 --- /dev/null +++ b/aclk/aclk_charts_api.h @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_CHARTS_H +#define ACLK_CHARTS_H + +#include "../daemon/common.h" +#include "schema-wrappers/schema_wrappers.h" + +void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); +void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); +void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id); + +void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size); + +void aclk_chart_reset(chart_reset_t reset); + +void aclk_retention_updated(struct retention_updated *data); + +#endif /* ACLK_CHARTS_H */ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 07b3537b01..99af385c40 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -273,16 +273,27 @@ static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t q return 0; } +static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) +{ + // this will be simplified when legacy support is removed + aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); + return 0; +} + aclk_query_handler aclk_query_handlers[] = { - { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, - { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, - { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata }, - { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, - { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, - { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, - { .type = REGISTER_NODE, .name = "register node", .fnc = register_node }, - { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update }, - { .type = UNKNOWN, .name = NULL, .fnc = NULL } + { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, + { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, + { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata }, + { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, + { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, + { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, + { .type = REGISTER_NODE, .name = "register node", .fnc = register_node }, + { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update }, + { .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg }, + { .type = CHART_CONFIG_UPDATED, .name = "chart config updated", .fnc = send_bin_msg }, + { .type = CHART_RESET, .name = "reset chart messages", .fnc = send_bin_msg }, + { .type = RETENTION_UPDATED, .name = "update retention info", .fnc = send_bin_msg }, + { .type = UNKNOWN, .name = NULL, .fnc = NULL } }; diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 1aefdd7711..10db9ec4f3 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -139,27 +139,42 @@ aclk_query_t aclk_query_new(aclk_query_type_t type) void aclk_query_free(aclk_query_t query) { - if (query->type == HTTP_API_V2) { + switch (query->type) { + case HTTP_API_V2: freez(query->data.http_api_v2.payload); if (query->data.http_api_v2.query != query->dedup_id) freez(query->data.http_api_v2.query); - } + break; - if (query->type == CHART_NEW) + case CHART_NEW: freez(query->data.chart_add_del.chart_name); - - if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update) - json_object_put(query->data.alarm_update); - - if (query->type == NODE_STATE_UPDATE) { + break; + + case ALARM_STATE_UPDATE: + if (query->data.alarm_update) + json_object_put(query->data.alarm_update); + break; + + case NODE_STATE_UPDATE: freez((void*)query->data.node_update.claim_id); freez((void*)query->data.node_update.node_id); - } + break; - if (query->type == REGISTER_NODE) { + case REGISTER_NODE: freez((void*)query->data.node_creation.claim_id); freez((void*)query->data.node_creation.hostname); freez((void*)query->data.node_creation.machine_guid); + break; + + case CHART_DIMS_UPDATE: + case CHART_CONFIG_UPDATED: + case CHART_RESET: + case RETENTION_UPDATED: + freez(query->data.bin_payload.payload); + break; + + default: + break; } freez(query->dedup_id); diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 58d5dd0407..dd4f85750a 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -7,6 +7,8 @@ #include "daemon/common.h" #include "schema-wrappers/schema_wrappers.h" +#include "aclk_util.h" + typedef enum { UNKNOWN, METADATA_INFO, @@ -16,7 +18,11 @@ typedef enum { CHART_DEL, ALARM_STATE_UPDATE, REGISTER_NODE, - NODE_STATE_UPDATE + NODE_STATE_UPDATE, + CHART_DIMS_UPDATE, + CHART_CONFIG_UPDATED, + CHART_RESET, + RETENTION_UPDATED } aclk_query_type_t; struct aclk_query_metadata { @@ -34,6 +40,13 @@ struct aclk_query_http_api_v2 { char *query; }; +struct aclk_bin_payload { + char *payload; + size_t size; + enum aclk_topics topic; + const char *msg_name; +}; + typedef struct aclk_query *aclk_query_t; struct aclk_query { aclk_query_type_t type; @@ -61,6 +74,7 @@ struct aclk_query { struct aclk_query_chart_add_del chart_add_del; node_instance_creation_t node_creation; node_instance_connection_t node_update; + struct aclk_bin_payload bin_payload; json_object *alarm_update; } data; }; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 2a0fdd5e33..0a69808562 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -36,7 +36,7 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, #endif } -static uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { #ifndef ACLK_LOG_CONVERSATION_DIR UNUSED(msgname); diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index e4445f4427..4b661049fe 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -7,6 +7,9 @@ #include "daemon/common.h" #include "mqtt_wss_client.h" #include "schema-wrappers/schema_wrappers.h" +#include "aclk_util.h" + +uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index 2f0035d825..5b964fea61 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -13,6 +13,8 @@ int aclk_use_new_cloud_arch = 0; usec_t aclk_session_newarch = 0; +int chart_batch_id; + aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) { if (!strcmp(str, "json")) { return ACLK_ENC_JSON; @@ -110,15 +112,19 @@ struct topic_name { // in answer to /password endpoint const char *name; } topic_names[] = { - { .id = ACLK_TOPICID_CHART, .name = "chart" }, - { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, - { .id = ACLK_TOPICID_METADATA, .name = "meta" }, - { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, - { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" }, - { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" }, - { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" }, - { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" }, - { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } + { .id = ACLK_TOPICID_CHART, .name = "chart" }, + { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, + { .id = ACLK_TOPICID_METADATA, .name = "meta" }, + { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, + { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" }, + { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" }, + { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" }, + { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" }, + { .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" }, + { .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" }, + { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" }, + { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" }, + { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } }; enum aclk_topics compulsory_topics_legacy[] = { @@ -139,6 +145,10 @@ enum aclk_topics compulsory_topics_new_cloud_arch[] = { ACLK_TOPICID_CMD_NG_V1, ACLK_TOPICID_CREATE_NODE, ACLK_TOPICID_NODE_CONN, + ACLK_TOPICID_CHART_DIMS, + ACLK_TOPICID_CHART_CONFIGS_UPDATED, + ACLK_TOPICID_CHART_RESET, + ACLK_TOPICID_RETENTION_UPDATED, ACLK_TOPICID_UNKNOWN }; diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 04897be809..6ce0074338 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -11,6 +11,8 @@ extern int aclk_use_new_cloud_arch; extern usec_t aclk_session_newarch; +extern int chart_batch_id; + typedef enum { ACLK_ENC_UNKNOWN = 0, ACLK_ENC_JSON, @@ -54,15 +56,19 @@ void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc); void aclk_env_t_destroy(aclk_env_t *env); enum aclk_topics { - ACLK_TOPICID_UNKNOWN = 0, - ACLK_TOPICID_CHART = 1, - ACLK_TOPICID_ALARMS = 2, - ACLK_TOPICID_METADATA = 3, - ACLK_TOPICID_COMMAND = 4, - ACLK_TOPICID_AGENT_CONN = 5, - ACLK_TOPICID_CMD_NG_V1 = 6, - ACLK_TOPICID_CREATE_NODE = 7, - ACLK_TOPICID_NODE_CONN = 8 + ACLK_TOPICID_UNKNOWN = 0, + ACLK_TOPICID_CHART = 1, + ACLK_TOPICID_ALARMS = 2, + ACLK_TOPICID_METADATA = 3, + ACLK_TOPICID_COMMAND = 4, + ACLK_TOPICID_AGENT_CONN = 5, + ACLK_TOPICID_CMD_NG_V1 = 6, + ACLK_TOPICID_CREATE_NODE = 7, + ACLK_TOPICID_NODE_CONN = 8, + ACLK_TOPICID_CHART_DIMS = 9, + ACLK_TOPICID_CHART_CONFIGS_UPDATED = 10, + ACLK_TOPICID_CHART_RESET = 11, + ACLK_TOPICID_RETENTION_UPDATED = 12 }; const char *aclk_get_topic(enum aclk_topics topic); diff --git a/aclk/schema-wrappers/chart_config.cc b/aclk/schema-wrappers/chart_config.cc new file mode 100644 index 0000000000..87e34e0dfc --- /dev/null +++ b/aclk/schema-wrappers/chart_config.cc @@ -0,0 +1,105 @@ +#include "chart_config.h" + +#include "proto/chart/v1/config.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +void destroy_update_chart_config(struct update_chart_config *cfg) +{ + freez(cfg->claim_id); + freez(cfg->node_id); + freez(cfg->hashes); +} + +void destroy_chart_config_updated(struct chart_config_updated *cfg) +{ + freez(cfg->type); + freez(cfg->family); + freez(cfg->context); + freez(cfg->title); + freez(cfg->plugin); + freez(cfg->module); + freez(cfg->units); + freez(cfg->config_hash); +} + +struct update_chart_config parse_update_chart_config(const char *data, size_t len) +{ + chart::v1::UpdateChartConfigs cfgs; + update_chart_config res; + memset(&res, 0, sizeof(res)); + + if (!cfgs.ParseFromArray(data, len)) + return res; + + res.claim_id = strdupz(cfgs.claim_id().c_str()); + res.node_id = strdupz(cfgs.node_id().c_str()); + + // to not do bazillion tiny allocations for individual strings + // we calculate how much memory we will need for all of them + // and allocate at once + int hash_count = cfgs.config_hashes_size(); + size_t total_strlen = 0; + for (int i = 0; i < hash_count; i++) + total_strlen += cfgs.config_hashes(i).length(); + total_strlen += hash_count; //null bytes + + res.hashes = (char**)callocz( 1, + (hash_count+1) * sizeof(char*) + //char * array incl. terminating NULL at the end + total_strlen //strings themselves incl. 1 null byte each + ); + + char* dest = ((char*)res.hashes) + (hash_count + 1 /* NULL ptr */) * sizeof(char *); + // now copy them strings + // null bytes handled by callocz + for (int i = 0; i < hash_count; i++) { + strcpy(dest, cfgs.config_hashes(i).c_str()); + res.hashes[i] = dest; + dest += strlen(dest) + 1 /* end string null */; + } + + return res; +} + +char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size) +{ + chart::v1::ChartConfigsUpdated configs; + for (int i = 0; i < list_size; i++) { + chart::v1::ChartConfigUpdated *config = configs.add_configs(); + config->set_type(config_list[i].type); + if (config_list[i].family) + config->set_family(config_list[i].family); + config->set_context(config_list[i].context); + config->set_title(config_list[i].title); + config->set_priority(config_list[i].priority); + config->set_plugin(config_list[i].plugin); + + if (config_list[i].module) + config->set_module(config_list[i].module); + + switch (config_list[i].chart_type) { + case RRDSET_TYPE_LINE: + config->set_chart_type(chart::v1::LINE); + break; + case RRDSET_TYPE_AREA: + config->set_chart_type(chart::v1::AREA); + break; + case RRDSET_TYPE_STACKED: + config->set_chart_type(chart::v1::STACKED); + break; + default: + return NULL; + } + + config->set_units(config_list[i].units); + config->set_config_hash(config_list[i].config_hash); + } + + *len = PROTO_COMPAT_MSG_SIZE(configs); + char *bin = (char*)mallocz(*len); + configs.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/chart_config.h b/aclk/schema-wrappers/chart_config.h new file mode 100644 index 0000000000..f08f76b618 --- /dev/null +++ b/aclk/schema-wrappers/chart_config.h @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H +#define ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H + +#include <stdlib.h> + +#include "database/rrd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct update_chart_config { + char *claim_id; + char *node_id; + char **hashes; +}; + +enum chart_config_chart_type { + LINE, + AREA, + STACKED +}; + +struct chart_config_updated { + char *type; + char *family; + char *context; + char *title; + uint64_t priority; + char *plugin; + char *module; + RRDSET_TYPE chart_type; + char *units; + char *config_hash; +}; + +void destroy_update_chart_config(struct update_chart_config *cfg); +void destroy_chart_config_updated(struct chart_config_updated *cfg); + +struct update_chart_config parse_update_chart_config(const char *data, size_t len); + +char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H */ diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc new file mode 100644 index 0000000000..8a43a03752 --- /dev/null +++ b/aclk/schema-wrappers/chart_stream.cc @@ -0,0 +1,344 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk/aclk_util.h" + +#include "proto/chart/v1/stream.pb.h" +#include "chart_stream.h" + +#include "schema_wrapper_utils.h" + +#include <sys/time.h> +#include <stdlib.h> + +stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len) +{ + chart::v1::StreamChartsAndDimensions msg; + stream_charts_and_dims_t res; + memset(&res, 0, sizeof(res)); + + if (!msg.ParseFromArray(data, len)) + return res; + + res.node_id = strdup(msg.node_id().c_str()); + res.claim_id = strdup(msg.claim_id().c_str()); + res.seq_id = msg.sequence_id(); + res.batch_id = msg.batch_id(); + set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at); + + return res; +} + +chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len) +{ + chart::v1::ChartsAndDimensionsAck msg; + chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 }; + + if (!msg.ParseFromArray(data, len)) + return res; + + res.node_id = strdup(msg.node_id().c_str()); + res.claim_id = strdup(msg.claim_id().c_str()); + res.last_seq_id = msg.last_sequence_id(); + + return res; +} + +char *generate_reset_chart_messages(size_t *len, chart_reset_t reset) +{ + chart::v1::ResetChartMessages msg; + + msg.set_claim_id(reset.claim_id); + msg.set_node_id(reset.node_id); + switch (reset.reason) { + case DB_EMPTY: + msg.set_reason(chart::v1::ResetReason::DB_EMPTY); + break; + case SEQ_ID_NOT_EXISTS: + msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS); + break; + case TIMESTAMP_MISMATCH: + msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH); + break; + default: + return NULL; + } + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} + +void chart_instance_updated_destroy(struct chart_instance_updated *instance) +{ + freez((char*)instance->id); + freez((char*)instance->claim_id); + freez((char*)instance->node_id); + freez((char*)instance->name); + + free_label_list(instance->label_head); + + freez((char*)instance->config_hash); +} + +static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update) +{ + google::protobuf::Map<std::string, std::string> *map; + aclk_lib::v1::ACLKMessagePosition *pos; + struct label *label; + + chart->set_id(update->id); + chart->set_claim_id(update->claim_id); + chart->set_node_id(update->node_id); + chart->set_name(update->name); + + map = chart->mutable_chart_labels(); + label = update->label_head; + while (label) { + map->insert({label->key, label->value}); + label = label->next; + } + + switch (update->memory_mode) { + case RRD_MEMORY_MODE_NONE: + chart->set_memory_mode(chart::v1::NONE); + break; + case RRD_MEMORY_MODE_RAM: + chart->set_memory_mode(chart::v1::RAM); + break; + case RRD_MEMORY_MODE_MAP: + chart->set_memory_mode(chart::v1::MAP); + break; + case RRD_MEMORY_MODE_SAVE: + chart->set_memory_mode(chart::v1::SAVE); + break; + case RRD_MEMORY_MODE_ALLOC: + chart->set_memory_mode(chart::v1::ALLOC); + break; + case RRD_MEMORY_MODE_DBENGINE: + chart->set_memory_mode(chart::v1::DB_ENGINE); + break; + default: + return 1; + break; + } + + chart->set_update_every_interval(update->update_every); + chart->set_config_hash(update->config_hash); + + pos = chart->mutable_position(); + pos->set_sequence_id(update->position.sequence_id); + pos->set_previous_sequence_id(update->position.previous_sequence_id); + set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); + + return 0; +} + +static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim) +{ + aclk_lib::v1::ACLKMessagePosition *pos; + + dim->set_id(c_dim->id); + dim->set_chart_id(c_dim->chart_id); + dim->set_node_id(c_dim->node_id); + dim->set_claim_id(c_dim->claim_id); + dim->set_name(c_dim->name); + + set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at()); + set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp()); + + pos = dim->mutable_position(); + pos->set_sequence_id(c_dim->position.sequence_id); + pos->set_previous_sequence_id(c_dim->position.previous_sequence_id); + set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); + + return 0; +} + +char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) +{ + chart::v1::ChartsAndDimensionsUpdated msg; + chart::v1::ChartInstanceUpdated db_chart; + chart::v1::ChartDimensionUpdated db_dim; + aclk_lib::v1::ACLKMessagePosition *pos; + + msg.set_batch_id(batch_id); + + for (int i = 0; payloads[i]; i++) { + if (is_dim[i]) { + if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) { + error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); + return NULL; + } + + pos = db_dim.mutable_position(); + pos->set_sequence_id(new_positions[i] |