diff options
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 82 | ||||
-rw-r--r-- | aclk/aclk_charts_api.c | 77 | ||||
-rw-r--r-- | aclk/aclk_charts_api.h | 22 | ||||
-rw-r--r-- | aclk/aclk_contexts_api.c | 18 | ||||
-rw-r--r-- | aclk/aclk_contexts_api.h | 2 | ||||
-rw-r--r-- | aclk/aclk_otp.c | 5 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 39 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 2 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_config.cc | 105 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_config.h | 50 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_stream.cc | 337 | ||||
-rw-r--r-- | aclk/schema-wrappers/chart_stream.h | 121 | ||||
-rw-r--r-- | aclk/schema-wrappers/proto_2_json.cc | 16 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 2 |
14 files changed, 34 insertions, 844 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 63aa4ad386..8b1dca8ca7 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -788,7 +788,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) { .name = "proto", .version = 1, .enabled = 1 }, { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) }, { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = "ctx", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } }; node_state_update.capabilities = caps; @@ -834,7 +834,7 @@ void aclk_send_node_instances() { .name = "proto", .version = 1, .enabled = 1 }, { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = "ctx", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } }; node_state_update.capabilities = caps; @@ -905,38 +905,6 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) status.last_submitted_sequence_id ); } - -static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) -{ - struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); - if (!stats) { - buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host"); - return; - } - buffer_sprintf(wb, - "\n\t\tUpdates: %d" - "\n\t\tBatch ID: %"PRIu64 - "\n\t\tMin Seq ID: %"PRIu64 - "\n\t\tMax Seq ID: %"PRIu64 - "\n\t\tPending Min Seq ID: %"PRIu64 - "\n\t\tPending Max Seq ID: %"PRIu64 - "\n\t\tSent Min Seq ID: %"PRIu64 - "\n\t\tSent Max Seq ID: %"PRIu64 - "\n\t\tAcked Min Seq ID: %"PRIu64 - "\n\t\tAcked Max Seq ID: %"PRIu64, - stats->updates, - stats->batch_id, - stats->min_seqid, - stats->max_seqid, - stats->min_seqid_pend, - stats->max_seqid_pend, - stats->min_seqid_sent, - stats->max_seqid_sent, - stats->min_seqid_ack, - stats->max_seqid_ack - ); - freez(stats); -} #endif /* ENABLE_ACLK */ char *aclk_state(void) @@ -1018,9 +986,6 @@ char *aclk_state(void) buffer_strcat(wb, "\n\tAlert Streaming Status:"); fill_alert_status_for_host(wb, host); - - buffer_strcat(wb, "\n\tChart Streaming Status:"); - fill_chart_status_for_host(wb, host); } rrd_unlock(); } @@ -1058,45 +1023,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) json_object_object_add(obj, "last-submitted-seq-id", tmp); } -static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) -{ - struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); - if (!stats) - return; - - json_object *tmp = json_object_new_int(stats->updates); - json_object_object_add(obj, "updates", tmp); - - tmp = json_object_new_int(stats->batch_id); - json_object_object_add(obj, "batch-id", tmp); - - tmp = json_object_new_int(stats->min_seqid); - json_object_object_add(obj, "min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid); - json_object_object_add(obj, "max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_pend); - json_object_object_add(obj, "pending-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_pend); - json_object_object_add(obj, "pending-max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_sent); - json_object_object_add(obj, "sent-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_sent); - json_object_object_add(obj, "sent-max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_ack); - json_object_object_add(obj, "acked-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_ack); - json_object_object_add(obj, "acked-max-seq-id", tmp); - - freez(stats); -} - static json_object *timestamp_to_json(const time_t *t) { struct tm *tmptr, tmbuf; @@ -1215,10 +1141,6 @@ char *aclk_state_json(void) fill_alert_status_for_host_json(tmp, host); json_object_object_add(nodeinstance, "alert-sync-status", tmp); - tmp = json_object_new_object(); - fill_chart_status_for_host_json(tmp, host); - json_object_object_add(nodeinstance, "chart-sync-status", tmp); - json_object_array_add(grp, nodeinstance); } rrd_unlock(); diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c deleted file mode 100644 index 51d8dad58c..0000000000 --- a/aclk/aclk_charts_api.c +++ /dev/null @@ -1,77 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#include "aclk_charts_api.h" - -#include "aclk_query_queue.h" - -#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated" - -void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; - query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) -{ - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; - query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size) -{ - aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED; - query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size); - query->data.bin_payload.msg_name = "ChartConfigsUpdated"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_reset(chart_reset_t reset) -{ - aclk_query_t query = aclk_query_new(CHART_RESET); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET; - query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset); - query->data.bin_payload.msg_name = "ResetChartMessages"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_retention_updated(struct retention_updated *data) -{ - aclk_query_t query = aclk_query_new(RETENTION_UPDATED); - query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED; - query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data); - query->data.bin_payload.msg_name = "RetentionUpdated"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_update_node_info(struct update_node_info *info) -{ - aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); - query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; - query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); - query->data.bin_payload.msg_name = "UpdateNodeInfo"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_update_node_collectors(struct update_node_collectors *collectors) -{ - aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); - query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; - query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); - query->data.bin_payload.msg_name = "UpdateNodeCollectors"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h deleted file mode 100644 index 71f07dd338..0000000000 --- a/aclk/aclk_charts_api.h +++ /dev/null @@ -1,22 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#ifndef ACLK_CHARTS_H -#define ACLK_CHARTS_H - -#include "../daemon/common.h" -#include "schema-wrappers/schema_wrappers.h" - -void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); -void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); -void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id); - -void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size); - -void aclk_chart_reset(chart_reset_t reset); - -void aclk_retention_updated(struct retention_updated *data); - -void aclk_update_node_info(struct update_node_info *info); - -void aclk_update_node_collectors(struct update_node_collectors *collectors); - -#endif /* ACLK_CHARTS_H */ diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c index f17d3cabd9..f3344935e2 100644 --- a/aclk/aclk_contexts_api.c +++ b/aclk/aclk_contexts_api.c @@ -21,3 +21,21 @@ void aclk_send_contexts_updated(contexts_updated_t data) query->data.bin_payload.msg_name = "ContextsUpdated"; QUEUE_IF_PAYLOAD_PRESENT(query); } + +void aclk_update_node_collectors(struct update_node_collectors *collectors) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; + query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); + query->data.bin_payload.msg_name = "UpdateNodeCollectors"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_update_node_info(struct update_node_info *info) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; + query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); + query->data.bin_payload.msg_name = "UpdateNodeInfo"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h index 46b916d22f..f0b5ec77e5 100644 --- a/aclk/aclk_contexts_api.h +++ b/aclk/aclk_contexts_api.h @@ -7,6 +7,8 @@ void aclk_send_contexts_snapshot(contexts_snapshot_t data); void aclk_send_contexts_updated(contexts_updated_t data); +void aclk_update_node_collectors(struct update_node_collectors *collectors); +void aclk_update_node_info(struct update_node_info *info); #endif /* ACLK_CONTEXTS_API_H */ diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index b7bf173c4a..e1b7a5123d 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -844,10 +844,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { return 1; } - if (rrdcontext_enabled) - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); - else - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); freez(agent_id); diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index b42f010f9e..4a5ddc0d2d 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -293,7 +293,7 @@ int create_node_instance_result(const char *msg, size_t msg_len) { .name = "proto", .version = 1, .enabled = 1 }, { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = "ctx", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } }; node_state_update.capabilities = caps; @@ -322,44 +322,25 @@ int send_node_instances(const char *msg, size_t msg_len) int stream_charts_and_dimensions(const char *msg, size_t msg_len) { - aclk_ctx_based = 0; - stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return 1; - } - chart_batch_id = res.batch_id; - aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); - freez(res.claim_id); - freez(res.node_id); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensions msg"); return 0; } int charts_and_dimensions_ack(const char *msg, size_t msg_len) { - chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return 1; - } - aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); - freez(res.claim_id); - freez(res.node_id); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensionsAck msg"); return 0; } int update_chart_configs(const char *msg, size_t msg_len) { - struct update_chart_config res = parse_update_chart_config(msg, msg_len); - if (!res.claim_id || !res.node_id || !res.hashes) - error("Error parsing UpdateChartConfigs msg"); - else - aclk_get_chart_config(res.hashes); - destroy_update_chart_config(&res); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete UpdateChartConfigs msg"); return 0; } diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index b483e5221d..9bf8529ef8 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -225,7 +225,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, #endif { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = "ctx", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } }; diff --git a/aclk/schema-wrappers/chart_config.cc b/aclk/schema-wrappers/chart_config.cc deleted file mode 100644 index 87e34e0dfc..0000000000 --- a/aclk/schema-wrappers/chart_config.cc +++ /dev/null @@ -1,105 +0,0 @@ -#include "chart_config.h" - -#include "proto/chart/v1/config.pb.h" - -#include "libnetdata/libnetdata.h" - -#include "schema_wrapper_utils.h" - -void destroy_update_chart_config(struct update_chart_config *cfg) -{ - freez(cfg->claim_id); - freez(cfg->node_id); - freez(cfg->hashes); -} - -void destroy_chart_config_updated(struct chart_config_updated *cfg) -{ - freez(cfg->type); - freez(cfg->family); - freez(cfg->context); - freez(cfg->title); - freez(cfg->plugin); - freez(cfg->module); - freez(cfg->units); - freez(cfg->config_hash); -} - -struct update_chart_config parse_update_chart_config(const char *data, size_t len) -{ - chart::v1::UpdateChartConfigs cfgs; - update_chart_config res; - memset(&res, 0, sizeof(res)); - - if (!cfgs.ParseFromArray(data, len)) - return res; - - res.claim_id = strdupz(cfgs.claim_id().c_str()); - res.node_id = strdupz(cfgs.node_id().c_str()); - - // to not do bazillion tiny allocations for individual strings - // we calculate how much memory we will need for all of them - // and allocate at once - int hash_count = cfgs.config_hashes_size(); - size_t total_strlen = 0; - for (int i = 0; i < hash_count; i++) - total_strlen += cfgs.config_hashes(i).length(); - total_strlen += hash_count; //null bytes - - res.hashes = (char**)callocz( 1, - (hash_count+1) * sizeof(char*) + //char * array incl. terminating NULL at the end - total_strlen //strings themselves incl. 1 null byte each - ); - - char* dest = ((char*)res.hashes) + (hash_count + 1 /* NULL ptr */) * sizeof(char *); - // now copy them strings - // null bytes handled by callocz - for (int i = 0; i < hash_count; i++) { - strcpy(dest, cfgs.config_hashes(i).c_str()); - res.hashes[i] = dest; - dest += strlen(dest) + 1 /* end string null */; - } - - return res; -} - -char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size) -{ - chart::v1::ChartConfigsUpdated configs; - for (int i = 0; i < list_size; i++) { - chart::v1::ChartConfigUpdated *config = configs.add_configs(); - config->set_type(config_list[i].type); - if (config_list[i].family) - config->set_family(config_list[i].family); - config->set_context(config_list[i].context); - config->set_title(config_list[i].title); - config->set_priority(config_list[i].priority); - config->set_plugin(config_list[i].plugin); - - if (config_list[i].module) - config->set_module(config_list[i].module); - - switch (config_list[i].chart_type) { - case RRDSET_TYPE_LINE: - config->set_chart_type(chart::v1::LINE); - break; - case RRDSET_TYPE_AREA: - config->set_chart_type(chart::v1::AREA); - break; - case RRDSET_TYPE_STACKED: - config->set_chart_type(chart::v1::STACKED); - break; - default: - return NULL; - } - - config->set_units(config_list[i].units); - config->set_config_hash(config_list[i].config_hash); - } - - *len = PROTO_COMPAT_MSG_SIZE(configs); - char *bin = (char*)mallocz(*len); - configs.SerializeToArray(bin, *len); - - return bin; -} diff --git a/aclk/schema-wrappers/chart_config.h b/aclk/schema-wrappers/chart_config.h deleted file mode 100644 index f08f76b618..0000000000 --- a/aclk/schema-wrappers/chart_config.h +++ /dev/null @@ -1,50 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H -#define ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H - -#include <stdlib.h> - -#include "database/rrd.h" - -#ifdef __cplusplus -extern "C" { -#endif - -struct update_chart_config { - char *claim_id; - char *node_id; - char **hashes; -}; - -enum chart_config_chart_type { - LINE, - AREA, - STACKED -}; - -struct chart_config_updated { - char *type; - char *family; - char *context; - char *title; - uint64_t priority; - char *plugin; - char *module; - RRDSET_TYPE chart_type; - char *units; - char *config_hash; -}; - -void destroy_update_chart_config(struct update_chart_config *cfg); -void destroy_chart_config_updated(struct chart_config_updated *cfg); - -struct update_chart_config parse_update_chart_config(const char *data, size_t len); - -char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size); - -#ifdef __cplusplus -} -#endif - -#endif /* ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H */ diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc deleted file mode 100644 index 54c9407586..0000000000 --- a/aclk/schema-wrappers/chart_stream.cc +++ /dev/null @@ -1,337 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "aclk/aclk_util.h" - -#include "proto/chart/v1/stream.pb.h" -#include "chart_stream.h" - -#include "schema_wrapper_utils.h" - -#include <sys/time.h> -#include <stdlib.h> - -stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len) -{ - chart::v1::StreamChartsAndDimensions msg; - stream_charts_and_dims_t res; - memset(&res, 0, sizeof(res)); - - if (!msg.ParseFromArray(data, len)) - return res; - - res.node_id = strdup(msg.node_id().c_str()); - res.claim_id = strdup(msg.claim_id().c_str()); - res.seq_id = msg.sequence_id(); - res.batch_id = msg.batch_id(); - set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at); - - return res; -} - -chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len) -{ - chart::v1::ChartsAndDimensionsAck msg; - chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 }; - - if (!msg.ParseFromArray(data, len)) - return res; - - res.node_id = strdup(msg.node_id().c_str()); - res.claim_id = strdup(msg.claim_id().c_str()); - res.last_seq_id = msg.last_sequence_id(); - - return res; -} - -char *generate_reset_chart_messages(size_t *len, chart_reset_t reset) -{ - chart::v1::ResetChartMessages msg; - - msg.set_claim_id(reset.claim_id); - msg.set_node_id(reset.node_id); - switch (reset.reason) { - case DB_EMPTY: - msg.set_reason(chart::v1::ResetReason::DB_EMPTY); - break; - case SEQ_ID_NOT_EXISTS: - msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS); - break; - case TIMESTAMP_MISMATCH: - msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH); - break; - default: - return NULL; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)malloc(*len); - if (bin) - msg.SerializeToArray(bin, *len); - - return bin; -} - -void chart_instance_updated_destroy(struct chart_instance_updated *instance) -{ - freez((char*)instance->id); - freez((char*)instance->claim_id); - - rrdlabels_destroy(instance->chart_labels); - - freez((char*)instance->config_hash); -} - -static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update) -{ - google::protobuf::Map<std::string, std::string> *map; - aclk_lib::v1::ACLKMessagePosition *pos; - - chart->set_id(update->id); - chart->set_claim_id(update->claim_id); - chart->set_node_id(update->node_id); - chart->set_name(update->name); - - map = chart->mutable_chart_labels(); - rrdlabels_walkthrough_read(update->chart_labels, label_add_to_map_callback, map); - - switch (update->memory_mode) { - case RRD_MEMORY_MODE_NONE: - chart->set_memory_mode(chart::v1::NONE); - break; - case RRD_MEMORY_MODE_RAM: - chart->set_memory_mode(chart::v1::RAM); - break; - case RRD_MEMORY_MODE_MAP: - chart->set_memory_mode(chart::v1::MAP); - break; - case RRD_MEMORY_MODE_SAVE: - chart->set_memory_mode(chart::v1::SAVE); - break; - case RRD_MEMORY_MODE_ALLOC: - chart->set_memory_mode(chart::v1::ALLOC); - break; - case RRD_MEMORY_MODE_DBENGINE: - chart->set_memory_mode(chart::v1::DB_ENGINE); - break; - default: - return 1; - break; - } - - chart->set_update_every_interval(update->update_every); - chart->set_config_hash(update->config_hash); - - pos = chart->mutable_position(); - pos->set_sequence_id(update->position.sequence_id); - pos->set_previous_sequence_id(update->position.previous_sequence_id); - set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); - - return 0; -} - -static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim) -{ - aclk_lib::v1::ACLKMessagePosition *pos; - - dim->set_id(c_dim->id); - dim->set_chart_id(c_dim->chart_id); - dim->set_node_id(c_dim->node_id); - dim->set_claim_id(c_dim->claim_id); - dim->set_name(c_dim->name); - - set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at()); - set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp()); - - pos = dim->mutable_position(); - pos->set_sequence_id(c_dim->position.sequence_id); - pos->set_previous_sequence_id(c_dim->position.previous_sequence_id); - set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); - - return 0; -} - -char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - chart::v1::ChartInstanceUpdated db_chart; - chart::v1::ChartDimensionUpdated db_dim; - aclk_lib::v1::ACLKMessagePosition *pos; - - msg.set_batch_id(batch_id); - - for (int i = 0; payloads[i]; i++) { - if (is_dim[i]) { - if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); - return NULL; - } - - pos = db_dim.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart::v1::ChartDimensionUpdated *dim = msg.add_dimensions(); - *dim = db_dim; - } else { - if (!db_chart.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); - return NULL; - } - - pos = db_chart.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart::v1::ChartInstanceUpdated *chart = msg.add_charts(); - *chart = db_chart; - } - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - - msg.set_batch_id(chart_batch_id); - - for (int i = 0; payloads[i]; i++) { - chart::v1::ChartInstanceUpdated db_msg; - chart::v1::ChartInstanceUpdated *chart; - aclk_lib::v1::ACLKMessagePosition *pos; - - if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); - return NULL; - } - - pos = db_msg.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart = msg.add_charts(); - *chart = db_msg; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - - msg.set_batch_id(chart_batch_id); - - for (int i = 0; payloads[i]; i++) { - chart::v1::ChartDimensionUpdated db_msg; - chart::v1::ChartDimensionUpdated *dim; - aclk_lib::v1::ACLKMessagePosition *pos; - - if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); - return NULL; - } - - pos = db_msg.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - dim = msg.add_dimensions(); - *dim = db_msg; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update) -{ - chart::v1::ChartInstanceUpdated *chart = new chart::v1::ChartInstanceUpdated(); - - if (set_chart_instance_updated(chart, update)) - return NULL; - - *len = PROTO_COMPAT_MSG_SIZE_PTR(chart); - char *bin = (char*)mallocz(*len); - chart->SerializeToArray(bin, *len); - - delete chart; - return bin; -} - -char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim) -{ - chart::v1::ChartDimensionUpdated *proto_dim = new chart::v1::ChartDimensionUpdated(); - - if (set_chart_dim_updated(proto_dim, dim)) - return NULL; - - *len = PROTO_COMPAT_MSG_SIZE_PTR(proto_dim); - char *bin = (char*)mallocz(*len); - proto_dim->SerializeToArray(bin, *len); - - delete proto_dim; - return bin; -} - -using namespace google::protobuf; - -char *generate_retention_updated(size_t *len, struct retention_updated *data) -{ - chart::v1::RetentionUpdated msg; - - msg.set_claim_id(data->claim_id); - msg.set_node_id(data->node_id); - - switch (data->memory_mode) { - case RRD_MEMORY_MODE_NONE: - msg.set_memory_mode(chart::v1::NONE); - break; - case RRD_MEMORY_MODE_RAM: - msg.set_memory_mode(chart::v1::RAM); - break; - case RRD_MEMORY_MODE_MAP: - msg.set_memory_mode(chart::v1::MAP); - break; - case RRD_MEMOR |