diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2022-11-16 04:05:33 +0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-15 23:05:33 +0200 |
commit | f289ba344990ae1f16f6019c72de1e52dcf260bf (patch) | |
tree | 3ba507d9d104bf7cbce257baa76ae3f7301177da /aclk | |
parent | 224b051a2b2bab39a4b536e531ab9ca590bf31bb (diff) |
Add 'funcs' capability (#13992)
* cleanup capas + add func capa
* make it const
* fixes
* freez
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 21 | ||||
-rw-r--r-- | aclk/aclk_capas.c | 47 | ||||
-rw-r--r-- | aclk/aclk_capas.h | 14 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 12 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 14 | ||||
-rw-r--r-- | aclk/schema-wrappers/capability.cc | 2 | ||||
-rw-r--r-- | aclk/schema-wrappers/capability.h | 2 | ||||
-rw-r--r-- | aclk/schema-wrappers/connection.cc | 2 | ||||
-rw-r--r-- | aclk/schema-wrappers/connection.h | 2 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_connection.cc | 2 | ||||
-rw-r--r-- | aclk/schema-wrappers/node_connection.h | 2 |
11 files changed, 78 insertions, 42 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 9a0ffc0700..3b035b849d 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -13,6 +13,7 @@ #include "aclk_rx_msgs.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" +#include "aclk_capas.h" #include "aclk_proxy.h" @@ -779,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(node_id, (char*)node_state_update.node_id); - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + node_state_update.capabilities = aclk_get_agent_capas(); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; @@ -825,14 +819,7 @@ void aclk_send_node_instances() uuid_unparse_lower(list->host_id, host_id); RRDHOST *host = rrdhost_find_by_guid(host_id); - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + node_state_update.capabilities = aclk_get_node_instance_capas(host); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; @@ -841,6 +828,8 @@ void aclk_send_node_instances() info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); + + freez((void*)node_state_update.capabilities); freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c new file mode 100644 index 0000000000..df9d18f638 --- /dev/null +++ b/aclk/aclk_capas.c @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_capas.h" + +#include "ml/ml.h" + +const struct capability *aclk_get_agent_capas() +{ + static struct capability agent_capabilities[] = { + { .name = "json", .version = 2, .enabled = 0 }, + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = 0, .enabled = 0 }, + { .name = "mc", .version = 0, .enabled = 0 }, + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 1, .enabled = 1 }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + agent_capabilities[2].version = ml_capable() ? 1 : 0; + agent_capabilities[2].enabled = ml_enabled(localhost); + + agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0; + agent_capabilities[3].enabled = enable_metric_correlations; + + return agent_capabilities; +} + +struct capability *aclk_get_node_instance_capas(RRDHOST *host) +{ + struct capability ni_caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) }, + { .name = "mc", + .version = enable_metric_correlations ? metric_correlations_version : 0, + .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 0, .enabled = 0 }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) { + ni_caps[4].version = 1; + ni_caps[4].enabled = 1; + } + + struct capability *ret = mallocz(sizeof(ni_caps)); + memcpy(ret, ni_caps, sizeof(ni_caps)); + return ret; +} diff --git a/aclk/aclk_capas.h b/aclk/aclk_capas.h new file mode 100644 index 0000000000..c39a197b8f --- /dev/null +++ b/aclk/aclk_capas.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_CAPAS_H +#define ACLK_CAPAS_H + +#include "daemon/common.h" +#include "libnetdata/libnetdata.h" + +#include "schema-wrappers/capability.h" + +const struct capability *aclk_get_agent_capas(); +struct capability *aclk_get_node_instance_capas(RRDHOST *host); + +#endif /* ACLK_CAPAS_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 6a1fc0534a..83bc5508be 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -5,6 +5,7 @@ #include "aclk_stats.h" #include "aclk_query_queue.h" #include "aclk.h" +#include "aclk_capas.h" #include "schema-wrappers/proto_2_json.h" @@ -289,20 +290,15 @@ int create_node_instance_result(const char *msg, size_t msg_len) } } - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + node_state_update.capabilities = aclk_get_node_instance_capas(host); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); + freez((void *)node_state_update.capabilities); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index e46a6395e4..532b964ad3 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -5,6 +5,7 @@ #include "aclk_util.h" #include "aclk_stats.h" #include "aclk.h" +#include "aclk_capas.h" #include "schema-wrappers/proto_2_json.h" @@ -211,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable size_t len; uint16_t pid; - struct capability agent_capabilities[] = { - { .name = "json", .version = 2, .enabled = 0 }, - { .name = "proto", .version = 1, .enabled = 1 }, -#ifdef ENABLE_ML - { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, -#endif - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - update_agent_connection_t conn = { .reachable = (reachable ? 1 : 0), .lwt = 0, .session_id = aclk_session_newarch, - .capabilities = agent_capabilities + .capabilities = aclk_get_agent_capas() }; rrdhost_aclk_state_lock(localhost); diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc index 769806f90b..af45740a9d 100644 --- a/aclk/schema-wrappers/capability.cc +++ b/aclk/schema-wrappers/capability.cc @@ -4,7 +4,7 @@ #include "capability.h" -void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) { +void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) { proto_capa->set_name(c_capa->name); proto_capa->set_enabled(c_capa->enabled); proto_capa->set_version(c_capa->version); diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h index 9517a87163..c6085a44b8 100644 --- a/aclk/schema-wrappers/capability.h +++ b/aclk/schema-wrappers/capability.h @@ -18,7 +18,7 @@ struct capability { #include "proto/aclk/v1/lib.pb.h" -void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa); +void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa); #endif #endif /* ACLK_SCHEMA_CAPABILITY_H */ diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc index 8be6b54d70..20b40ece20 100644 --- a/aclk/schema-wrappers/connection.cc +++ b/aclk/schema-wrappers/connection.cc @@ -29,7 +29,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio timestamp->set_nanos(tv.tv_usec * 1000); if (data->capabilities) { - struct capability *capa = data->capabilities; + const struct capability *capa = data->capabilities; while (capa->name) { aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities(); capability_set(proto_capa, capa); diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h index fcbe6bd595..0356c7d78e 100644 --- a/aclk/schema-wrappers/connection.h +++ b/aclk/schema-wrappers/connection.h @@ -17,7 +17,7 @@ typedef struct { unsigned int lwt:1; - struct capability *capabilities; + const struct capability *capabilities; // TODO in future optional fields // > 15 optional fields: diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc index 18f5cc6e1d..db1fa6449c 100644 --- a/aclk/schema-wrappers/node_connection.cc +++ b/aclk/schema-wrappers/node_connection.cc @@ -29,7 +29,7 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect timestamp->set_nanos(tv.tv_usec * 1000); if (data->capabilities) { - struct capability *capa = data->capabilities; + const struct capability *capa = data->capabilities; while (capa->name) { aclk_lib::v1::Capability *proto_capa = msg.add_capabilities(); capability_set(proto_capa, capa); diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h index c27729d15c..dac0d8fe09 100644 --- a/aclk/schema-wrappers/node_connection.h +++ b/aclk/schema-wrappers/node_connection.h @@ -19,7 +19,7 @@ typedef struct { int64_t session_id; int32_t hops; - struct capability *capabilities; + const struct capability *capabilities; } node_instance_connection_t; char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); |