summaryrefslogtreecommitdiffstats
path: root/exporting
diff options
context:
space:
mode:
Diffstat (limited to 'exporting')
-rw-r--r--exporting/clean_connectors.c37
-rw-r--r--exporting/exporting_engine.c57
-rw-r--r--exporting/exporting_engine.h4
-rw-r--r--exporting/send_data.c23
-rw-r--r--exporting/tests/test_exporting_engine.c5
5 files changed, 101 insertions, 25 deletions
diff --git a/exporting/clean_connectors.c b/exporting/clean_connectors.c
index 7983f0ab19..7cbdf8ac25 100644
--- a/exporting/clean_connectors.c
+++ b/exporting/clean_connectors.c
@@ -3,35 +3,36 @@
#include "exporting_engine.h"
/**
- * Clean the instance config.
- * @param ptr
+ * Clean the instance config
+ *
+ * @param config an instance config structure.
*/
-static void clean_instance_config(struct instance_config *ptr)
+static void clean_instance_config(struct instance_config *config)
{
- if (ptr->name)
- freez((void *)ptr->name);
+ if (config->name)
+ freez((void *)config->name);
- if (ptr->destination)
- freez((void *)ptr->destination);
+ if (config->destination)
+ freez((void *)config->destination);
- if (ptr->charts_pattern)
- simple_pattern_free(ptr->charts_pattern);
+ if (config->charts_pattern)
+ simple_pattern_free(config->charts_pattern);
- if (ptr->hosts_pattern)
- simple_pattern_free(ptr->hosts_pattern);
+ if (config->hosts_pattern)
+ simple_pattern_free(config->hosts_pattern);
}
/**
* Clean the allocated variables
*
- * @param ptr a pointer to the structure with variables to clean.
+ * @param instance an instance data structure.
*/
-void clean_instance(struct instance *ptr)
+void clean_instance(struct instance *instance)
{
- clean_instance_config(&ptr->config);
- if (ptr->labels)
- buffer_free(ptr->labels);
+ clean_instance_config(&instance->config);
+ if (instance->labels)
+ buffer_free(instance->labels);
- uv_mutex_destroy(&ptr->mutex);
- uv_cond_destroy(&ptr->cond_var);
+ uv_cond_destroy(&instance->cond_var);
+ // uv_mutex_destroy(&instance->mutex);
}
diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c
index 06dc8156fe..656e5ada78 100644
--- a/exporting/exporting_engine.c
+++ b/exporting/exporting_engine.c
@@ -2,12 +2,63 @@
#include "exporting_engine.h"
-static void exporting_main_cleanup(void *ptr) {
+static struct engine *engine = NULL;
+
+/**
+ * Clean up the main exporting thread and all connector workers on Netdata exit
+ *
+ * @param ptr thread data.
+ */
+static void exporting_main_cleanup(void *ptr)
+{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
info("cleaning up...");
+ if (!engine) {
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return;
+ }
+
+ engine->exit = 1;
+
+ int found = 0;
+ usec_t max = 2 * USEC_PER_SEC, step = 50000;
+
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (!instance->exited) {
+ found++;
+ info("stopping worker for instance %s", instance->config.name);
+ uv_cond_signal(&instance->cond_var);
+ } else
+ info("found stopped worker for instance %s", instance->config.name);
+ }
+
+ while (found && max > 0) {
+ max -= step;
+ info("Waiting %d exporting connectors to finish...", found);
+ sleep_usec(step);
+ found = 0;
+
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (!instance->exited)
+ found++;
+ }
+ }
+
+ for (struct instance *instance = engine->instance_root; instance;) {
+ struct instance *current_instance = instance;
+ instance = instance->next;
+ clean_instance(current_instance);
+ }
+
+ if (engine->config.prefix)
+ freez((void *)engine->config.prefix);
+ if (engine->config.hostname)
+ freez((void *)engine->config.hostname);
+ freez(engine);
+
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
@@ -24,7 +75,7 @@ void *exporting_main(void *ptr)
{
netdata_thread_cleanup_push(exporting_main_cleanup, ptr);
- struct engine *engine = read_exporting_config();
+ engine = read_exporting_config();
if (!engine) {
info("EXPORTING: no exporting connectors configured");
goto cleanup;
@@ -54,7 +105,7 @@ void *exporting_main(void *ptr)
send_main_rusage(st_main_rusage, rd_main_user, rd_main_system);
#ifdef UNIT_TESTING
- break;
+ return NULL;
#endif
}
diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h
index bdf58ce757..e567676fec 100644
--- a/exporting/exporting_engine.h
+++ b/exporting/exporting_engine.h
@@ -180,6 +180,8 @@ struct instance {
size_t index;
struct instance *next;
struct engine *engine;
+
+ volatile sig_atomic_t exited;
};
struct engine {
@@ -192,6 +194,8 @@ struct engine {
int mongoc_initialized;
struct instance *instance_root;
+
+ volatile sig_atomic_t exit;
};
extern struct instance *prometheus_exporter_instance;
diff --git a/exporting/send_data.c b/exporting/send_data.c
index 8875065f2b..3a749cdbb8 100644
--- a/exporting/send_data.c
+++ b/exporting/send_data.c
@@ -141,6 +141,21 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
}
/**
+ * Clean up a simple connector instance on Netdata exit
+ *
+ * @param instance an instance data structure.
+ */
+void simple_connector_cleanup(struct instance *instance)
+{
+ info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+ // TODO free allocated resources
+
+ info("EXPORTING: instance %s exited", instance->config.name);
+ instance->exited = 1;
+}
+
+/**
* Simple connector worker
*
* Runs in a separate thread for every instance.
@@ -159,7 +174,7 @@ void simple_connector_worker(void *instance_p)
.tv_usec = (instance->config.timeoutms * 1000) % 1000000};
int failures = 0;
- while(!netdata_exit) {
+ while(!instance->engine->exit) {
// reset the monitoring chart counters
stats->received_bytes =
@@ -195,7 +210,7 @@ void simple_connector_worker(void *instance_p)
stats->reconnects += reconnects;
}
- if(unlikely(netdata_exit)) break;
+ if(unlikely(instance->engine->exit)) break;
// ------------------------------------------------------------------------
// if we are connected, send our buffer to the data collecting server
@@ -203,6 +218,8 @@ void simple_connector_worker(void *instance_p)
uv_mutex_lock(&instance->mutex);
uv_cond_wait(&instance->cond_var, &instance->mutex);
+ if(unlikely(instance->engine->exit)) break;
+
if (likely(sock != -1)) {
simple_connector_send_buffer(&sock, &failures, instance);
} else {
@@ -238,4 +255,6 @@ void simple_connector_worker(void *instance_p)
break;
#endif
}
+
+ simple_connector_cleanup(instance);
}
diff --git a/exporting/tests/test_exporting_engine.c b/exporting/tests/test_exporting_engine.c
index 6cd29132fc..09a0b1c749 100644
--- a/exporting/tests/test_exporting_engine.c
+++ b/exporting/tests/test_exporting_engine.c
@@ -65,8 +65,6 @@ static void test_exporting_engine(void **state)
expect_value(__wrap_send_main_rusage, rd_user, NULL);
expect_value(__wrap_send_main_rusage, rd_system, NULL);
- expect_function_call(__wrap_info_int);
-
void *ptr = malloc(sizeof(struct netdata_static_thread));
assert_ptr_equal(exporting_main(ptr), NULL);
assert_int_equal(engine->now, 2);
@@ -668,6 +666,9 @@ static void test_simple_connector_worker(void **state)
expect_value(__wrap_send_internal_metrics, instance, instance);
will_return(__wrap_send_internal_metrics, 0);
+ expect_function_call(__wrap_info_int);
+ expect_function_call(__wrap_info_int);
+
simple_connector_worker(instance);
assert_int_equal(stats->buffered_metrics, 0);