diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2021-04-21 15:41:19 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-21 15:41:19 +0200 |
commit | b6d729f96a0b1e17abffb2cf69d1ff29e62004f4 (patch) | |
tree | d23b94f3464f330402efc719a6e0b5a8ecb40a46 /aclk | |
parent | f9fbde67a4fad334806e78866acd6a22ad1f350b (diff) |
remove vneg from ACLK-NG (#10980)
removes obsolete version negotiation from ACLK-NG
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 4 | ||||
-rw-r--r-- | aclk/aclk.h | 24 | ||||
-rw-r--r-- | aclk/aclk_query.c | 17 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 88 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.h | 1 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 48 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.h | 2 |
7 files changed, 10 insertions, 174 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 644e98d692..64e7d237ef 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -43,8 +43,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; struct aclk_shared_state aclk_shared_state = { .agent_state = AGENT_INITIALIZING, .last_popcorn_interrupt = 0, - .version_neg = 0, - .version_neg_wait_till = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; @@ -338,7 +336,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; - aclk_hello_msg(client); + ACLK_SHARED_STATE_LOCK; if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { error("Sending `connect` payload immediately as popcorning was finished already."); diff --git a/aclk/aclk.h b/aclk/aclk.h index ed8cb636ab..5c37162183 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -9,23 +9,8 @@ typedef struct aclk_rrdhost_state { #include "../daemon/common.h" #include "aclk_util.h" -// minimum and maximum supported version of ACLK -// in this version of agent -#define ACLK_VERSION_MIN 2 -#define ACLK_VERSION_MAX 2 - -// Version negotiation messages have they own versioning -// this is also used for LWT message as we set that up -// before version negotiation -#define ACLK_VERSION_NEG_VERSION 1 - -// Maximum time to wait for version negotiation before aborting -// and defaulting to oldest supported version -#define VERSION_NEG_TIMEOUT 3 - -#if ACLK_VERSION_MIN > ACLK_VERSION_MAX -#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" -#endif +// version for aclk legacy (old cloud arch) +#define ACLK_VERSION 2 // Define ACLK Feature Version Boundaries Here #define ACLK_V_COMPRESSION 2 @@ -70,11 +55,6 @@ extern struct aclk_shared_state { ACLK_AGENT_STATE agent_state; time_t last_popcorn_interrupt; - // read only while ACLK connected - // protect by lock otherwise - int version_neg; - usec_t version_neg_wait_till; - // To wait for `disconnect` message PUBACK // when shuting down // at the same time if > 0 we know link is diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 2581fd32de..3e2f88e468 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -235,23 +235,6 @@ void *aclk_query_main_thread(void *ptr) { struct aclk_query_thread *info = ptr; while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (unlikely(!aclk_shared_state.version_neg)) { - if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { - ACLK_SHARED_STATE_UNLOCK; - info("Waiting for ACLK Version Negotiation message from Cloud"); - sleep(1); - continue; - } - errno = 0; - error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." - " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); - aclk_shared_state.version_neg = ACLK_VERSION_MIN; -// When ACLK v3 is implemented you will need this -// aclk_set_rx_handlers(aclk_shared_state.version_neg); - } - ACLK_SHARED_STATE_UNLOCK; - aclk_query_process_msgs(info); QUERY_THREAD_LOCK; diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index fcb8d99688..3d3ab5e2ca 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -166,81 +166,6 @@ error: return 1; } -// This handles `version` message from cloud used to negotiate -// protocol version we will use -static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload) -{ - UNUSED(raw_payload); - int version = -1; - errno = 0; - - if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { - error( - "Unsuported version of \"version\" message from cloud. Expected %d, Got %d", - ACLK_VERSION_NEG_VERSION, - cloud_to_agent->version); - return 1; - } - if (unlikely(!cloud_to_agent->min_version)) { - error("Min version missing or 0"); - return 1; - } - if (unlikely(!cloud_to_agent->max_version)) { - error("Max version missing or 0"); - return 1; - } - if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { - error( - "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, - cloud_to_agent->min_version); - return 1; - } - - if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { - error( - "Agent too old for this cloud. Minimum version required by cloud %d." - " Maximum version supported by this agent %d.", - cloud_to_agent->min_version, ACLK_VERSION_MAX); - aclk_kill_link = 1; - aclk_disable_runtime = 1; - return 1; - } - if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { - error( - "Cloud version is too old for this agent. Maximum version supported by cloud %d." - " Minimum (oldest) version supported by this agent %d.", - cloud_to_agent->max_version, ACLK_VERSION_MIN); - aclk_kill_link = 1; - return 1; - } - - version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); - - ACLK_SHARED_STATE_LOCK; - if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { - errno = 0; - error("The \"version\" message came too late ignoring."); - goto err_cleanup; - } - if (unlikely(aclk_shared_state.version_neg)) { - errno = 0; - error("Version has already been set to %d", aclk_shared_state.version_neg); - goto err_cleanup; - } - aclk_shared_state.version_neg = version; - ACLK_SHARED_STATE_UNLOCK; - - info("Choosing version %d of ACLK", version); - - aclk_set_rx_handlers(version); - - return 0; - -err_cleanup: - ACLK_SHARED_STATE_UNLOCK; - return 1; -} - typedef struct aclk_incoming_msg_type{ char *name; int(*fnc)(struct aclk_request *, char *); @@ -248,20 +173,11 @@ typedef struct aclk_incoming_msg_type{ aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, - { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression; -void aclk_set_rx_handlers(int version) -{ -// ACLK_NG ACLK version support starts at 2 -// TODO ACLK v3 - UNUSED(version); - aclk_incoming_msg_types = aclk_incoming_msg_types_compression; -} - int aclk_handle_cloud_message(char *payload) { struct aclk_request cloud_to_agent; @@ -295,10 +211,6 @@ int aclk_handle_cloud_message(char *payload) goto err_cleanup; } - if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { - error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); - goto err_cleanup; - } for (int i = 0; aclk_incoming_msg_types[i].name; i++) { if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index c9f0bd37a6..e24252bee3 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -9,6 +9,5 @@ #include "libnetdata/libnetdata.h" int aclk_handle_cloud_message(char *payload); -void aclk_set_rx_handlers(int version); #endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index a96c752854..144008e4dc 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -211,9 +211,9 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRD // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (metadata_submitted) - msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION); else - msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); payload = json_object_new_object(); json_object_object_add(msg, "payload", payload); @@ -253,9 +253,9 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) // session. if (metadata_submitted) - msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION); else - msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); payload = json_object_new_object(); json_object_object_add(msg, "payload", payload); @@ -277,39 +277,6 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) buffer_free(local_buffer); } -void aclk_hello_msg(mqtt_wss_client client) -{ - json_object *tmp, *msg; - - char *msg_id = create_uuid(); - - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; - ACLK_SHARED_STATE_UNLOCK; - - //Hello message is versioned separatelly from the rest of the protocol - msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); - - tmp = json_object_new_int(ACLK_VERSION_MIN); - json_object_object_add(msg, "min-version", tmp); - - tmp = json_object_new_int(ACLK_VERSION_MAX); - json_object_object_add(msg, "max-version", tmp); - -#ifdef ACLK_NG - tmp = json_object_new_string("Next Generation"); -#else - tmp = json_object_new_string("Legacy"); -#endif - json_object_object_add(msg, "aclk-implementation", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); - - json_object_put(msg); - freez(msg_id); -} - void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -352,7 +319,7 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) return; } - msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg); + msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION); json_object_object_add(msg, "payload", payload); aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART); @@ -364,11 +331,10 @@ void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg) { // we create header here on purpose (and not send message with it already as `msg` param) - // one is version_neg is guaranteed to be done here - // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy + // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy // send message with timestamps already to Query Queue they would be incorrect at time // when query queue would get to send them) - json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg); + json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION); json_object_object_add(obj, "payload", msg); aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS); diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index cb4d44c96e..50c9816961 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -10,8 +10,6 @@ void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); -void aclk_hello_msg(mqtt_wss_client client); - void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart); |