summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c44
-rw-r--r--aclk/aclk.h2
-rw-r--r--aclk/aclk_otp.c2
-rw-r--r--aclk/aclk_util.c11
-rw-r--r--aclk/aclk_util.h3
5 files changed, 49 insertions, 13 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index c7cab3ae06..801fcaa5d2 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -30,8 +30,6 @@ int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
time_t aclk_block_until = 0;
-aclk_env_t *aclk_env = NULL;
-
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -183,7 +181,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
//TODO prevent big buffer on stack
#define RX_MSGLEN_MAX 4096
-static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
+static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
{
char cmsg[RX_MSGLEN_MAX];
size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
@@ -227,7 +225,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos)
+static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
{
if (msglen > RX_MSGLEN_MAX)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@@ -264,7 +262,14 @@ static void msg_callback_new(const char *topic, const void *msg, size_t msglen,
aclk_handle_new_cloud_msg(msgtype, msg, msglen);
}
-#endif
+
+static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
+ if (aclk_use_new_cloud_arch)
+ msg_callback_new_protocol(topic, msg, msglen, qos);
+ else
+ msg_callback_old_protocol(topic, msg, msglen, qos);
+}
+#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
static void puback_callback(uint16_t packet_id)
{
@@ -600,6 +605,13 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.drop_on_publish_fail = 1
};
+#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE)
+ aclk_use_new_cloud_arch = 1;
+ info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE.");
+#else
+ aclk_use_new_cloud_arch = 0;
+#endif
+
#ifndef ACLK_DISABLE_CHALLENGE
if (aclk_env) {
aclk_env_t_destroy(aclk_env);
@@ -618,6 +630,21 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (netdata_exit)
return 1;
+#ifndef ACLK_NEWARCH_DEVMODE
+ if (aclk_env->encoding == ACLK_ENC_PROTO) {
+#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+ error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
+ continue;
+#endif
+ if (!aclk_env_has_capa("proto")) {
+ error ("Can't encoding=proto without at least \"proto\" capability.");
+ continue;
+ }
+ info("Switching ACLK to new protobuf protocol. Due to /env response.");
+ aclk_use_new_cloud_arch = 1;
+ }
+#endif
+
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
@@ -716,9 +743,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
*/
void *aclk_main(void *ptr)
{
-#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE)
- aclk_use_new_cloud_arch = 1;
-#endif
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_stats_thread *stats_thread = NULL;
@@ -753,9 +777,9 @@ void *aclk_main(void *ptr)
goto exit;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) {
-#else
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+#else
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) {
#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
diff --git a/aclk/aclk.h b/aclk/aclk.h
index e5944c04bc..87fe9d6622 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -14,8 +14,6 @@ extern time_t aclk_block_until;
extern int disconnect_req;
-extern aclk_env_t *aclk_env;
-
void *aclk_main(void *ptr);
extern netdata_mutex_t aclk_shared_state_mutex;
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index bb463cbc79..4248fc1082 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -842,7 +842,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
return 1;
}
- buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json,proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
freez(agent_id);
req.host = (char*)aclk_hostname;
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 88accc5636..81410c880c 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -7,6 +7,8 @@
int aclk_use_new_cloud_arch = 0;
usec_t aclk_session_newarch = 0;
+aclk_env_t *aclk_env = NULL;
+
int chart_batch_id;
aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
@@ -51,6 +53,15 @@ void aclk_env_t_destroy(aclk_env_t *env) {
}
}
+int aclk_env_has_capa(const char *capa)
+{
+ for (int i = 0; i < aclk_env->capability_count; i++) {
+ if (!strcasecmp(capa, aclk_env->capabilities[i]))
+ return 1;
+ }
+ return 0;
+}
+
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 750463b1c3..07de5c58a3 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -49,11 +49,14 @@ typedef struct {
aclk_backoff_t backoff;
} aclk_env_t;
+extern aclk_env_t *aclk_env;
+
aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str);
aclk_transport_type_t aclk_transport_type_t_from_str(const char *str);
void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc);
void aclk_env_t_destroy(aclk_env_t *env);
+int aclk_env_has_capa(const char *capa);
enum aclk_topics {
ACLK_TOPICID_UNKNOWN = 0,