summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-05-11 08:34:29 +0200
committerJames Mills <prologic@shortcircuit.net.au>2020-05-11 16:37:27 +1000
commitaa3ec552c896aebafd03b9d2c1864272dcb34749 (patch)
tree02f7cd95ed84d888c27fb4bfb55df2b251b97b7b /aclk
parentfd05e1d87751ecaa45ebd3aed2499435b1627cea (diff)
Enable support for Netdata Cloud.
This PR merges the feature-branch to make the cloud live. It contains the following work: Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com(opens in new tab)> Co-authored-by: Jacek Kolasa <jacek.kolasa@gmail.com(opens in new tab)> Co-authored-by: Austin S. Hemmelgarn <austin@netdata.cloud(opens in new tab)> Co-authored-by: James Mills <prologic@shortcircuit.net.au(opens in new tab)> Co-authored-by: Markos Fountoulakis <44345837+mfundul@users.noreply.github.com(opens in new tab)> Co-authored-by: Timotej S <6674623+underhood@users.noreply.github.com(opens in new tab)> Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com(opens in new tab)> * dashboard with new navbars, v1.0-alpha.9: PR #8478 * dashboard v1.0.11: netdata/dashboard#76 Co-authored-by: Jacek Kolasa <jacek.kolasa@gmail.com(opens in new tab)> * Added installer code to bundle JSON-c if it's not present. PR #8836 Co-authored-by: James Mills <prologic@shortcircuit.net.au(opens in new tab)> * Fix claiming config PR #8843 * Adds JSON-c as hard dep. for ACLK PR #8838 * Fix SSL renegotiation errors in old versions of openssl. PR #8840. Also - we have a transient problem with opensuse CI so this PR disables them with a commit from @prologic. Co-authored-by: James Mills <prologic@shortcircuit.net.au(opens in new tab)> * Fix claiming error handling PR #8850 * Added CI to verify JSON-C bundling code in installer PR #8853 * Make cloud-enabled flag in web/api/v1/info be independent of ACLK build success PR #8866 * Reduce ACLK_STABLE_TIMEOUT from 10 to 3 seconds PR #8871 * remove old-cloud related UI from old dashboard (accessible now via /old suffix) PR #8858 * dashboard v1.0.13 PR #8870 * dashboard v1.0.14 PR #8904 * Provide feedback on proxy setting changes PR #8895 * Change the name of the connect message to update during an ongoing session PR #8927 * Fetch active alarms from alarm_log PR #8944
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_lws_wss_client.c46
-rw-r--r--aclk/aclk_lws_wss_client.h1
-rw-r--r--aclk/agent_cloud_link.c93
-rw-r--r--aclk/agent_cloud_link.h2
-rw-r--r--aclk/mqtt.c5
5 files changed, 98 insertions, 49 deletions
diff --git a/aclk/aclk_lws_wss_client.c b/aclk/aclk_lws_wss_client.c
index 168d866b32..97aa337390 100644
--- a/aclk/aclk_lws_wss_client.c
+++ b/aclk/aclk_lws_wss_client.c
@@ -152,7 +152,6 @@ static void aclk_lws_wss_log_divert(int level, const char *line)
static int aclk_lws_wss_client_init( char *target_hostname, int target_port)
{
static int lws_logging_initialized = 0;
- struct lws_context_creation_info info;
if (unlikely(!lws_logging_initialized)) {
lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert);
@@ -167,14 +166,6 @@ static int aclk_lws_wss_client_init( char *target_hostname, int target_port)
engine_instance->host = target_hostname;
engine_instance->port = target_port;
- memset(&info, 0, sizeof(struct lws_context_creation_info));
- info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
- info.port = CONTEXT_PORT_NO_LISTEN;
- info.protocols = protocols;
-
- engine_instance->lws_context = lws_create_context(&info);
- if (!engine_instance->lws_context)
- goto failure_cleanup_2;
aclk_lws_mutex_init(&engine_instance->write_buf_mutex);
aclk_lws_mutex_init(&engine_instance->read_buf_mutex);
@@ -186,18 +177,27 @@ static int aclk_lws_wss_client_init( char *target_hostname, int target_port)
return 0;
failure_cleanup:
- lws_context_destroy(engine_instance->lws_context);
-failure_cleanup_2:
freez(engine_instance);
return 1;
}
-void aclk_lws_wss_client_destroy()
+void aclk_lws_wss_destroy_context()
{
- if (engine_instance == NULL)
+ if (!engine_instance)
+ return;
+ if (!engine_instance->lws_context)
return;
lws_context_destroy(engine_instance->lws_context);
engine_instance->lws_context = NULL;
+}
+
+
+void aclk_lws_wss_client_destroy()
+{
+ if (engine_instance == NULL)
+ return;
+
+ aclk_lws_wss_destroy_context();
engine_instance->lws_wsi = NULL;
aclk_lws_wss_clear_io_buffers(engine_instance);
@@ -267,7 +267,25 @@ int aclk_lws_wss_connect(char *host, int port)
int n;
if (!engine_instance) {
- return aclk_lws_wss_client_init(host, port);
+ if (aclk_lws_wss_client_init(host, port))
+ return 1; // Propagate failure
+ }
+
+ if (!engine_instance->lws_context)
+ {
+ // First time through (on this connection), create the context
+ struct lws_context_creation_info info;
+ memset(&info, 0, sizeof(struct lws_context_creation_info));
+ info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ info.port = CONTEXT_PORT_NO_LISTEN;
+ info.protocols = protocols;
+ engine_instance->lws_context = lws_create_context(&info);
+ if (!engine_instance->lws_context)
+ {
+ error("Failed to create lws_context, ACLK will not function");
+ return 1;
+ }
+ return 0;
// PROTOCOL_INIT callback will call again.
}
diff --git a/aclk/aclk_lws_wss_client.h b/aclk/aclk_lws_wss_client.h
index 26a7865393..584a3cf4f0 100644
--- a/aclk/aclk_lws_wss_client.h
+++ b/aclk/aclk_lws_wss_client.h
@@ -70,6 +70,7 @@ struct aclk_lws_wss_engine_instance {
};
void aclk_lws_wss_client_destroy();
+void aclk_lws_wss_destroy_context();
int aclk_lws_wss_connect(char *host, int port);
diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c
index d3bf881a9c..4750967987 100644
--- a/aclk/agent_cloud_link.c
+++ b/aclk/agent_cloud_link.c
@@ -23,6 +23,7 @@ static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0; // Exposed in the web-api
+int aclk_force_reconnect = 0; // Indication from lower layers
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
@@ -47,7 +48,7 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
-
+void aclk_lws_wss_destroy_context();
/*
* Maintain a list of collectors and chart count
* If all the charts of a collector are deleted
@@ -149,7 +150,7 @@ static RSA *aclk_private_key = NULL;
static int create_private_key()
{
char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "%s/claim.d/private.pem", netdata_configured_user_config_dir);
+ snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
long bytes_read;
char *private_key = read_by_filename(filename, &bytes_read);
@@ -1336,59 +1337,84 @@ void *aclk_main(void *ptr)
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct netdata_static_thread *query_thread;
- if (!netdata_cloud_setting) {
- info("Killing ACLK thread -> cloud functionality has been disabled");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- return NULL;
- }
+ // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
+ // as it must notify the far end that it shutdown gracefully and avoid the LWT.
+ netdata_thread_disable_cancelability();
+
+#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
+ info("Killing ACLK thread -> cloud functionality has been disabled");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+#endif
info("Waiting for netdata to be ready");
while (!netdata_ready) {
sleep_usec(USEC_PER_MS * 300);
}
+ info("Waiting for Cloud to be enabled");
+ while (!netdata_cloud_setting) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (netdata_exit) {
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+ }
+ }
+
last_init_sequence = now_realtime_sec();
query_thread = NULL;
char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
char *aclk_port = NULL;
uint32_t port_num = 0;
- char *cloud_base_url = config_get(CONFIG_SECTION_CLOUD, "cloud base url", DEFAULT_CLOUD_BASE_URL);
- if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
- error("Configuration error - cannot use agent cloud link");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- return NULL;
- }
- port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
-
info("Waiting for netdata to be claimed");
while(1) {
while (likely(!is_agent_claimed())) {
- sleep_usec(USEC_PER_SEC * 5);
+ sleep_usec(USEC_PER_SEC * 1);
if (netdata_exit)
goto exited;
}
- if (!create_private_key() && !_mqtt_lib_init())
- break;
-
- if (netdata_exit)
+ // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
+ // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (cloud_base_url == NULL) {
+ error("Do not move the cloud base url out of post_conf_load!!");
goto exited;
+ }
+ if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
+ error("Agent is claimed but the configuration is invalid, please fix");
+ }
+ else
+ {
+ port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
+ if (!create_private_key() && !_mqtt_lib_init())
+ break;
+ }
- sleep_usec(USEC_PER_SEC * 60);
+ for (int i=0; i<60; i++) {
+ if (netdata_exit)
+ goto exited;
+
+ sleep_usec(USEC_PER_SEC * 1);
+ }
}
+
create_publish_base_topic();
usec_t reconnect_expiry = 0; // In usecs
- netdata_thread_disable_cancelability();
-
while (!netdata_exit) {
static int first_init = 0;
size_t write_q, write_q_bytes, read_q;
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+
+ if (aclk_force_reconnect) {
+ aclk_lws_wss_destroy_context();
+ aclk_force_reconnect = 0;
+ }
//info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
// first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
- if (unlikely(!netdata_exit && !aclk_connected)) {
+ if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
if (unlikely(!first_init)) {
aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
first_init = 1;
@@ -1414,7 +1440,7 @@ void *aclk_main(void *ptr)
}
_link_event_loop();
- if (unlikely(!aclk_connected))
+ if (unlikely(!aclk_connected || aclk_force_reconnect))
continue;
/*static int stress_counter = 0;
if (write_q_bytes==0 && stress_counter ++ >5)
@@ -1550,6 +1576,7 @@ void aclk_disconnect()
waiting_init = 1;
aclk_connected = 0;
aclk_connecting = 0;
+ aclk_force_reconnect = 1;
}
void aclk_shutdown()
@@ -1598,6 +1625,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts
* alarm_log
* active alarms
*/
+void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
void aclk_send_alarm_metadata()
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1618,17 +1646,18 @@ void aclk_send_alarm_metadata()
aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
health_alarms2json(localhost, local_buffer, 1);
debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
+ // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
+ // health_alarm_log2json(localhost, local_buffer, 0);
+ // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
+ buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
+ health_active_log_alarms_2json(localhost, local_buffer);
+ //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
- buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
- health_alarm_log2json(localhost, local_buffer, 0);
- debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
- buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
- health_alarms_values2json(localhost, local_buffer, 0);
- debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, "\n}\n}");
aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
@@ -1657,7 +1686,7 @@ int aclk_send_info_metadata()
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (aclk_metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "connect", msg_id, 0, 0);
+ aclk_create_header(local_buffer, "update", msg_id, 0, 0);
else
aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h
index a3722b82ae..29871cc89d 100644
--- a/aclk/agent_cloud_link.h
+++ b/aclk/agent_cloud_link.h
@@ -25,7 +25,7 @@
#define ACLK_MAX_TOPIC 255
#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now
-#define ACLK_STABLE_TIMEOUT 10 // Minimum delay to mark AGENT as stable
+#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
#define ACLK_DEFAULT_PORT 9002
#define ACLK_DEFAULT_HOST "localhost"
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index b070f7fb09..8beb4b6766 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -29,7 +29,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc)
UNUSED(mosq);
UNUSED(obj);
UNUSED(rc);
-
+ info("Publish_callback: mid=%d", rc);
// TODO: link this with a msg_id so it can be traced
return;
}
@@ -219,7 +219,8 @@ void aclk_lws_connection_data_received()
void aclk_lws_connection_closed()
{
- aclk_disconnect(NULL);
+ aclk_disconnect();
+
}