summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2021-08-06 13:50:19 +0200
committerGitHub <noreply@github.com>2021-08-06 13:50:19 +0200
commit14ce65525275fa8760fa4f20e10b5a4e5c34ccde (patch)
tree99c387d8a093b05cc7a89a1fad81514924bdccaa
parent597763dd0ae60dd89af211267a17644e00114537 (diff)
New Cloud chart related parsers and generators (#11393)
* adds message generators parsers and handlers for upcoming Chart stream implementation
-rw-r--r--CMakeLists.txt1
-rw-r--r--Makefile.am38
m---------aclk/aclk-schemas0
-rw-r--r--aclk/aclk_charts_api.c61
-rw-r--r--aclk/aclk_charts_api.h18
-rw-r--r--aclk/aclk_query.c29
-rw-r--r--aclk/aclk_query_queue.c35
-rw-r--r--aclk/aclk_query_queue.h16
-rw-r--r--aclk/aclk_tx_msgs.c2
-rw-r--r--aclk/aclk_tx_msgs.h3
-rw-r--r--aclk/aclk_util.c28
-rw-r--r--aclk/aclk_util.h24
-rw-r--r--aclk/schema-wrappers/chart_config.cc105
-rw-r--r--aclk/schema-wrappers/chart_config.h50
-rw-r--r--aclk/schema-wrappers/chart_stream.cc344
-rw-r--r--aclk/schema-wrappers/chart_stream.h121
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.cc15
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.h7
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h2
19 files changed, 860 insertions, 39 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b4425e6fc8..0ba7caf17e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -805,6 +805,7 @@ set(ACLK_NG_FILES
aclk/schema-wrappers/node_creation.cc
aclk/schema-wrappers/node_creation.h
aclk/schema-wrappers/schema_wrappers.h
+ aclk/schema-wrappers/schema_wrapper_utils.cc
aclk/schema-wrappers/schema_wrapper_utils.h
)
diff --git a/Makefile.am b/Makefile.am
index e03261b7ed..690b03b35d 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -567,6 +567,8 @@ ACLK_NG_FILES = \
aclk/aclk_rx_msgs.h \
aclk/https_client.c \
aclk/https_client.h \
+ aclk/aclk_charts_api.c \
+ aclk/aclk_charts_api.h \
mqtt_websockets/src/mqtt_wss_client.c \
mqtt_websockets/src/include/mqtt_wss_client.h \
mqtt_websockets/src/mqtt_wss_log.c \
@@ -584,7 +586,12 @@ ACLK_NG_FILES = \
aclk/schema-wrappers/node_connection.h \
aclk/schema-wrappers/node_creation.cc \
aclk/schema-wrappers/node_creation.h \
+ aclk/schema-wrappers/chart_stream.cc \
+ aclk/schema-wrappers/chart_stream.h \
+ aclk/schema-wrappers/chart_config.cc \
+ aclk/schema-wrappers/chart_config.h \
aclk/schema-wrappers/schema_wrappers.h \
+ aclk/schema-wrappers/schema_wrapper_utils.cc \
aclk/schema-wrappers/schema_wrapper_utils.h \
$(NULL)
@@ -594,10 +601,21 @@ ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h \
aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \
aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h \
+ aclk/aclk-schemas/proto/chart/v1/stream.pb.cc \
+ aclk/aclk-schemas/proto/chart/v1/stream.pb.h \
+ aclk/aclk-schemas/proto/chart/v1/instance.pb.cc \
+ aclk/aclk-schemas/proto/chart/v1/instance.pb.h \
+ aclk/aclk-schemas/proto/chart/v1/dimension.pb.cc \
+ aclk/aclk-schemas/proto/chart/v1/dimension.pb.h \
+ aclk/aclk-schemas/proto/chart/v1/config.pb.cc \
+ aclk/aclk-schemas/proto/chart/v1/config.pb.h \
+ aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \
+ aclk/aclk-schemas/proto/aclk/v1/lib.pb.h \
$(NULL)
BUILT_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES)
nodist_netdata_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES)
+CLEANFILES += $(ACLK_NG_PROTO_BUILT_FILES)
aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
aclk/aclk-schemas/proto/agent/v1/connection.pb.h: aclk/aclk-schemas/proto/agent/v1/connection.proto
@@ -611,6 +629,26 @@ aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \
aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h: aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto
$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+aclk/aclk-schemas/proto/chart/v1/stream.pb.cc \
+aclk/aclk-schemas/proto/chart/v1/stream.pb.h: aclk/aclk-schemas/proto/chart/v1/stream.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/chart/v1/instance.pb.cc \
+aclk/aclk-schemas/proto/chart/v1/instance.pb.h: aclk/aclk-schemas/proto/chart/v1/instance.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/chart/v1/dimension.pb.cc \
+aclk/aclk-schemas/proto/chart/v1/dimension.pb.h: aclk/aclk-schemas/proto/chart/v1/dimension.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/chart/v1/config.pb.cc \
+aclk/aclk-schemas/proto/chart/v1/config.pb.h: aclk/aclk-schemas/proto/chart/v1/config.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \
+aclk/aclk-schemas/proto/aclk/v1/lib.pb.h: aclk/aclk-schemas/proto/aclk/v1/lib.proto
+ $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
endif #ACLK_NG
if ENABLE_ACLK
diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas
-Subproject b5fef3f3a84e6a5013b36b906f4677012c73441
+Subproject a0adf5b1e026ee8339d56cfa27af95bb26b5317
diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c
new file mode 100644
index 0000000000..9dabf82041
--- /dev/null
+++ b/aclk/aclk_charts_api.c
@@ -0,0 +1,61 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "aclk_charts_api.h"
+
+#include "aclk_query_queue.h"
+
+#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated"
+
+void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
+{
+ aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
+ query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
+ if (query->data.bin_payload.payload)
+ aclk_queue_query(query);
+}
+
+void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
+{
+ aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
+ query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
+ if (query->data.bin_payload.payload)
+ aclk_queue_query(query);
+}
+
+void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
+{
+ aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
+ query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id);
+ query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
+ if (query->data.bin_payload.payload)
+ aclk_queue_query(query);
+}
+
+void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size)
+{
+ aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED);
+ query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size);
+ query->data.bin_payload.msg_name = "ChartConfigsUpdated";
+ if (query->data.bin_payload.payload)
+ aclk_queue_query(query);
+}
+
+void aclk_chart_reset(chart_reset_t reset)
+{
+ aclk_query_t query = aclk_query_new(CHART_RESET);
+ query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset);
+ query->data.bin_payload.msg_name = "ResetChartMessages";
+ if (query->data.bin_payload.payload)
+ aclk_queue_query(query);
+}
+
+void aclk_retention_updated(struct retention_updated *data)
+{
+ aclk_query_t query = aclk_query_new(RETENTION_UPDATED);
+ query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED;
+ query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data);
+ query->data.bin_payload.msg_name = "RetentionUpdated";
+ if (query->data.bin_payload.payload)
+ aclk_queue_query(query);
+}
diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h
new file mode 100644
index 0000000000..c2db9d4979
--- /dev/null
+++ b/aclk/aclk_charts_api.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_CHARTS_H
+#define ACLK_CHARTS_H
+
+#include "../daemon/common.h"
+#include "schema-wrappers/schema_wrappers.h"
+
+void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
+void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
+void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id);
+
+void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size);
+
+void aclk_chart_reset(chart_reset_t reset);
+
+void aclk_retention_updated(struct retention_updated *data);
+
+#endif /* ACLK_CHARTS_H */
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 07b3537b01..99af385c40 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -273,16 +273,27 @@ static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t q
return 0;
}
+static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
+{
+ // this will be simplified when legacy support is removed
+ aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
+ return 0;
+}
+
aclk_query_handler aclk_query_handlers[] = {
- { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
- { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
- { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata },
- { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
- { .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
- { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
- { .type = REGISTER_NODE, .name = "register node", .fnc = register_node },
- { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update },
- { .type = UNKNOWN, .name = NULL, .fnc = NULL }
+ { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
+ { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
+ { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata },
+ { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
+ { .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
+ { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
+ { .type = REGISTER_NODE, .name = "register node", .fnc = register_node },
+ { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update },
+ { .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg },
+ { .type = CHART_CONFIG_UPDATED, .name = "chart config updated", .fnc = send_bin_msg },
+ { .type = CHART_RESET, .name = "reset chart messages", .fnc = send_bin_msg },
+ { .type = RETENTION_UPDATED, .name = "update retention info", .fnc = send_bin_msg },
+ { .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 1aefdd7711..10db9ec4f3 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -139,27 +139,42 @@ aclk_query_t aclk_query_new(aclk_query_type_t type)
void aclk_query_free(aclk_query_t query)
{
- if (query->type == HTTP_API_V2) {
+ switch (query->type) {
+ case HTTP_API_V2:
freez(query->data.http_api_v2.payload);
if (query->data.http_api_v2.query != query->dedup_id)
freez(query->data.http_api_v2.query);
- }
+ break;
- if (query->type == CHART_NEW)
+ case CHART_NEW:
freez(query->data.chart_add_del.chart_name);
-
- if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update)
- json_object_put(query->data.alarm_update);
-
- if (query->type == NODE_STATE_UPDATE) {
+ break;
+
+ case ALARM_STATE_UPDATE:
+ if (query->data.alarm_update)
+ json_object_put(query->data.alarm_update);
+ break;
+
+ case NODE_STATE_UPDATE:
freez((void*)query->data.node_update.claim_id);
freez((void*)query->data.node_update.node_id);
- }
+ break;
- if (query->type == REGISTER_NODE) {
+ case REGISTER_NODE:
freez((void*)query->data.node_creation.claim_id);
freez((void*)query->data.node_creation.hostname);
freez((void*)query->data.node_creation.machine_guid);
+ break;
+
+ case CHART_DIMS_UPDATE:
+ case CHART_CONFIG_UPDATED:
+ case CHART_RESET:
+ case RETENTION_UPDATED:
+ freez(query->data.bin_payload.payload);
+ break;
+
+ default:
+ break;
}
freez(query->dedup_id);
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index 58d5dd0407..dd4f85750a 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -7,6 +7,8 @@
#include "daemon/common.h"
#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_util.h"
+
typedef enum {
UNKNOWN,
METADATA_INFO,
@@ -16,7 +18,11 @@ typedef enum {
CHART_DEL,
ALARM_STATE_UPDATE,
REGISTER_NODE,
- NODE_STATE_UPDATE
+ NODE_STATE_UPDATE,
+ CHART_DIMS_UPDATE,
+ CHART_CONFIG_UPDATED,
+ CHART_RESET,
+ RETENTION_UPDATED
} aclk_query_type_t;
struct aclk_query_metadata {
@@ -34,6 +40,13 @@ struct aclk_query_http_api_v2 {
char *query;
};
+struct aclk_bin_payload {
+ char *payload;
+ size_t size;
+ enum aclk_topics topic;
+ const char *msg_name;
+};
+
typedef struct aclk_query *aclk_query_t;
struct aclk_query {
aclk_query_type_t type;
@@ -61,6 +74,7 @@ struct aclk_query {
struct aclk_query_chart_add_del chart_add_del;
node_instance_creation_t node_creation;
node_instance_connection_t node_update;
+ struct aclk_bin_payload bin_payload;
json_object *alarm_update;
} data;
};
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 2a0fdd5e33..0a69808562 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -36,7 +36,7 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg,
#endif
}
-static uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
#ifndef ACLK_LOG_CONVERSATION_DIR
UNUSED(msgname);
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index e4445f4427..4b661049fe 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -7,6 +7,9 @@
#include "daemon/common.h"
#include "mqtt_wss_client.h"
#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_util.h"
+
+uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 2f0035d825..5b964fea61 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -13,6 +13,8 @@
int aclk_use_new_cloud_arch = 0;
usec_t aclk_session_newarch = 0;
+int chart_batch_id;
+
aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
if (!strcmp(str, "json")) {
return ACLK_ENC_JSON;
@@ -110,15 +112,19 @@ struct topic_name {
// in answer to /password endpoint
const char *name;
} topic_names[] = {
- { .id = ACLK_TOPICID_CHART, .name = "chart" },
- { .id = ACLK_TOPICID_ALARMS, .name = "alarms" },
- { .id = ACLK_TOPICID_METADATA, .name = "meta" },
- { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" },
- { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" },
- { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" },
- { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" },
- { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" },
- { .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
+ { .id = ACLK_TOPICID_CHART, .name = "chart" },
+ { .id = ACLK_TOPICID_ALARMS, .name = "alarms" },
+ { .id = ACLK_TOPICID_METADATA, .name = "meta" },
+ { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" },
+ { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" },
+ { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" },
+ { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" },
+ { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" },
+ { .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" },
+ { .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" },
+ { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" },
+ { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" },
+ { .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
};
enum aclk_topics compulsory_topics_legacy[] = {
@@ -139,6 +145,10 @@ enum aclk_topics compulsory_topics_new_cloud_arch[] = {
ACLK_TOPICID_CMD_NG_V1,
ACLK_TOPICID_CREATE_NODE,
ACLK_TOPICID_NODE_CONN,
+ ACLK_TOPICID_CHART_DIMS,
+ ACLK_TOPICID_CHART_CONFIGS_UPDATED,
+ ACLK_TOPICID_CHART_RESET,
+ ACLK_TOPICID_RETENTION_UPDATED,
ACLK_TOPICID_UNKNOWN
};
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 04897be809..6ce0074338 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -11,6 +11,8 @@
extern int aclk_use_new_cloud_arch;
extern usec_t aclk_session_newarch;
+extern int chart_batch_id;
+
typedef enum {
ACLK_ENC_UNKNOWN = 0,
ACLK_ENC_JSON,
@@ -54,15 +56,19 @@ void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc);
void aclk_env_t_destroy(aclk_env_t *env);
enum aclk_topics {
- ACLK_TOPICID_UNKNOWN = 0,
- ACLK_TOPICID_CHART = 1,
- ACLK_TOPICID_ALARMS = 2,
- ACLK_TOPICID_METADATA = 3,
- ACLK_TOPICID_COMMAND = 4,
- ACLK_TOPICID_AGENT_CONN = 5,
- ACLK_TOPICID_CMD_NG_V1 = 6,
- ACLK_TOPICID_CREATE_NODE = 7,
- ACLK_TOPICID_NODE_CONN = 8
+ ACLK_TOPICID_UNKNOWN = 0,
+ ACLK_TOPICID_CHART = 1,
+ ACLK_TOPICID_ALARMS = 2,
+ ACLK_TOPICID_METADATA = 3,
+ ACLK_TOPICID_COMMAND = 4,
+ ACLK_TOPICID_AGENT_CONN = 5,
+ ACLK_TOPICID_CMD_NG_V1 = 6,
+ ACLK_TOPICID_CREATE_NODE = 7,
+ ACLK_TOPICID_NODE_CONN = 8,
+ ACLK_TOPICID_CHART_DIMS = 9,
+ ACLK_TOPICID_CHART_CONFIGS_UPDATED = 10,
+ ACLK_TOPICID_CHART_RESET = 11,
+ ACLK_TOPICID_RETENTION_UPDATED = 12
};
const char *aclk_get_topic(enum aclk_topics topic);
diff --git a/aclk/schema-wrappers/chart_config.cc b/aclk/schema-wrappers/chart_config.cc
new file mode 100644
index 0000000000..87e34e0dfc
--- /dev/null
+++ b/aclk/schema-wrappers/chart_config.cc
@@ -0,0 +1,105 @@
+#include "chart_config.h"
+
+#include "proto/chart/v1/config.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+void destroy_update_chart_config(struct update_chart_config *cfg)
+{
+ freez(cfg->claim_id);
+ freez(cfg->node_id);
+ freez(cfg->hashes);
+}
+
+void destroy_chart_config_updated(struct chart_config_updated *cfg)
+{
+ freez(cfg->type);
+ freez(cfg->family);
+ freez(cfg->context);
+ freez(cfg->title);
+ freez(cfg->plugin);
+ freez(cfg->module);
+ freez(cfg->units);
+ freez(cfg->config_hash);
+}
+
+struct update_chart_config parse_update_chart_config(const char *data, size_t len)
+{
+ chart::v1::UpdateChartConfigs cfgs;
+ update_chart_config res;
+ memset(&res, 0, sizeof(res));
+
+ if (!cfgs.ParseFromArray(data, len))
+ return res;
+
+ res.claim_id = strdupz(cfgs.claim_id().c_str());
+ res.node_id = strdupz(cfgs.node_id().c_str());
+
+ // to not do bazillion tiny allocations for individual strings
+ // we calculate how much memory we will need for all of them
+ // and allocate at once
+ int hash_count = cfgs.config_hashes_size();
+ size_t total_strlen = 0;
+ for (int i = 0; i < hash_count; i++)
+ total_strlen += cfgs.config_hashes(i).length();
+ total_strlen += hash_count; //null bytes
+
+ res.hashes = (char**)callocz( 1,
+ (hash_count+1) * sizeof(char*) + //char * array incl. terminating NULL at the end
+ total_strlen //strings themselves incl. 1 null byte each
+ );
+
+ char* dest = ((char*)res.hashes) + (hash_count + 1 /* NULL ptr */) * sizeof(char *);
+ // now copy them strings
+ // null bytes handled by callocz
+ for (int i = 0; i < hash_count; i++) {
+ strcpy(dest, cfgs.config_hashes(i).c_str());
+ res.hashes[i] = dest;
+ dest += strlen(dest) + 1 /* end string null */;
+ }
+
+ return res;
+}
+
+char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size)
+{
+ chart::v1::ChartConfigsUpdated configs;
+ for (int i = 0; i < list_size; i++) {
+ chart::v1::ChartConfigUpdated *config = configs.add_configs();
+ config->set_type(config_list[i].type);
+ if (config_list[i].family)
+ config->set_family(config_list[i].family);
+ config->set_context(config_list[i].context);
+ config->set_title(config_list[i].title);
+ config->set_priority(config_list[i].priority);
+ config->set_plugin(config_list[i].plugin);
+
+ if (config_list[i].module)
+ config->set_module(config_list[i].module);
+
+ switch (config_list[i].chart_type) {
+ case RRDSET_TYPE_LINE:
+ config->set_chart_type(chart::v1::LINE);
+ break;
+ case RRDSET_TYPE_AREA:
+ config->set_chart_type(chart::v1::AREA);
+ break;
+ case RRDSET_TYPE_STACKED:
+ config->set_chart_type(chart::v1::STACKED);
+ break;
+ default:
+ return NULL;
+ }
+
+ config->set_units(config_list[i].units);
+ config->set_config_hash(config_list[i].config_hash);
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(configs);
+ char *bin = (char*)mallocz(*len);
+ configs.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/chart_config.h b/aclk/schema-wrappers/chart_config.h
new file mode 100644
index 0000000000..f08f76b618
--- /dev/null
+++ b/aclk/schema-wrappers/chart_config.h
@@ -0,0 +1,50 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H
+#define ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H
+
+#include <stdlib.h>
+
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct update_chart_config {
+ char *claim_id;
+ char *node_id;
+ char **hashes;
+};
+
+enum chart_config_chart_type {
+ LINE,
+ AREA,
+ STACKED
+};
+
+struct chart_config_updated {
+ char *type;
+ char *family;
+ char *context;
+ char *title;
+ uint64_t priority;
+ char *plugin;
+ char *module;
+ RRDSET_TYPE chart_type;
+ char *units;
+ char *config_hash;
+};
+
+void destroy_update_chart_config(struct update_chart_config *cfg);
+void destroy_chart_config_updated(struct chart_config_updated *cfg);
+
+struct update_chart_config parse_update_chart_config(const char *data, size_t len);
+
+char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H */
diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc
new file mode 100644
index 0000000000..8a43a03752
--- /dev/null
+++ b/aclk/schema-wrappers/chart_stream.cc
@@ -0,0 +1,344 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk/aclk_util.h"
+
+#include "proto/chart/v1/stream.pb.h"
+#include "chart_stream.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len)
+{
+ chart::v1::StreamChartsAndDimensions msg;
+ stream_charts_and_dims_t res;
+ memset(&res, 0, sizeof(res));
+
+ if (!msg.ParseFromArray(data, len))
+ return res;
+
+ res.node_id = strdup(msg.node_id().c_str());
+ res.claim_id = strdup(msg.claim_id().c_str());
+ res.seq_id = msg.sequence_id();
+ res.batch_id = msg.batch_id();
+ set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at);
+
+ return res;
+}
+
+chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len)
+{
+ chart::v1::ChartsAndDimensionsAck msg;
+ chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 };
+
+ if (!msg.ParseFromArray(data, len))
+ return res;
+
+ res.node_id = strdup(msg.node_id().c_str());
+ res.claim_id = strdup(msg.claim_id().c_str());
+ res.last_seq_id = msg.last_sequence_id();
+
+ return res;
+}
+
+char *generate_reset_chart_messages(size_t *len, chart_reset_t reset)
+{
+ chart::v1::ResetChartMessages msg;
+
+ msg.set_claim_id(reset.claim_id);
+ msg.set_node_id(reset.node_id);
+ switch (reset.reason) {
+ case DB_EMPTY:
+ msg.set_reason(chart::v1::ResetReason::DB_EMPTY);
+ break;
+ case SEQ_ID_NOT_EXISTS:
+ msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS);
+ break;
+ case TIMESTAMP_MISMATCH:
+ msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH);
+ break;
+ default:
+ return NULL;
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)malloc(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+void chart_instance_updated_destroy(struct chart_instance_updated *instance)
+{
+ freez((char*)instance->id);
+ freez((char*)instance->claim_id);
+ freez((char*)instance->node_id);
+ freez((char*)instance->name);
+
+ free_label_list(instance->label_head);
+
+ freez((char*)instance->config_hash);
+}
+
+static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update)
+{
+ google::protobuf::Map<std::string, std::string> *map;
+ aclk_lib::v1::ACLKMessagePosition *pos;
+ struct label *label;
+
+ chart->set_id(update->id);
+ chart->set_claim_id(update->claim_id);
+ chart->set_node_id(update->node_id);
+ chart->set_name(update->name);
+
+ map = chart->mutable_chart_labels();
+ label = update->label_head;
+ while (label) {
+ map->insert({label->key, label->value});
+ label = label->next;
+ }
+
+ switch (update->memory_mode) {
+ case RRD_MEMORY_MODE_NONE:
+ chart->set_memory_mode(chart::v1::NONE);
+ break;
+ case RRD_MEMORY_MODE_RAM:
+ chart->set_memory_mode(chart::v1::RAM);
+ break;
+ case RRD_MEMORY_MODE_MAP:
+ chart->set_memory_mode(chart::v1::MAP);
+ break;
+ case RRD_MEMORY_MODE_SAVE:
+ chart->set_memory_mode(chart::v1::SAVE);
+ break;
+ case RRD_MEMORY_MODE_ALLOC:
+ chart->set_memory_mode(chart::v1::ALLOC);
+ break;
+ case RRD_MEMORY_MODE_DBENGINE:
+ chart->set_memory_mode(chart::v1::DB_ENGINE);
+ break;
+ default:
+ return 1;
+ break;
+ }
+
+ chart->set_update_every_interval(update->update_every);
+ chart->set_config_hash(update->config_hash);
+
+ pos = chart->mutable_position();
+ pos->set_sequence_id(update->position.sequence_id);
+ pos->set_previous_sequence_id(update->position.previous_sequence_id);
+ set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ return 0;
+}
+
+static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim)
+{
+ aclk_lib::v1::ACLKMessagePosition *pos;
+
+ dim->set_id(c_dim->id);
+ dim->set_chart_id(c_dim->chart_id);
+ dim->set_node_id(c_dim->node_id);
+ dim->set_claim_id(c_dim->claim_id);
+ dim->set_name(c_dim->name);
+
+ set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at());
+ set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp());
+
+ pos = dim->mutable_position();
+ pos->set_sequence_id(c_dim->position.sequence_id);
+ pos->set_previous_sequence_id(c_dim->position.previous_sequence_id);
+ set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ return 0;
+}
+
+char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
+{
+ chart::v1::ChartsAndDimensionsUpdated msg;
+ chart::v1::ChartInstanceUpdated db_chart;
+ chart::v1::ChartDimensionUpdated db_dim;
+ aclk_lib::v1::ACLKMessagePosition *pos;
+
+ msg.set_batch_id(batch_id);
+
+ for (int i = 0; payloads[i]; i++) {
+ if (is_dim[i]) {
+ if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) {
+ error("[ACLK] Could not parse chart::v1::chart_dimension_updated");
+ return NULL;
+ }
+
+ pos = db_dim.mutable_position();
+ pos->set_sequence_id(new_positions[i]