summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-09-08 11:14:53 +0200
committerGitHub <noreply@github.com>2020-09-08 11:14:53 +0200
commitae7a9aa7ed8109eda7f681fa7583dbb22dce0172 (patch)
tree1321b576124eb542d60f526b8e360b03637f13da /aclk
parent1982291959543f50d00af820701ace3a1495bd82 (diff)
ACLK Version Negotiation (#9819)
* implements version negotiation for ACLK
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_common.c6
-rw-r--r--aclk/aclk_common.h25
-rw-r--r--aclk/aclk_query.c23
-rw-r--r--aclk/agent_cloud_link.c262
-rw-r--r--aclk/agent_cloud_link.h7
-rw-r--r--aclk/mqtt.c4
-rw-r--r--aclk/mqtt.h2
7 files changed, 259 insertions, 70 deletions
diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c
index 4ef5b97617..d172f27c38 100644
--- a/aclk/aclk_common.c
+++ b/aclk/aclk_common.c
@@ -4,10 +4,14 @@
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+int aclk_disable_runtime = 0;
+
struct aclk_shared_state aclk_shared_state = {
.metadata_submitted = ACLK_METADATA_REQUIRED,
.agent_state = AGENT_INITIALIZING,
- .last_popcorn_interrupt = 0
+ .last_popcorn_interrupt = 0,
+ .version_neg = 0,
+ .version_neg_wait_till = 0
};
struct {
diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h
index 62295fdf41..0d85a6db9d 100644
--- a/aclk/aclk_common.h
+++ b/aclk/aclk_common.h
@@ -7,6 +7,24 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+// minimum and maximum supported version of ACLK
+// in this version of agent
+#define ACLK_VERSION_MIN 1
+#define ACLK_VERSION_MAX 1
+
+// Version negotiation messages have they own versioning
+// this is also used for LWT message as we set that up
+// before version negotiation
+#define ACLK_VERSION_NEG_VERSION 1
+
+// Maximum time to wait for version negotiation before aborting
+// and defaulting to oldest supported version
+#define VERSION_NEG_TIMEOUT 3
+
+#if ACLK_VERSION_MIN > ACLK_VERSION_MAX
+#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
+#endif
+
typedef enum aclk_cmd {
ACLK_CMD_CLOUD,
ACLK_CMD_ONCONNECT,
@@ -31,6 +49,11 @@ extern struct aclk_shared_state {
ACLK_METADATA_STATE metadata_submitted;
ACLK_AGENT_STATE agent_state;
time_t last_popcorn_interrupt;
+
+ // read only while ACLK connected
+ // protect by lock otherwise
+ int version_neg;
+ usec_t version_neg_wait_till;
} aclk_shared_state;
typedef enum aclk_proxy_type {
@@ -53,4 +76,6 @@ void safe_log_proxy_censor(char *proxy);
int aclk_decode_base_url(char *url, char **aclk_hostname, char **aclk_port);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
+extern int aclk_disable_runtime;
+
#endif //ACLK_COMMON_H
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 9971ea8cb9..cf9a63ff6a 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -348,7 +348,7 @@ static int aclk_execute_query(struct aclk_query *this_query)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
@@ -537,16 +537,31 @@ void *aclk_query_main_thread(void *ptr)
}
while (!netdata_exit) {
+ if(aclk_disable_runtime) {
+ sleep(1);
+ continue;
+ }
ACLK_SHARED_STATE_LOCK;
- if (unlikely(!aclk_shared_state.metadata_submitted)) {
- ACLK_SHARED_STATE_UNLOCK;
+ if (unlikely(!aclk_shared_state.version_neg)) {
+ if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
+ ACLK_SHARED_STATE_UNLOCK;
+ info("Waiting for ACLK Version Negotiation message from Cloud");
+ sleep(1);
+ continue;
+ }
+ errno = 0;
+ error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
+ " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
+ aclk_shared_state.version_neg = ACLK_VERSION_MIN;
+ }
+ if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) {
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ ACLK_SHARED_STATE_UNLOCK;
errno = 0;
error("ACLK failed to queue on_connect command");
sleep(1);
continue;
}
- ACLK_SHARED_STATE_LOCK;
aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED;
}
ACLK_SHARED_STATE_UNLOCK;
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index 97ced1c19d..ee64b1faf9 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -100,6 +100,15 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
data->version = e->data.number;
break;
}
+ if (!strcmp(e->name, "min-version")) {
+ data->min_version = e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "max-version")) {
+ data->max_version = e->data.number;
+ break;
+ }
+
break;
case JSON_BOOLEAN:
@@ -513,7 +522,7 @@ static void aclk_graceful_disconnect()
// Send a graceful disconnect message
BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, 0, 0);
+ aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg);
buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
buffer_free(b);
@@ -820,12 +829,36 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num)
int rc;
aclk_connecting = 1;
create_publish_base_topic();
+ ACLK_SHARED_STATE_LOCK;
+ aclk_shared_state.version_neg = 0;
+ aclk_shared_state.version_neg_wait_till = 0;
+ ACLK_SHARED_STATE_UNLOCK;
rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
if (unlikely(rc)) {
error("Failed to initialize the agent cloud link library");
}
}
+// Sends "hello" message to negotiate ACLK version with cloud
+static inline void aclk_hello_msg()
+{
+ BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+
+ char *msg_id = create_uuid();
+
+ ACLK_SHARED_STATE_LOCK;
+ aclk_shared_state.version_neg = 0;
+ aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
+ ACLK_SHARED_STATE_UNLOCK;
+
+ //Hello message is versioned separatelly from the rest of the protocol
+ aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
+ buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX);
+ aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id);
+ freez(msg_id);
+ buffer_free(buf);
+}
+
/**
* Main agent cloud link thread
*
@@ -932,6 +965,11 @@ void *aclk_main(void *ptr)
/* size_t write_q, write_q_bytes, read_q;
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
+ if (aclk_disable_runtime && !aclk_connected) {
+ sleep(1);
+ continue;
+ }
+
if (aclk_kill_link) { // User has reloaded the claiming state
aclk_kill_link = 0;
aclk_graceful_disconnect();
@@ -978,9 +1016,9 @@ void *aclk_main(void *ptr)
stress_counter = 0;
}*/
- // TODO: Move to on-connect
if (unlikely(!aclk_subscribed)) {
aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
+ aclk_hello_msg();
}
if (unlikely(!query_threads.thread_list)) {
@@ -1117,6 +1155,7 @@ void aclk_connect()
aclk_connected = 1;
aclk_reconnect_delay(0);
+
QUERY_THREAD_WAKEUP;
return;
}
@@ -1138,7 +1177,7 @@ void aclk_disconnect()
aclk_force_reconnect = 1;
}
-inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
+inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version)
{
uuid_t uuid;
char uuid_str[36 + 1];
@@ -1164,9 +1203,9 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts
"\t\"connect\": %ld,\n"
"\t\"connect-offset-usec\": %llu,\n"
"\t\"version\": %d",
- type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);
+ type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version);
- debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
+ debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs);
}
@@ -1194,9 +1233,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
else
- aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
@@ -1239,9 +1278,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "update", msg_id, 0, 0);
+ aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg);
else
- aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
+ aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
@@ -1341,7 +1380,7 @@ int aclk_send_single_chart(char *hostname, char *chart)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
+ aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
rrdset2json(st, local_buffer, NULL, NULL, 1);
@@ -1418,7 +1457,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
char *msg_id = create_uuid();
buffer_flush(local_buffer);
- aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
+ aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
@@ -1443,75 +1482,180 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
/*
* Parse the incoming payload and queue a command if valid
*/
-int aclk_handle_cloud_request(char *payload)
+static int aclk_handle_cloud_request(struct aclk_request *cloud_to_agent)
{
- struct aclk_request cloud_to_agent = {
- .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
- };
-
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
-
+ errno = 0;
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
- debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
+ debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");
ACLK_SHARED_STATE_UNLOCK;
- return 0;
+ return 1;
}
ACLK_SHARED_STATE_UNLOCK;
- if (unlikely(!payload)) {
- debug(D_ACLK, "ACLK incoming message is empty");
- return 0;
+ if (unlikely(cloud_to_agent->version != aclk_shared_state.version_neg)) {
+ error("Received \"http\" message from Cloud with version %d, but ACLK version %d is used", cloud_to_agent->version, aclk_shared_state.version_neg);
+ return 1;
}
- debug(D_ACLK, "ACLK incoming message (%s)", payload);
+ if (unlikely(!cloud_to_agent->payload)) {
+ error("payload missing");
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->callback_topic)) {
+ error("callback_topic missing");
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->msg_id)) {
+ error("msg_id missing");
+ return 1;
+ }
- int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+ if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
+ debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
+
+ // Note: the payload comes from the callback and it will be automatically freed
+ return 0;
+}
+
+// This handles `version` message from cloud used to negotiate
+// protocol version we will use
+static int aclk_handle_version_response(struct aclk_request *cloud_to_agent)
+{
+ int version = -1;
+ errno = 0;
+
+ if(unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
+ error("Unsuported version of \"version\" message from cloud. Expected %d, Got %d", ACLK_VERSION_NEG_VERSION, cloud_to_agent->version);
+ return 1;
+ }
+ if(unlikely(!cloud_to_agent->min_version)) {
+ error("Min version missing or 0");
+ return 1;
+ }
+ if(unlikely(!cloud_to_agent->max_version)) {
+ error("Max version missing or 0");
+ return 1;
+ }
+ if(unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
+ error("Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, cloud_to_agent->min_version);
+ return 1;
+ }
- if (unlikely(
- JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
- !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
- strcmp(cloud_to_agent.type_id, "http"))) {
- if (JSON_OK != rc)
- error("Malformed json request (%s)", payload);
+ if(unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
+ error("Agent too old for this cloud. Minimum version required by cloud %d. Maximum version supported by this agent %d.", cloud_to_agent->min_version, ACLK_VERSION_MAX);
+ aclk_kill_link = 1;
+ aclk_disable_runtime = 1;
+ return 1;
+ }
+ if(unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
+ error("Cloud version is too old for this agent. Maximum version supported by cloud %d. Minimum (oldest) version supported by this agent %d.", cloud_to_agent->max_version, ACLK_VERSION_MIN);
+ aclk_kill_link = 1;
+ return 1;
+ }
+
+ version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
- if (cloud_to_agent.version > ACLK_VERSION)
- error("Unsupported version in JSON request %d", cloud_to_agent.version);
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
+ errno = 0;
+ error("The \"version\" message came too late ignoring.");
+ goto err_cleanup;
+ }
+ if (unlikely(aclk_shared_state.version_neg)) {
+ errno = 0;
+ error("Version has already been set to %d", aclk_shared_state.version_neg);
+ goto err_cleanup;
+ }
+ aclk_shared_state.version_neg = version;
+ ACLK_SHARED_STATE_UNLOCK;
- if (cloud_to_agent.payload)
- freez(cloud_to_agent.payload);
+ info("Choosing version %d of ACLK", version);
- if (cloud_to_agent.type_id)
- freez(cloud_to_agent.type_id);
+ return 0;
- if (cloud_to_agent.msg_id)
- freez(cloud_to_agent.msg_id);
+err_cleanup:
+ ACLK_SHARED_STATE_UNLOCK;
+ return 1;
+}
- if (cloud_to_agent.callback_topic)
- freez(cloud_to_agent.callback_topic);
+struct {
+ char *name;
+ int(*fnc)(struct aclk_request *cloud_to_agent);
+} aclk_incoming_msg_types[] = {
+ { .name = "http", .fnc = aclk_handle_cloud_request },
+ { .name = "version", .fnc = aclk_handle_version_response },
+ { .name = NULL, .fnc = NULL }
+};
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
- }
+int aclk_handle_cloud_message(char *payload)
+{
+ struct aclk_request cloud_to_agent;
+ memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
- return 1;
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
}
- // Checked to be "http", not needed anymore
- if (likely(cloud_to_agent.type_id)) {
- freez(cloud_to_agent.type_id);
- cloud_to_agent.type_id = NULL;
+ if (unlikely(!payload)) {
+ errno = 0;
+ error("ACLK incoming message is empty");
+ goto err_cleanup_nojson;
}
- if (unlikely(aclk_queue_query(cloud_to_agent.callback_topic, NULL, cloud_to_agent.msg_id, cloud_to_agent.payload, 0, 0, ACLK_CMD_CLOUD)))
- debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
+ debug(D_ACLK, "ACLK incoming message (%s)", payload);
- // Note: the payload comes from the callback and it will be automatically freed
- return 0;
+ int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+
+ if (unlikely(rc != JSON_OK)) {
+ errno = 0;
+ error("Malformed json request (%s)", payload);
+ goto err_cleanup;
+ }
+
+ if (!cloud_to_agent.type_id) {
+ errno = 0;
+ error("Cloud message is missing compulsory key \"type\"");
+ goto err_cleanup;
+ }
+
+ for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
+ if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
+ if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent))) {
+ // in case of success handler is supposed to clean up after itself
+ // or as in the case of aclk_handle_cloud_request take
+ // ownership of the pointers (done to avoid copying)
+ // see what `aclk_queue_query` parameter `internal` does
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
+ goto err_cleanup;
+ }
+ }
+
+ errno = 0;
+ error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
+
+err_cleanup:
+ if (cloud_to_agent.payload)
+ freez(cloud_to_agent.payload);
+ if (cloud_to_agent.type_id)
+ freez(cloud_to_agent.type_id);
+ if (cloud_to_agent.msg_id)
+ freez(cloud_to_agent.msg_id);
+ if (cloud_to_agent.callback_topic)
+ freez(cloud_to_agent.callback_topic);
+
+err_cleanup_nojson:
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ return 1;
}
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index 54399f855b..9651070ebe 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -7,7 +7,6 @@
#include "mqtt.h"
#include "aclk_common.h"
-#define ACLK_VERSION 1
#define ACLK_THREAD_NAME "ACLK_Query"
#define ACLK_CHART_TOPIC "outbound/meta"
#define ACLK_ALARMS_TOPIC "outbound/alarms"
@@ -35,6 +34,8 @@ struct aclk_request {
char *callback_topic;
char *payload;
int version;
+ int min_version;
+ int max_version;
};
typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION;
@@ -72,8 +73,8 @@ char *create_publish_base_topic();
int aclk_send_single_chart(char *host, char *chart);
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us);
-int aclk_handle_cloud_request(char *payload);
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version);
+int aclk_handle_cloud_message(char *payload);
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
void aclk_alarm_reload();
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index 26164bbd92..7973627610 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -26,7 +26,7 @@ void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosqu
UNUSED(mosq);
UNUSED(obj);
- aclk_handle_cloud_request(msg->payload);
+ aclk_handle_cloud_message(msg->payload);
}
void publish_callback(struct mosquitto *mosq, void *obj, int rc)
@@ -306,7 +306,7 @@ int _link_set_lwt(char *sub_topic, int qos)
usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1;
BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC);
+ aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION);
buffer_strcat(b, ", \"payload\": \"unexpected\" }");
rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0);
buffer_free(b);
diff --git a/aclk/mqtt.h b/aclk/mqtt.h
index 3b2b41f269..53fe79d768 100644
--- a/aclk/mqtt.h
+++ b/aclk/mqtt.h
@@ -19,7 +19,7 @@ const char *_link_strerror(int rc);
int _link_set_lwt(char *topic, int qos);
-int aclk_handle_cloud_request(char *);
+int aclk_handle_cloud_message(char *);
extern char *get_topic(char *sub_topic, char *final_topic, int max_size);
#endif //NETDATA_MQTT_H