diff options
Diffstat (limited to 'aclk')
m--------- | aclk/aclk-schemas | 0 | ||||
-rw-r--r-- | aclk/aclk.c | 32 | ||||
-rw-r--r-- | aclk/aclk_api.c | 1 | ||||
-rw-r--r-- | aclk/aclk_api.h | 1 | ||||
-rw-r--r-- | aclk/aclk_contexts_api.c | 23 | ||||
-rw-r--r-- | aclk/aclk_contexts_api.h | 12 | ||||
-rw-r--r-- | aclk/aclk_otp.c | 9 | ||||
-rw-r--r-- | aclk/aclk_query.c | 1 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 1 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 1 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 47 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 1 | ||||
-rw-r--r-- | aclk/aclk_util.c | 4 | ||||
-rw-r--r-- | aclk/aclk_util.h | 4 | ||||
-rw-r--r-- | aclk/schema-wrappers/context.cc | 125 | ||||
-rw-r--r-- | aclk/schema-wrappers/context.h | 53 | ||||
-rw-r--r-- | aclk/schema-wrappers/context_stream.cc | 42 | ||||
-rw-r--r-- | aclk/schema-wrappers/context_stream.h | 36 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_connection.cc | 9 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_connection.h | 3 | ||||
-rw-r--r-- | aclk/schema-wrappers/proto_2_json.cc | 12 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 2 |
22 files changed, 411 insertions, 8 deletions
diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas -Subproject fa46ccca237a9bdb613b3b1f2809a25b7b45c7c +Subproject 3252118bd547640251356629f0df05eaf952ac3 diff --git a/aclk/aclk.c b/aclk/aclk.c index 4477f1bb5f..7b3641b1e2 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -141,12 +141,12 @@ static int wait_till_cloud_enabled() static int wait_till_agent_claimed(void) { //TODO prevent malloc and freez - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) return 1; - agent_id = is_agent_claimed(); + agent_id = get_agent_claimid(); } freez(agent_id); return 0; @@ -769,6 +769,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd) }; node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(node_id, (char*)node_state_update.node_id); + + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -801,6 +811,20 @@ void aclk_send_node_instances() }; node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id); + + char host_id[UUID_STR_LEN]; + uuid_unparse_lower(list->host_id, host_id); + + RRDHOST *host = rrdhost_find_by_guid(host_id, 0); + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -913,7 +937,7 @@ char *ng_aclk_state(void) ); buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); if (agent_id == NULL) buffer_strcat(wb, "No\n"); else { @@ -1079,7 +1103,7 @@ char *ng_aclk_state_json(void) json_object_array_add(grp, tmp); json_object_object_add(msg, "protocols-supported", grp); - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); tmp = json_object_new_boolean(agent_id != NULL); json_object_object_add(msg, "agent-claimed", tmp); diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index 9446b407f2..141d267af2 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -16,6 +16,7 @@ int aclk_disable_runtime = 0; int aclk_stats_enabled; int use_mqtt_5 = 0; +int aclk_ctx_based = 0; #define ACLK_IMPL_KEY_NAME "aclk implementation" diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index 8bf7c72913..36a6d603f3 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -20,6 +20,7 @@ extern int aclk_stats_enabled; extern int aclk_alert_reloaded; extern int use_mqtt_5; +extern int aclk_ctx_based; #ifdef ENABLE_ACLK void *aclk_starter(void *ptr); diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c new file mode 100644 index 0000000000..f17d3cabd9 --- /dev/null +++ b/aclk/aclk_contexts_api.c @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_query_queue.h" + +#include "aclk_contexts_api.h" + +void aclk_send_contexts_snapshot(contexts_snapshot_t data) +{ + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT; + query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsSnapshot"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_send_contexts_updated(contexts_updated_t data) +{ + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED; + query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h new file mode 100644 index 0000000000..46b916d22f --- /dev/null +++ b/aclk/aclk_contexts_api.h @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_CONTEXTS_API_H +#define ACLK_CONTEXTS_API_H + +#include "schema-wrappers/schema_wrappers.h" + + +void aclk_send_contexts_snapshot(contexts_snapshot_t data); +void aclk_send_contexts_updated(contexts_updated_t data); + +#endif /* ACLK_CONTEXTS_API_H */ + diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index 6ce217a3ac..b7bf173c4a 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -493,7 +493,7 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_p unsigned char *challenge; int challenge_bytes; - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); if (agent_id == NULL) { error("Agent was not claimed - cannot perform challenge/response"); return 1; @@ -836,7 +836,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { req.request_type = HTTP_REQ_GET; - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); if (agent_id == NULL) { error("Agent was not claimed - cannot perform challenge/response"); @@ -844,7 +844,10 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { return 1; } - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + if (rrdcontext_enabled) + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + else + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); freez(agent_id); diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 84dcc111a8..981c01965a 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -278,6 +278,7 @@ const char *aclk_query_get_name(aclk_query_type_t qt) case ALARM_PROVIDE_CFG: return "provide_alarm_config"; case ALARM_SNAPSHOT: return "alarm_snapshot"; case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; + case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; default: error_report("Unknown query type used %d", (int) qt); return "unknown"; diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 61484288e9..01b20d23f3 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -122,6 +122,7 @@ void aclk_query_free(aclk_query_t query) case ALARM_PROVIDE_CFG: case ALARM_SNAPSHOT: case UPDATE_NODE_COLLECTORS: + case PROTO_BIN_MESSAGE: if (!use_mqtt_5) freez(query->data.bin_payload.payload); break; diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 1f03d54fd7..ab94b63848 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -23,6 +23,7 @@ typedef enum { ALARM_PROVIDE_CFG, ALARM_SNAPSHOT, UPDATE_NODE_COLLECTORS, + PROTO_BIN_MESSAGE, ACLK_QUERY_TYPE_COUNT // always keep this as last } aclk_query_type_t; diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 4afe6335a8..e6ed332cc5 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -289,6 +289,15 @@ int create_node_instance_result(const char *msg, size_t msg_len) } } + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -313,6 +322,7 @@ int send_node_instances(const char *msg, size_t msg_len) int stream_charts_and_dimensions(const char *msg, size_t msg_len) { + aclk_ctx_based = 0; stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); if (!res.claim_id || !res.node_id) { error("Error parsing StreamChartsAndDimensions msg"); @@ -426,6 +436,41 @@ int handle_disconnect_req(const char *msg, size_t msg_len) return 0; } +int contexts_checkpoint(const char *msg, size_t msg_len) +{ + aclk_ctx_based = 1; + + struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_checkpoint_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + +int stop_streaming_contexts(const char *msg, size_t msg_len) +{ + if (!aclk_ctx_based) { + error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message"); + return 1; + } + + struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_stop_streaming_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + typedef struct { const char *name; simple_hash_t name_hash; @@ -444,6 +489,8 @@ new_cloud_rx_msg_t rx_msgs[] = { { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, + { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint }, + { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts }, { .name = NULL, .name_hash = 0, .fnc = NULL }, }; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 6b87bba759..822a90fa25 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -248,6 +248,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, #endif { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, { .name = NULL, .version = 0, .enabled = 0 } }; diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index 6412fcd0a2..ec021aec55 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -124,6 +124,8 @@ struct topic_name { { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" }, { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" }, + { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" }, + { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" }, { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } }; @@ -147,6 +149,8 @@ enum aclk_topics compulsory_topics[] = { ACLK_TOPICID_ALARM_CONFIG, ACLK_TOPICID_ALARM_SNAPSHOT, ACLK_TOPICID_NODE_COLLECTORS, + ACLK_TOPICID_CTXS_SNAPSHOT, + ACLK_TOPICID_CTXS_UPDATED, ACLK_TOPICID_UNKNOWN }; diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 186abd8a08..ed715e0466 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -88,7 +88,9 @@ enum aclk_topics { ACLK_TOPICID_ALARM_HEALTH = 15, ACLK_TOPICID_ALARM_CONFIG = 16, ACLK_TOPICID_ALARM_SNAPSHOT = 17, - ACLK_TOPICID_NODE_COLLECTORS = 18 + ACLK_TOPICID_NODE_COLLECTORS = 18, + ACLK_TOPICID_CTXS_SNAPSHOT = 19, + ACLK_TOPICID_CTXS_UPDATED = 20 }; const char *aclk_get_topic(enum aclk_topics topic); diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc new file mode 100644 index 0000000000..b04c9d20cc --- /dev/null +++ b/aclk/schema-wrappers/context.cc @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/context.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +#include "context.h" + +using namespace context::v1; + +// ContextsSnapshot +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version) +{ + ContextsSnapshot *ctxs_snap = new ContextsSnapshot; + + if (ctxs_snap == NULL) + fatal("Cannot allocate ContextsSnapshot object. OOM"); + + ctxs_snap->set_claim_id(claim_id); + ctxs_snap->set_node_id(node_id); + ctxs_snap->set_version(version); + + return ctxs_snap; +} + +void contexts_snapshot_delete(contexts_snapshot_t snapshot) +{ + delete (ContextsSnapshot *)snapshot; +} + +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version) +{ + ((ContextsSnapshot *)ctxs_snapshot)->set_version(version); +} + +static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx) +{ + ctx->set_id(c_ctx->id); + ctx->set_version(c_ctx->version); + ctx->set_first_entry(c_ctx->first_entry); + ctx->set_last_entry(c_ctx->last_entry); + ctx->set_deleted(c_ctx->deleted); + ctx->set_title(c_ctx->title); + ctx->set_priority(c_ctx->priority); + ctx->set_chart_type(c_ctx->chart_type); + ctx->set_units(c_ctx->units); + ctx->set_family(c_ctx->family); +} + +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + ContextUpdated *ctx = ctxs_snap->add_contexts(); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap); + char *bin = (char*)mallocz(*len); + if (!ctxs_snap->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_snap; + return NULL; + } + + delete ctxs_snap; + return bin; +} + +// ContextsUpdated +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at) +{ + ContextsUpdated *ctxs_updated = new ContextsUpdated; + + if (ctxs_updated == NULL) + fatal("Cannot allocate ContextsUpdated object. OOM"); + + ctxs_updated->set_claim_id(claim_id); + ctxs_updated->set_node_id(node_id); + ctxs_updated->set_version_hash(version_hash); + ctxs_updated->set_created_at(created_at); + + return ctxs_updated; +} + +void contexts_updated_delete(contexts_updated_t ctxs_updated) +{ + delete (ContextsUpdated *)ctxs_updated; +} + +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash) +{ + ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash); +} + +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + ContextUpdated *ctx = ctxs_update->add_contextupdates(); + + if (ctx == NULL) + fatal("Cannot allocate ContextUpdated object. OOM"); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update); + char *bin = (char*)mallocz(*len); + if (!ctxs_update->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_update; + return NULL; + } + + delete ctxs_update; + return bin; +} diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h new file mode 100644 index 0000000000..cbb7701a81 --- /dev/null +++ b/aclk/schema-wrappers/context.h @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_H + +#include <stdint.h> +#include <sys/types.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* contexts_updated_t; +typedef void* contexts_snapshot_t; + +struct context_updated { + // context id + const char *id; + + uint64_t version; + + uint64_t first_entry; + uint64_t last_entry; + + int deleted; + + const char *title; + uint64_t priority; + const char *chart_type; + const char *units; + const char *family; +}; + +// ContextS Snapshot related +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version); +void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot); +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version); +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update); +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len); + +// ContextS Updated related +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at); +void contexts_updated_delete(contexts_updated_t ctxs_updated); +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash); +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update); +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */ diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc new file mode 100644 index 0000000000..3bb1956cb2 --- /dev/null +++ b/aclk/schema-wrappers/context_stream.cc @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/stream.pb.h" + +#include "context_stream.h" + +#include "libnetdata/libnetdata.h" + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len) +{ + context::v1::StopStreamingContexts msg; + + struct stop_streaming_ctxs *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + + return res; +} + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len) +{ + context::v1::ContextsCheckpoint msg; + + struct ctxs_checkpoint *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + res->version_hash = msg.version_hash(); + + return res; +} diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h new file mode 100644 index 0000000000..8c691d2cc1 --- /dev/null +++ b/aclk/schema-wrappers/context_stream.h @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct stop_streaming_ctxs { + char *claim_id; + char *node_id; + // we omit reason as there is only one defined at this point + // as soon as there is more than one defined in StopStreaminContextsReason + // we should add it + // 0 - RATE_LIMIT_EXCEEDED +}; + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len); + +struct ctxs_checkpoint { + char *claim_id; + char *node_id; + + uint64_t version_hash; +}; + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len); + + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */ diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc index 0a4c8ece1b..a6ca8ef984 100644 --- a/aclk/schema-wrappers/node_connection.cc +++ b/aclk/schema-wrappers/node_connection.cc @@ -28,6 +28,15 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect timestamp->set_seconds(tv.tv_sec); timestamp->set_nanos(tv.tv_usec * 1000); + if (data->capabilities) { + struct capability *capa = data->capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + *len = PROTO_COMPAT_MSG_SIZE(msg); char *bin = (char*)malloc(*len); if (bin) diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h index 3fd2072134..c27729d15c 100644 --- a/aclk/schema-wrappers/node_connection.h +++ b/aclk/schema-wrappers/node_connection.h @@ -3,6 +3,8 @@ #ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H #define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H +#include "capability.h" + #ifdef __cplusplus extern "C" { #endif @@ -17,6 +19,7 @@ typedef struct { int64_t session_id; int32_t hops; + struct capability *capabilities; } node_instance_connection_t; char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc index 2fd2bb3179..0e473eb6c4 100644 --- a/aclk/schema-wrappers/proto_2_json.cc +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -11,6 +11,8 @@ #include "proto/nodeinstance/connection/v1/connection.pb.h" #include "proto/nodeinstance/create/v1/creation.pb.h" #include "proto/nodeinstance/info/v1/info.pb.h" +#include "proto/context/v1/stream.pb.h" +#include "proto/context/v1/context.pb.h" #include "libnetdata/libnetdata.h" @@ -45,6 +47,12 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new alarms::v1::AlarmSnapshot; if (!strcmp(msgname, "AlarmLogEntry")) return new alarms::v1::AlarmLogEntry; + if (!strcmp(msgname, "UpdateNodeCollectors")) + return new nodeinstance::info::v1::UpdateNodeCollectors; + if (!strcmp(msgname, "ContextsUpdated")) + return new context::v1::ContextsUpdated; + if (!strcmp(msgname, "ContextsSnapshot")) + return new context::v1::ContextsSnapshot; //rx side if (!strcmp(msgname, "CreateNodeInstanceResult")) @@ -67,6 +75,10 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new alarms::v1::SendAlarmSnapshot; if (!strcmp(msgname, "DisconnectReq")) return new agent::v1::DisconnectReq; + if (!strcmp(msgname, "ContextsCheckpoint")) + return new context::v1::ContextsCheckpoint; + if (!strcmp(msgname, "StopStreamingContexts")) + return new context::v1::StopStreamingContexts; return NULL; } diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index a3248a69b6..26412cacc7 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -14,5 +14,7 @@ #include "alarm_stream.h" #include "node_info.h" #include "capability.h" +#include "context_stream.h" +#include "context.h" #endif /* SCHEMA_WRAPPERS_H */ |