diff options
author | Vladimir Kobal <vlad@prokk.net> | 2020-02-25 21:08:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-25 21:08:41 +0200 |
commit | d79bbbf943f72495e135eee4afc25723f886592f (patch) | |
tree | 1637e6f719f9923e92bad2e5033dce6207c2b9c1 /exporting/read_config.c | |
parent | 84421fdf0b513e9e7dc1351550b96044e92c363d (diff) |
Add an AWS Kinesis connector to the exporting engine (#8145)
* Prepare files for the AWS Kinesis exporting connector
* Update the documentation
* Rename functions in backends
* Include the connector to the Netdata buid
* Add initializers and a worker
* Add Kinesis specific configuration options
* Add a compile time configuration check
* Remove the connector data structure
* Restore unit tests
* Fix the compile-time configuration check
* Initialize AWS SDK only once
* Don't create an instance for an unknown exporting connector
* Separate client and request outcome data for every instance
* Fix memory cleanup, document functions
* Add unit tests
* Update the documentation
Diffstat (limited to 'exporting/read_config.c')
-rw-r--r-- | exporting/read_config.c | 184 |
1 files changed, 95 insertions, 89 deletions
diff --git a/exporting/read_config.c b/exporting/read_config.c index 5675187d21..d30a9e94b2 100644 --- a/exporting/read_config.c +++ b/exporting/read_config.c @@ -194,11 +194,12 @@ struct engine *read_exporting_config() static struct engine *engine = NULL; struct connector_instance_list { struct connector_instance local_ci; + BACKEND_TYPE backend_type; + struct connector_instance_list *next; }; struct connector_instance local_ci; - struct connector_instance_list *tmp_ci_list, *tmp_ci_list1; - struct connector_instance_list **ci_list; + struct connector_instance_list *tmp_ci_list, *tmp_ci_list1, *tmp_ci_list_prev = NULL; if (unlikely(engine)) return engine; @@ -218,17 +219,11 @@ struct engine *read_exporting_config() freez(filename); - // Will build a list of instances per connector // TODO: change BACKEND to EXPORTING - ci_list = callocz(BACKEND_TYPE_NUM, sizeof(struct connector_instance_list *)); - while (get_connector_instance(&local_ci)) { - BACKEND_TYPE backend_type; - info("Processing connector instance (%s)", local_ci.instance_name); - if (exporter_get_boolean(local_ci.instance_name, "enabled", 0)) { - backend_type = exporting_select_type(local_ci.connector_name); + if (exporter_get_boolean(local_ci.instance_name, "enabled", 0)) { info( "Instance (%s) on connector (%s) is enabled and scheduled for activation", local_ci.instance_name, @@ -236,8 +231,9 @@ struct engine *read_exporting_config() tmp_ci_list = (struct connector_instance_list *)callocz(1, sizeof(struct connector_instance_list)); memcpy(&tmp_ci_list->local_ci, &local_ci, sizeof(local_ci)); - tmp_ci_list->next = ci_list[backend_type]; - ci_list[backend_type] = tmp_ci_list; + tmp_ci_list->backend_type = exporting_select_type(local_ci.connector_name); + tmp_ci_list->next = tmp_ci_list_prev; + tmp_ci_list_prev = tmp_ci_list; instances_to_activate++; } else info("Instance (%s) on connector (%s) is not enabled", local_ci.instance_name, local_ci.connector_name); @@ -245,11 +241,10 @@ struct engine *read_exporting_config() if (unlikely(!instances_to_activate)) { info("No connector instances to activate"); - freez(ci_list); return NULL; } - engine = (struct engine *)calloc(1, sizeof(struct engine)); + engine = (struct engine *)callocz(1, sizeof(struct engine)); // TODO: Check and fill engine fields if actually needed if (exporting_config_exists) { @@ -260,107 +255,118 @@ struct engine *read_exporting_config() exporter_get_number(CONFIG_SECTION_EXPORTING, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT); } - for (size_t i = 0; i < BACKEND_TYPE_NUM; i++) { - // For each connector build list - tmp_ci_list = ci_list[i]; + while (tmp_ci_list) { + struct instance *tmp_instance; + char *instance_name; - // If we have a list of instances for this connector then build it - if (tmp_ci_list) { - struct connector *tmp_connector; + info("Instance %s on %s", tmp_ci_list->local_ci.instance_name, tmp_ci_list->local_ci.connector_name); - tmp_connector = (struct connector *)calloc(1, sizeof(struct connector)); - tmp_connector->next = engine->connector_root; - engine->connector_root = tmp_connector; + if (tmp_ci_list->backend_type == BACKEND_TYPE_UNKNOWN) { + error("Unknown exporting connector type"); + goto next_connector_instance; + } - tmp_connector->config.type = i; - tmp_connector->engine = engine; +#ifndef HAVE_KINESIS + if (tmp_ci_list->backend_type == BACKEND_TYPE_KINESIS) { + error("AWS Kinesis support isn't compiled"); + goto next_connector_instance; + } +#endif + + tmp_instance = (struct instance *)callocz(1, sizeof(struct instance)); + tmp_instance->next = engine->instance_root; + engine->instance_root = tmp_instance; + + tmp_instance->engine = engine; + tmp_instance->config.type = tmp_ci_list->backend_type; + + instance_name = tmp_ci_list->local_ci.instance_name; - while (tmp_ci_list) { - struct instance *tmp_instance; - char *instance_name; + tmp_instance->config.name = strdupz(tmp_ci_list->local_ci.instance_name); - info("Instance %s on %s", tmp_ci_list->local_ci.instance_name, tmp_ci_list->local_ci.connector_name); + tmp_instance->config.destination = + strdupz(exporter_get(instance_name, EXPORTER_DESTINATION, EXPORTER_DESTINATION_DEFAULT)); - tmp_instance = (struct instance *)calloc(1, sizeof(struct instance)); - tmp_instance->connector = engine->connector_root; - tmp_instance->next = engine->connector_root->instance_root; - engine->connector_root->instance_root = tmp_instance; - tmp_instance->connector = engine->connector_root; + tmp_instance->config.update_every = + exporter_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT); - instance_name = tmp_ci_list->local_ci.instance_name; + tmp_instance->config.buffer_on_failures = + exporter_get_number(instance_name, EXPORTER_BUF_ONFAIL, EXPORTER_BUF_ONFAIL_DEFAULT); - tmp_instance->config.name = strdupz(tmp_ci_list->local_ci.instance_name); + tmp_instance->config.timeoutms = + exporter_get_number(instance_name, EXPORTER_TIMEOUT_MS, EXPORTER_TIMEOUT_MS_DEFAULT); - tmp_instance->config.destination = - strdupz(exporter_get(instance_name, EXPORTER_DESTINATION, EXPORTER_DESTINATION_DEFAULT)); + tmp_instance->config.charts_pattern = simple_pattern_create( + exporter_get(instance_name, EXPORTER_SEND_CHART_MATCH, EXPORTER_SEND_CHART_MATCH_DEFAULT), + NULL, + SIMPLE_PATTERN_EXACT); - tmp_instance->config.update_every = - exporter_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT); + tmp_instance->config.hosts_pattern = simple_pattern_create( + exporter_get(instance_name, EXPORTER_SEND_HOST_MATCH, EXPORTER_SEND_HOST_MATCH_DEFAULT), + NULL, + SIMPLE_PATTERN_EXACT); - tmp_instance->config.buffer_on_failures = - exporter_get_number(instance_name, EXPORTER_BUF_ONFAIL, EXPORTER_BUF_ONFAIL_DEFAULT); + char *data_source = + exporter_get(instance_name, EXPORTER_DATA_SOURCE, EXPORTER_DATA_SOURCE_DEFAULT); - tmp_instance->config.timeoutms = - exporter_get_number(instance_name, EXPORTER_TIMEOUT_MS, EXPORTER_TIMEOUT_MS_DEFAULT); + tmp_instance->config.options = exporting_parse_data_source(data_source, tmp_instance->config.options); - tmp_instance->config.charts_pattern = simple_pattern_create( - exporter_get(instance_name, EXPORTER_SEND_CHART_MATCH, EXPORTER_SEND_CHART_MATCH_DEFAULT), - NULL, - SIMPLE_PATTERN_EXACT); + if (exporter_get_boolean( + instance_name, EXPORTER_SEND_CONFIGURED_LABELS, EXPORTER_SEND_CONFIGURED_LABELS_DEFAULT)) + tmp_instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; + else + tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_CONFIGURED_LABELS; - tmp_instance->config.hosts_pattern = simple_pattern_create( - exporter_get(instance_name, EXPORTER_SEND_HOST_MATCH, EXPORTER_SEND_HOST_MATCH_DEFAULT), - NULL, - SIMPLE_PATTERN_EXACT); + if (exporter_get_boolean( + instance_name, EXPORTER_SEND_AUTOMATIC_LABELS, EXPORTER_SEND_AUTOMATIC_LABELS_DEFAULT)) + tmp_instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; + else + tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; - char *data_source = - exporter_get(instance_name, EXPORTER_DATA_SOURCE, EXPORTER_DATA_SOURCE_DEFAULT); + if (exporter_get_boolean(instance_name, EXPORTER_SEND_NAMES, EXPORTER_SEND_NAMES_DEFAULT)) + tmp_instance->config.options |= EXPORTING_OPTION_SEND_NAMES; + else + tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_NAMES; - tmp_instance->config.options = exporting_parse_data_source(data_source, tmp_instance->config.options); + if (tmp_instance->config.type == BACKEND_TYPE_KINESIS) { + struct aws_kinesis_specific_config *connector_specific_config = + callocz(1, sizeof(struct aws_kinesis_specific_config)); - if (exporter_get_boolean( - instance_name, EXPORTER_SEND_CONFIGURED_LABELS, EXPORTER_SEND_CONFIGURED_LABELS_DEFAULT)) - tmp_instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; - else - tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_CONFIGURED_LABELS; + tmp_instance->config.connector_specific_config = connector_specific_config; - if (exporter_get_boolean( - instance_name, EXPORTER_SEND_AUTOMATIC_LABELS, EXPORTER_SEND_AUTOMATIC_LABELS_DEFAULT)) - tmp_instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; - else - tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; + connector_specific_config->stream_name = strdupz(exporter_get( + instance_name, EXPORTER_KINESIS_STREAM_NAME, EXPORTER_KINESIS_STREAM_NAME_DEFAULT)); - if (exporter_get_boolean(instance_name, EXPORTER_SEND_NAMES, EXPORTER_SEND_NAMES_DEFAULT)) - tmp_instance->config.options |= EXPORTING_OPTION_SEND_NAMES; - else - tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_NAMES; + connector_specific_config->auth_key_id = strdupz(exporter_get( + instance_name, EXPORTER_AWS_ACCESS_KEY_ID, "")); + + connector_specific_config->secure_key = strdupz(exporter_get( + instance_name, EXPORTER_AWS_SECRET_ACCESS_KEY, "")); + } #ifdef NETDATA_INTERNAL_CHECKS - info( - " Dest=[%s], upd=[%d], buffer=[%d] timeout=[%ld] options=[%u]", - tmp_instance->config.destination, - tmp_instance->config.update_every, - tmp_instance->config.buffer_on_failures, - tmp_instance->config.timeoutms, - tmp_instance->config.options); + info( + " Dest=[%s], upd=[%d], buffer=[%d] timeout=[%ld] options=[%u]", + tmp_instance->config.destination, + tmp_instance->config.update_every, + tmp_instance->config.buffer_on_failures, + tmp_instance->config.timeoutms, + tmp_instance->config.options); #endif - if (unlikely(!exporting_config_exists) && !engine->config.hostname) { - engine->config.hostname = - strdupz(config_get(instance_name, "hostname", netdata_configured_hostname)); - engine->config.prefix = strdupz(config_get(instance_name, "prefix", "netdata")); - engine->config.update_every = - config_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT); - } - - tmp_ci_list1 = tmp_ci_list->next; - freez(tmp_ci_list); - tmp_ci_list = tmp_ci_list1; - } + if (unlikely(!exporting_config_exists) && !engine->config.hostname) { + engine->config.hostname = + strdupz(config_get(instance_name, "hostname", netdata_configured_hostname)); + engine->config.prefix = strdupz(config_get(instance_name, "prefix", "netdata")); + engine->config.update_every = + config_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT); } - } - freez(ci_list); +next_connector_instance: + tmp_ci_list1 = tmp_ci_list->next; + freez(tmp_ci_list); + tmp_ci_list = tmp_ci_list1; + } return engine; } |