// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_util.h"
#include "aclk_proxy.h"
#include "daemon/common.h"
usec_t aclk_session_newarch = 0;
aclk_env_t *aclk_env = NULL;
int chart_batch_id;
aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
if (!strcmp(str, "json")) {
return ACLK_ENC_JSON;
}
if (!strcmp(str, "proto")) {
return ACLK_ENC_PROTO;
}
return ACLK_ENC_UNKNOWN;
}
aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) {
if (!strcmp(str, "MQTTv3")) {
return ACLK_TRP_MQTT_3_1_1;
}
if (!strcmp(str, "MQTTv5")) {
return ACLK_TRP_MQTT_5;
}
return ACLK_TRP_UNKNOWN;
}
void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) {
freez(trp_desc->endpoint);
}
void aclk_env_t_destroy(aclk_env_t *env) {
freez(env->auth_endpoint);
if (env->transports) {
for (size_t i = 0; i < env->transport_count; i++) {
if(env->transports[i]) {
aclk_transport_desc_t_destroy(env->transports[i]);
freez(env->transports[i]);
env->transports[i] = NULL;
}
}
freez(env->transports);
}
if (env->capabilities) {
for (size_t i = 0; i < env->capability_count; i++)
freez(env->capabilities[i]);
freez(env->capabilities);
}
}
int aclk_env_has_capa(const char *capa)
{
for (int i = 0; i < (int) aclk_env->capability_count; i++) {
if (!strcasecmp(capa, aclk_env->capabilities[i]))
return 1;
}
return 0;
}
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
#endif
#define ACLK_TOPIC_PREFIX "/agent/"
struct aclk_topic {
enum aclk_topics topic_id;
// as received from cloud - we keep this for
// eventual topic list update when claim_id changes
char *topic_recvd;
// constructed topic
char *topic;
};
// This helps to cache finalized topics (assembled with claim_id)
// to not have to alloc or create buffer and construct topic every
// time message is sent as in old ACLK
static struct aclk_topic **aclk_topic_cache = NULL;
static size_t aclk_topic_cache_items = 0;
void free_topic_cache(void)
{
if (aclk_topic_cache) {
for (size_t i = 0; i < aclk_topic_cache_items; i++) {
freez(aclk_topic_cache[i]->topic);
freez(aclk_topic_cache[i]->topic_recvd);
freez(aclk_topic_cache[i]);
}
freez(aclk_topic_cache);
aclk_topic_cache = NULL;
aclk_topic_cache_items = 0;
}
}
#define JSON_TOPIC_KEY_TOPIC "topic"
#define JSON_TOPIC_KEY_NAME "name"
struct topic_name {
enum aclk_topics id;
// cloud name - how is it called
// in answer to /password endpoint
const char *name;
} topic_names[] = {
{ .id = ACLK_TOPICID_CHART, .name = "chart" },
{ .id = ACLK_TOPICID_ALARMS, .name = "alarms" },
{ .id = ACLK_TOPICID_METADATA, .name = "meta" },
{ .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" },
{ .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" },
{ .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" },
{ .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" },
{ .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" },
{ .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" },
{ .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" },
{ .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" },
{ .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" },
{ .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" },
{ .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" },
{ .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" },
{ .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" },
{ .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" },
{ .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" },
{ .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" },
{ .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" },
{ .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
};
enum aclk_topics compulsory_topics[] = {
// TODO remove old topics once not needed anymore
ACLK_TOPICID_CHART, //TODO from legacy
ACLK_TOPICID_ALARMS, //TODO from legacy
ACLK_TOPICID_METADATA, //TODO from legacy
ACLK_TOPICID_COMMAND,
ACLK_TOPICID_AGENT_CONN,
ACLK_TOPICID_CMD_NG_V1,
ACLK_TOPICID_CREATE_NODE,
ACLK_TOPICID_NODE_CONN,
ACLK_TOPICID_CHART_DIMS,
ACLK_TOPICID_CHART_CONFIGS_UPDATED,
ACLK_TOPICID_CHART_RESET,
ACLK_TOPICID_RETENTION_UPDATED,
ACLK_TOPICID_NODE_INFO,
ACLK_TOPICID_ALARM_LOG,
ACLK_TOPICID_ALARM_HEALTH,
ACLK_TOPICID_ALARM_CONFIG,
ACLK_TOPICID_ALARM_SNAPSHOT,
ACLK_TOPICID_NODE_COLLECTORS,
ACLK_TOPICID_CTXS_SNAPSHOT,
ACLK_TOPICID_CTXS_UPDATED,
ACLK_TOPICID_UNKNOWN
};
static enum aclk_topics topic_name_to_id(const *topic = topic_names;
while (topic->name) {
if (!strcmp(topic->name, name)) {
return topic->id;
}
topic++;
}
return ACLK_TOPICID_UNKNOWN;
}
static const char *topic_id_to_name(enum aclk_topics tid) {
struct topic_name *topic = topic_names;
while (topic->name) {
if (topic->id == tid)
return topic->name;
topic++;
}
return "unknown";
}
#define CLAIM_ID_REPLACE_TAG "#{claim_id}"
static void topic_generate_final(struct aclk_topic *t) {
char *dest;
char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG);
if (!replace_tag)
return;
rrdhost_aclk_state_lock(localhost);
if (unlikely(!localhost->aclk_state.claimed_id)) {
error("This should never be called if agent not claimed");
rrdhost_aclk_state_unlock(localhost);
return;
}
t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen