diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-09-08 11:14:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-08 11:14:53 +0200 |
commit | ae7a9aa7ed8109eda7f681fa7583dbb22dce0172 (patch) | |
tree | 1321b576124eb542d60f526b8e360b03637f13da /aclk | |
parent | 1982291959543f50d00af820701ace3a1495bd82 (diff) |
ACLK Version Negotiation (#9819)
* implements version negotiation for ACLK
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_common.c | 6 | ||||
-rw-r--r-- | aclk/aclk_common.h | 25 | ||||
-rw-r--r-- | aclk/aclk_query.c | 23 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 262 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 7 | ||||
-rw-r--r-- | aclk/mqtt.c | 4 | ||||
-rw-r--r-- | aclk/mqtt.h | 2 |
7 files changed, 259 insertions, 70 deletions
diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c index 4ef5b97617..d172f27c38 100644 --- a/aclk/aclk_common.c +++ b/aclk/aclk_common.c @@ -4,10 +4,14 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +int aclk_disable_runtime = 0; + struct aclk_shared_state aclk_shared_state = { .metadata_submitted = ACLK_METADATA_REQUIRED, .agent_state = AGENT_INITIALIZING, - .last_popcorn_interrupt = 0 + .last_popcorn_interrupt = 0, + .version_neg = 0, + .version_neg_wait_till = 0 }; struct { diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h index 62295fdf41..0d85a6db9d 100644 --- a/aclk/aclk_common.h +++ b/aclk/aclk_common.h @@ -7,6 +7,24 @@ extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) +// minimum and maximum supported version of ACLK +// in this version of agent +#define ACLK_VERSION_MIN 1 +#define ACLK_VERSION_MAX 1 + +// Version negotiation messages have they own versioning +// this is also used for LWT message as we set that up +// before version negotiation +#define ACLK_VERSION_NEG_VERSION 1 + +// Maximum time to wait for version negotiation before aborting +// and defaulting to oldest supported version +#define VERSION_NEG_TIMEOUT 3 + +#if ACLK_VERSION_MIN > ACLK_VERSION_MAX +#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" +#endif + typedef enum aclk_cmd { ACLK_CMD_CLOUD, ACLK_CMD_ONCONNECT, @@ -31,6 +49,11 @@ extern struct aclk_shared_state { ACLK_METADATA_STATE metadata_submitted; ACLK_AGENT_STATE agent_state; time_t last_popcorn_interrupt; + + // read only while ACLK connected + // protect by lock otherwise + int version_neg; + usec_t version_neg_wait_till; } aclk_shared_state; typedef enum aclk_proxy_type { @@ -53,4 +76,6 @@ void safe_log_proxy_censor(char *proxy); int aclk_decode_base_url(char *url, char **aclk_hostname, char **aclk_port); const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); +extern int aclk_disable_runtime; + #endif //ACLK_COMMON_H diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 9971ea8cb9..cf9a63ff6a 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -348,7 +348,7 @@ static int aclk_execute_query(struct aclk_query *this_query) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0); + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); @@ -537,16 +537,31 @@ void *aclk_query_main_thread(void *ptr) } while (!netdata_exit) { + if(aclk_disable_runtime) { + sleep(1); + continue; + } ACLK_SHARED_STATE_LOCK; - if (unlikely(!aclk_shared_state.metadata_submitted)) { - ACLK_SHARED_STATE_UNLOCK; + if (unlikely(!aclk_shared_state.version_neg)) { + if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { + ACLK_SHARED_STATE_UNLOCK; + info("Waiting for ACLK Version Negotiation message from Cloud"); + sleep(1); + continue; + } + errno = 0; + error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." + " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); + aclk_shared_state.version_neg = ACLK_VERSION_MIN; + } + if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) { if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + ACLK_SHARED_STATE_UNLOCK; errno = 0; error("ACLK failed to queue on_connect command"); sleep(1); continue; } - ACLK_SHARED_STATE_LOCK; aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED; } ACLK_SHARED_STATE_UNLOCK; diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index 97ced1c19d..ee64b1faf9 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -100,6 +100,15 @@ int cloud_to_agent_parse(JSON_ENTRY *e) data->version = e->data.number; break; } + if (!strcmp(e->name, "min-version")) { + data->min_version = e->data.number; + break; + } + if (!strcmp(e->name, "max-version")) { + data->max_version = e->data.number; + break; + } + break; case JSON_BOOLEAN: @@ -513,7 +522,7 @@ static void aclk_graceful_disconnect() // Send a graceful disconnect message BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0); + aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg); buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n"); aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); buffer_free(b); @@ -820,12 +829,36 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num) int rc; aclk_connecting = 1; create_publish_base_topic(); + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = 0; + ACLK_SHARED_STATE_UNLOCK; rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password); if (unlikely(rc)) { error("Failed to initialize the agent cloud link library"); } } +// Sends "hello" message to negotiate ACLK version with cloud +static inline void aclk_hello_msg() +{ + BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + + char *msg_id = create_uuid(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + ACLK_SHARED_STATE_UNLOCK; + + //Hello message is versioned separatelly from the rest of the protocol + aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); + buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX); + aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id); + freez(msg_id); + buffer_free(buf); +} + /** * Main agent cloud link thread * @@ -932,6 +965,11 @@ void *aclk_main(void *ptr) /* size_t write_q, write_q_bytes, read_q; lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ + if (aclk_disable_runtime && !aclk_connected) { + sleep(1); + continue; + } + if (aclk_kill_link) { // User has reloaded the claiming state aclk_kill_link = 0; aclk_graceful_disconnect(); @@ -978,9 +1016,9 @@ void *aclk_main(void *ptr) stress_counter = 0; }*/ - // TODO: Move to on-connect if (unlikely(!aclk_subscribed)) { aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1); + aclk_hello_msg(); } if (unlikely(!query_threads.thread_list)) { @@ -1117,6 +1155,7 @@ void aclk_connect() aclk_connected = 1; aclk_reconnect_delay(0); + QUERY_THREAD_WAKEUP; return; } @@ -1138,7 +1177,7 @@ void aclk_disconnect() aclk_force_reconnect = 1; } -inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us) +inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version) { uuid_t uuid; char uuid_str[36 + 1]; @@ -1164,9 +1203,9 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts "\t\"connect\": %ld,\n" "\t\"connect-offset-usec\": %llu,\n" "\t\"version\": %d", - type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION); + type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version); - debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs); + debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs); } @@ -1194,9 +1233,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0); + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us); + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); @@ -1239,9 +1278,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted) // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "update", msg_id, 0, 0); + aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us); + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"info\" : "); @@ -1341,7 +1380,7 @@ int aclk_send_single_chart(char *hostname, char *chart) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "chart", msg_id, 0, 0); + aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); rrdset2json(st, local_buffer, NULL, NULL, 1); @@ -1418,7 +1457,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) char *msg_id = create_uuid(); buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id, 0, 0); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); @@ -1443,75 +1482,180 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) /* * Parse the incoming payload and queue a command if valid */ -int aclk_handle_cloud_request(char *payload) +static int aclk_handle_cloud_request(struct aclk_request *cloud_to_agent) { - struct aclk_request cloud_to_agent = { - .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0 - }; - - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_recvd++; - ACLK_STATS_UNLOCK; - } - + errno = 0; ACLK_SHARED_STATE_LOCK; if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { - debug(D_ACLK, "Ignoring cloud request; agent not in stable state"); + debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state"); ACLK_SHARED_STATE_UNLOCK; - return 0; + return 1; } ACLK_SHARED_STATE_UNLOCK; - if (unlikely(!payload)) { - debug(D_ACLK, "ACLK incoming message is empty"); - return 0; + if (unlikely(cloud_to_agent->version != aclk_shared_state.version_neg)) { + error("Received \"http\" message from Cloud with version %d, but ACLK version %d is used", cloud_to_agent->version, aclk_shared_state.version_neg); + return 1; } - debug(D_ACLK, "ACLK incoming message (%s)", payload); + if (unlikely(!cloud_to_agent->payload)) { + error("payload missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("callback_topic missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("msg_id missing"); + return 1; + } - int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) + debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); + + // Note: the payload comes from the callback and it will be automatically freed + return 0; +} + +// This handles `version` message from cloud used to negotiate +// protocol version we will use +static int aclk_handle_version_response(struct aclk_request *cloud_to_agent) +{ + int version = -1; + errno = 0; + + if(unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { + error("Unsuported version of \"version\" message from cloud. Expected %d, Got %d", ACLK_VERSION_NEG_VERSION, cloud_to_agent->version); + return 1; + } + if(unlikely(!cloud_to_agent->min_version)) { + error("Min version missing or 0"); + return 1; + } + if(unlikely(!cloud_to_agent->max_version)) { + error("Max version missing or 0"); + return 1; + } + if(unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { + error("Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, cloud_to_agent->min_version); + return 1; + } - if (unlikely( - JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id || - !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION || - strcmp(cloud_to_agent.type_id, "http"))) { - if (JSON_OK != rc) - error("Malformed json request (%s)", payload); + if(unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { + error("Agent too old for this cloud. Minimum version required by cloud %d. Maximum version supported by this agent %d.", cloud_to_agent->min_version, ACLK_VERSION_MAX); + aclk_kill_link = 1; + aclk_disable_runtime = 1; + return 1; + } + if(unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { + error("Cloud version is too old for this agent. Maximum version supported by cloud %d. Minimum (oldest) version supported by this agent %d.", cloud_to_agent->max_version, ACLK_VERSION_MIN); + aclk_kill_link = 1; + return 1; + } + + version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); - if (cloud_to_agent.version > ACLK_VERSION) - error("Unsupported version in JSON request %d", cloud_to_agent.version); + ACLK_SHARED_STATE_LOCK; + if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + errno = 0; + error("The \"version\" message came too late ignoring."); + goto err_cleanup; + } + if (unlikely(aclk_shared_state.version_neg)) { + errno = 0; + error("Version has already been set to %d", aclk_shared_state.version_neg); + goto err_cleanup; + } + aclk_shared_state.version_neg = version; + ACLK_SHARED_STATE_UNLOCK; - if (cloud_to_agent.payload) - freez(cloud_to_agent.payload); + info("Choosing version %d of ACLK", version); - if (cloud_to_agent.type_id) - freez(cloud_to_agent.type_id); + return 0; - if (cloud_to_agent.msg_id) - freez(cloud_to_agent.msg_id); +err_cleanup: + ACLK_SHARED_STATE_UNLOCK; + return 1; +} - if (cloud_to_agent.callback_topic) - freez(cloud_to_agent.callback_topic); +struct { + char *name; + int(*fnc)(struct aclk_request *cloud_to_agent); +} aclk_incoming_msg_types[] = { + { .name = "http", .fnc = aclk_handle_cloud_request }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_err++; - ACLK_STATS_UNLOCK; - } +int aclk_handle_cloud_message(char *payload) +{ + struct aclk_request cloud_to_agent; + memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); - return 1; + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_recvd++; + ACLK_STATS_UNLOCK; } - // Checked to be "http", not needed anymore - if (likely(cloud_to_agent.type_id)) { - freez(cloud_to_agent.type_id); - cloud_to_agent.type_id = NULL; + if (unlikely(!payload)) { + errno = 0; + error("ACLK incoming message is empty"); + goto err_cleanup_nojson; } - if (unlikely(aclk_queue_query(cloud_to_agent.callback_topic, NULL, cloud_to_agent.msg_id, cloud_to_agent.payload, 0, 0, ACLK_CMD_CLOUD))) - debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload); + debug(D_ACLK, "ACLK incoming message (%s)", payload); - // Note: the payload comes from the callback and it will be automatically freed - return 0; + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(rc != JSON_OK)) { + errno = 0; + error("Malformed json request (%s)", payload); + goto err_cleanup; + } + + if (!cloud_to_agent.type_id) { + errno = 0; + error("Cloud message is missing compulsory key \"type\""); + goto err_cleanup; + } + + for (int i = 0; aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { + if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent))) { + // in case of success handler is supposed to clean up after itself + // or as in the case of aclk_handle_cloud_request take + // ownership of the pointers (done to avoid copying) + // see what `aclk_queue_query` parameter `internal` does + freez(cloud_to_agent.type_id); + return 0; + } + goto err_cleanup; + } + } + + errno = 0; + error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id); + +err_cleanup: + if (cloud_to_agent.payload) + freez(cloud_to_agent.payload); + if (cloud_to_agent.type_id) + freez(cloud_to_agent.type_id); + if (cloud_to_agent.msg_id) + freez(cloud_to_agent.msg_id); + if (cloud_to_agent.callback_topic) + freez(cloud_to_agent.callback_topic); + +err_cleanup_nojson: + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; + } + + return 1; } diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h index 54399f855b..9651070ebe 100644 --- a/aclk/agent_cloud_link.h +++ b/aclk/agent_cloud_link.h @@ -7,7 +7,6 @@ #include "mqtt.h" #include "aclk_common.h" -#define ACLK_VERSION 1 #define ACLK_THREAD_NAME "ACLK_Query" #define ACLK_CHART_TOPIC "outbound/meta" #define ACLK_ALARMS_TOPIC "outbound/alarms" @@ -35,6 +34,8 @@ struct aclk_request { char *callback_topic; char *payload; int version; + int min_version; + int max_version; }; typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION; @@ -72,8 +73,8 @@ char *create_publish_base_topic(); int aclk_send_single_chart(char *host, char *chart); int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); -void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us); -int aclk_handle_cloud_request(char *payload); +void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version); +int aclk_handle_cloud_message(char *payload); void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name); void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name); void aclk_alarm_reload(); diff --git a/aclk/mqtt.c b/aclk/mqtt.c index 26164bbd92..7973627610 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -26,7 +26,7 @@ void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosqu UNUSED(mosq); UNUSED(obj); - aclk_handle_cloud_request(msg->payload); + aclk_handle_cloud_message(msg->payload); } void publish_callback(struct mosquitto *mosq, void *obj, int rc) @@ -306,7 +306,7 @@ int _link_set_lwt(char *sub_topic, int qos) usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC); + aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION); buffer_strcat(b, ", \"payload\": \"unexpected\" }"); rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); buffer_free(b); diff --git a/aclk/mqtt.h b/aclk/mqtt.h index 3b2b41f269..53fe79d768 100644 --- a/aclk/mqtt.h +++ b/aclk/mqtt.h @@ -19,7 +19,7 @@ const char *_link_strerror(int rc); int _link_set_lwt(char *topic, int qos); -int aclk_handle_cloud_request(char *); +int aclk_handle_cloud_message(char *); extern char *get_topic(char *sub_topic, char *final_topic, int max_size); #endif //NETDATA_MQTT_H |