summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt2
-rw-r--r--Makefile.am2
-rw-r--r--aclk/aclk.c21
-rw-r--r--aclk/aclk_capas.c47
-rw-r--r--aclk/aclk_capas.h14
-rw-r--r--aclk/aclk_rx_msgs.c12
-rw-r--r--aclk/aclk_tx_msgs.c14
-rw-r--r--aclk/schema-wrappers/capability.cc2
-rw-r--r--aclk/schema-wrappers/capability.h2
-rw-r--r--aclk/schema-wrappers/connection.cc2
-rw-r--r--aclk/schema-wrappers/connection.h2
-rw-r--r--aclk/schema-wrappers/node_connection.cc2
-rw-r--r--aclk/schema-wrappers/node_connection.h2
-rw-r--r--database/sqlite/sqlite_aclk_node.c11
14 files changed, 85 insertions, 50 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 182a808614..8c75cfd39c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -898,6 +898,8 @@ set(ACLK_FILES
aclk/aclk_alarm_api.h
aclk/aclk_contexts_api.c
aclk/aclk_contexts_api.h
+ aclk/aclk_capas.c
+ aclk/aclk_capas.h
aclk/schema-wrappers/connection.cc
aclk/schema-wrappers/connection.h
aclk/schema-wrappers/node_connection.cc
diff --git a/Makefile.am b/Makefile.am
index d1af8ad45b..95a2e28f46 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -690,6 +690,8 @@ ACLK_FILES = \
aclk/aclk_alarm_api.h \
aclk/aclk_contexts_api.c \
aclk/aclk_contexts_api.h \
+ aclk/aclk_capas.c \
+ aclk/aclk_capas.h \
aclk/helpers/mqtt_wss_pal.h \
aclk/helpers/ringbuffer_pal.h \
aclk/schema-wrappers/connection.cc \
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 9a0ffc0700..3b035b849d 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -13,6 +13,7 @@
#include "aclk_rx_msgs.h"
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_capas.h"
#include "aclk_proxy.h"
@@ -779,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_agent_capas();
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -825,14 +819,7 @@ void aclk_send_node_instances()
uuid_unparse_lower(list->host_id, host_id);
RRDHOST *host = rrdhost_find_by_guid(host_id);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -841,6 +828,8 @@ void aclk_send_node_instances()
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
+
+ freez((void*)node_state_update.capabilities);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c
new file mode 100644
index 0000000000..df9d18f638
--- /dev/null
+++ b/aclk/aclk_capas.c
@@ -0,0 +1,47 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_capas.h"
+
+#include "ml/ml.h"
+
+const struct capability *aclk_get_agent_capas()
+{
+ static struct capability agent_capabilities[] = {
+ { .name = "json", .version = 2, .enabled = 0 },
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = 0, .enabled = 0 },
+ { .name = "mc", .version = 0, .enabled = 0 },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 1, .enabled = 1 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ agent_capabilities[2].version = ml_capable() ? 1 : 0;
+ agent_capabilities[2].enabled = ml_enabled(localhost);
+
+ agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0;
+ agent_capabilities[3].enabled = enable_metric_correlations;
+
+ return agent_capabilities;
+}
+
+struct capability *aclk_get_node_instance_capas(RRDHOST *host)
+{
+ struct capability ni_caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) },
+ { .name = "mc",
+ .version = enable_metric_correlations ? metric_correlations_version : 0,
+ .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 0, .enabled = 0 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) {
+ ni_caps[4].version = 1;
+ ni_caps[4].enabled = 1;
+ }
+
+ struct capability *ret = mallocz(sizeof(ni_caps));
+ memcpy(ret, ni_caps, sizeof(ni_caps));
+ return ret;
+}
diff --git a/aclk/aclk_capas.h b/aclk/aclk_capas.h
new file mode 100644
index 0000000000..c39a197b8f
--- /dev/null
+++ b/aclk/aclk_capas.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_CAPAS_H
+#define ACLK_CAPAS_H
+
+#include "daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+#include "schema-wrappers/capability.h"
+
+const struct capability *aclk_get_agent_capas();
+struct capability *aclk_get_node_instance_capas(RRDHOST *host);
+
+#endif /* ACLK_CAPAS_H */
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 6a1fc0534a..83bc5508be 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk.h"
+#include "aclk_capas.h"
#include "schema-wrappers/proto_2_json.h"
@@ -289,20 +290,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
}
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
+ freez((void *)node_state_update.capabilities);
+
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index e46a6395e4..532b964ad3 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_util.h"
#include "aclk_stats.h"
#include "aclk.h"
+#include "aclk_capas.h"
#include "schema-wrappers/proto_2_json.h"
@@ -211,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
size_t len;
uint16_t pid;
- struct capability agent_capabilities[] = {
- { .name = "json", .version = 2, .enabled = 0 },
- { .name = "proto", .version = 1, .enabled = 1 },
-#ifdef ENABLE_ML
- { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
-#endif
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
-
update_agent_connection_t conn = {
.reachable = (reachable ? 1 : 0),
.lwt = 0,
.session_id = aclk_session_newarch,
- .capabilities = agent_capabilities
+ .capabilities = aclk_get_agent_capas()
};
rrdhost_aclk_state_lock(localhost);
diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc
index 769806f90b..af45740a9d 100644
--- a/aclk/schema-wrappers/capability.cc
+++ b/aclk/schema-wrappers/capability.cc
@@ -4,7 +4,7 @@
#include "capability.h"
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) {
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) {
proto_capa->set_name(c_capa->name);
proto_capa->set_enabled(c_capa->enabled);
proto_capa->set_version(c_capa->version);
diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h
index 9517a87163..c6085a44b8 100644
--- a/aclk/schema-wrappers/capability.h
+++ b/aclk/schema-wrappers/capability.h
@@ -18,7 +18,7 @@ struct capability {
#include "proto/aclk/v1/lib.pb.h"
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa);
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa);
#endif
#endif /* ACLK_SCHEMA_CAPABILITY_H */
diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc
index 8be6b54d70..20b40ece20 100644
--- a/aclk/schema-wrappers/connection.cc
+++ b/aclk/schema-wrappers/connection.cc
@@ -29,7 +29,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio
timestamp->set_nanos(tv.tv_usec * 1000);
if (data->capabilities) {
- struct capability *capa = data->capabilities;
+ const struct capability *capa = data->capabilities;
while (capa->name) {
aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities();
capability_set(proto_capa, capa);
diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h
index fcbe6bd595..0356c7d78e 100644
--- a/aclk/schema-wrappers/connection.h
+++ b/aclk/schema-wrappers/connection.h
@@ -17,7 +17,7 @@ typedef struct {
unsigned int lwt:1;
- struct capability *capabilities;
+ const struct capability *capabilities;
// TODO in future optional fields
// > 15 optional fields:
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
index 18f5cc6e1d..db1fa6449c 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -29,7 +29,7 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
timestamp->set_nanos(tv.tv_usec * 1000);
if (data->capabilities) {
- struct capability *capa = data->capabilities;
+ const struct capability *capa = data->capabilities;
while (capa->name) {
aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
capability_set(proto_capa, capa);
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
index c27729d15c..dac0d8fe09 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/aclk/schema-wrappers/node_connection.h
@@ -19,7 +19,7 @@ typedef struct {
int64_t session_id;
int32_t hops;
- struct capability *capabilities;
+ const struct capability *capabilities;
} node_instance_connection_t;
char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c
index 3f4ed7e5f2..afe774997d 100644
--- a/database/sqlite/sqlite_aclk_node.c
+++ b/database/sqlite/sqlite_aclk_node.c
@@ -4,6 +4,7 @@
#include "sqlite_aclk_node.h"
#include "../../aclk/aclk_contexts_api.h"
+#include "../../aclk/aclk_capas.h"
#ifdef ENABLE_ACLK
DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) {
@@ -71,14 +72,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.ml_info.ml_capable = ml_capable(localhost);
node_info.ml_info.ml_enabled = ml_enabled(wc->host);
- struct capability instance_caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(wc->host) },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_info.node_instance_capabilities = instance_caps;
+ node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host);
now_realtime_timeval(&node_info.updated_at);
@@ -126,6 +120,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
rrd_unlock();
freez(node_info.claim_id);
+ freez(node_info.node_instance_capabilities);
freez(host_version);
wc->node_collectors_send = now_realtime_sec();