summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-05-14 11:42:40 +0300
committerGitHub <noreply@github.com>2020-05-14 11:42:40 +0300
commita606a27f164b1c704d850c838a7b89d6c6e0c17c (patch)
tree1d653e680fa8b6f9c1a6653d87ac6faef60c71f9
parentbb61e46e71777b467513a2b102fccdc5f5a8cd0b (diff)
Fix error handling in exporting connector (#8910)
-rw-r--r--CMakeLists.txt1
-rw-r--r--Makefile.am1
-rw-r--r--exporting/exporting_engine.c13
-rw-r--r--exporting/exporting_engine.h26
-rw-r--r--exporting/process_data.c100
-rw-r--r--exporting/tests/exporting_doubles.c7
-rw-r--r--exporting/tests/test_exporting_engine.c4
-rw-r--r--exporting/tests/test_exporting_engine.h2
8 files changed, 47 insertions, 107 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d97ae1d11f..56f3a2416e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1152,7 +1152,6 @@ endif()
-Wl,--wrap=rrdset_is_exportable
-Wl,--wrap=exporting_calculate_value_from_stored_data
-Wl,--wrap=prepare_buffers
- -Wl,--wrap=notify_workers
-Wl,--wrap=send_internal_metrics
-Wl,--wrap=now_realtime_sec
-Wl,--wrap=uv_thread_set_name_np
diff --git a/Makefile.am b/Makefile.am
index c26b93ae86..8a6de4e0e8 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -878,7 +878,6 @@ if ENABLE_UNITTESTS
-Wl,--wrap=rrdset_is_exportable \
-Wl,--wrap=exporting_calculate_value_from_stored_data \
-Wl,--wrap=prepare_buffers \
- -Wl,--wrap=notify_workers \
-Wl,--wrap=send_internal_metrics \
-Wl,--wrap=now_realtime_sec \
-Wl,--wrap=uv_thread_set_name_np \
diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c
index 93347328cd..06dc8156fe 100644
--- a/exporting/exporting_engine.c
+++ b/exporting/exporting_engine.c
@@ -48,17 +48,8 @@ void *exporting_main(void *ptr)
heartbeat_next(&hb, step_ut);
engine->now = now_realtime_sec();
- if (mark_scheduled_instances(engine)) {
- if (prepare_buffers(engine) != 0) {
- error("EXPORTING: cannot prepare data to send");
- break;
- }
- }
-
- if (notify_workers(engine) != 0) {
- error("EXPORTING: cannot communicate with exporting connector instance working threads");
- break;
- }
+ if (mark_scheduled_instances(engine))
+ prepare_buffers(engine);
send_main_rusage(st_main_rusage, rd_main_user, rd_main_system);
diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h
index 9673e823d6..7dc79e3956 100644
--- a/exporting/exporting_engine.h
+++ b/exporting/exporting_engine.h
@@ -151,6 +151,7 @@ struct instance {
struct stats stats;
int scheduled;
+ int disabled;
int skip_host;
int skip_chart;
@@ -203,8 +204,7 @@ EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type);
int init_connectors(struct engine *engine);
int mark_scheduled_instances(struct engine *engine);
-int prepare_buffers(struct engine *engine);
-int notify_workers(struct engine *engine);
+void prepare_buffers(struct engine *engine);
size_t exporting_name_copy(char *dst, const char *src, size_t max_len);
@@ -216,13 +216,13 @@ calculated_number exporting_calculate_value_from_stored_data(
RRDDIM *rd,
time_t *last_timestamp);
-int start_batch_formatting(struct engine *engine);
-int start_host_formatting(struct engine *engine, RRDHOST *host);
-int start_chart_formatting(struct engine *engine, RRDSET *st);
-int metric_formatting(struct engine *engine, RRDDIM *rd);
-int end_chart_formatting(struct engine *engine, RRDSET *st);
-int end_host_formatting(struct engine *engine, RRDHOST *host);
-int end_batch_formatting(struct engine *engine);
+void start_batch_formatting(struct engine *engine);
+void start_host_formatting(struct engine *engine, RRDHOST *host);
+void start_chart_formatting(struct engine *engine, RRDSET *st);
+void metric_formatting(struct engine *engine, RRDDIM *rd);
+void end_chart_formatting(struct engine *engine, RRDSET *st);
+void end_host_formatting(struct engine *engine, RRDHOST *host);
+void end_batch_formatting(struct engine *engine);
int flush_host_labels(struct instance *instance, RRDHOST *host);
int simple_connector_update_buffered_bytes(struct instance *instance);
@@ -235,6 +235,14 @@ void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_
void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system);
void send_internal_metrics(struct instance *instance);
+static inline void disable_instance(struct instance *instance)
+{
+ instance->disabled = 1;
+ instance->scheduled = 0;
+ uv_mutex_unlock(&instance->mutex);
+ error("EXPORTING: Instance %s disabled", instance->config.name);
+}
+
#include "exporting/prometheus/prometheus.h"
#endif /* NETDATA_EXPORTING_ENGINE_H */
diff --git a/exporting/process_data.c b/exporting/process_data.c
index f2442e701c..b3d0fdef2d 100644
--- a/exporting/process_data.c
+++ b/exporting/process_data.c
@@ -43,7 +43,7 @@ int mark_scheduled_instances(struct engine *engine)
int instances_were_scheduled = 0;
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (engine->now % instance->config.update_every < localhost->rrd_update_every) {
+ if (!instance->disabled && (engine->now % instance->config.update_every < localhost->rrd_update_every)) {
instance->scheduled = 1;
instances_were_scheduled = 1;
instance->before = engine->now;
@@ -160,21 +160,18 @@ calculated_number exporting_calculate_value_from_stored_data(
* Start batch formatting for every connector instance's buffer
*
* @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
*/
-int start_batch_formatting(struct engine *engine)
+void start_batch_formatting(struct engine *engine)
{
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
if (instance->scheduled) {
uv_mutex_lock(&instance->mutex);
if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
- return 1;
+ disable_instance(instance);
}
}
}
-
- return 0;
}
/**
@@ -182,24 +179,21 @@ int start_batch_formatting(struct engine *engine)
*
* @param engine an engine data structure.
* @param host a data collecting host.
- * @return Returns 0 on success, 1 on failure.
*/
-int start_host_formatting(struct engine *engine, RRDHOST *host)
+void start_host_formatting(struct engine *engine, RRDHOST *host)
{
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
if (instance->scheduled) {
if (rrdhost_is_exportable(instance, host)) {
if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
error("EXPORTING: cannot start host formatting for %s", instance->config.name);
- return 1;
+ disable_instance(instance);
}
} else {
instance->skip_host = 1;
}
}
}
-
- return 0;
}
/**
@@ -207,24 +201,21 @@ int start_host_formatting(struct engine *engine, RRDHOST *host)
*
* @param engine an engine data structure.
* @param st a chart.
- * @return Returns 0 on success, 1 on failure.
*/
-int start_chart_formatting(struct engine *engine, RRDSET *st)
+void start_chart_formatting(struct engine *engine, RRDSET *st)
{
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host) {
if (rrdset_is_exportable(instance, st)) {
if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
- return 1;
+ disable_instance(instance);
}
} else {
instance->skip_chart = 1;
}
}
}
-
- return 0;
}
/**
@@ -232,21 +223,19 @@ int start_chart_formatting(struct engine *engine, RRDSET *st)
*
* @param engine an engine data structure.
* @param rd a dimension(metric) in the Netdata database.
- * @return Returns 0 on success, 1 on failure.
*/
-int metric_formatting(struct engine *engine, RRDDIM *rd)
+void metric_formatting(struct engine *engine, RRDDIM *rd)
{
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
error("EXPORTING: cannot format metric for %s", instance->config.name);
- return 1;
+ disable_instance(instance);
+ continue;
}
instance->stats.buffered_metrics++;
}
}
-
- return 0;
}
/**
@@ -254,21 +243,19 @@ int metric_formatting(struct engine *engine, RRDDIM *rd)
*
* @param engine an engine data structure.
* @param a chart.
- * @return Returns 0 on success, 1 on failure.
*/
-int end_chart_formatting(struct engine *engine, RRDSET *st)
+void end_chart_formatting(struct engine *engine, RRDSET *st)
{
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
- return 1;
+ disable_instance(instance);
+ continue;
}
}
instance->skip_chart = 0;
}
-
- return 0;
}
/**
@@ -276,36 +263,34 @@ int end_chart_formatting(struct engine *engine, RRDSET *st)
*
* @param engine an engine data structure.
* @param host a data collecting host.
- * @return Returns 0 on success, 1 on failure.
*/
-int end_host_formatting(struct engine *engine, RRDHOST *host)
+void end_host_formatting(struct engine *engine, RRDHOST *host)
{
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host) {
if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
error("EXPORTING: cannot end host formatting for %s", instance->config.name);
- return 1;
+ disable_instance(instance);
+ continue;
}
}
instance->skip_host = 0;
}
-
- return 0;
}
/**
* End batch formatting for every connector instance's buffer
*
* @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
*/
-int end_batch_formatting(struct engine *engine)
+void end_batch_formatting(struct engine *engine)
{
for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
if (instance->scheduled) {
if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
- return 1;
+ disable_instance(instance);
+ continue;
}
uv_mutex_unlock(&instance->mutex);
uv_cond_signal(&instance->cond_var);
@@ -314,8 +299,6 @@ int end_batch_formatting(struct engine *engine)
instance->after = instance->before;
}
}
-
- return 0;
}
/**
@@ -325,51 +308,39 @@ int end_batch_formatting(struct engine *engine)
* configured rules.
*
* @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
*/
-int prepare_buffers(struct engine *engine)
+void prepare_buffers(struct engine *engine)
{
- if (start_batch_formatting(engine) != 0)
- return 1;
-
netdata_thread_disable_cancelability();
+ start_batch_formatting(engine);
+
rrd_rdlock();
RRDHOST *host;
rrdhost_foreach_read(host)
{
rrdhost_rdlock(host);
- if (start_host_formatting(engine, host) != 0)
- return 1;
+ start_host_formatting(engine, host);
RRDSET *st;
rrdset_foreach_read(st, host)
{
rrdset_rdlock(st);
- if (start_chart_formatting(engine, st) != 0)
- return 1;
+ start_chart_formatting(engine, st);
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- {
- if (metric_formatting(engine, rd) != 0)
- return 1;
- }
+ metric_formatting(engine, rd);
- if (end_chart_formatting(engine, st) != 0)
- return 1;
+ end_chart_formatting(engine, st);
rrdset_unlock(st);
}
- if (end_host_formatting(engine, host) != 0)
- return 1;
+ end_host_formatting(engine, host);
rrdhost_unlock(host);
}
rrd_unlock();
netdata_thread_enable_cancelability();
- if (end_batch_formatting(engine) != 0)
- return 1;
-
- return 0;
+ end_batch_formatting(engine);
}
/**
@@ -401,18 +372,3 @@ int simple_connector_update_buffered_bytes(struct instance *instance)
return 0;
}
-
-/**
- * Notify workers
- *
- * Notify exporting connector instance working threads that data is ready to send.
- *
- * @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
- */
-int notify_workers(struct engine *engine)
-{
- (void)engine;
-
- return 0;
-}
diff --git a/exporting/tests/exporting_doubles.c b/exporting/tests/exporting_doubles.c
index 8fafada929..2d966e51ac 100644
--- a/exporting/tests/exporting_doubles.c
+++ b/exporting/tests/exporting_doubles.c
@@ -75,13 +75,6 @@ int __wrap_prepare_buffers(struct engine *engine)
return mock_type(int);
}
-int __wrap_notify_workers(struct engine *engine)
-{
- function_called();
- check_expected_ptr(engine);
- return mock_type(int);
-}
-
void __wrap_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system)
{
function_called();
diff --git a/exporting/tests/test_exporting_engine.c b/exporting/tests/test_exporting_engine.c
index 3835369356..6cd29132fc 100644
--- a/exporting/tests/test_exporting_engine.c
+++ b/exporting/tests/test_exporting_engine.c
@@ -60,10 +60,6 @@ static void test_exporting_engine(void **state)
expect_memory(__wrap_prepare_buffers, engine, engine, sizeof(struct engine));
will_return(__wrap_prepare_buffers, 0);
- expect_function_call(__wrap_notify_workers);
- expect_memory(__wrap_notify_workers, engine, engine, sizeof(struct engine));
- will_return(__wrap_notify_workers, 0);
-
expect_function_call(__wrap_send_main_rusage);
expect_value(__wrap_send_main_rusage, st_rusage, NULL);
expect_value(__wrap_send_main_rusage, rd_user, NULL);
diff --git a/exporting/tests/test_exporting_engine.h b/exporting/tests/test_exporting_engine.h
index 7df3c39c12..237bc5f9b9 100644
--- a/exporting/tests/test_exporting_engine.h
+++ b/exporting/tests/test_exporting_engine.h
@@ -93,8 +93,6 @@ calculated_number __wrap_exporting_calculate_value_from_stored_data(
int __real_prepare_buffers(struct engine *engine);
int __wrap_prepare_buffers(struct engine *engine);
-int __wrap_notify_workers(struct engine *engine);
-
void __real_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);
void __wrap_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);