summaryrefslogtreecommitdiffstats
path: root/exporting/process_data.c
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-02-25 21:08:41 +0200
committerGitHub <noreply@github.com>2020-02-25 21:08:41 +0200
commitd79bbbf943f72495e135eee4afc25723f886592f (patch)
tree1637e6f719f9923e92bad2e5033dce6207c2b9c1 /exporting/process_data.c
parent84421fdf0b513e9e7dc1351550b96044e92c363d (diff)
Add an AWS Kinesis connector to the exporting engine (#8145)
* Prepare files for the AWS Kinesis exporting connector * Update the documentation * Rename functions in backends * Include the connector to the Netdata buid * Add initializers and a worker * Add Kinesis specific configuration options * Add a compile time configuration check * Remove the connector data structure * Restore unit tests * Fix the compile-time configuration check * Initialize AWS SDK only once * Don't create an instance for an unknown exporting connector * Separate client and request outcome data for every instance * Fix memory cleanup, document functions * Add unit tests * Update the documentation
Diffstat (limited to 'exporting/process_data.c')
-rw-r--r--exporting/process_data.c126
1 files changed, 55 insertions, 71 deletions
diff --git a/exporting/process_data.c b/exporting/process_data.c
index f38231f6ca..e284528426 100644
--- a/exporting/process_data.c
+++ b/exporting/process_data.c
@@ -42,13 +42,11 @@ int mark_scheduled_instances(struct engine *engine)
{
int instances_were_scheduled = 0;
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (engine->now % instance->config.update_every < localhost->rrd_update_every) {
- instance->scheduled = 1;
- instances_were_scheduled = 1;
- instance->before = engine->now;
- }
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (engine->now % instance->config.update_every < localhost->rrd_update_every) {
+ instance->scheduled = 1;
+ instances_were_scheduled = 1;
+ instance->before = engine->now;
}
}
@@ -166,14 +164,12 @@ calculated_number exporting_calculate_value_from_stored_data(
*/
int start_batch_formatting(struct engine *engine)
{
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (instance->scheduled) {
- uv_mutex_lock(&instance->mutex);
- if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
- error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
- return 1;
- }
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled) {
+ uv_mutex_lock(&instance->mutex);
+ if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
+ error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
+ return 1;
}
}
}
@@ -190,17 +186,15 @@ int start_batch_formatting(struct engine *engine)
*/
int start_host_formatting(struct engine *engine, RRDHOST *host)
{
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (instance->scheduled) {
- if (rrdhost_is_exportable(instance, host)) {
- if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
- error("EXPORTING: cannot start host formatting for %s", instance->config.name);
- return 1;
- }
- } else {
- instance->skip_host = 1;
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled) {
+ if (rrdhost_is_exportable(instance, host)) {
+ if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
+ error("EXPORTING: cannot start host formatting for %s", instance->config.name);
+ return 1;
}
+ } else {
+ instance->skip_host = 1;
}
}
}
@@ -217,17 +211,15 @@ int start_host_formatting(struct engine *engine, RRDHOST *host)
*/
int start_chart_formatting(struct engine *engine, RRDSET *st)
{
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host) {
- if (rrdset_is_exportable(instance, st)) {
- if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
- error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
- return 1;
- }
- } else {
- instance->skip_chart = 1;
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled && !instance->skip_host) {
+ if (rrdset_is_exportable(instance, st)) {
+ if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
+ error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
+ return 1;
}
+ } else {
+ instance->skip_chart = 1;
}
}
}
@@ -244,15 +236,13 @@ int start_chart_formatting(struct engine *engine, RRDSET *st)
*/
int metric_formatting(struct engine *engine, RRDDIM *rd)
{
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
- if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
- error("EXPORTING: cannot format metric for %s", instance->config.name);
- return 1;
- }
- instance->stats.chart_buffered_metrics++;
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
+ if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
+ error("EXPORTING: cannot format metric for %s", instance->config.name);
+ return 1;
}
+ instance->stats.chart_buffered_metrics++;
}
}
@@ -268,16 +258,14 @@ int metric_formatting(struct engine *engine, RRDDIM *rd)
*/
int end_chart_formatting(struct engine *engine, RRDSET *st)
{
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
- if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
- error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
- return 1;
- }
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
+ if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
+ error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
+ return 1;
}
- instance->skip_chart = 0;
}
+ instance->skip_chart = 0;
}
return 0;
@@ -292,16 +280,14 @@ int end_chart_formatting(struct engine *engine, RRDSET *st)
*/
int end_host_formatting(struct engine *engine, RRDHOST *host)
{
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host) {
- if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
- error("EXPORTING: cannot end host formatting for %s", instance->config.name);
- return 1;
- }
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled && !instance->skip_host) {
+ if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
+ error("EXPORTING: cannot end host formatting for %s", instance->config.name);
+ return 1;
}
- instance->skip_host = 0;
}
+ instance->skip_host = 0;
}
return 0;
@@ -315,19 +301,17 @@ int end_host_formatting(struct engine *engine, RRDHOST *host)
*/
int end_batch_formatting(struct engine *engine)
{
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- if (instance->scheduled) {
- if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
- error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
- return 1;
- }
- uv_mutex_unlock(&instance->mutex);
- uv_cond_signal(&instance->cond_var);
-
- instance->scheduled = 0;
- instance->after = instance->before;
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ if (instance->scheduled) {
+ if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
+ error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
+ return 1;
}
+ uv_mutex_unlock(&instance->mutex);
+ uv_cond_signal(&instance->cond_var);
+
+ instance->scheduled = 0;
+ instance->after = instance->before;
}
}