diff options
author | Vladimir Kobal <vlad@prokk.net> | 2020-02-25 21:08:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-25 21:08:41 +0200 |
commit | d79bbbf943f72495e135eee4afc25723f886592f (patch) | |
tree | 1637e6f719f9923e92bad2e5033dce6207c2b9c1 /exporting | |
parent | 84421fdf0b513e9e7dc1351550b96044e92c363d (diff) |
Add an AWS Kinesis connector to the exporting engine (#8145)
* Prepare files for the AWS Kinesis exporting connector
* Update the documentation
* Rename functions in backends
* Include the connector to the Netdata buid
* Add initializers and a worker
* Add Kinesis specific configuration options
* Add a compile time configuration check
* Remove the connector data structure
* Restore unit tests
* Fix the compile-time configuration check
* Initialize AWS SDK only once
* Don't create an instance for an unknown exporting connector
* Separate client and request outcome data for every instance
* Fix memory cleanup, document functions
* Add unit tests
* Update the documentation
Diffstat (limited to 'exporting')
-rw-r--r-- | exporting/Makefile.am | 1 | ||||
-rw-r--r-- | exporting/aws_kinesis/Makefile.am | 8 | ||||
-rw-r--r-- | exporting/aws_kinesis/README.md | 48 | ||||
-rw-r--r-- | exporting/aws_kinesis/aws_kinesis.c | 157 | ||||
-rw-r--r-- | exporting/aws_kinesis/aws_kinesis.h | 16 | ||||
-rw-r--r-- | exporting/aws_kinesis/aws_kinesis_put_record.cc | 151 | ||||
-rw-r--r-- | exporting/aws_kinesis/aws_kinesis_put_record.h | 35 | ||||
-rw-r--r-- | exporting/check_filters.c | 4 | ||||
-rw-r--r-- | exporting/exporting_engine.h | 32 | ||||
-rw-r--r-- | exporting/graphite/graphite.c | 27 | ||||
-rw-r--r-- | exporting/graphite/graphite.h | 1 | ||||
-rw-r--r-- | exporting/init_connectors.c | 64 | ||||
-rw-r--r-- | exporting/json/json.c | 27 | ||||
-rw-r--r-- | exporting/json/json.h | 1 | ||||
-rw-r--r-- | exporting/opentsdb/opentsdb.c | 37 | ||||
-rw-r--r-- | exporting/opentsdb/opentsdb.h | 1 | ||||
-rw-r--r-- | exporting/process_data.c | 126 | ||||
-rw-r--r-- | exporting/read_config.c | 184 | ||||
-rw-r--r-- | exporting/send_data.c | 2 | ||||
-rw-r--r-- | exporting/tests/exporting_doubles.c | 52 | ||||
-rw-r--r-- | exporting/tests/exporting_fixtures.c | 8 | ||||
-rw-r--r-- | exporting/tests/test_exporting_engine.c | 270 | ||||
-rw-r--r-- | exporting/tests/test_exporting_engine.h | 10 |
23 files changed, 890 insertions, 372 deletions
diff --git a/exporting/Makefile.am b/exporting/Makefile.am index ce6282989a..82ae0f77e4 100644 --- a/exporting/Makefile.am +++ b/exporting/Makefile.am @@ -8,6 +8,7 @@ SUBDIRS = \ graphite \ json \ opentsdb \ + aws_kinesis \ $(NULL) dist_noinst_DATA = \ diff --git a/exporting/aws_kinesis/Makefile.am b/exporting/aws_kinesis/Makefile.am new file mode 100644 index 0000000000..161784b8f6 --- /dev/null +++ b/exporting/aws_kinesis/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/exporting/aws_kinesis/README.md b/exporting/aws_kinesis/README.md new file mode 100644 index 0000000000..acf9d7d0fa --- /dev/null +++ b/exporting/aws_kinesis/README.md @@ -0,0 +1,48 @@ +# Export metrics to AWS Kinesis Data Streams + +## Prerequisites + +To use AWS Kinesis for metric collecting and processing, you should first +[install](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) AWS SDK for C++. Netdata +works with the SDK version 1.7.121. Other versions might work correctly as well, but they were not tested with Netdata. +`libcrypto`, `libssl`, and `libcurl` are also required to compile Netdata with Kinesis support enabled. Next, Netdata +should be re-installed from the source. The installer will detect that the required libraries are now available. + +If the AWS SDK for C++ is being installed from source, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the +building process could take a very long time. Note that the default installation path for the libraries is +`/usr/local/lib64`. Many Linux distributions don't include this path as the default one for a library search, so it is +advisable to use the following options to `cmake` while building the AWS SDK: + +```sh +cmake -DCMAKE_INSTALL_LIBDIR=/usr/lib -DCMAKE_INSTALL_INCLUDEDIR=/usr/include -DBUILD_SHARED_LIBS=OFF -DBUILD_ONLY=kinesis <aws-sdk-cpp sources> +``` + +## Configuration + +To enable data sending to the Kinesis service, run `./edit-config exporting.conf` in the Netdata configuration directory +and set the following options: + +```conf +[kinesis:my_instance] + enabled = yes + destination = us-east-1 +``` + +Set the `destination` option to an AWS region. + +Set AWS credentials and stream name: + +```conf + # AWS credentials + aws_access_key_id = your_access_key_id + aws_secret_access_key = your_secret_access_key + # destination stream + stream name = your_stream_name +``` + +Alternatively, you can set AWS credentials for the `netdata` user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html). + +Netdata automatically computes a partition key for every record with the purpose to distribute records across +available shards evenly. + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2Faws_kinesis%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/exporting/aws_kinesis/aws_kinesis.c b/exporting/aws_kinesis/aws_kinesis.c new file mode 100644 index 0000000000..2e5da3fad9 --- /dev/null +++ b/exporting/aws_kinesis/aws_kinesis.c @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aws_kinesis.h" + +/** + * Initialize AWS Kinesis connector instance + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_aws_kinesis_instance(struct instance *instance) +{ + instance->worker = aws_kinesis_connector_worker; + + instance->start_batch_formatting = NULL; + instance->start_host_formatting = format_host_labels_json_plaintext; + instance->start_chart_formatting = NULL; + + if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) + instance->metric_formatting = format_dimension_collected_json_plaintext; + else + instance->metric_formatting = format_dimension_stored_json_plaintext; + + instance->end_chart_formatting = NULL; + instance->end_host_formatting = flush_host_labels; + instance->end_batch_formatting = NULL; + + instance->buffer = (void *)buffer_create(0); + if (!instance->buffer) { + error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s", instance->config.name); + return 1; + } + uv_mutex_init(&instance->mutex); + uv_cond_init(&instance->cond_var); + + if (!instance->engine->aws_sdk_initialized) { + aws_sdk_init(); + instance->engine->aws_sdk_initialized = 1; + } + + struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config; + struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data)); + instance->connector_specific_data = (void *)connector_specific_data; + + kinesis_init( + (void *)connector_specific_data, + instance->config.destination, + connector_specific_config->auth_key_id, + connector_specific_config->secure_key, + instance->config.timeoutms); + + return 0; +} + +/** + * AWS Kinesis connector worker + * + * Runs in a separate thread for every instance. + * + * @param instance_p an instance data structure. + */ +void aws_kinesis_connector_worker(void *instance_p) +{ + struct instance *instance = (struct instance *)instance_p; + struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config; + struct aws_kinesis_specific_data *connector_specific_data = instance->connector_specific_data; + + while (!netdata_exit) { + unsigned long long partition_key_seq = 0; + struct stats *stats = &instance->stats; + + uv_mutex_lock(&instance->mutex); + uv_cond_wait(&instance->cond_var, &instance->mutex); + + BUFFER *buffer = (BUFFER *)instance->buffer; + size_t buffer_len = buffer_strlen(buffer); + + size_t sent = 0; + + while (sent < buffer_len) { + char partition_key[KINESIS_PARTITION_KEY_MAX + 1]; + snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++); + size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX); + + const char *first_char = buffer_tostring(buffer) + sent; + + size_t record_len = 0; + + // split buffer into chunks of maximum allowed size + if (buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) { + record_len = buffer_len - sent; + } else { + record_len = KINESIS_RECORD_MAX - partition_key_len; + while (*(first_char + record_len) != '\n' && record_len) + record_len--; + } + char error_message[ERROR_LINE_MAX + 1] = ""; + + debug( + D_BACKEND, + "EXPORTING: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \ + buffer = %zu, record = %zu", + instance->config.destination, + connector_specific_config->auth_key_id, + connector_specific_config->secure_key, + connector_specific_config->stream_name, + partition_key, + buffer_len, + record_len); + + kinesis_put_record( + connector_specific_data, connector_specific_config->stream_name, partition_key, first_char, record_len); + + sent += record_len; + stats->chart_transmission_successes++; + + size_t sent_bytes = 0, lost_bytes = 0; + + if (unlikely(kinesis_get_result( + connector_specific_data->request_outcomes, error_message, &sent_bytes, &lost_bytes))) { + // oops! we couldn't send (all or some of the) data + error("EXPORTING: %s", error_message); + error( + "EXPORTING: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.", + instance->config.destination, sent_bytes, sent_bytes - lost_bytes); + + stats->chart_transmission_failures++; + stats->chart_data_lost_events++; + stats->chart_lost_bytes += lost_bytes; + + // estimate the number of lost metrics + stats->chart_lost_metrics += (collected_number)( + stats->chart_buffered_metrics * + (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1)); + + break; + } else { + stats->chart_receptions++; + } + + if (unlikely(netdata_exit)) + break; + } + + stats->chart_sent_bytes += sent; + if (likely(sent == buffer_len)) + stats->chart_sent_metrics = stats->chart_buffered_metrics; + + buffer_flush(buffer); + + uv_mutex_unlock(&instance->mutex); + +#ifdef UNIT_TESTING + break; +#endif + } +} diff --git a/exporting/aws_kinesis/aws_kinesis.h b/exporting/aws_kinesis/aws_kinesis.h new file mode 100644 index 0000000000..d88a45861c --- /dev/null +++ b/exporting/aws_kinesis/aws_kinesis.h @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_KINESIS_H +#define NETDATA_EXPORTING_KINESIS_H + +#include "exporting/exporting_engine.h" +#include "exporting/json/json.h" +#include "aws_kinesis_put_record.h" + +#define KINESIS_PARTITION_KEY_MAX 256 +#define KINESIS_RECORD_MAX 1024 * 1024 + +int init_aws_kinesis_instance(struct instance *instance); +void aws_kinesis_connector_worker(void *instance_p); + +#endif //NETDATA_EXPORTING_KINESIS_H diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.cc b/exporting/aws_kinesis/aws_kinesis_put_record.cc new file mode 100644 index 0000000000..b20ec13736 --- /dev/null +++ b/exporting/aws_kinesis/aws_kinesis_put_record.cc @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include <aws/core/Aws.h> +#include <aws/core/client/ClientConfiguration.h> +#include <aws/core/auth/AWSCredentials.h> +#include <aws/core/utils/Outcome.h> +#include <aws/kinesis/KinesisClient.h> +#include <aws/kinesis/model/PutRecordRequest.h> +#include "aws_kinesis_put_record.h" + +using namespace Aws; + +static SDKOptions options; + +struct request_outcome { + Kinesis::Model::PutRecordOutcomeCallable future_outcome; + size_t data_len; +}; + +/** + * Initialize AWS SDK API + */ +void aws_sdk_init() +{ + InitAPI(options); +} + +/** + * Shutdown AWS SDK API + */ +void aws_sdk_shutdown() +{ + ShutdownAPI(options); +} + +/** + * Initialize a client and a data structure for request outcomes + * + * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information. + * @param region AWS region. + * @param access_key_id AWS account access key ID. + * @param secret_key AWS account secret access key. + * @param timeout communication timeout. + */ +void kinesis_init( + void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key, + const long timeout) +{ + struct aws_kinesis_specific_data *kinesis_specific_data = + (struct aws_kinesis_specific_data *)kinesis_specific_data_p; + + Client::ClientConfiguration config; + + config.region = region; + config.requestTimeoutMs = timeout; + config.connectTimeoutMs = timeout; + + Kinesis::KinesisClient *client; + + if (access_key_id && *access_key_id && secret_key && *secret_key) { + client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config); + } else { + client = New<Kinesis::KinesisClient>("client", config); + } + kinesis_specific_data->client = (void *)client; + + Vector<request_outcome> *request_outcomes; + + request_outcomes = new Vector<request_outcome>; + kinesis_specific_data->request_outcomes = (void *)request_outcomes; +} + +/** + * Deallocate Kinesis specific data + * + * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information. + */ +void kinesis_shutdown(void *kinesis_specific_data_p) +{ + struct aws_kinesis_specific_data *kinesis_specific_data = + (struct aws_kinesis_specific_data *)kinesis_specific_data_p; + + Delete((Kinesis::KinesisClient *)kinesis_specific_data->client); + delete (Vector<request_outcome> *)kinesis_specific_data->request_outcomes; +} + +/** + * Send data to the Kinesis service + * + * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information. + * @param stream_name the name of a stream to send to. + * @param partition_key a partition key which automatically maps data to a specific stream. + * @param data a data buffer to send to the stream. + * @param data_len the length of the data buffer. + */ +void kinesis_put_record( + void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data, + size_t data_len) +{ + struct aws_kinesis_specific_data *kinesis_specific_data = + (struct aws_kinesis_specific_data *)kinesis_specific_data_p; + Kinesis::Model::PutRecordRequest request; + + request.SetStreamName(stream_name); + request.SetPartitionKey(partition_key); + request.SetData(Utils::ByteBuffer((unsigned char *)data, data_len)); + + ((Vector<request_outcome> *)(kinesis_specific_data->request_outcomes))->push_back( + { ((Kinesis::KinesisClient *)(kinesis_specific_data->client))->PutRecordCallable(request), data_len }); +} + +/** + * Get results from service responces + * + * @param request_outcomes_p request outcome information. + * @param error_message report error message to a caller. + * @param sent_bytes report to a caller how many bytes was successfuly sent. + * @param lost_bytes report to a caller how many bytes was lost during transmission. + * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission + */ +int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes) +{ + Vector<request_outcome> *request_outcomes = (Vector<request_outcome> *)request_outcomes_p; + Kinesis::Model::PutRecordOutcome outcome; + *sent_bytes = 0; + *lost_bytes = 0; + + for (auto request_outcome = request_outcomes->begin(); request_outcome != request_outcomes->end();) { + std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100)); + + if (status == std::future_status::ready || status == std::future_status::deferred) { + outcome = request_outcome->future_outcome.get(); + *sent_bytes += request_outcome->data_len; + + if (!outcome.IsSuccess()) { + *lost_bytes += request_outcome->data_len; + outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX); + } + + request_outcomes->erase(request_outcome); + } else { + ++request_outcome; + } + } + + if (*lost_bytes) { + return 1; + } + + return 0; +} diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.h b/exporting/aws_kinesis/aws_kinesis_put_record.h new file mode 100644 index 0000000000..321baf6699 --- /dev/null +++ b/exporting/aws_kinesis/aws_kinesis_put_record.h @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_EXPORTING_KINESIS_PUT_RECORD_H +#define NETDATA_EXPORTING_KINESIS_PUT_RECORD_H + +#define ERROR_LINE_MAX 1023 + +#ifdef __cplusplus +extern "C" { +#endif + +struct aws_kinesis_specific_data { + void *client; + void *request_outcomes; +}; + +void aws_sdk_init(); +void aws_sdk_shutdown(); + +void kinesis_init( + void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key, + const long timeout); +void kinesis_shutdown(void *client); + +void kinesis_put_record( + void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data, + size_t data_len); + +int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes); + +#ifdef __cplusplus +} +#endif + +#endif //NETDATA_EXPORTING_KINESIS_PUT_RECORD_H diff --git a/exporting/check_filters.c b/exporting/check_filters.c index f1ed9e828c..cfe0b4ce43 100644 --- a/exporting/check_filters.c +++ b/exporting/check_filters.c @@ -12,7 +12,7 @@ int rrdhost_is_exportable(struct instance *instance, RRDHOST *host) { if (host->exporting_flags == NULL) - host->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t)); + host->exporting_flags = callocz(instance->engine->instance_num, sizeof(size_t)); RRDHOST_FLAGS *flags = &host->exporting_flags[instance->index]; @@ -46,7 +46,7 @@ int rrdset_is_exportable(struct instance *instance, RRDSET *st) RRDHOST *host = st->rrdhost; if (st->exporting_flags == NULL) - st->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t)); + st->exporting_flags = callocz(instance->engine->instance_num, sizeof(size_t)); RRDSET_FLAGS *flags = &st->exporting_flags[instance->index]; diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h index d45397b448..c1e63a42eb 100644 --- a/exporting/exporting_engine.h +++ b/exporting/exporting_engine.h @@ -43,6 +43,12 @@ extern struct config exporting_config; #define EXPORTER_SEND_NAMES "send names instead of ids" #define EXPORTER_SEND_NAMES_DEFAULT CONFIG_BOOLEAN_YES +#define EXPORTER_KINESIS_STREAM_NAME "stream name" +#define EXPORTER_KINESIS_STREAM_NAME_DEFAULT "netdata" + +#define EXPORTER_AWS_ACCESS_KEY_ID "aws_access_key_id" +#define EXPORTER_AWS_SECRET_ACCESS_KEY "aws_secret_access_key" + typedef enum exporting_options { EXPORTING_OPTION_NONE = 0, @@ -72,6 +78,8 @@ typedef enum exporting_options { struct engine; struct instance_config { + BACKEND_TYPE type; + const char *name; const char *destination; @@ -90,9 +98,10 @@ struct simple_connector_config { int default_port; }; -struct connector_config { - BACKEND_TYPE type; - void *connector_specific_config; +struct aws_kinesis_specific_config { + char *stream_name; + char *auth_key_id; + char *secure_key; }; struct engine_config { @@ -119,6 +128,7 @@ struct stats { struct instance { struct instance_config config; void *buffer; + void (*worker)(void *instance_p); struct stats stats; int scheduled; @@ -142,18 +152,10 @@ struct instance { int (*end_host_formatting)(struct instance *instance, RRDHOST *host); int (*end_batch_formatting)(struct instance *instance); + void *connector_specific_data; + size_t index; struct instance *next; - struct connector *connector; -}; - -struct connector { - struct connector_config config; - - void (*worker)(void *instance_p); - - struct instance *instance_root; - struct connector *next; struct engine *engine; }; @@ -163,7 +165,9 @@ struct engine { size_t instance_num; time_t now; - struct connector *connector_root; + int aws_sdk_initialized; + + struct instance *instance_root; }; void *exporting_main(void *ptr); diff --git a/exporting/graphite/graphite.c b/exporting/graphite/graphite.c index d0589910ef..fe748cc561 100644 --- a/exporting/graphite/graphite.c +++ b/exporting/graphite/graphite.c @@ -3,23 +3,6 @@ #include "graphite.h" /** - * Initialize Graphite connector - * - * @param instance a connector data structure. - * @return Always returns 0. - */ -int init_graphite_connector(struct connector *connector) -{ - connector->worker = simple_connector_worker; - - struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config)); - connector->config.connector_specific_config = (void *)connector_specific_config; - connector_specific_config->default_port = 2003; - - return 0; -} - -/** * Initialize Graphite connector instance * * @param instance an instance data structure. @@ -27,6 +10,12 @@ int init_graphite_connector(struct connector *connector) */ int init_graphite_instance(struct instance *instance) { + instance->worker = simple_connector_worker; + + struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config)); + instance->config.connector_specific_config = (void *)connector_specific_config; + connector_specific_config->default_port = 2003; + instance->start_batch_formatting = NULL; instance->start_host_formatting = format_host_labels_graphite_plaintext; instance->start_chart_formatting = NULL; @@ -115,7 +104,7 @@ int format_host_labels_graphite_plaintext(struct instance *instance, RRDHOST *ho */ int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd) { - struct engine *engine = instance->connector->engine; + struct engine *engine = instance->engine; RRDSET *st = rd->rrdset; RRDHOST *host = st->rrdhost; @@ -156,7 +145,7 @@ int format_dimension_collected_graphite_plaintext(struct instance *instance, RRD */ int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd) { - struct engine *engine = instance->connector->engine; + struct engine *engine = instance->engine; RRDSET *st = rd->rrdset; RRDHOST *host = st->rrdhost; diff --git a/exporting/graphite/graphite.h b/exporting/graphite/graphite.h index cc3767003a..edda498e85 100644 --- a/exporting/graphite/graphite.h +++ b/exporting/graphite/graphite.h @@ -5,7 +5,6 @@ #include "exporting/exporting_engine.h" -int init_graphite_connector(struct connector *connector); int init_graphite_instance(struct instance *instance); void sanitize_graphite_label_value(char *dst, char *src, size_t len); diff --git a/exporting/init_connectors.c b/exporting/init_connectors.c index fd8b03c6ca..dae5576d3b 100644 --- a/exporting/init_connectors.c +++ b/exporting/init_connectors.c @@ -4,6 +4,7 @@ #include "graphite/graphite.h" #include "json/json.h" #include "opentsdb/opentsdb.h" +#include "aws_kinesis/aws_kinesis.h" /** * Initialize connectors @@ -15,64 +16,47 @@ int init_connectors(struct engine *engine) { engine->now = now_realtime_sec(); - for (struct connector *connector = engine->connector_root; connector; connector = connector->next) { - switch (connector->config.type) { + for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { + instance->index = engine->instance_num++; + instance->after = engine->now; + + switch (instance->config.type) { case BACKEND_TYPE_GRAPHITE: - if (init_graphite_connector(connector) != 0) + if (init_graphite_instance(instance) != 0) return 1; break; case BACKEND_TYPE_JSON: - if (init_json_connector(connector) != 0) + if (init_json_instance(instance) != 0) return 1; break; case BACKEND_TYPE_OPENTSDB_USING_TELNET: - if (init_opentsdb_connector(connector) != 0) + if (init_opentsdb_telnet_instance(instance) != 0) return 1; break; case BACKEND_TYPE_OPENTSDB_USING_HTTP: - if (init_opentsdb_connector(connector) != 0) + if (init_opentsdb_http_instance(instance) != 0) + return 1; + break; + case BACKEND_TYPE_KINESIS: +#if HAVE_KINESIS + if (init_aws_kinesis_instance(instance) != 0) return 1; +#endif break; default: error("EXPORTING: unknown exporting connector type"); return 1; } - for (struct instance *instance = connector->instance_root; instance; instance = instance->next) { - instance->index = engine->instance_num++; - instance->after = engine->now; - switch (connector->config.type) { - case BACKEND_TYPE_GRAPHITE: - if (init_graphite_instance(instance) != 0) - return 1; - break; - case BACKEND_TYPE_JSON: - if (init_json_instance(instance) != 0) - return 1; - break; - case BACKEND_TYPE_OPENTSDB_USING_TELNET: - if (init_opentsdb_telnet_instance(instance) != 0) - return 1; - break; - case BACKEND_TYPE_OPENTSDB_USING_HTTP: - if (init_opentsdb_http_instance(instance) != 0) - return 1; - break; - default: - error("EXPORTING: unknown exporting connector type"); - return 1; - } - - // dispatch the instance worker thread - int error = uv_thread_create(&instance->thread, connector->worker, instance); - if (error) { - error("EXPORTING: cannot create tread worker. uv_thread_create(): %s", uv_strerror(error)); - return 1; - } - char threadname[NETDATA_THREAD_NAME_MAX+1]; - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "EXPORTING-%zu", instance->index); - uv_thread_set_name_np(instance->thread, threadname); + // dispatch the instance worker thread + int error = uv_thread_create(&instance->thread, instance->worker, instance); + if (error) { + error("EXPORTING: cannot create tread worker. uv_thread_create(): %s", uv_strerror(error)); + return 1; } + char threadname[NETDATA_THREAD_NAME_MAX+1]; + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "EXPORTING-%zu", instance->index); + uv_thread_set_name_np(instance->thread, threadname); } return 0; diff --git a/exporting/json/json.c b/exporting/json/json.c index 9a2937b02e..0e09e7e99d 100644 --- a/exporting/json/json.c +++ b/exporting/json/json.c @@ -3,23 +3,6 @@ #include "json.h" /** - * Initialize JSON connector - * - * @param instance a connector data structure. - * @return Always returns 0. - */ -int init_json_connector(struct connector *connector) -{ - connector->worker = simple_connector_worker; - - struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config)); - connector->config.connector_specific_config = (void *)connector_specific_config; - connector_specific_config->default_port = 5448; - - return 0; -} - -/** * Initialize JSON connector instance * * @param instance an instance data structure. @@ -27,6 +10,12 @@ int init_json_connector(struct connector *connector) */ int init_json_instance(struct instance *ins |