diff options
Diffstat (limited to 'aclk/schema-wrappers')
-rw-r--r-- | aclk/schema-wrappers/context.cc | 125 | ||||
-rw-r--r-- | aclk/schema-wrappers/context.h | 53 | ||||
-rw-r--r-- | aclk/schema-wrappers/context_stream.cc | 42 | ||||
-rw-r--r-- | aclk/schema-wrappers/context_stream.h | 36 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_connection.cc | 9 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_connection.h | 3 | ||||
-rw-r--r-- | aclk/schema-wrappers/proto_2_json.cc | 12 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 2 |
8 files changed, 282 insertions, 0 deletions
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc new file mode 100644 index 0000000000..b04c9d20cc --- /dev/null +++ b/aclk/schema-wrappers/context.cc @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/context.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +#include "context.h" + +using namespace context::v1; + +// ContextsSnapshot +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version) +{ + ContextsSnapshot *ctxs_snap = new ContextsSnapshot; + + if (ctxs_snap == NULL) + fatal("Cannot allocate ContextsSnapshot object. OOM"); + + ctxs_snap->set_claim_id(claim_id); + ctxs_snap->set_node_id(node_id); + ctxs_snap->set_version(version); + + return ctxs_snap; +} + +void contexts_snapshot_delete(contexts_snapshot_t snapshot) +{ + delete (ContextsSnapshot *)snapshot; +} + +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version) +{ + ((ContextsSnapshot *)ctxs_snapshot)->set_version(version); +} + +static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx) +{ + ctx->set_id(c_ctx->id); + ctx->set_version(c_ctx->version); + ctx->set_first_entry(c_ctx->first_entry); + ctx->set_last_entry(c_ctx->last_entry); + ctx->set_deleted(c_ctx->deleted); + ctx->set_title(c_ctx->title); + ctx->set_priority(c_ctx->priority); + ctx->set_chart_type(c_ctx->chart_type); + ctx->set_units(c_ctx->units); + ctx->set_family(c_ctx->family); +} + +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + ContextUpdated *ctx = ctxs_snap->add_contexts(); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap); + char *bin = (char*)mallocz(*len); + if (!ctxs_snap->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_snap; + return NULL; + } + + delete ctxs_snap; + return bin; +} + +// ContextsUpdated +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at) +{ + ContextsUpdated *ctxs_updated = new ContextsUpdated; + + if (ctxs_updated == NULL) + fatal("Cannot allocate ContextsUpdated object. OOM"); + + ctxs_updated->set_claim_id(claim_id); + ctxs_updated->set_node_id(node_id); + ctxs_updated->set_version_hash(version_hash); + ctxs_updated->set_created_at(created_at); + + return ctxs_updated; +} + +void contexts_updated_delete(contexts_updated_t ctxs_updated) +{ + delete (ContextsUpdated *)ctxs_updated; +} + +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash) +{ + ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash); +} + +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + ContextUpdated *ctx = ctxs_update->add_contextupdates(); + + if (ctx == NULL) + fatal("Cannot allocate ContextUpdated object. OOM"); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update); + char *bin = (char*)mallocz(*len); + if (!ctxs_update->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_update; + return NULL; + } + + delete ctxs_update; + return bin; +} diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h new file mode 100644 index 0000000000..cbb7701a81 --- /dev/null +++ b/aclk/schema-wrappers/context.h @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_H + +#include <stdint.h> +#include <sys/types.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* contexts_updated_t; +typedef void* contexts_snapshot_t; + +struct context_updated { + // context id + const char *id; + + uint64_t version; + + uint64_t first_entry; + uint64_t last_entry; + + int deleted; + + const char *title; + uint64_t priority; + const char *chart_type; + const char *units; + const char *family; +}; + +// ContextS Snapshot related +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version); +void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot); +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version); +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update); +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len); + +// ContextS Updated related +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at); +void contexts_updated_delete(contexts_updated_t ctxs_updated); +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash); +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update); +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */ diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc new file mode 100644 index 0000000000..3bb1956cb2 --- /dev/null +++ b/aclk/schema-wrappers/context_stream.cc @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/stream.pb.h" + +#include "context_stream.h" + +#include "libnetdata/libnetdata.h" + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len) +{ + context::v1::StopStreamingContexts msg; + + struct stop_streaming_ctxs *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + + return res; +} + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len) +{ + context::v1::ContextsCheckpoint msg; + + struct ctxs_checkpoint *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + res->version_hash = msg.version_hash(); + + return res; +} diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h new file mode 100644 index 0000000000..8c691d2cc1 --- /dev/null +++ b/aclk/schema-wrappers/context_stream.h @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct stop_streaming_ctxs { + char *claim_id; + char *node_id; + // we omit reason as there is only one defined at this point + // as soon as there is more than one defined in StopStreaminContextsReason + // we should add it + // 0 - RATE_LIMIT_EXCEEDED +}; + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len); + +struct ctxs_checkpoint { + char *claim_id; + char *node_id; + + uint64_t version_hash; +}; + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len); + + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */ diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc index 0a4c8ece1b..a6ca8ef984 100644 --- a/aclk/schema-wrappers/node_connection.cc +++ b/aclk/schema-wrappers/node_connection.cc @@ -28,6 +28,15 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect timestamp->set_seconds(tv.tv_sec); timestamp->set_nanos(tv.tv_usec * 1000); + if (data->capabilities) { + struct capability *capa = data->capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + *len = PROTO_COMPAT_MSG_SIZE(msg); char *bin = (char*)malloc(*len); if (bin) diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h index 3fd2072134..c27729d15c 100644 --- a/aclk/schema-wrappers/node_connection.h +++ b/aclk/schema-wrappers/node_connection.h @@ -3,6 +3,8 @@ #ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H #define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H +#include "capability.h" + #ifdef __cplusplus extern "C" { #endif @@ -17,6 +19,7 @@ typedef struct { int64_t session_id; int32_t hops; + struct capability *capabilities; } node_instance_connection_t; char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc index 2fd2bb3179..0e473eb6c4 100644 --- a/aclk/schema-wrappers/proto_2_json.cc +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -11,6 +11,8 @@ #include "proto/nodeinstance/connection/v1/connection.pb.h" #include "proto/nodeinstance/create/v1/creation.pb.h" #include "proto/nodeinstance/info/v1/info.pb.h" +#include "proto/context/v1/stream.pb.h" +#include "proto/context/v1/context.pb.h" #include "libnetdata/libnetdata.h" @@ -45,6 +47,12 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new alarms::v1::AlarmSnapshot; if (!strcmp(msgname, "AlarmLogEntry")) return new alarms::v1::AlarmLogEntry; + if (!strcmp(msgname, "UpdateNodeCollectors")) + return new nodeinstance::info::v1::UpdateNodeCollectors; + if (!strcmp(msgname, "ContextsUpdated")) + return new context::v1::ContextsUpdated; + if (!strcmp(msgname, "ContextsSnapshot")) + return new context::v1::ContextsSnapshot; //rx side if (!strcmp(msgname, "CreateNodeInstanceResult")) @@ -67,6 +75,10 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new alarms::v1::SendAlarmSnapshot; if (!strcmp(msgname, "DisconnectReq")) return new agent::v1::DisconnectReq; + if (!strcmp(msgname, "ContextsCheckpoint")) + return new context::v1::ContextsCheckpoint; + if (!strcmp(msgname, "StopStreamingContexts")) + return new context::v1::StopStreamingContexts; return NULL; } diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index a3248a69b6..26412cacc7 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -14,5 +14,7 @@ #include "alarm_stream.h" #include "node_info.h" #include "capability.h" +#include "context_stream.h" +#include "context.h" #endif /* SCHEMA_WRAPPERS_H */ |