summaryrefslogtreecommitdiffstats
path: root/exporting
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-02-25 21:08:41 +0200
committerGitHub <noreply@github.com>2020-02-25 21:08:41 +0200
commitd79bbbf943f72495e135eee4afc25723f886592f (patch)
tree1637e6f719f9923e92bad2e5033dce6207c2b9c1 /exporting
parent84421fdf0b513e9e7dc1351550b96044e92c363d (diff)
Add an AWS Kinesis connector to the exporting engine (#8145)
* Prepare files for the AWS Kinesis exporting connector * Update the documentation * Rename functions in backends * Include the connector to the Netdata buid * Add initializers and a worker * Add Kinesis specific configuration options * Add a compile time configuration check * Remove the connector data structure * Restore unit tests * Fix the compile-time configuration check * Initialize AWS SDK only once * Don't create an instance for an unknown exporting connector * Separate client and request outcome data for every instance * Fix memory cleanup, document functions * Add unit tests * Update the documentation
Diffstat (limited to 'exporting')
-rw-r--r--exporting/Makefile.am1
-rw-r--r--exporting/aws_kinesis/Makefile.am8
-rw-r--r--exporting/aws_kinesis/README.md48
-rw-r--r--exporting/aws_kinesis/aws_kinesis.c157
-rw-r--r--exporting/aws_kinesis/aws_kinesis.h16
-rw-r--r--exporting/aws_kinesis/aws_kinesis_put_record.cc151
-rw-r--r--exporting/aws_kinesis/aws_kinesis_put_record.h35
-rw-r--r--exporting/check_filters.c4
-rw-r--r--exporting/exporting_engine.h32
-rw-r--r--exporting/graphite/graphite.c27
-rw-r--r--exporting/graphite/graphite.h1
-rw-r--r--exporting/init_connectors.c64
-rw-r--r--exporting/json/json.c27
-rw-r--r--exporting/json/json.h1
-rw-r--r--exporting/opentsdb/opentsdb.c37
-rw-r--r--exporting/opentsdb/opentsdb.h1
-rw-r--r--exporting/process_data.c126
-rw-r--r--exporting/read_config.c184
-rw-r--r--exporting/send_data.c2
-rw-r--r--exporting/tests/exporting_doubles.c52
-rw-r--r--exporting/tests/exporting_fixtures.c8
-rw-r--r--exporting/tests/test_exporting_engine.c270
-rw-r--r--exporting/tests/test_exporting_engine.h10
23 files changed, 890 insertions, 372 deletions
diff --git a/exporting/Makefile.am b/exporting/Makefile.am
index ce6282989a..82ae0f77e4 100644
--- a/exporting/Makefile.am
+++ b/exporting/Makefile.am
@@ -8,6 +8,7 @@ SUBDIRS = \
graphite \
json \
opentsdb \
+ aws_kinesis \
$(NULL)
dist_noinst_DATA = \
diff --git a/exporting/aws_kinesis/Makefile.am b/exporting/aws_kinesis/Makefile.am
new file mode 100644
index 0000000000..161784b8f6
--- /dev/null
+++ b/exporting/aws_kinesis/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/exporting/aws_kinesis/README.md b/exporting/aws_kinesis/README.md
new file mode 100644
index 0000000000..acf9d7d0fa
--- /dev/null
+++ b/exporting/aws_kinesis/README.md
@@ -0,0 +1,48 @@
+# Export metrics to AWS Kinesis Data Streams
+
+## Prerequisites
+
+To use AWS Kinesis for metric collecting and processing, you should first
+[install](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) AWS SDK for C++. Netdata
+works with the SDK version 1.7.121. Other versions might work correctly as well, but they were not tested with Netdata.
+`libcrypto`, `libssl`, and `libcurl` are also required to compile Netdata with Kinesis support enabled. Next, Netdata
+should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+If the AWS SDK for C++ is being installed from source, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the
+building process could take a very long time. Note that the default installation path for the libraries is
+`/usr/local/lib64`. Many Linux distributions don't include this path as the default one for a library search, so it is
+advisable to use the following options to `cmake` while building the AWS SDK:
+
+```sh
+cmake -DCMAKE_INSTALL_LIBDIR=/usr/lib -DCMAKE_INSTALL_INCLUDEDIR=/usr/include -DBUILD_SHARED_LIBS=OFF -DBUILD_ONLY=kinesis <aws-sdk-cpp sources>
+```
+
+## Configuration
+
+To enable data sending to the Kinesis service, run `./edit-config exporting.conf` in the Netdata configuration directory
+and set the following options:
+
+```conf
+[kinesis:my_instance]
+ enabled = yes
+ destination = us-east-1
+```
+
+Set the `destination` option to an AWS region.
+
+Set AWS credentials and stream name:
+
+```conf
+ # AWS credentials
+ aws_access_key_id = your_access_key_id
+ aws_secret_access_key = your_secret_access_key
+ # destination stream
+ stream name = your_stream_name
+```
+
+Alternatively, you can set AWS credentials for the `netdata` user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html).
+
+Netdata automatically computes a partition key for every record with the purpose to distribute records across
+available shards evenly.
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2Faws_kinesis%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
diff --git a/exporting/aws_kinesis/aws_kinesis.c b/exporting/aws_kinesis/aws_kinesis.c
new file mode 100644
index 0000000000..2e5da3fad9
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis.c
@@ -0,0 +1,157 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aws_kinesis.h"
+
+/**
+ * Initialize AWS Kinesis connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_aws_kinesis_instance(struct instance *instance)
+{
+ instance->worker = aws_kinesis_connector_worker;
+
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = format_host_labels_json_plaintext;
+ instance->start_chart_formatting = NULL;
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->end_host_formatting = flush_host_labels;
+ instance->end_batch_formatting = NULL;
+
+ instance->buffer = (void *)buffer_create(0);
+ if (!instance->buffer) {
+ error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ uv_mutex_init(&instance->mutex);
+ uv_cond_init(&instance->cond_var);
+
+ if (!instance->engine->aws_sdk_initialized) {
+ aws_sdk_init();
+ instance->engine->aws_sdk_initialized = 1;
+ }
+
+ struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ kinesis_init(
+ (void *)connector_specific_data,
+ instance->config.destination,
+ connector_specific_config->auth_key_id,
+ connector_specific_config->secure_key,
+ instance->config.timeoutms);
+
+ return 0;
+}
+
+/**
+ * AWS Kinesis connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void aws_kinesis_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance *)instance_p;
+ struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct aws_kinesis_specific_data *connector_specific_data = instance->connector_specific_data;
+
+ while (!netdata_exit) {
+ unsigned long long partition_key_seq = 0;
+ struct stats *stats = &instance->stats;
+
+ uv_mutex_lock(&instance->mutex);
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ size_t buffer_len = buffer_strlen(buffer);
+
+ size_t sent = 0;
+
+ while (sent < buffer_len) {
+ char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
+ snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
+ size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
+
+ const char *first_char = buffer_tostring(buffer) + sent;
+
+ size_t record_len = 0;
+
+ // split buffer into chunks of maximum allowed size
+ if (buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
+ record_len = buffer_len - sent;
+ } else {
+ record_len = KINESIS_RECORD_MAX - partition_key_len;
+ while (*(first_char + record_len) != '\n' && record_len)
+ record_len--;
+ }
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ debug(
+ D_BACKEND,
+ "EXPORTING: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
+ buffer = %zu, record = %zu",
+ instance->config.destination,
+ connector_specific_config->auth_key_id,
+ connector_specific_config->secure_key,
+ connector_specific_config->stream_name,
+ partition_key,
+ buffer_len,
+ record_len);
+
+ kinesis_put_record(
+ connector_specific_data, connector_specific_config->stream_name, partition_key, first_char, record_len);
+
+ sent += record_len;
+ stats->chart_transmission_successes++;
+
+ size_t sent_bytes = 0, lost_bytes = 0;
+
+ if (unlikely(kinesis_get_result(
+ connector_specific_data->request_outcomes, error_message, &sent_bytes, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ error("EXPORTING: %s", error_message);
+ error(
+ "EXPORTING: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, sent_bytes, sent_bytes - lost_bytes);
+
+ stats->chart_transmission_failures++;
+ stats->chart_data_lost_events++;
+ stats->chart_lost_bytes += lost_bytes;
+
+ // estimate the number of lost metrics
+ stats->chart_lost_metrics += (collected_number)(
+ stats->chart_buffered_metrics *
+ (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
+
+ break;
+ } else {
+ stats->chart_receptions++;
+ }
+
+ if (unlikely(netdata_exit))
+ break;
+ }
+
+ stats->chart_sent_bytes += sent;
+ if (likely(sent == buffer_len))
+ stats->chart_sent_metrics = stats->chart_buffered_metrics;
+
+ buffer_flush(buffer);
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ break;
+#endif
+ }
+}
diff --git a/exporting/aws_kinesis/aws_kinesis.h b/exporting/aws_kinesis/aws_kinesis.h
new file mode 100644
index 0000000000..d88a45861c
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis.h
@@ -0,0 +1,16 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_KINESIS_H
+#define NETDATA_EXPORTING_KINESIS_H
+
+#include "exporting/exporting_engine.h"
+#include "exporting/json/json.h"
+#include "aws_kinesis_put_record.h"
+
+#define KINESIS_PARTITION_KEY_MAX 256
+#define KINESIS_RECORD_MAX 1024 * 1024
+
+int init_aws_kinesis_instance(struct instance *instance);
+void aws_kinesis_connector_worker(void *instance_p);
+
+#endif //NETDATA_EXPORTING_KINESIS_H
diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.cc b/exporting/aws_kinesis/aws_kinesis_put_record.cc
new file mode 100644
index 0000000000..b20ec13736
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis_put_record.cc
@@ -0,0 +1,151 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <aws/core/Aws.h>
+#include <aws/core/client/ClientConfiguration.h>
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/utils/Outcome.h>
+#include <aws/kinesis/KinesisClient.h>
+#include <aws/kinesis/model/PutRecordRequest.h>
+#include "aws_kinesis_put_record.h"
+
+using namespace Aws;
+
+static SDKOptions options;
+
+struct request_outcome {
+ Kinesis::Model::PutRecordOutcomeCallable future_outcome;
+ size_t data_len;
+};
+
+/**
+ * Initialize AWS SDK API
+ */
+void aws_sdk_init()
+{
+ InitAPI(options);
+}
+
+/**
+ * Shutdown AWS SDK API
+ */
+void aws_sdk_shutdown()
+{
+ ShutdownAPI(options);
+}
+
+/**
+ * Initialize a client and a data structure for request outcomes
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param region AWS region.
+ * @param access_key_id AWS account access key ID.
+ * @param secret_key AWS account secret access key.
+ * @param timeout communication timeout.
+ */
+void kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+
+ Client::ClientConfiguration config;
+
+ config.region = region;
+ config.requestTimeoutMs = timeout;
+ config.connectTimeoutMs = timeout;
+
+ Kinesis::KinesisClient *client;
+
+ if (access_key_id && *access_key_id && secret_key && *secret_key) {
+ client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config);
+ } else {
+ client = New<Kinesis::KinesisClient>("client", config);
+ }
+ kinesis_specific_data->client = (void *)client;
+
+ Vector<request_outcome> *request_outcomes;
+
+ request_outcomes = new Vector<request_outcome>;
+ kinesis_specific_data->request_outcomes = (void *)request_outcomes;
+}
+
+/**
+ * Deallocate Kinesis specific data
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ */
+void kinesis_shutdown(void *kinesis_specific_data_p)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+
+ Delete((Kinesis::KinesisClient *)kinesis_specific_data->client);
+ delete (Vector<request_outcome> *)kinesis_specific_data->request_outcomes;
+}
+
+/**
+ * Send data to the Kinesis service
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param stream_name the name of a stream to send to.
+ * @param partition_key a partition key which automatically maps data to a specific stream.
+ * @param data a data buffer to send to the stream.
+ * @param data_len the length of the data buffer.
+ */
+void kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+ Kinesis::Model::PutRecordRequest request;
+
+ request.SetStreamName(stream_name);
+ request.SetPartitionKey(partition_key);
+ request.SetData(Utils::ByteBuffer((unsigned char *)data, data_len));
+
+ ((Vector<request_outcome> *)(kinesis_specific_data->request_outcomes))->push_back(
+ { ((Kinesis::KinesisClient *)(kinesis_specific_data->client))->PutRecordCallable(request), data_len });
+}
+
+/**
+ * Get results from service responces
+ *
+ * @param request_outcomes_p request outcome information.
+ * @param error_message report error message to a caller.
+ * @param sent_bytes report to a caller how many bytes was successfuly sent.
+ * @param lost_bytes report to a caller how many bytes was lost during transmission.
+ * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission
+ */
+int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes)
+{
+ Vector<request_outcome> *request_outcomes = (Vector<request_outcome> *)request_outcomes_p;
+ Kinesis::Model::PutRecordOutcome outcome;
+ *sent_bytes = 0;
+ *lost_bytes = 0;
+
+ for (auto request_outcome = request_outcomes->begin(); request_outcome != request_outcomes->end();) {
+ std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100));
+
+ if (status == std::future_status::ready || status == std::future_status::deferred) {
+ outcome = request_outcome->future_outcome.get();
+ *sent_bytes += request_outcome->data_len;
+
+ if (!outcome.IsSuccess()) {
+ *lost_bytes += request_outcome->data_len;
+ outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX);
+ }
+
+ request_outcomes->erase(request_outcome);
+ } else {
+ ++request_outcome;
+ }
+ }
+
+ if (*lost_bytes) {
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.h b/exporting/aws_kinesis/aws_kinesis_put_record.h
new file mode 100644
index 0000000000..321baf6699
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis_put_record.h
@@ -0,0 +1,35 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
+#define NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aws_kinesis_specific_data {
+ void *client;
+ void *request_outcomes;
+};
+
+void aws_sdk_init();
+void aws_sdk_shutdown();
+
+void kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout);
+void kinesis_shutdown(void *client);
+
+void kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len);
+
+int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
diff --git a/exporting/check_filters.c b/exporting/check_filters.c
index f1ed9e828c..cfe0b4ce43 100644
--- a/exporting/check_filters.c
+++ b/exporting/check_filters.c
@@ -12,7 +12,7 @@
int rrdhost_is_exportable(struct instance *instance, RRDHOST *host)
{
if (host->exporting_flags == NULL)
- host->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
+ host->exporting_flags = callocz(instance->engine->instance_num, sizeof(size_t));
RRDHOST_FLAGS *flags = &host->exporting_flags[instance->index];
@@ -46,7 +46,7 @@ int rrdset_is_exportable(struct instance *instance, RRDSET *st)
RRDHOST *host = st->rrdhost;
if (st->exporting_flags == NULL)
- st->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
+ st->exporting_flags = callocz(instance->engine->instance_num, sizeof(size_t));
RRDSET_FLAGS *flags = &st->exporting_flags[instance->index];
diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h
index d45397b448..c1e63a42eb 100644
--- a/exporting/exporting_engine.h
+++ b/exporting/exporting_engine.h
@@ -43,6 +43,12 @@ extern struct config exporting_config;
#define EXPORTER_SEND_NAMES "send names instead of ids"
#define EXPORTER_SEND_NAMES_DEFAULT CONFIG_BOOLEAN_YES
+#define EXPORTER_KINESIS_STREAM_NAME "stream name"
+#define EXPORTER_KINESIS_STREAM_NAME_DEFAULT "netdata"
+
+#define EXPORTER_AWS_ACCESS_KEY_ID "aws_access_key_id"
+#define EXPORTER_AWS_SECRET_ACCESS_KEY "aws_secret_access_key"
+
typedef enum exporting_options {
EXPORTING_OPTION_NONE = 0,
@@ -72,6 +78,8 @@ typedef enum exporting_options {
struct engine;
struct instance_config {
+ BACKEND_TYPE type;
+
const char *name;
const char *destination;
@@ -90,9 +98,10 @@ struct simple_connector_config {
int default_port;
};
-struct connector_config {
- BACKEND_TYPE type;
- void *connector_specific_config;
+struct aws_kinesis_specific_config {
+ char *stream_name;
+ char *auth_key_id;
+ char *secure_key;
};
struct engine_config {
@@ -119,6 +128,7 @@ struct stats {
struct instance {
struct instance_config config;
void *buffer;
+ void (*worker)(void *instance_p);
struct stats stats;
int scheduled;
@@ -142,18 +152,10 @@ struct instance {
int (*end_host_formatting)(struct instance *instance, RRDHOST *host);
int (*end_batch_formatting)(struct instance *instance);
+ void *connector_specific_data;
+
size_t index;
struct instance *next;
- struct connector *connector;
-};
-
-struct connector {
- struct connector_config config;
-
- void (*worker)(void *instance_p);
-
- struct instance *instance_root;
- struct connector *next;
struct engine *engine;
};
@@ -163,7 +165,9 @@ struct engine {
size_t instance_num;
time_t now;
- struct connector *connector_root;
+ int aws_sdk_initialized;
+
+ struct instance *instance_root;
};
void *exporting_main(void *ptr);
diff --git a/exporting/graphite/graphite.c b/exporting/graphite/graphite.c
index d0589910ef..fe748cc561 100644
--- a/exporting/graphite/graphite.c
+++ b/exporting/graphite/graphite.c
@@ -3,23 +3,6 @@
#include "graphite.h"
/**
- * Initialize Graphite connector
- *
- * @param instance a connector data structure.
- * @return Always returns 0.
- */
-int init_graphite_connector(struct connector *connector)
-{
- connector->worker = simple_connector_worker;
-
- struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config));
- connector->config.connector_specific_config = (void *)connector_specific_config;
- connector_specific_config->default_port = 2003;
-
- return 0;
-}
-
-/**
* Initialize Graphite connector instance
*
* @param instance an instance data structure.
@@ -27,6 +10,12 @@ int init_graphite_connector(struct connector *connector)
*/
int init_graphite_instance(struct instance *instance)
{
+ instance->worker = simple_connector_worker;
+
+ struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config));
+ instance->config.connector_specific_config = (void *)connector_specific_config;
+ connector_specific_config->default_port = 2003;
+
instance->start_batch_formatting = NULL;
instance->start_host_formatting = format_host_labels_graphite_plaintext;
instance->start_chart_formatting = NULL;
@@ -115,7 +104,7 @@ int format_host_labels_graphite_plaintext(struct instance *instance, RRDHOST *ho
*/
int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd)
{
- struct engine *engine = instance->connector->engine;
+ struct engine *engine = instance->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
@@ -156,7 +145,7 @@ int format_dimension_collected_graphite_plaintext(struct instance *instance, RRD
*/
int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd)
{
- struct engine *engine = instance->connector->engine;
+ struct engine *engine = instance->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
diff --git a/exporting/graphite/graphite.h b/exporting/graphite/graphite.h
index cc3767003a..edda498e85 100644
--- a/exporting/graphite/graphite.h
+++ b/exporting/graphite/graphite.h
@@ -5,7 +5,6 @@
#include "exporting/exporting_engine.h"
-int init_graphite_connector(struct connector *connector);
int init_graphite_instance(struct instance *instance);
void sanitize_graphite_label_value(char *dst, char *src, size_t len);
diff --git a/exporting/init_connectors.c b/exporting/init_connectors.c
index fd8b03c6ca..dae5576d3b 100644
--- a/exporting/init_connectors.c
+++ b/exporting/init_connectors.c
@@ -4,6 +4,7 @@
#include "graphite/graphite.h"
#include "json/json.h"
#include "opentsdb/opentsdb.h"
+#include "aws_kinesis/aws_kinesis.h"
/**
* Initialize connectors
@@ -15,64 +16,47 @@ int init_connectors(struct engine *engine)
{
engine->now = now_realtime_sec();
- for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
- switch (connector->config.type) {
+ for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+ instance->index = engine->instance_num++;
+ instance->after = engine->now;
+
+ switch (instance->config.type) {
case BACKEND_TYPE_GRAPHITE:
- if (init_graphite_connector(connector) != 0)
+ if (init_graphite_instance(instance) != 0)
return 1;
break;
case BACKEND_TYPE_JSON:
- if (init_json_connector(connector) != 0)
+ if (init_json_instance(instance) != 0)
return 1;
break;
case BACKEND_TYPE_OPENTSDB_USING_TELNET:
- if (init_opentsdb_connector(connector) != 0)
+ if (init_opentsdb_telnet_instance(instance) != 0)
return 1;
break;
case BACKEND_TYPE_OPENTSDB_USING_HTTP:
- if (init_opentsdb_connector(connector) != 0)
+ if (init_opentsdb_http_instance(instance) != 0)
+ return 1;
+ break;
+ case BACKEND_TYPE_KINESIS:
+#if HAVE_KINESIS
+ if (init_aws_kinesis_instance(instance) != 0)
return 1;
+#endif
break;
default:
error("EXPORTING: unknown exporting connector type");
return 1;
}
- for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
- instance->index = engine->instance_num++;
- instance->after = engine->now;
- switch (connector->config.type) {
- case BACKEND_TYPE_GRAPHITE:
- if (init_graphite_instance(instance) != 0)
- return 1;
- break;
- case BACKEND_TYPE_JSON:
- if (init_json_instance(instance) != 0)
- return 1;
- break;
- case BACKEND_TYPE_OPENTSDB_USING_TELNET:
- if (init_opentsdb_telnet_instance(instance) != 0)
- return 1;
- break;
- case BACKEND_TYPE_OPENTSDB_USING_HTTP:
- if (init_opentsdb_http_instance(instance) != 0)
- return 1;
- break;
- default:
- error("EXPORTING: unknown exporting connector type");
- return 1;
- }
-
- // dispatch the instance worker thread
- int error = uv_thread_create(&instance->thread, connector->worker, instance);
- if (error) {
- error("EXPORTING: cannot create tread worker. uv_thread_create(): %s", uv_strerror(error));
- return 1;
- }
- char threadname[NETDATA_THREAD_NAME_MAX+1];
- snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "EXPORTING-%zu", instance->index);
- uv_thread_set_name_np(instance->thread, threadname);
+ // dispatch the instance worker thread
+ int error = uv_thread_create(&instance->thread, instance->worker, instance);
+ if (error) {
+ error("EXPORTING: cannot create tread worker. uv_thread_create(): %s", uv_strerror(error));
+ return 1;
}
+ char threadname[NETDATA_THREAD_NAME_MAX+1];
+ snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "EXPORTING-%zu", instance->index);
+ uv_thread_set_name_np(instance->thread, threadname);
}
return 0;
diff --git a/exporting/json/json.c b/exporting/json/json.c
index 9a2937b02e..0e09e7e99d 100644
--- a/exporting/json/json.c
+++ b/exporting/json/json.c
@@ -3,23 +3,6 @@
#include "json.h"