summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'aclk')
m---------aclk/aclk-schemas0
-rw-r--r--aclk/aclk.c32
-rw-r--r--aclk/aclk_api.c1
-rw-r--r--aclk/aclk_api.h1
-rw-r--r--aclk/aclk_contexts_api.c23
-rw-r--r--aclk/aclk_contexts_api.h12
-rw-r--r--aclk/aclk_otp.c9
-rw-r--r--aclk/aclk_query.c1
-rw-r--r--aclk/aclk_query_queue.c1
-rw-r--r--aclk/aclk_query_queue.h1
-rw-r--r--aclk/aclk_rx_msgs.c47
-rw-r--r--aclk/aclk_tx_msgs.c1
-rw-r--r--aclk/aclk_util.c4
-rw-r--r--aclk/aclk_util.h4
-rw-r--r--aclk/schema-wrappers/context.cc125
-rw-r--r--aclk/schema-wrappers/context.h53
-rw-r--r--aclk/schema-wrappers/context_stream.cc42
-rw-r--r--aclk/schema-wrappers/context_stream.h36
-rw-r--r--aclk/schema-wrappers/node_connection.cc9
-rw-r--r--aclk/schema-wrappers/node_connection.h3
-rw-r--r--aclk/schema-wrappers/proto_2_json.cc12
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h2
22 files changed, 411 insertions, 8 deletions
diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas
-Subproject fa46ccca237a9bdb613b3b1f2809a25b7b45c7c
+Subproject 3252118bd547640251356629f0df05eaf952ac3
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 4477f1bb5f..7b3641b1e2 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -141,12 +141,12 @@ static int wait_till_cloud_enabled()
static int wait_till_agent_claimed(void)
{
//TODO prevent malloc and freez
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
if (netdata_exit)
return 1;
- agent_id = is_agent_claimed();
+ agent_id = get_agent_claimid();
}
freez(agent_id);
return 0;
@@ -769,6 +769,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
+
+ struct capability caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) },
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ node_state_update.capabilities = caps;
+
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -801,6 +811,20 @@ void aclk_send_node_instances()
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
+
+ char host_id[UUID_STR_LEN];
+ uuid_unparse_lower(list->host_id, host_id);
+
+ RRDHOST *host = rrdhost_find_by_guid(host_id, 0);
+ struct capability caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ node_state_update.capabilities = caps;
+
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -913,7 +937,7 @@ char *ng_aclk_state(void)
);
buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
if (agent_id == NULL)
buffer_strcat(wb, "No\n");
else {
@@ -1079,7 +1103,7 @@ char *ng_aclk_state_json(void)
json_object_array_add(grp, tmp);
json_object_object_add(msg, "protocols-supported", grp);
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
tmp = json_object_new_boolean(agent_id != NULL);
json_object_object_add(msg, "agent-claimed", tmp);
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index 9446b407f2..141d267af2 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -16,6 +16,7 @@ int aclk_disable_runtime = 0;
int aclk_stats_enabled;
int use_mqtt_5 = 0;
+int aclk_ctx_based = 0;
#define ACLK_IMPL_KEY_NAME "aclk implementation"
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
index 8bf7c72913..36a6d603f3 100644
--- a/aclk/aclk_api.h
+++ b/aclk/aclk_api.h
@@ -20,6 +20,7 @@ extern int aclk_stats_enabled;
extern int aclk_alert_reloaded;
extern int use_mqtt_5;
+extern int aclk_ctx_based;
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr);
diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c
new file mode 100644
index 0000000000..f17d3cabd9
--- /dev/null
+++ b/aclk/aclk_contexts_api.c
@@ -0,0 +1,23 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_query_queue.h"
+
+#include "aclk_contexts_api.h"
+
+void aclk_send_contexts_snapshot(contexts_snapshot_t data)
+{
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT;
+ query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsSnapshot";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_send_contexts_updated(contexts_updated_t data)
+{
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED;
+ query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsUpdated";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h
new file mode 100644
index 0000000000..46b916d22f
--- /dev/null
+++ b/aclk/aclk_contexts_api.h
@@ -0,0 +1,12 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_CONTEXTS_API_H
+#define ACLK_CONTEXTS_API_H
+
+#include "schema-wrappers/schema_wrappers.h"
+
+
+void aclk_send_contexts_snapshot(contexts_snapshot_t data);
+void aclk_send_contexts_updated(contexts_updated_t data);
+
+#endif /* ACLK_CONTEXTS_API_H */
+
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index 6ce217a3ac..b7bf173c4a 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -493,7 +493,7 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_p
unsigned char *challenge;
int challenge_bytes;
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
if (agent_id == NULL) {
error("Agent was not claimed - cannot perform challenge/response");
return 1;
@@ -836,7 +836,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
req.request_type = HTTP_REQ_GET;
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
if (agent_id == NULL)
{
error("Agent was not claimed - cannot perform challenge/response");
@@ -844,7 +844,10 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
return 1;
}
- buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ if (rrdcontext_enabled)
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ else
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
freez(agent_id);
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 84dcc111a8..981c01965a 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -278,6 +278,7 @@ const char *aclk_query_get_name(aclk_query_type_t qt)
case ALARM_PROVIDE_CFG: return "provide_alarm_config";
case ALARM_SNAPSHOT: return "alarm_snapshot";
case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
+ case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
default:
error_report("Unknown query type used %d", (int) qt);
return "unknown";
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 61484288e9..01b20d23f3 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -122,6 +122,7 @@ void aclk_query_free(aclk_query_t query)
case ALARM_PROVIDE_CFG:
case ALARM_SNAPSHOT:
case UPDATE_NODE_COLLECTORS:
+ case PROTO_BIN_MESSAGE:
if (!use_mqtt_5)
freez(query->data.bin_payload.payload);
break;
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index 1f03d54fd7..ab94b63848 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -23,6 +23,7 @@ typedef enum {
ALARM_PROVIDE_CFG,
ALARM_SNAPSHOT,
UPDATE_NODE_COLLECTORS,
+ PROTO_BIN_MESSAGE,
ACLK_QUERY_TYPE_COUNT // always keep this as last
} aclk_query_type_t;
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 4afe6335a8..e6ed332cc5 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -289,6 +289,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
}
+ struct capability caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ node_state_update.capabilities = caps;
+
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -313,6 +322,7 @@ int send_node_instances(const char *msg, size_t msg_len)
int stream_charts_and_dimensions(const char *msg, size_t msg_len)
{
+ aclk_ctx_based = 0;
stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
if (!res.claim_id || !res.node_id) {
error("Error parsing StreamChartsAndDimensions msg");
@@ -426,6 +436,41 @@ int handle_disconnect_req(const char *msg, size_t msg_len)
return 0;
}
+int contexts_checkpoint(const char *msg, size_t msg_len)
+{
+ aclk_ctx_based = 1;
+
+ struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_checkpoint_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
+int stop_streaming_contexts(const char *msg, size_t msg_len)
+{
+ if (!aclk_ctx_based) {
+ error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message");
+ return 1;
+ }
+
+ struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_stop_streaming_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
typedef struct {
const char *name;
simple_hash_t name_hash;
@@ -444,6 +489,8 @@ new_cloud_rx_msg_t rx_msgs[] = {
{ .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
{ .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
{ .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
+ { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint },
+ { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts },
{ .name = NULL, .name_hash = 0, .fnc = NULL },
};
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 6b87bba759..822a90fa25 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -248,6 +248,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
{ .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
#endif
{ .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
{ .name = NULL, .version = 0, .enabled = 0 }
};
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 6412fcd0a2..ec021aec55 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -124,6 +124,8 @@ struct topic_name {
{ .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" },
{ .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" },
{ .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" },
+ { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" },
+ { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" },
{ .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
};
@@ -147,6 +149,8 @@ enum aclk_topics compulsory_topics[] = {
ACLK_TOPICID_ALARM_CONFIG,
ACLK_TOPICID_ALARM_SNAPSHOT,
ACLK_TOPICID_NODE_COLLECTORS,
+ ACLK_TOPICID_CTXS_SNAPSHOT,
+ ACLK_TOPICID_CTXS_UPDATED,
ACLK_TOPICID_UNKNOWN
};
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 186abd8a08..ed715e0466 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -88,7 +88,9 @@ enum aclk_topics {
ACLK_TOPICID_ALARM_HEALTH = 15,
ACLK_TOPICID_ALARM_CONFIG = 16,
ACLK_TOPICID_ALARM_SNAPSHOT = 17,
- ACLK_TOPICID_NODE_COLLECTORS = 18
+ ACLK_TOPICID_NODE_COLLECTORS = 18,
+ ACLK_TOPICID_CTXS_SNAPSHOT = 19,
+ ACLK_TOPICID_CTXS_UPDATED = 20
};
const char *aclk_get_topic(enum aclk_topics topic);
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc
new file mode 100644
index 0000000000..b04c9d20cc
--- /dev/null
+++ b/aclk/schema-wrappers/context.cc
@@ -0,0 +1,125 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+#include "context.h"
+
+using namespace context::v1;
+
+// ContextsSnapshot
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version)
+{
+ ContextsSnapshot *ctxs_snap = new ContextsSnapshot;
+
+ if (ctxs_snap == NULL)
+ fatal("Cannot allocate ContextsSnapshot object. OOM");
+
+ ctxs_snap->set_claim_id(claim_id);
+ ctxs_snap->set_node_id(node_id);
+ ctxs_snap->set_version(version);
+
+ return ctxs_snap;
+}
+
+void contexts_snapshot_delete(contexts_snapshot_t snapshot)
+{
+ delete (ContextsSnapshot *)snapshot;
+}
+
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version)
+{
+ ((ContextsSnapshot *)ctxs_snapshot)->set_version(version);
+}
+
+static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx)
+{
+ ctx->set_id(c_ctx->id);
+ ctx->set_version(c_ctx->version);
+ ctx->set_first_entry(c_ctx->first_entry);
+ ctx->set_last_entry(c_ctx->last_entry);
+ ctx->set_deleted(c_ctx->deleted);
+ ctx->set_title(c_ctx->title);
+ ctx->set_priority(c_ctx->priority);
+ ctx->set_chart_type(c_ctx->chart_type);
+ ctx->set_units(c_ctx->units);
+ ctx->set_family(c_ctx->family);
+}
+
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ ContextUpdated *ctx = ctxs_snap->add_contexts();
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_snap->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_snap;
+ return NULL;
+ }
+
+ delete ctxs_snap;
+ return bin;
+}
+
+// ContextsUpdated
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at)
+{
+ ContextsUpdated *ctxs_updated = new ContextsUpdated;
+
+ if (ctxs_updated == NULL)
+ fatal("Cannot allocate ContextsUpdated object. OOM");
+
+ ctxs_updated->set_claim_id(claim_id);
+ ctxs_updated->set_node_id(node_id);
+ ctxs_updated->set_version_hash(version_hash);
+ ctxs_updated->set_created_at(created_at);
+
+ return ctxs_updated;
+}
+
+void contexts_updated_delete(contexts_updated_t ctxs_updated)
+{
+ delete (ContextsUpdated *)ctxs_updated;
+}
+
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash)
+{
+ ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash);
+}
+
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ ContextUpdated *ctx = ctxs_update->add_contextupdates();
+
+ if (ctx == NULL)
+ fatal("Cannot allocate ContextUpdated object. OOM");
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_update->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_update;
+ return NULL;
+ }
+
+ delete ctxs_update;
+ return bin;
+}
diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h
new file mode 100644
index 0000000000..cbb7701a81
--- /dev/null
+++ b/aclk/schema-wrappers/context.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_H
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void* contexts_updated_t;
+typedef void* contexts_snapshot_t;
+
+struct context_updated {
+ // context id
+ const char *id;
+
+ uint64_t version;
+
+ uint64_t first_entry;
+ uint64_t last_entry;
+
+ int deleted;
+
+ const char *title;
+ uint64_t priority;
+ const char *chart_type;
+ const char *units;
+ const char *family;
+};
+
+// ContextS Snapshot related
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version);
+void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
+
+// ContextS Updated related
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
+void contexts_updated_delete(contexts_updated_t ctxs_updated);
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */
diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc
new file mode 100644
index 0000000000..3bb1956cb2
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.cc
@@ -0,0 +1,42 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/stream.pb.h"
+
+#include "context_stream.h"
+
+#include "libnetdata/libnetdata.h"
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len)
+{
+ context::v1::StopStreamingContexts msg;
+
+ struct stop_streaming_ctxs *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+
+ return res;
+}
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len)
+{
+ context::v1::ContextsCheckpoint msg;
+
+ struct ctxs_checkpoint *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+ res->version_hash = msg.version_hash();
+
+ return res;
+}
diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h
new file mode 100644
index 0000000000..8c691d2cc1
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct stop_streaming_ctxs {
+ char *claim_id;
+ char *node_id;
+ // we omit reason as there is only one defined at this point
+ // as soon as there is more than one defined in StopStreaminContextsReason
+ // we should add it
+ // 0 - RATE_LIMIT_EXCEEDED
+};
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len);
+
+struct ctxs_checkpoint {
+ char *claim_id;
+ char *node_id;
+
+ uint64_t version_hash;
+};
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len);
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
index 0a4c8ece1b..a6ca8ef984 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -28,6 +28,15 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
timestamp->set_seconds(tv.tv_sec);
timestamp->set_nanos(tv.tv_usec * 1000);
+ if (data->capabilities) {
+ struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)malloc(*len);
if (bin)
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
index 3fd2072134..c27729d15c 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/aclk/schema-wrappers/node_connection.h
@@ -3,6 +3,8 @@
#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+#include "capability.h"
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -17,6 +19,7 @@ typedef struct {
int64_t session_id;
int32_t hops;
+ struct capability *capabilities;
} node_instance_connection_t;
char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc
index 2fd2bb3179..0e473eb6c4 100644
--- a/aclk/schema-wrappers/proto_2_json.cc
+++ b/aclk/schema-wrappers/proto_2_json.cc
@@ -11,6 +11,8 @@
#include "proto/nodeinstance/connection/v1/connection.pb.h"
#include "proto/nodeinstance/create/v1/creation.pb.h"
#include "proto/nodeinstance/info/v1/info.pb.h"
+#include "proto/context/v1/stream.pb.h"
+#include "proto/context/v1/context.pb.h"
#include "libnetdata/libnetdata.h"
@@ -45,6 +47,12 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new alarms::v1::AlarmSnapshot;
if (!strcmp(msgname, "AlarmLogEntry"))
return new alarms::v1::AlarmLogEntry;
+ if (!strcmp(msgname, "UpdateNodeCollectors"))
+ return new nodeinstance::info::v1::UpdateNodeCollectors;
+ if (!strcmp(msgname, "ContextsUpdated"))
+ return new context::v1::ContextsUpdated;
+ if (!strcmp(msgname, "ContextsSnapshot"))
+ return new context::v1::ContextsSnapshot;
//rx side
if (!strcmp(msgname, "CreateNodeInstanceResult"))
@@ -67,6 +75,10 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new alarms::v1::SendAlarmSnapshot;
if (!strcmp(msgname, "DisconnectReq"))
return new agent::v1::DisconnectReq;
+ if (!strcmp(msgname, "ContextsCheckpoint"))
+ return new context::v1::ContextsCheckpoint;
+ if (!strcmp(msgname, "StopStreamingContexts"))
+ return new context::v1::StopStreamingContexts;
return NULL;
}
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
index a3248a69b6..26412cacc7 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -14,5 +14,7 @@
#include "alarm_stream.h"
#include "node_info.h"
#include "capability.h"
+#include "context_stream.h"
+#include "context.h"
#endif /* SCHEMA_WRAPPERS_H */