summaryrefslogtreecommitdiffstats
path: root/aclk/schema-wrappers
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/schema-wrappers')
-rw-r--r--aclk/schema-wrappers/context.cc125
-rw-r--r--aclk/schema-wrappers/context.h53
-rw-r--r--aclk/schema-wrappers/context_stream.cc42
-rw-r--r--aclk/schema-wrappers/context_stream.h36
-rw-r--r--aclk/schema-wrappers/node_connection.cc9
-rw-r--r--aclk/schema-wrappers/node_connection.h3
-rw-r--r--aclk/schema-wrappers/proto_2_json.cc12
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h2
8 files changed, 282 insertions, 0 deletions
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc
new file mode 100644
index 0000000000..b04c9d20cc
--- /dev/null
+++ b/aclk/schema-wrappers/context.cc
@@ -0,0 +1,125 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+#include "context.h"
+
+using namespace context::v1;
+
+// ContextsSnapshot
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version)
+{
+ ContextsSnapshot *ctxs_snap = new ContextsSnapshot;
+
+ if (ctxs_snap == NULL)
+ fatal("Cannot allocate ContextsSnapshot object. OOM");
+
+ ctxs_snap->set_claim_id(claim_id);
+ ctxs_snap->set_node_id(node_id);
+ ctxs_snap->set_version(version);
+
+ return ctxs_snap;
+}
+
+void contexts_snapshot_delete(contexts_snapshot_t snapshot)
+{
+ delete (ContextsSnapshot *)snapshot;
+}
+
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version)
+{
+ ((ContextsSnapshot *)ctxs_snapshot)->set_version(version);
+}
+
+static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx)
+{
+ ctx->set_id(c_ctx->id);
+ ctx->set_version(c_ctx->version);
+ ctx->set_first_entry(c_ctx->first_entry);
+ ctx->set_last_entry(c_ctx->last_entry);
+ ctx->set_deleted(c_ctx->deleted);
+ ctx->set_title(c_ctx->title);
+ ctx->set_priority(c_ctx->priority);
+ ctx->set_chart_type(c_ctx->chart_type);
+ ctx->set_units(c_ctx->units);
+ ctx->set_family(c_ctx->family);
+}
+
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ ContextUpdated *ctx = ctxs_snap->add_contexts();
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_snap->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_snap;
+ return NULL;
+ }
+
+ delete ctxs_snap;
+ return bin;
+}
+
+// ContextsUpdated
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at)
+{
+ ContextsUpdated *ctxs_updated = new ContextsUpdated;
+
+ if (ctxs_updated == NULL)
+ fatal("Cannot allocate ContextsUpdated object. OOM");
+
+ ctxs_updated->set_claim_id(claim_id);
+ ctxs_updated->set_node_id(node_id);
+ ctxs_updated->set_version_hash(version_hash);
+ ctxs_updated->set_created_at(created_at);
+
+ return ctxs_updated;
+}
+
+void contexts_updated_delete(contexts_updated_t ctxs_updated)
+{
+ delete (ContextsUpdated *)ctxs_updated;
+}
+
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash)
+{
+ ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash);
+}
+
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ ContextUpdated *ctx = ctxs_update->add_contextupdates();
+
+ if (ctx == NULL)
+ fatal("Cannot allocate ContextUpdated object. OOM");
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_update->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_update;
+ return NULL;
+ }
+
+ delete ctxs_update;
+ return bin;
+}
diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h
new file mode 100644
index 0000000000..cbb7701a81
--- /dev/null
+++ b/aclk/schema-wrappers/context.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_H
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void* contexts_updated_t;
+typedef void* contexts_snapshot_t;
+
+struct context_updated {
+ // context id
+ const char *id;
+
+ uint64_t version;
+
+ uint64_t first_entry;
+ uint64_t last_entry;
+
+ int deleted;
+
+ const char *title;
+ uint64_t priority;
+ const char *chart_type;
+ const char *units;
+ const char *family;
+};
+
+// ContextS Snapshot related
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version);
+void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
+
+// ContextS Updated related
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
+void contexts_updated_delete(contexts_updated_t ctxs_updated);
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */
diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc
new file mode 100644
index 0000000000..3bb1956cb2
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.cc
@@ -0,0 +1,42 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/stream.pb.h"
+
+#include "context_stream.h"
+
+#include "libnetdata/libnetdata.h"
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len)
+{
+ context::v1::StopStreamingContexts msg;
+
+ struct stop_streaming_ctxs *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+
+ return res;
+}
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len)
+{
+ context::v1::ContextsCheckpoint msg;
+
+ struct ctxs_checkpoint *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+ res->version_hash = msg.version_hash();
+
+ return res;
+}
diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h
new file mode 100644
index 0000000000..8c691d2cc1
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct stop_streaming_ctxs {
+ char *claim_id;
+ char *node_id;
+ // we omit reason as there is only one defined at this point
+ // as soon as there is more than one defined in StopStreaminContextsReason
+ // we should add it
+ // 0 - RATE_LIMIT_EXCEEDED
+};
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len);
+
+struct ctxs_checkpoint {
+ char *claim_id;
+ char *node_id;
+
+ uint64_t version_hash;
+};
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len);
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
index 0a4c8ece1b..a6ca8ef984 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -28,6 +28,15 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
timestamp->set_seconds(tv.tv_sec);
timestamp->set_nanos(tv.tv_usec * 1000);
+ if (data->capabilities) {
+ struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)malloc(*len);
if (bin)
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
index 3fd2072134..c27729d15c 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/aclk/schema-wrappers/node_connection.h
@@ -3,6 +3,8 @@
#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+#include "capability.h"
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -17,6 +19,7 @@ typedef struct {
int64_t session_id;
int32_t hops;
+ struct capability *capabilities;
} node_instance_connection_t;
char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc
index 2fd2bb3179..0e473eb6c4 100644
--- a/aclk/schema-wrappers/proto_2_json.cc
+++ b/aclk/schema-wrappers/proto_2_json.cc
@@ -11,6 +11,8 @@
#include "proto/nodeinstance/connection/v1/connection.pb.h"
#include "proto/nodeinstance/create/v1/creation.pb.h"
#include "proto/nodeinstance/info/v1/info.pb.h"
+#include "proto/context/v1/stream.pb.h"
+#include "proto/context/v1/context.pb.h"
#include "libnetdata/libnetdata.h"
@@ -45,6 +47,12 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new alarms::v1::AlarmSnapshot;
if (!strcmp(msgname, "AlarmLogEntry"))
return new alarms::v1::AlarmLogEntry;
+ if (!strcmp(msgname, "UpdateNodeCollectors"))
+ return new nodeinstance::info::v1::UpdateNodeCollectors;
+ if (!strcmp(msgname, "ContextsUpdated"))
+ return new context::v1::ContextsUpdated;
+ if (!strcmp(msgname, "ContextsSnapshot"))
+ return new context::v1::ContextsSnapshot;
//rx side
if (!strcmp(msgname, "CreateNodeInstanceResult"))
@@ -67,6 +75,10 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new alarms::v1::SendAlarmSnapshot;
if (!strcmp(msgname, "DisconnectReq"))
return new agent::v1::DisconnectReq;
+ if (!strcmp(msgname, "ContextsCheckpoint"))
+ return new context::v1::ContextsCheckpoint;
+ if (!strcmp(msgname, "StopStreamingContexts"))
+ return new context::v1::StopStreamingContexts;
return NULL;
}
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
index a3248a69b6..26412cacc7 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -14,5 +14,7 @@
#include "alarm_stream.h"
#include "node_info.h"
#include "capability.h"
+#include "context_stream.h"
+#include "context.h"
#endif /* SCHEMA_WRAPPERS_H */