summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2022-11-16 04:05:33 +0700
committerGitHub <noreply@github.com>2022-11-15 23:05:33 +0200
commitf289ba344990ae1f16f6019c72de1e52dcf260bf (patch)
tree3ba507d9d104bf7cbce257baa76ae3f7301177da /aclk
parent224b051a2b2bab39a4b536e531ab9ca590bf31bb (diff)
Add 'funcs' capability (#13992)
* cleanup capas + add func capa * make it const * fixes * freez
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c21
-rw-r--r--aclk/aclk_capas.c47
-rw-r--r--aclk/aclk_capas.h14
-rw-r--r--aclk/aclk_rx_msgs.c12
-rw-r--r--aclk/aclk_tx_msgs.c14
-rw-r--r--aclk/schema-wrappers/capability.cc2
-rw-r--r--aclk/schema-wrappers/capability.h2
-rw-r--r--aclk/schema-wrappers/connection.cc2
-rw-r--r--aclk/schema-wrappers/connection.h2
-rw-r--r--aclk/schema-wrappers/node_connection.cc2
-rw-r--r--aclk/schema-wrappers/node_connection.h2
11 files changed, 78 insertions, 42 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 9a0ffc0700..3b035b849d 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -13,6 +13,7 @@
#include "aclk_rx_msgs.h"
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_capas.h"
#include "aclk_proxy.h"
@@ -779,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_agent_capas();
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -825,14 +819,7 @@ void aclk_send_node_instances()
uuid_unparse_lower(list->host_id, host_id);
RRDHOST *host = rrdhost_find_by_guid(host_id);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -841,6 +828,8 @@ void aclk_send_node_instances()
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
+
+ freez((void*)node_state_update.capabilities);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c
new file mode 100644
index 0000000000..df9d18f638
--- /dev/null
+++ b/aclk/aclk_capas.c
@@ -0,0 +1,47 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_capas.h"
+
+#include "ml/ml.h"
+
+const struct capability *aclk_get_agent_capas()
+{
+ static struct capability agent_capabilities[] = {
+ { .name = "json", .version = 2, .enabled = 0 },
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = 0, .enabled = 0 },
+ { .name = "mc", .version = 0, .enabled = 0 },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 1, .enabled = 1 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ agent_capabilities[2].version = ml_capable() ? 1 : 0;
+ agent_capabilities[2].enabled = ml_enabled(localhost);
+
+ agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0;
+ agent_capabilities[3].enabled = enable_metric_correlations;
+
+ return agent_capabilities;
+}
+
+struct capability *aclk_get_node_instance_capas(RRDHOST *host)
+{
+ struct capability ni_caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) },
+ { .name = "mc",
+ .version = enable_metric_correlations ? metric_correlations_version : 0,
+ .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 0, .enabled = 0 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) {
+ ni_caps[4].version = 1;
+ ni_caps[4].enabled = 1;
+ }
+
+ struct capability *ret = mallocz(sizeof(ni_caps));
+ memcpy(ret, ni_caps, sizeof(ni_caps));
+ return ret;
+}
diff --git a/aclk/aclk_capas.h b/aclk/aclk_capas.h
new file mode 100644
index 0000000000..c39a197b8f
--- /dev/null
+++ b/aclk/aclk_capas.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_CAPAS_H
+#define ACLK_CAPAS_H
+
+#include "daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+#include "schema-wrappers/capability.h"
+
+const struct capability *aclk_get_agent_capas();
+struct capability *aclk_get_node_instance_capas(RRDHOST *host);
+
+#endif /* ACLK_CAPAS_H */
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 6a1fc0534a..83bc5508be 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk.h"
+#include "aclk_capas.h"
#include "schema-wrappers/proto_2_json.h"
@@ -289,20 +290,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
}
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
+ freez((void *)node_state_update.capabilities);
+
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index e46a6395e4..532b964ad3 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_util.h"
#include "aclk_stats.h"
#include "aclk.h"
+#include "aclk_capas.h"
#include "schema-wrappers/proto_2_json.h"
@@ -211,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
size_t len;
uint16_t pid;
- struct capability agent_capabilities[] = {
- { .name = "json", .version = 2, .enabled = 0 },
- { .name = "proto", .version = 1, .enabled = 1 },
-#ifdef ENABLE_ML
- { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
-#endif
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = 1 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
-
update_agent_connection_t conn = {
.reachable = (reachable ? 1 : 0),
.lwt = 0,
.session_id = aclk_session_newarch,
- .capabilities = agent_capabilities
+ .capabilities = aclk_get_agent_capas()
};
rrdhost_aclk_state_lock(localhost);
diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc
index 769806f90b..af45740a9d 100644
--- a/aclk/schema-wrappers/capability.cc
+++ b/aclk/schema-wrappers/capability.cc
@@ -4,7 +4,7 @@
#include "capability.h"
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) {
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) {
proto_capa->set_name(c_capa->name);
proto_capa->set_enabled(c_capa->enabled);
proto_capa->set_version(c_capa->version);
diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h
index 9517a87163..c6085a44b8 100644
--- a/aclk/schema-wrappers/capability.h
+++ b/aclk/schema-wrappers/capability.h
@@ -18,7 +18,7 @@ struct capability {
#include "proto/aclk/v1/lib.pb.h"
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa);
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa);
#endif
#endif /* ACLK_SCHEMA_CAPABILITY_H */
diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc
index 8be6b54d70..20b40ece20 100644
--- a/aclk/schema-wrappers/connection.cc
+++ b/aclk/schema-wrappers/connection.cc
@@ -29,7 +29,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio
timestamp->set_nanos(tv.tv_usec * 1000);
if (data->capabilities) {
- struct capability *capa = data->capabilities;
+ const struct capability *capa = data->capabilities;
while (capa->name) {
aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities();
capability_set(proto_capa, capa);
diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h
index fcbe6bd595..0356c7d78e 100644
--- a/aclk/schema-wrappers/connection.h
+++ b/aclk/schema-wrappers/connection.h
@@ -17,7 +17,7 @@ typedef struct {
unsigned int lwt:1;
- struct capability *capabilities;
+ const struct capability *capabilities;
// TODO in future optional fields
// > 15 optional fields:
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
index 18f5cc6e1d..db1fa6449c 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -29,7 +29,7 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
timestamp->set_nanos(tv.tv_usec * 1000);
if (data->capabilities) {
- struct capability *capa = data->capabilities;
+ const struct capability *capa = data->capabilities;
while (capa->name) {
aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
capability_set(proto_capa, capa);
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
index c27729d15c..dac0d8fe09 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/aclk/schema-wrappers/node_connection.h
@@ -19,7 +19,7 @@ typedef struct {
int64_t session_id;
int32_t hops;
- struct capability *capabilities;
+ const struct capability *capabilities;
} node_instance_connection_t;
char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);