summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2021-04-21 15:41:19 +0200
committerGitHub <noreply@github.com>2021-04-21 15:41:19 +0200
commitb6d729f96a0b1e17abffb2cf69d1ff29e62004f4 (patch)
treed23b94f3464f330402efc719a6e0b5a8ecb40a46 /aclk
parentf9fbde67a4fad334806e78866acd6a22ad1f350b (diff)
remove vneg from ACLK-NG (#10980)
removes obsolete version negotiation from ACLK-NG
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c4
-rw-r--r--aclk/aclk.h24
-rw-r--r--aclk/aclk_query.c17
-rw-r--r--aclk/aclk_rx_msgs.c88
-rw-r--r--aclk/aclk_rx_msgs.h1
-rw-r--r--aclk/aclk_tx_msgs.c48
-rw-r--r--aclk/aclk_tx_msgs.h2
7 files changed, 10 insertions, 174 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 644e98d692..64e7d237ef 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -43,8 +43,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
struct aclk_shared_state aclk_shared_state = {
.agent_state = AGENT_INITIALIZING,
.last_popcorn_interrupt = 0,
- .version_neg = 0,
- .version_neg_wait_till = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
@@ -338,7 +336,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
- aclk_hello_msg(client);
+
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
error("Sending `connect` payload immediately as popcorning was finished already.");
diff --git a/aclk/aclk.h b/aclk/aclk.h
index ed8cb636ab..5c37162183 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -9,23 +9,8 @@ typedef struct aclk_rrdhost_state {
#include "../daemon/common.h"
#include "aclk_util.h"
-// minimum and maximum supported version of ACLK
-// in this version of agent
-#define ACLK_VERSION_MIN 2
-#define ACLK_VERSION_MAX 2
-
-// Version negotiation messages have they own versioning
-// this is also used for LWT message as we set that up
-// before version negotiation
-#define ACLK_VERSION_NEG_VERSION 1
-
-// Maximum time to wait for version negotiation before aborting
-// and defaulting to oldest supported version
-#define VERSION_NEG_TIMEOUT 3
-
-#if ACLK_VERSION_MIN > ACLK_VERSION_MAX
-#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
-#endif
+// version for aclk legacy (old cloud arch)
+#define ACLK_VERSION 2
// Define ACLK Feature Version Boundaries Here
#define ACLK_V_COMPRESSION 2
@@ -70,11 +55,6 @@ extern struct aclk_shared_state {
ACLK_AGENT_STATE agent_state;
time_t last_popcorn_interrupt;
- // read only while ACLK connected
- // protect by lock otherwise
- int version_neg;
- usec_t version_neg_wait_till;
-
// To wait for `disconnect` message PUBACK
// when shuting down
// at the same time if > 0 we know link is
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 2581fd32de..3e2f88e468 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -235,23 +235,6 @@ void *aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
while (!netdata_exit) {
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(!aclk_shared_state.version_neg)) {
- if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
- ACLK_SHARED_STATE_UNLOCK;
- info("Waiting for ACLK Version Negotiation message from Cloud");
- sleep(1);
- continue;
- }
- errno = 0;
- error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
- " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
- aclk_shared_state.version_neg = ACLK_VERSION_MIN;
-// When ACLK v3 is implemented you will need this
-// aclk_set_rx_handlers(aclk_shared_state.version_neg);
- }
- ACLK_SHARED_STATE_UNLOCK;
-
aclk_query_process_msgs(info);
QUERY_THREAD_LOCK;
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index fcb8d99688..3d3ab5e2ca 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -166,81 +166,6 @@ error:
return 1;
}
-// This handles `version` message from cloud used to negotiate
-// protocol version we will use
-static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload)
-{
- UNUSED(raw_payload);
- int version = -1;
- errno = 0;
-
- if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
- error(
- "Unsuported version of \"version\" message from cloud. Expected %d, Got %d",
- ACLK_VERSION_NEG_VERSION,
- cloud_to_agent->version);
- return 1;
- }
- if (unlikely(!cloud_to_agent->min_version)) {
- error("Min version missing or 0");
- return 1;
- }
- if (unlikely(!cloud_to_agent->max_version)) {
- error("Max version missing or 0");
- return 1;
- }
- if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
- error(
- "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version,
- cloud_to_agent->min_version);
- return 1;
- }
-
- if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
- error(
- "Agent too old for this cloud. Minimum version required by cloud %d."
- " Maximum version supported by this agent %d.",
- cloud_to_agent->min_version, ACLK_VERSION_MAX);
- aclk_kill_link = 1;
- aclk_disable_runtime = 1;
- return 1;
- }
- if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
- error(
- "Cloud version is too old for this agent. Maximum version supported by cloud %d."
- " Minimum (oldest) version supported by this agent %d.",
- cloud_to_agent->max_version, ACLK_VERSION_MIN);
- aclk_kill_link = 1;
- return 1;
- }
-
- version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
-
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
- errno = 0;
- error("The \"version\" message came too late ignoring.");
- goto err_cleanup;
- }
- if (unlikely(aclk_shared_state.version_neg)) {
- errno = 0;
- error("Version has already been set to %d", aclk_shared_state.version_neg);
- goto err_cleanup;
- }
- aclk_shared_state.version_neg = version;
- ACLK_SHARED_STATE_UNLOCK;
-
- info("Choosing version %d of ACLK", version);
-
- aclk_set_rx_handlers(version);
-
- return 0;
-
-err_cleanup:
- ACLK_SHARED_STATE_UNLOCK;
- return 1;
-}
-
typedef struct aclk_incoming_msg_type{
char *name;
int(*fnc)(struct aclk_request *, char *);
@@ -248,20 +173,11 @@ typedef struct aclk_incoming_msg_type{
aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v2 },
- { .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
-void aclk_set_rx_handlers(int version)
-{
-// ACLK_NG ACLK version support starts at 2
-// TODO ACLK v3
- UNUSED(version);
- aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
-}
-
int aclk_handle_cloud_message(char *payload)
{
struct aclk_request cloud_to_agent;
@@ -295,10 +211,6 @@ int aclk_handle_cloud_message(char *payload)
goto err_cleanup;
}
- if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
- error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
- goto err_cleanup;
- }
for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index c9f0bd37a6..e24252bee3 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -9,6 +9,5 @@
#include "libnetdata/libnetdata.h"
int aclk_handle_cloud_message(char *payload);
-void aclk_set_rx_handlers(int version);
#endif /* ACLK_RX_MSGS_H */
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index a96c752854..144008e4dc 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -211,9 +211,9 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRD
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted)
- msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
else
- msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
@@ -253,9 +253,9 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
// session.
if (metadata_submitted)
- msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
else
- msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
@@ -277,39 +277,6 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
buffer_free(local_buffer);
}
-void aclk_hello_msg(mqtt_wss_client client)
-{
- json_object *tmp, *msg;
-
- char *msg_id = create_uuid();
-
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.version_neg = 0;
- aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
- ACLK_SHARED_STATE_UNLOCK;
-
- //Hello message is versioned separatelly from the rest of the protocol
- msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
-
- tmp = json_object_new_int(ACLK_VERSION_MIN);
- json_object_object_add(msg, "min-version", tmp);
-
- tmp = json_object_new_int(ACLK_VERSION_MAX);
- json_object_object_add(msg, "max-version", tmp);
-
-#ifdef ACLK_NG
- tmp = json_object_new_string("Next Generation");
-#else
- tmp = json_object_new_string("Legacy");
-#endif
- json_object_object_add(msg, "aclk-implementation", tmp);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
-
- json_object_put(msg);
- freez(msg_id);
-}
-
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -352,7 +319,7 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
return;
}
- msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg);
+ msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(msg, "payload", payload);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
@@ -364,11 +331,10 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
{
// we create header here on purpose (and not send message with it already as `msg` param)
- // one is version_neg is guaranteed to be done here
- // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
+ // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
// send message with timestamps already to Query Queue they would be incorrect at time
// when query queue would get to send them)
- json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg);
+ json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(obj, "payload", msg);
aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index cb4d44c96e..50c9816961 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -10,8 +10,6 @@
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
-void aclk_hello_msg(mqtt_wss_client client);
-
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart);