diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-05-11 08:34:29 +0200 |
---|---|---|
committer | James Mills <prologic@shortcircuit.net.au> | 2020-05-11 16:37:27 +1000 |
commit | aa3ec552c896aebafd03b9d2c1864272dcb34749 (patch) | |
tree | 02f7cd95ed84d888c27fb4bfb55df2b251b97b7b /aclk | |
parent | fd05e1d87751ecaa45ebd3aed2499435b1627cea (diff) |
Enable support for Netdata Cloud.
This PR merges the feature-branch to make the cloud live. It contains the following work:
Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com(opens in new tab)>
Co-authored-by: Jacek Kolasa <jacek.kolasa@gmail.com(opens in new tab)>
Co-authored-by: Austin S. Hemmelgarn <austin@netdata.cloud(opens in new tab)>
Co-authored-by: James Mills <prologic@shortcircuit.net.au(opens in new tab)>
Co-authored-by: Markos Fountoulakis <44345837+mfundul@users.noreply.github.com(opens in new tab)>
Co-authored-by: Timotej S <6674623+underhood@users.noreply.github.com(opens in new tab)>
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com(opens in new tab)>
* dashboard with new navbars, v1.0-alpha.9: PR #8478
* dashboard v1.0.11: netdata/dashboard#76
Co-authored-by: Jacek Kolasa <jacek.kolasa@gmail.com(opens in new tab)>
* Added installer code to bundle JSON-c if it's not present. PR #8836
Co-authored-by: James Mills <prologic@shortcircuit.net.au(opens in new tab)>
* Fix claiming config PR #8843
* Adds JSON-c as hard dep. for ACLK PR #8838
* Fix SSL renegotiation errors in old versions of openssl. PR #8840. Also - we have a transient problem with opensuse CI so this PR disables them with a commit from @prologic.
Co-authored-by: James Mills <prologic@shortcircuit.net.au(opens in new tab)>
* Fix claiming error handling PR #8850
* Added CI to verify JSON-C bundling code in installer PR #8853
* Make cloud-enabled flag in web/api/v1/info be independent of ACLK build success PR #8866
* Reduce ACLK_STABLE_TIMEOUT from 10 to 3 seconds PR #8871
* remove old-cloud related UI from old dashboard (accessible now via /old suffix) PR #8858
* dashboard v1.0.13 PR #8870
* dashboard v1.0.14 PR #8904
* Provide feedback on proxy setting changes PR #8895
* Change the name of the connect message to update during an ongoing session PR #8927
* Fetch active alarms from alarm_log PR #8944
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_lws_wss_client.c | 46 | ||||
-rw-r--r-- | aclk/aclk_lws_wss_client.h | 1 | ||||
-rw-r--r-- | aclk/agent_cloud_link.c | 93 | ||||
-rw-r--r-- | aclk/agent_cloud_link.h | 2 | ||||
-rw-r--r-- | aclk/mqtt.c | 5 |
5 files changed, 98 insertions, 49 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c index 168d866b32..97aa337390 100644 --- a/aclk/aclk_lws_wss_client.c +++ b/aclk/aclk_lws_wss_client.c @@ -152,7 +152,6 @@ static void aclk_lws_wss_log_divert(int level, const char *line) static int aclk_lws_wss_client_init( char *target_hostname, int target_port) { static int lws_logging_initialized = 0; - struct lws_context_creation_info info; if (unlikely(!lws_logging_initialized)) { lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); @@ -167,14 +166,6 @@ static int aclk_lws_wss_client_init( char *target_hostname, int target_port) engine_instance->host = target_hostname; engine_instance->port = target_port; - memset(&info, 0, sizeof(struct lws_context_creation_info)); - info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; - info.port = CONTEXT_PORT_NO_LISTEN; - info.protocols = protocols; - - engine_instance->lws_context = lws_create_context(&info); - if (!engine_instance->lws_context) - goto failure_cleanup_2; aclk_lws_mutex_init(&engine_instance->write_buf_mutex); aclk_lws_mutex_init(&engine_instance->read_buf_mutex); @@ -186,18 +177,27 @@ static int aclk_lws_wss_client_init( char *target_hostname, int target_port) return 0; failure_cleanup: - lws_context_destroy(engine_instance->lws_context); -failure_cleanup_2: freez(engine_instance); return 1; } -void aclk_lws_wss_client_destroy() +void aclk_lws_wss_destroy_context() { - if (engine_instance == NULL) + if (!engine_instance) + return; + if (!engine_instance->lws_context) return; lws_context_destroy(engine_instance->lws_context); engine_instance->lws_context = NULL; +} + + +void aclk_lws_wss_client_destroy() +{ + if (engine_instance == NULL) + return; + + aclk_lws_wss_destroy_context(); engine_instance->lws_wsi = NULL; aclk_lws_wss_clear_io_buffers(engine_instance); @@ -267,7 +267,25 @@ int aclk_lws_wss_connect(char *host, int port) int n; if (!engine_instance) { - return aclk_lws_wss_client_init(host, port); + if (aclk_lws_wss_client_init(host, port)) + return 1; // Propagate failure + } + + if (!engine_instance->lws_context) + { + // First time through (on this connection), create the context + struct lws_context_creation_info info; + memset(&info, 0, sizeof(struct lws_context_creation_info)); + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + engine_instance->lws_context = lws_create_context(&info); + if (!engine_instance->lws_context) + { + error("Failed to create lws_context, ACLK will not function"); + return 1; + } + return 0; // PROTOCOL_INIT callback will call again. } diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h index 26a7865393..584a3cf4f0 100644 --- a/aclk/aclk_lws_wss_client.h +++ b/aclk/aclk_lws_wss_client.h @@ -70,6 +70,7 @@ struct aclk_lws_wss_engine_instance { }; void aclk_lws_wss_client_destroy(); +void aclk_lws_wss_destroy_context(); int aclk_lws_wss_connect(char *host, int port); diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index d3bf881a9c..4750967987 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -23,6 +23,7 @@ static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_connected = 0; // Exposed in the web-api +int aclk_force_reconnect = 0; // Indication from lower layers usec_t aclk_session_us = 0; // Used by the mqtt layer time_t aclk_session_sec = 0; // Used by the mqtt layer @@ -47,7 +48,7 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); - +void aclk_lws_wss_destroy_context(); /* * Maintain a list of collectors and chart count * If all the charts of a collector are deleted @@ -149,7 +150,7 @@ static RSA *aclk_private_key = NULL; static int create_private_key() { char filename[FILENAME_MAX + 1]; - snprintfz(filename, FILENAME_MAX, "%s/claim.d/private.pem", netdata_configured_user_config_dir); + snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); long bytes_read; char *private_key = read_by_filename(filename, &bytes_read); @@ -1336,59 +1337,84 @@ void *aclk_main(void *ptr) struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct netdata_static_thread *query_thread; - if (!netdata_cloud_setting) { - info("Killing ACLK thread -> cloud functionality has been disabled"); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - return NULL; - } + // This thread is unusual in that it cannot be cancelled by cancel_main_threads() + // as it must notify the far end that it shutdown gracefully and avoid the LWT. + netdata_thread_disable_cancelability(); + +#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK) + info("Killing ACLK thread -> cloud functionality has been disabled"); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +#endif info("Waiting for netdata to be ready"); while (!netdata_ready) { sleep_usec(USEC_PER_MS * 300); } + info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_setting) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) { + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } + } + last_init_sequence = now_realtime_sec(); query_thread = NULL; char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering. char *aclk_port = NULL; uint32_t port_num = 0; - char *cloud_base_url = config_get(CONFIG_SECTION_CLOUD, "cloud base url", DEFAULT_CLOUD_BASE_URL); - if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { - error("Configuration error - cannot use agent cloud link"); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - return NULL; - } - port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value - info("Waiting for netdata to be claimed"); while(1) { while (likely(!is_agent_claimed())) { - sleep_usec(USEC_PER_SEC * 5); + sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) goto exited; } - if (!create_private_key() && !_mqtt_lib_init()) - break; - - if (netdata_exit) + // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. + // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); goto exited; + } + if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { + error("Agent is claimed but the configuration is invalid, please fix"); + } + else + { + port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value + if (!create_private_key() && !_mqtt_lib_init()) + break; + } - sleep_usec(USEC_PER_SEC * 60); + for (int i=0; i<60; i++) { + if (netdata_exit) + goto exited; + + sleep_usec(USEC_PER_SEC * 1); + } } + create_publish_base_topic(); usec_t reconnect_expiry = 0; // In usecs - netdata_thread_disable_cancelability(); - while (!netdata_exit) { static int first_init = 0; size_t write_q, write_q_bytes, read_q; lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + + if (aclk_force_reconnect) { + aclk_lws_wss_destroy_context(); + aclk_force_reconnect = 0; + } //info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu", // first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q); - if (unlikely(!netdata_exit && !aclk_connected)) { + if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) { if (unlikely(!first_init)) { aclk_try_to_connect(aclk_hostname, aclk_port, port_num); first_init = 1; @@ -1414,7 +1440,7 @@ void *aclk_main(void *ptr) } _link_event_loop(); - if (unlikely(!aclk_connected)) + if (unlikely(!aclk_connected || aclk_force_reconnect)) continue; /*static int stress_counter = 0; if (write_q_bytes==0 && stress_counter ++ >5) @@ -1550,6 +1576,7 @@ void aclk_disconnect() waiting_init = 1; aclk_connected = 0; aclk_connecting = 0; + aclk_force_reconnect = 1; } void aclk_shutdown() @@ -1598,6 +1625,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts * alarm_log * active alarms */ +void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); void aclk_send_alarm_metadata() { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1618,17 +1646,18 @@ void aclk_send_alarm_metadata() aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us); buffer_strcat(local_buffer, ",\n\t\"payload\": "); + buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); health_alarms2json(localhost, local_buffer, 1); debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len); + // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : "); + // health_alarm_log2json(localhost, local_buffer, 0); + // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len); + buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : "); + health_active_log_alarms_2json(localhost, local_buffer); + //debug(D_ACLK, "Metadata message %s", local_buffer->buffer); - buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : "); - health_alarm_log2json(localhost, local_buffer, 0); - debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len); - buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : "); - health_alarms_values2json(localhost, local_buffer, 0); - debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer, "\n}\n}"); aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id); @@ -1657,7 +1686,7 @@ int aclk_send_info_metadata() // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (aclk_metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect", msg_id, 0, 0); + aclk_create_header(local_buffer, "update", msg_id, 0, 0); else aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us); buffer_strcat(local_buffer, ",\n\t\"payload\": "); diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h index a3722b82ae..29871cc89d 100644 --- a/aclk/agent_cloud_link.h +++ b/aclk/agent_cloud_link.h @@ -25,7 +25,7 @@ #define ACLK_MAX_TOPIC 255 #define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now -#define ACLK_STABLE_TIMEOUT 10 // Minimum delay to mark AGENT as stable +#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable #define ACLK_DEFAULT_PORT 9002 #define ACLK_DEFAULT_HOST "localhost" diff --git a/aclk/mqtt.c b/aclk/mqtt.c index b070f7fb09..8beb4b6766 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -29,7 +29,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc) UNUSED(mosq); UNUSED(obj); UNUSED(rc); - + info("Publish_callback: mid=%d", rc); // TODO: link this with a msg_id so it can be traced return; } @@ -219,7 +219,8 @@ void aclk_lws_connection_data_received() void aclk_lws_connection_closed() { - aclk_disconnect(NULL); + aclk_disconnect(); + } |