summaryrefslogtreecommitdiffstats
path: root/exporting/read_config.c
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-02-25 21:08:41 +0200
committerGitHub <noreply@github.com>2020-02-25 21:08:41 +0200
commitd79bbbf943f72495e135eee4afc25723f886592f (patch)
tree1637e6f719f9923e92bad2e5033dce6207c2b9c1 /exporting/read_config.c
parent84421fdf0b513e9e7dc1351550b96044e92c363d (diff)
Add an AWS Kinesis connector to the exporting engine (#8145)
* Prepare files for the AWS Kinesis exporting connector * Update the documentation * Rename functions in backends * Include the connector to the Netdata buid * Add initializers and a worker * Add Kinesis specific configuration options * Add a compile time configuration check * Remove the connector data structure * Restore unit tests * Fix the compile-time configuration check * Initialize AWS SDK only once * Don't create an instance for an unknown exporting connector * Separate client and request outcome data for every instance * Fix memory cleanup, document functions * Add unit tests * Update the documentation
Diffstat (limited to 'exporting/read_config.c')
-rw-r--r--exporting/read_config.c184
1 files changed, 95 insertions, 89 deletions
diff --git a/exporting/read_config.c b/exporting/read_config.c
index 5675187d21..d30a9e94b2 100644
--- a/exporting/read_config.c
+++ b/exporting/read_config.c
@@ -194,11 +194,12 @@ struct engine *read_exporting_config()
static struct engine *engine = NULL;
struct connector_instance_list {
struct connector_instance local_ci;
+ BACKEND_TYPE backend_type;
+
struct connector_instance_list *next;
};
struct connector_instance local_ci;
- struct connector_instance_list *tmp_ci_list, *tmp_ci_list1;
- struct connector_instance_list **ci_list;
+ struct connector_instance_list *tmp_ci_list, *tmp_ci_list1, *tmp_ci_list_prev = NULL;
if (unlikely(engine))
return engine;
@@ -218,17 +219,11 @@ struct engine *read_exporting_config()
freez(filename);
- // Will build a list of instances per connector
// TODO: change BACKEND to EXPORTING
- ci_list = callocz(BACKEND_TYPE_NUM, sizeof(struct connector_instance_list *));
-
while (get_connector_instance(&local_ci)) {
- BACKEND_TYPE backend_type;
-
info("Processing connector instance (%s)", local_ci.instance_name);
- if (exporter_get_boolean(local_ci.instance_name, "enabled", 0)) {
- backend_type = exporting_select_type(local_ci.connector_name);
+ if (exporter_get_boolean(local_ci.instance_name, "enabled", 0)) {
info(
"Instance (%s) on connector (%s) is enabled and scheduled for activation",
local_ci.instance_name,
@@ -236,8 +231,9 @@ struct engine *read_exporting_config()
tmp_ci_list = (struct connector_instance_list *)callocz(1, sizeof(struct connector_instance_list));
memcpy(&tmp_ci_list->local_ci, &local_ci, sizeof(local_ci));
- tmp_ci_list->next = ci_list[backend_type];
- ci_list[backend_type] = tmp_ci_list;
+ tmp_ci_list->backend_type = exporting_select_type(local_ci.connector_name);
+ tmp_ci_list->next = tmp_ci_list_prev;
+ tmp_ci_list_prev = tmp_ci_list;
instances_to_activate++;
} else
info("Instance (%s) on connector (%s) is not enabled", local_ci.instance_name, local_ci.connector_name);
@@ -245,11 +241,10 @@ struct engine *read_exporting_config()
if (unlikely(!instances_to_activate)) {
info("No connector instances to activate");
- freez(ci_list);
return NULL;
}
- engine = (struct engine *)calloc(1, sizeof(struct engine));
+ engine = (struct engine *)callocz(1, sizeof(struct engine));
// TODO: Check and fill engine fields if actually needed
if (exporting_config_exists) {
@@ -260,107 +255,118 @@ struct engine *read_exporting_config()
exporter_get_number(CONFIG_SECTION_EXPORTING, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
}
- for (size_t i = 0; i < BACKEND_TYPE_NUM; i++) {
- // For each connector build list
- tmp_ci_list = ci_list[i];
+ while (tmp_ci_list) {
+ struct instance *tmp_instance;
+ char *instance_name;
- // If we have a list of instances for this connector then build it
- if (tmp_ci_list) {
- struct connector *tmp_connector;
+ info("Instance %s on %s", tmp_ci_list->local_ci.instance_name, tmp_ci_list->local_ci.connector_name);
- tmp_connector = (struct connector *)calloc(1, sizeof(struct connector));
- tmp_connector->next = engine->connector_root;
- engine->connector_root = tmp_connector;
+ if (tmp_ci_list->backend_type == BACKEND_TYPE_UNKNOWN) {
+ error("Unknown exporting connector type");
+ goto next_connector_instance;
+ }
- tmp_connector->config.type = i;
- tmp_connector->engine = engine;
+#ifndef HAVE_KINESIS
+ if (tmp_ci_list->backend_type == BACKEND_TYPE_KINESIS) {
+ error("AWS Kinesis support isn't compiled");
+ goto next_connector_instance;
+ }
+#endif
+
+ tmp_instance = (struct instance *)callocz(1, sizeof(struct instance));
+ tmp_instance->next = engine->instance_root;
+ engine->instance_root = tmp_instance;
+
+ tmp_instance->engine = engine;
+ tmp_instance->config.type = tmp_ci_list->backend_type;
+
+ instance_name = tmp_ci_list->local_ci.instance_name;
- while (tmp_ci_list) {
- struct instance *tmp_instance;
- char *instance_name;
+ tmp_instance->config.name = strdupz(tmp_ci_list->local_ci.instance_name);
- info("Instance %s on %s", tmp_ci_list->local_ci.instance_name, tmp_ci_list->local_ci.connector_name);
+ tmp_instance->config.destination =
+ strdupz(exporter_get(instance_name, EXPORTER_DESTINATION, EXPORTER_DESTINATION_DEFAULT));
- tmp_instance = (struct instance *)calloc(1, sizeof(struct instance));
- tmp_instance->connector = engine->connector_root;
- tmp_instance->next = engine->connector_root->instance_root;
- engine->connector_root->instance_root = tmp_instance;
- tmp_instance->connector = engine->connector_root;
+ tmp_instance->config.update_every =
+ exporter_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
- instance_name = tmp_ci_list->local_ci.instance_name;
+ tmp_instance->config.buffer_on_failures =
+ exporter_get_number(instance_name, EXPORTER_BUF_ONFAIL, EXPORTER_BUF_ONFAIL_DEFAULT);
- tmp_instance->config.name = strdupz(tmp_ci_list->local_ci.instance_name);
+ tmp_instance->config.timeoutms =
+ exporter_get_number(instance_name, EXPORTER_TIMEOUT_MS, EXPORTER_TIMEOUT_MS_DEFAULT);
- tmp_instance->config.destination =
- strdupz(exporter_get(instance_name, EXPORTER_DESTINATION, EXPORTER_DESTINATION_DEFAULT));
+ tmp_instance->config.charts_pattern = simple_pattern_create(
+ exporter_get(instance_name, EXPORTER_SEND_CHART_MATCH, EXPORTER_SEND_CHART_MATCH_DEFAULT),
+ NULL,
+ SIMPLE_PATTERN_EXACT);
- tmp_instance->config.update_every =
- exporter_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
+ tmp_instance->config.hosts_pattern = simple_pattern_create(
+ exporter_get(instance_name, EXPORTER_SEND_HOST_MATCH, EXPORTER_SEND_HOST_MATCH_DEFAULT),
+ NULL,
+ SIMPLE_PATTERN_EXACT);
- tmp_instance->config.buffer_on_failures =
- exporter_get_number(instance_name, EXPORTER_BUF_ONFAIL, EXPORTER_BUF_ONFAIL_DEFAULT);
+ char *data_source =
+ exporter_get(instance_name, EXPORTER_DATA_SOURCE, EXPORTER_DATA_SOURCE_DEFAULT);
- tmp_instance->config.timeoutms =
- exporter_get_number(instance_name, EXPORTER_TIMEOUT_MS, EXPORTER_TIMEOUT_MS_DEFAULT);
+ tmp_instance->config.options = exporting_parse_data_source(data_source, tmp_instance->config.options);
- tmp_instance->config.charts_pattern = simple_pattern_create(
- exporter_get(instance_name, EXPORTER_SEND_CHART_MATCH, EXPORTER_SEND_CHART_MATCH_DEFAULT),
- NULL,
- SIMPLE_PATTERN_EXACT);
+ if (exporter_get_boolean(
+ instance_name, EXPORTER_SEND_CONFIGURED_LABELS, EXPORTER_SEND_CONFIGURED_LABELS_DEFAULT))
+ tmp_instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
+ else
+ tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
- tmp_instance->config.hosts_pattern = simple_pattern_create(
- exporter_get(instance_name, EXPORTER_SEND_HOST_MATCH, EXPORTER_SEND_HOST_MATCH_DEFAULT),
- NULL,
- SIMPLE_PATTERN_EXACT);
+ if (exporter_get_boolean(
+ instance_name, EXPORTER_SEND_AUTOMATIC_LABELS, EXPORTER_SEND_AUTOMATIC_LABELS_DEFAULT))
+ tmp_instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
+ else
+ tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
- char *data_source =
- exporter_get(instance_name, EXPORTER_DATA_SOURCE, EXPORTER_DATA_SOURCE_DEFAULT);
+ if (exporter_get_boolean(instance_name, EXPORTER_SEND_NAMES, EXPORTER_SEND_NAMES_DEFAULT))
+ tmp_instance->config.options |= EXPORTING_OPTION_SEND_NAMES;
+ else
+ tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_NAMES;
- tmp_instance->config.options = exporting_parse_data_source(data_source, tmp_instance->config.options);
+ if (tmp_instance->config.type == BACKEND_TYPE_KINESIS) {
+ struct aws_kinesis_specific_config *connector_specific_config =
+ callocz(1, sizeof(struct aws_kinesis_specific_config));
- if (exporter_get_boolean(
- instance_name, EXPORTER_SEND_CONFIGURED_LABELS, EXPORTER_SEND_CONFIGURED_LABELS_DEFAULT))
- tmp_instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
- else
- tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
+ tmp_instance->config.connector_specific_config = connector_specific_config;
- if (exporter_get_boolean(
- instance_name, EXPORTER_SEND_AUTOMATIC_LABELS, EXPORTER_SEND_AUTOMATIC_LABELS_DEFAULT))
- tmp_instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
- else
- tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
+ connector_specific_config->stream_name = strdupz(exporter_get(
+ instance_name, EXPORTER_KINESIS_STREAM_NAME, EXPORTER_KINESIS_STREAM_NAME_DEFAULT));
- if (exporter_get_boolean(instance_name, EXPORTER_SEND_NAMES, EXPORTER_SEND_NAMES_DEFAULT))
- tmp_instance->config.options |= EXPORTING_OPTION_SEND_NAMES;
- else
- tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_NAMES;
+ connector_specific_config->auth_key_id = strdupz(exporter_get(
+ instance_name, EXPORTER_AWS_ACCESS_KEY_ID, ""));
+
+ connector_specific_config->secure_key = strdupz(exporter_get(
+ instance_name, EXPORTER_AWS_SECRET_ACCESS_KEY, ""));
+ }
#ifdef NETDATA_INTERNAL_CHECKS
- info(
- " Dest=[%s], upd=[%d], buffer=[%d] timeout=[%ld] options=[%u]",
- tmp_instance->config.destination,
- tmp_instance->config.update_every,
- tmp_instance->config.buffer_on_failures,
- tmp_instance->config.timeoutms,
- tmp_instance->config.options);
+ info(
+ " Dest=[%s], upd=[%d], buffer=[%d] timeout=[%ld] options=[%u]",
+ tmp_instance->config.destination,
+ tmp_instance->config.update_every,
+ tmp_instance->config.buffer_on_failures,
+ tmp_instance->config.timeoutms,
+ tmp_instance->config.options);
#endif
- if (unlikely(!exporting_config_exists) && !engine->config.hostname) {
- engine->config.hostname =
- strdupz(config_get(instance_name, "hostname", netdata_configured_hostname));
- engine->config.prefix = strdupz(config_get(instance_name, "prefix", "netdata"));
- engine->config.update_every =
- config_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
- }
-
- tmp_ci_list1 = tmp_ci_list->next;
- freez(tmp_ci_list);
- tmp_ci_list = tmp_ci_list1;
- }
+ if (unlikely(!exporting_config_exists) && !engine->config.hostname) {
+ engine->config.hostname =
+ strdupz(config_get(instance_name, "hostname", netdata_configured_hostname));
+ engine->config.prefix = strdupz(config_get(instance_name, "prefix", "netdata"));
+ engine->config.update_every =
+ config_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
}
- }
- freez(ci_list);
+next_connector_instance:
+ tmp_ci_list1 = tmp_ci_list->next;
+ freez(tmp_ci_list);
+ tmp_ci_list = tmp_ci_list1;
+ }
return engine;
}