summaryrefslogtreecommitdiffstats
path: root/exporting
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-04-10 12:26:36 +0300
committerGitHub <noreply@github.com>2020-04-10 12:26:36 +0300
commit231d19351d0ea20aa66fd204215cbb24f34d7575 (patch)
treee3ed26be7d8ea4ba3b07a29c77bf05bc67b73a43 /exporting
parent764a0676e82c89b4a4516a31a7782ea606071fa5 (diff)
Show internal stats for the exporting engine (#8635)
* Add a print function for internal exporting statistics * Send statistics for simple connectors * Flush sending buffers on failures * Send statistics for the Kinesis connector * Send statistics for the MongoDB connector * Add unit tests
Diffstat (limited to 'exporting')
-rw-r--r--exporting/aws_kinesis/aws_kinesis.c36
-rw-r--r--exporting/exporting_engine.c10
-rw-r--r--exporting/exporting_engine.h53
-rw-r--r--exporting/graphite/graphite.c2
-rw-r--r--exporting/json/json.c2
-rw-r--r--exporting/mongodb/mongodb.c60
-rw-r--r--exporting/mongodb/mongodb.h3
-rw-r--r--exporting/opentsdb/opentsdb.c4
-rw-r--r--exporting/process_data.c15
-rw-r--r--exporting/prometheus/remote_write/remote_write.c2
-rw-r--r--exporting/send_data.c50
-rw-r--r--exporting/send_internal_metrics.c166
-rw-r--r--exporting/tests/exporting_doubles.c20
-rw-r--r--exporting/tests/netdata_doubles.c86
-rw-r--r--exporting/tests/test_exporting_engine.c322
-rw-r--r--exporting/tests/test_exporting_engine.h9
16 files changed, 761 insertions, 79 deletions
diff --git a/exporting/aws_kinesis/aws_kinesis.c b/exporting/aws_kinesis/aws_kinesis.c
index 4b0d5f74a4..938569a9eb 100644
--- a/exporting/aws_kinesis/aws_kinesis.c
+++ b/exporting/aws_kinesis/aws_kinesis.c
@@ -75,9 +75,23 @@ void aws_kinesis_connector_worker(void *instance_p)
uv_mutex_lock(&instance->mutex);
uv_cond_wait(&instance->cond_var, &instance->mutex);
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
BUFFER *buffer = (BUFFER *)instance->buffer;
size_t buffer_len = buffer_strlen(buffer);
+ stats->buffered_bytes = buffer_len;
+
size_t sent = 0;
while (sent < buffer_len) {
@@ -115,7 +129,7 @@ void aws_kinesis_connector_worker(void *instance_p)
connector_specific_data, connector_specific_config->stream_name, partition_key, first_char, record_len);
sent += record_len;
- stats->chart_transmission_successes++;
+ stats->transmission_successes++;
size_t sent_bytes = 0, lost_bytes = 0;
@@ -127,30 +141,34 @@ void aws_kinesis_connector_worker(void *instance_p)
"EXPORTING: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
instance->config.destination, sent_bytes, sent_bytes - lost_bytes);
- stats->chart_transmission_failures++;
- stats->chart_data_lost_events++;
- stats->chart_lost_bytes += lost_bytes;
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_bytes += lost_bytes;
// estimate the number of lost metrics
- stats->chart_lost_metrics += (collected_number)(
- stats->chart_buffered_metrics *
+ stats->lost_metrics += (collected_number)(
+ stats->buffered_metrics *
(buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
break;
} else {
- stats->chart_receptions++;
+ stats->receptions++;
}
if (unlikely(netdata_exit))
break;
}
- stats->chart_sent_bytes += sent;
+ stats->sent_bytes += sent;
if (likely(sent == buffer_len))
- stats->chart_sent_metrics = stats->chart_buffered_metrics;
+ stats->sent_metrics = stats->buffered_metrics;
buffer_flush(buffer);
+ send_internal_metrics(instance);
+
+ stats->buffered_metrics = 0;
+
uv_mutex_unlock(&instance->mutex);
#ifdef UNIT_TESTING
diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c
index 0a38d66bbe..93347328cd 100644
--- a/exporting/exporting_engine.c
+++ b/exporting/exporting_engine.c
@@ -35,6 +35,11 @@ void *exporting_main(void *ptr)
goto cleanup;
}
+ RRDSET *st_main_rusage = NULL;
+ RRDDIM *rd_main_user = NULL;
+ RRDDIM *rd_main_system = NULL;
+ create_main_rusage_chart(&st_main_rusage, &rd_main_user, &rd_main_system);
+
usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
@@ -55,10 +60,7 @@ void *exporting_main(void *ptr)
break;
}
- if (send_internal_metrics(engine) != 0) {
- error("EXPORTING: cannot send metrics for the operation of exporting engine");
- break;
- }
+ send_main_rusage(st_main_rusage, rd_main_user, rd_main_system);
#ifdef UNIT_TESTING
break;
diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h
index 8a46dd5d3d..94daf98e05 100644
--- a/exporting/exporting_engine.h
+++ b/exporting/exporting_engine.h
@@ -99,18 +99,42 @@ struct engine_config {
};
struct stats {
- collected_number chart_buffered_metrics;
- collected_number chart_lost_metrics;
- collected_number chart_sent_metrics;
- collected_number chart_buffered_bytes;
- collected_number chart_received_bytes;
- collected_number chart_sent_bytes;
- collected_number chart_receptions;
- collected_number chart_transmission_successes;
- collected_number chart_transmission_failures;
- collected_number chart_data_lost_events;
- collected_number chart_lost_bytes;
- collected_number chart_reconnects;
+ collected_number buffered_metrics;
+ collected_number lost_metrics;
+ collected_number sent_metrics;
+ collected_number buffered_bytes;
+ collected_number lost_bytes;
+ collected_number sent_bytes;
+ collected_number received_bytes;
+ collected_number transmission_successes;
+ collected_number data_lost_events;
+ collected_number reconnects;
+ collected_number transmission_failures;
+ collected_number receptions;
+
+ int initialized;
+
+ RRDSET *st_metrics;
+ RRDDIM *rd_buffered_metrics;
+ RRDDIM *rd_lost_metrics;
+ RRDDIM *rd_sent_metrics;
+
+ RRDSET *st_bytes;
+ RRDDIM *rd_buffered_bytes;
+ RRDDIM *rd_lost_bytes;
+ RRDDIM *rd_sent_bytes;
+ RRDDIM *rd_received_bytes;
+
+ RRDSET *st_ops;
+ RRDDIM *rd_transmission_successes;
+ RRDDIM *rd_data_lost_events;
+ RRDDIM *rd_reconnects;
+ RRDDIM *rd_transmission_failures;
+ RRDDIM *rd_receptions;
+
+ RRDSET *st_rusage;
+ RRDDIM *rd_user;
+ RRDDIM *rd_system;
};
struct instance {
@@ -193,13 +217,16 @@ int end_chart_formatting(struct engine *engine, RRDSET *st);
int end_host_formatting(struct engine *engine, RRDHOST *host);
int end_batch_formatting(struct engine *engine);
int flush_host_labels(struct instance *instance, RRDHOST *host);
+int simple_connector_update_buffered_bytes(struct instance *instance);
int exporting_discard_response(BUFFER *buffer, struct instance *instance);
void simple_connector_receive_response(int *sock, struct instance *instance);
void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance);
void simple_connector_worker(void *instance_p);
-int send_internal_metrics(struct engine *engine);
+void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);
+void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system);
+void send_internal_metrics(struct instance *instance);
#include "exporting/prometheus/prometheus.h"
diff --git a/exporting/graphite/graphite.c b/exporting/graphite/graphite.c
index f815bff89d..d3f928583c 100644
--- a/exporting/graphite/graphite.c
+++ b/exporting/graphite/graphite.c
@@ -27,7 +27,7 @@ int init_graphite_instance(struct instance *instance)
instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels;
- instance->end_batch_formatting = NULL;
+ instance->end_batch_formatting = simple_connector_update_buffered_bytes;
instance->send_header = NULL;
instance->check_response = exporting_discard_response;
diff --git a/exporting/json/json.c b/exporting/json/json.c
index 9886b55567..b334804cff 100644
--- a/exporting/json/json.c
+++ b/exporting/json/json.c
@@ -27,7 +27,7 @@ int init_json_instance(struct instance *instance)
instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels;
- instance->end_batch_formatting = NULL;
+ instance->end_batch_formatting = simple_connector_update_buffered_bytes;
instance->send_header = NULL;
instance->check_response = exporting_discard_response;
diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c
index bd2d338894..f20c4f1c80 100644
--- a/exporting/mongodb/mongodb.c
+++ b/exporting/mongodb/mongodb.c
@@ -183,8 +183,10 @@ int format_batch_mongodb(struct instance *instance)
// ring buffer is full, reuse the oldest element
connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
free_bson(insert, connector_specific_data->last_buffer->documents_inserted);
+ connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted;
+ stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes;
}
- insert = callocz((size_t)stats->chart_buffered_metrics, sizeof(bson_t *));
+ insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *));
connector_specific_data->last_buffer->insert = insert;
BUFFER *buffer = (BUFFER *)instance->buffer;
@@ -193,7 +195,7 @@ int format_batch_mongodb(struct instance *instance)
size_t documents_inserted = 0;
- while (*end && documents_inserted <= (size_t)stats->chart_buffered_metrics) {
+ while (*end && documents_inserted <= (size_t)stats->buffered_metrics) {
while (*end && *end != '\n')
end++;
@@ -208,7 +210,8 @@ int format_batch_mongodb(struct instance *instance)
insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error);
if (unlikely(!insert[documents_inserted])) {
- error("EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message);
+ error(
+ "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message);
free_bson(insert, documents_inserted);
return 1;
}
@@ -218,8 +221,16 @@ int format_batch_mongodb(struct instance *instance)
documents_inserted++;
}
+ stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer);
+
buffer_flush(buffer);
+ // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number
+ // of metrics, added in the current iteration, so we are clearing it here. We will use the
+ // connector_specific_data->total_documents_inserted in the worker to show the statistics.
+ stats->buffered_metrics = 0;
+ connector_specific_data->total_documents_inserted += documents_inserted;
+
connector_specific_data->last_buffer->documents_inserted = documents_inserted;
connector_specific_data->last_buffer = connector_specific_data->last_buffer->next;
@@ -246,11 +257,25 @@ void mongodb_connector_worker(void *instance_p)
uv_mutex_lock(&instance->mutex);
uv_cond_wait(&instance->cond_var, &instance->mutex);
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
bson_t **insert = connector_specific_data->first_buffer->insert;
size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted;
+ size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
connector_specific_data->first_buffer->insert = NULL;
connector_specific_data->first_buffer->documents_inserted = 0;
+ connector_specific_data->first_buffer->buffered_bytes = 0;
connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
uv_mutex_unlock(&instance->mutex);
@@ -279,9 +304,10 @@ void mongodb_connector_worker(void *instance_p)
NULL,
NULL,
&bson_error))) {
- stats->chart_sent_bytes += data_size;
- stats->chart_transmission_successes++;
- stats->chart_receptions++;
+ stats->sent_metrics = documents_inserted;
+ stats->sent_bytes += data_size;
+ stats->transmission_successes++;
+ stats->receptions++;
} else {
// oops! we couldn't send (all or some of the) data
error("EXPORTING: %s", bson_error.message);
@@ -290,10 +316,10 @@ void mongodb_connector_worker(void *instance_p)
"Willing to write %zu bytes, wrote %zu bytes.",
instance->config.destination, data_size, 0UL);
- stats->chart_transmission_failures++;
- stats->chart_data_lost_events++;
- stats->chart_lost_bytes += data_size;
- stats->chart_lost_metrics += stats->chart_buffered_metrics;
+ stats->transmission_failures++;
+ stats->data_lost_events++;
+ stats->lost_bytes += buffered_bytes;
+ stats->lost_metrics += documents_inserted;
}
free_bson(insert, documents_inserted);
@@ -301,8 +327,18 @@ void mongodb_connector_worker(void *instance_p)
if (unlikely(netdata_exit))
break;
- stats->chart_sent_bytes += data_size;
- stats->chart_sent_metrics = stats->chart_buffered_metrics;
+ uv_mutex_lock(&instance->mutex);
+
+ stats->buffered_metrics = connector_specific_data->total_documents_inserted;
+
+ send_internal_metrics(instance);
+
+ connector_specific_data->total_documents_inserted -= documents_inserted;
+
+ stats->buffered_metrics = 0;
+ stats->buffered_bytes -= buffered_bytes;
+
+ uv_mutex_unlock(&instance->mutex);
#ifdef UNIT_TESTING
break;
diff --git a/exporting/mongodb/mongodb.h b/exporting/mongodb/mongodb.h
index 0f23705f55..5116e66fab 100644
--- a/exporting/mongodb/mongodb.h
+++ b/exporting/mongodb/mongodb.h
@@ -10,6 +10,7 @@
struct bson_buffer {
bson_t **insert;
size_t documents_inserted;
+ size_t buffered_bytes;
struct bson_buffer *next;
};
@@ -18,6 +19,8 @@ struct mongodb_specific_data {
mongoc_client_t *client;
mongoc_collection_t *collection;
+ size_t total_documents_inserted;
+
bson_t **current_insert;
struct bson_buffer *first_buffer;
struct bson_buffer *last_buffer;
diff --git a/exporting/opentsdb/opentsdb.c b/exporting/opentsdb/opentsdb.c
index 54f3c3c04d..2d5b2db698 100644
--- a/exporting/opentsdb/opentsdb.c
+++ b/exporting/opentsdb/opentsdb.c
@@ -27,7 +27,7 @@ int init_opentsdb_telnet_instance(struct instance *instance)
instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels;
- instance->end_batch_formatting = NULL;
+ instance->end_batch_formatting = simple_connector_update_buffered_bytes;
instance->send_header = NULL;
instance->check_response = exporting_discard_response;
@@ -68,7 +68,7 @@ int init_opentsdb_http_instance(struct instance *instance)
instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels;
- instance->end_batch_formatting = NULL;
+ instance->end_batch_formatting = simple_connector_update_buffered_bytes;
instance->send_header = NULL;
instance->check_response = exporting_discard_response;
diff --git a/exporting/process_data.c b/exporting/process_data.c
index 0645982c8a..f2442e701c 100644
--- a/exporting/process_data.c
+++ b/exporting/process_data.c
@@ -242,7 +242,7 @@ int metric_formatting(struct engine *engine, RRDDIM *rd)
error("EXPORTING: cannot format metric for %s", instance->config.name);
return 1;
}
- instance->stats.chart_buffered_metrics++;
+ instance->stats.buffered_metrics++;
}
}
@@ -390,6 +390,19 @@ int flush_host_labels(struct instance *instance, RRDHOST *host)
}
/**
+ * Update stats for buffered bytes
+ *
+ * @param instance an instance data structure.
+ * @return Always returns 0.
+ */
+int simple_connector_update_buffered_bytes(struct instance *instance)
+{
+ instance->stats.buffered_bytes = (collected_number)buffer_strlen((BUFFER *)(instance->buffer));
+
+ return 0;
+}
+
+/**
* Notify workers
*
* Notify exporting connector instance working threads that data is ready to send.
diff --git a/exporting/prometheus/remote_write/remote_write.c b/exporting/prometheus/remote_write/remote_write.c
index 248a053c21..12019e2286 100644
--- a/exporting/prometheus/remote_write/remote_write.c
+++ b/exporting/prometheus/remote_write/remote_write.c
@@ -314,7 +314,7 @@ int format_batch_prometheus_remote_write(struct instance *instance)
return 1;
}
buffer->len = data_size;
- instance->stats.chart_buffered_bytes = (collected_number)buffer_strlen(buffer);
+ instance->stats.buffered_bytes = (collected_number)buffer_strlen(buffer);
return 0;
}
diff --git a/exporting/send_data.c b/exporting/send_data.c
index 6315a7d1bf..8875065f2b 100644
--- a/exporting/send_data.c
+++ b/exporting/send_data.c
@@ -57,8 +57,8 @@ void simple_connector_receive_response(int *sock, struct instance *instance)
if (likely(r > 0)) {
// we received some data
response->len += r;
- stats->chart_received_bytes += r;
- stats->chart_receptions++;
+ stats->received_bytes += r;
+ stats->receptions++;
} else if (r == 0) {
error("EXPORTING: '%s' closed the socket", instance->config.destination);
close(*sock);
@@ -109,9 +109,9 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
if(written != -1 && (size_t)written == len) {
// we sent the data successfully
- stats->chart_transmission_successes++;
- stats->chart_sent_bytes += written;
- stats->chart_sent_metrics = stats->chart_buffered_metrics;
+ stats->transmission_successes++;
+ stats->sent_bytes += written;
+ stats->sent_metrics = stats->buffered_metrics;
// reset the failures count
*failures = 0;
@@ -126,10 +126,10 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
instance->config.destination,
len,
written);
- stats->chart_transmission_failures++;
+ stats->transmission_failures++;
if(written != -1)
- stats->chart_sent_bytes += written;
+ stats->sent_bytes += written;
// increment the counter we check for data loss
(*failures)++;
@@ -160,6 +160,19 @@ void simple_connector_worker(void *instance_p)
int failures = 0;
while(!netdata_exit) {
+
+ // reset the monitoring chart counters
+ stats->received_bytes =
+ stats->sent_bytes =
+ stats->sent_metrics =
+ stats->lost_metrics =
+ stats->receptions =
+ stats->transmission_successes =
+ stats->transmission_failures =
+ stats->data_lost_events =
+ stats->lost_bytes =
+ stats->reconnects = 0;
+
// ------------------------------------------------------------------------
// if we are connected, receive a response, without blocking
@@ -179,7 +192,7 @@ void simple_connector_worker(void *instance_p)
&reconnects,
NULL,
0);
- stats->chart_reconnects += reconnects;
+ stats->reconnects += reconnects;
}
if(unlikely(netdata_exit)) break;
@@ -194,12 +207,31 @@ void simple_connector_worker(void *instance_p)
simple_connector_send_buffer(&sock, &failures, instance);
} else {
error("EXPORTING: failed to update '%s'", instance->config.destination);
- stats->chart_transmission_failures++;
+ stats->transmission_failures++;
// increment the counter we check for data loss
failures++;
}
+ BUFFER *buffer = instance->buffer;
+
+ if (failures > instance->config.buffer_on_failures) {
+ stats->lost_bytes += buffer_strlen(buffer);
+ error(
+ "EXPORTING: connector instance %s reached %d exporting failures. "
+ "Flushing buffers to protect this host - this results in data loss on server '%s'",
+ instance->config.name, failures, instance->config.destination);
+ buffer_flush(buffer);
+ failures = 0;
+ stats->data_lost_events++;
+ stats->lost_metrics = stats->buffered_metrics;
+ }
+
+ send_internal_metrics(instance);
+
+ if(likely(buffer_strlen(buffer) == 0))
+ stats->buffered_metrics = 0;
+
uv_mutex_unlock(&instance->mutex);
#ifdef UNIT_TESTING
diff --git a/exporting/send_internal_metrics.c b/exporting/send_internal_metrics.c
index b93918695b..e4111a587b 100644
--- a/exporting/send_internal_metrics.c
+++ b/exporting/send_internal_metrics.c
@@ -3,16 +3,170 @@
#include "exporting_engine.h"
/**
- * Send internal metrics
+ * Create a chart for the main exporting thread CPU usage
+ *
+ * @param st_rusage the thead CPU usage chart
+ * @param rd_user a dimension for user CPU usage
+ * @param rd_system a dimension for system CPU usage
+ */
+void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system)
+{
+ if (*st_rusage && *rd_user && *rd_system)
+ return;
+
+ *st_rusage = rrdset_create_localhost(
+ "netdata", "exporting_main_thread_cpu", NULL, "exporting", NULL, "Netdata Main Exporting Thread CPU Usage",
+ "milliseconds/s", "exporting", NULL, 130600, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ *rd_user = rrddim_add(*st_rusage, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ *rd_system = rrddim_add(*st_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+}
+
+/**
+ * Send the main exporting thread CPU usage
+ *
+ * @param st_rusage a thead CPU usage chart
+ * @param rd_user a dimension for user CPU usage
+ * @param rd_system a dimension for system CPU usage
+ */
+void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system)
+{
+ struct rusage thread;
+ getrusage(RUSAGE_THREAD, &thread);
+
+ if (likely(st_rusage->counter_done))
+ rrdset_next(st_rusage);
+
+ rrddim_set_by_pointer(st_rusage, rd_user, thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
+ rrddim_set_by_pointer(st_rusage, rd_system, thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
+
+ rrdset_done(st_rusage);
+}
+
+/**
+ * Send internal metrics for an instance
*
* Send performance metrics for the operation of exporting engine itself to the Netdata database.
*
- * @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
+ * @param instance an instance data structure.
*/
-int send_internal_metrics(struct engine *engine)
+void send_internal_metrics(struct instance *instance)
{
- (void)engine;
+ struct stats *stats = &instance->stats;
+
+ // ------------------------------------------------------------------------
+ // create charts for monitoring the exporting operations
+
+ if (!stats->initialized) {
+ char id[RRD_ID_LENGTH_MAX + 1];
+ BUFFER *family = buffer_create(0);
+
+ buffer_sprintf(family, "exporting_%s", instance->config.name);
+
+ snprintf(id, RRD_ID_LENGTH_MAX, "exporting_%s_metrics", instance->config.name);
+ netdata_fix_chart_id(id);
+
+ stats->st_metrics = rrdset_create_localhost(
+ "netdata", id, NULL, buffer_tostring(family), NULL, "Netdata Buffered Metrics", "metrics", "exporting", NULL,
+ 130610, instance->config.update_every, RRDSET_TYPE_LINE);
+
+ stats->rd_buffered_metrics = rrddim_add(stats->st_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_lost_metrics = rrddim_add(stats->st_metrics, "lost", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_sent_metrics = rrddim_add(stats->st_metrics, "sent", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+
+ // ------------------------------------------------------------------------
+
+ snprintf(id, RRD_ID_LENGTH_MAX, "exporting_%s_bytes", instance->config.name);
+ netdata_fix_chart_id(id);
+
+ stats->st_bytes = rrdset_create_localhost(
+ "netdata", id, NULL, buffer_tostring(family), NULL, "Netdata Exporting Data Size", "KiB", "exporting", NULL,
+ 130620, instance->config.update_every, RRDSET_TYPE_AREA);
+
+ stats->rd_buffered_bytes = rrddim_add(stats->st_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_lost_bytes = rrddim_add(stats->st_bytes, "lost", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_sent_bytes = rrddim_add(stats->st_bytes, "sent", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_received_bytes = rrddim_add(stats->st_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
+
+ // ------------------------------------------------------------------------
+
+ snprintf(id, RRD_ID_LENGTH_MAX, "exporting_%s_ops", instance->config.name);
+ netdata_fix_chart_id(id);
+
+ stats->st_ops = rrdset_create_localhost(
+ "netdata", id, NULL, buffer_tostring(family), NULL, "Netdata Exporting Operations", "operations", "exporting",
+ NULL, 130630, instance->config.update_every, RRDSET_TYPE_LINE);
+
+ stats->rd_transmission_successes = rrddim_add(stats->st_ops, "write", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_data_lost_events = rrddim_add(stats->st_ops, "discard", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_reconnects = rrddim_add(stats->st_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_transmission_failures = rrddim_add(stats->st_ops, "failure", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ stats->rd_receptions = rrddim_add(stats->st_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+
+ // ------------------------------------------------------------------------
+
+ snprintf(id, RRD_ID_LENGTH_MAX, "exporting_%s_thread_cpu", instance->config.name);
+ netdata_fix_chart_id(id);
+
+ stats->st_rusage = rrdset_create_localhost(
+ "netdata", id, NULL, buffer_tostring(family), NULL, "Netdata Exporting Instance Thread CPU Usage",
+ "milliseconds/s", "exporting", NULL, 130640, instance->config.update_every, RRDSET_TYPE_STACKED);
+
+ stats->rd_user = rrddim_add(stats->st_rusage, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ stats->rd_system = rrddim_add(stats->st_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+
+ buffer_free(family);
+
+ stats->initialized = 1;
+ }
+
+ // ------------------------------------------------------------------------
+ // update the monitoring charts
+
+ if (likely(stats->st_metrics->counter_done))
+ rrdset_next(stats->st_metrics);
+
+ rrddim_set_by_pointer(stats->st_metrics, stats->rd_buffered_metrics, stats->buffered_metrics);
+ rrddim_set_by_pointer(stats->st_metrics, stats->rd_lost_metrics, stats->lost_metrics);
+ rrddim_set_by_pointer(stats->st_metrics, stats->rd_sent_metrics, stats->sent_metrics);
+
+ rrdset_done(stats->st_metrics);
+
+ // ------------------------------------------------------------------------
+
+ if (likely(stats->st_bytes->counter_done))
+ rrdset_next(stats->st_bytes);
+
+ rrddim_set_by_pointer(stats->st_bytes, stats->rd_buffered_bytes, stats->buffered_bytes);
+ rrddim_set_by_pointer(stats->st_bytes, stats->rd_lost_bytes, stats->lost_bytes);
+ rrddim_set_by_pointer(stats->st_bytes, stats->rd_sent_bytes, stats->sent_bytes);
+ rrddim_set_by_pointer(stats->st_bytes, stats->rd_received_bytes, stats->received_bytes);
+
+ rrdset_done(stats->st_bytes);
+
+ // ------------------------------------------------------------------------
+
+ if (likely(stats->st_ops->counter_done))
+ rrdset_next(stats->st_ops);
+
+ rrddim_set_by_pointer(stats->st_ops, stats->rd_transmission_successes, stats->transmission_successes);
+ rrddim_set_by_pointer(stats->st_ops, stats->rd_data_lost_events, stats->data_lost_events);
+ rrddim_set_by_pointer(stats->st_ops, stats->rd_reconnects, stats->reconnects);
+ rrddim_set_by_pointer(stats->st_ops, stats->rd_transmission_failures, stats->transmission_failures);
+ rrddim_set_by_pointer(stats->st_ops, stats->rd_receptions, stats->receptions);
+
+ rrdset_done(stats->st_ops);
+
+ // ------------------------------------------------------------------------
+
+ struct rusage thread;
+ getrusage(RUSAGE_THREAD, &thread);
+
+ if (likely(stats->st_rusage->counter_done))
+ rrdset_next(stats->st_rusage);
+
+ rrddim_set_by_pointer(stats->st_rusage, stats->rd_user, thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
+ rrddim_set_by_pointer(stats->st_rusage, stats->rd_system, thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
- return 0;
+ rrdset_done(stats->st_rusage);
}
diff --git a/exporting/tests/exporting_doubles.c b/exporting/tests/exporting_doubles.c
index c43face978..29935a7d19 100644
--- a/exporting/tests/exporting_doubles.c
+++ b/exporting/tests/exporting_doubles.c
@@ -82,10 +82,26 @@ int __wrap_notify_workers(struct engine *engine)
return mock_type(int);
}
-int __wrap_send_internal_metrics(struct engine *engine)
+void __wrap_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system)
{
function_called();
- check_expected_ptr(engine);</