summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt21
-rw-r--r--Makefile.am24
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.cc14
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.h8
-rw-r--r--backends/backends.c10
-rw-r--r--configure.ac3
-rw-r--r--exporting/Makefile.am1
-rw-r--r--exporting/aws_kinesis/Makefile.am8
-rw-r--r--exporting/aws_kinesis/README.md48
-rw-r--r--exporting/aws_kinesis/aws_kinesis.c157
-rw-r--r--exporting/aws_kinesis/aws_kinesis.h16
-rw-r--r--exporting/aws_kinesis/aws_kinesis_put_record.cc151
-rw-r--r--exporting/aws_kinesis/aws_kinesis_put_record.h35
-rw-r--r--exporting/check_filters.c4
-rw-r--r--exporting/exporting_engine.h32
-rw-r--r--exporting/graphite/graphite.c27
-rw-r--r--exporting/graphite/graphite.h1
-rw-r--r--exporting/init_connectors.c64
-rw-r--r--exporting/json/json.c27
-rw-r--r--exporting/json/json.h1
-rw-r--r--exporting/opentsdb/opentsdb.c37
-rw-r--r--exporting/opentsdb/opentsdb.h1
-rw-r--r--exporting/process_data.c126
-rw-r--r--exporting/read_config.c184
-rw-r--r--exporting/send_data.c2
-rw-r--r--exporting/tests/exporting_doubles.c52
-rw-r--r--exporting/tests/exporting_fixtures.c8
-rw-r--r--exporting/tests/test_exporting_engine.c270
-rw-r--r--exporting/tests/test_exporting_engine.h10
29 files changed, 952 insertions, 390 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index dd9b5420da..3dc8245d2c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -636,6 +636,13 @@ set(EXPORTING_ENGINE_FILES
exporting/send_internal_metrics.c
)
+set(KINESIS_EXPORTING_FILES
+ exporting/aws_kinesis/aws_kinesis.c
+ exporting/aws_kinesis/aws_kinesis.h
+ exporting/aws_kinesis/aws_kinesis_put_record.cc
+ exporting/aws_kinesis/aws_kinesis_put_record.h
+ )
+
set(KINESIS_BACKEND_FILES
backends/aws_kinesis/aws_kinesis.c
backends/aws_kinesis/aws_kinesis.h
@@ -720,7 +727,7 @@ ENDIF()
IF(ENABLE_BACKEND_KINESIS)
message(STATUS "kinesis backend: enabled")
- list(APPEND NETDATA_FILES ${KINESIS_BACKEND_FILES})
+ list(APPEND NETDATA_FILES ${KINESIS_BACKEND_FILES} ${KINESIS_EXPORTING_FILES})
list(APPEND NETDATA_COMMON_LIBRARIES aws-cpp-sdk-kinesis aws-cpp-sdk-core ${CRYPTO_LIBRARIES} ${SSL_LIBRARIES} ${CURL_LIBRARIES})
list(APPEND NETDATA_COMMON_INCLUDE_DIRS ${CRYPTO_INCLUDE_DIRS} ${SSL_INCLUDE_DIRS} ${CURL_INCLUDE_DIRS})
list(APPEND NETDATA_COMMON_CFLAGS ${CRYPTO_CFLAGS_OTHER} ${SSL_CFLAGS_OTHER} ${CURL_CFLAGS_OTHER})
@@ -981,6 +988,17 @@ if(BUILD_TESTING)
exporting/tests/system_doubles.c
)
set(TEST_NAME exporting_engine)
+ set(KINESIS_LINK_OPTIONS)
+if(ENABLE_BACKEND_KINESIS)
+ list(APPEND EXPORTING_ENGINE_FILES ${KINESIS_EXPORTING_FILES})
+ list(
+ APPEND KINESIS_LINK_OPTIONS
+ -Wl,--wrap=aws_sdk_init
+ -Wl,--wrap=kinesis_init
+ -Wl,--wrap=kinesis_put_record
+ -Wl,--wrap=kinesis_get_result
+ )
+endif()
add_executable(${TEST_NAME}_testdriver ${EXPORTING_ENGINE_TEST_FILES} ${EXPORTING_ENGINE_FILES})
target_compile_options(
${TEST_NAME}_testdriver
@@ -1011,6 +1029,7 @@ if(BUILD_TESTING)
-Wl,--wrap=recv
-Wl,--wrap=send
-Wl,--wrap=connect_to_one_of
+ ${KINESIS_LINK_OPTIONS}
)
target_link_libraries(${TEST_NAME}_testdriver libnetdata ${NETDATA_COMMON_LIBRARIES} ${CMOCKA_LIBRARIES})
add_test(NAME test_${TEST_NAME} COMMAND ${TEST_NAME}_testdriver)
diff --git a/Makefile.am b/Makefile.am
index df1ba3832c..69985d177b 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -495,6 +495,13 @@ EXPORTING_ENGINE_FILES = \
exporting/send_internal_metrics.c \
$(NULL)
+KINESIS_EXPORTING_FILES = \
+ exporting/aws_kinesis/aws_kinesis.c \
+ exporting/aws_kinesis/aws_kinesis.h \
+ exporting/aws_kinesis/aws_kinesis_put_record.cc \
+ exporting/aws_kinesis/aws_kinesis_put_record.h \
+ $(NULL)
+
KINESIS_BACKEND_FILES = \
backends/aws_kinesis/aws_kinesis.c \
backends/aws_kinesis/aws_kinesis.h \
@@ -711,6 +718,13 @@ if ENABLE_PLUGIN_SLABINFO
$(NULL)
endif
+if ENABLE_EXPORTING
+if ENABLE_BACKEND_KINESIS
+ netdata_SOURCES += $(KINESIS_EXPORTING_FILES)
+ netdata_LDADD += $(OPTIONAL_KINESIS_LIBS)
+endif
+endif
+
if ENABLE_BACKEND_KINESIS
netdata_SOURCES += $(KINESIS_BACKEND_FILES)
netdata_LDADD += $(OPTIONAL_KINESIS_LIBS)
@@ -847,4 +861,14 @@ if ENABLE_UNITTESTS
$(TEST_LDFLAGS) \
$(NULL)
exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS)
+if ENABLE_BACKEND_KINESIS
+ exporting_tests_exporting_engine_testdriver_SOURCES += $(KINESIS_EXPORTING_FILES)
+ exporting_tests_exporting_engine_testdriver_LDADD += $(OPTIONAL_KINESIS_LIBS)
+ exporting_tests_exporting_engine_testdriver_LDFLAGS += \
+ -Wl,--wrap=aws_sdk_init \
+ -Wl,--wrap=kinesis_init \
+ -Wl,--wrap=kinesis_put_record \
+ -Wl,--wrap=kinesis_get_result \
+ $(NULL)
+endif
endif
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.cc b/backends/aws_kinesis/aws_kinesis_put_record.cc
index 0c8ece68bf..a8ba4aaca6 100644
--- a/backends/aws_kinesis/aws_kinesis_put_record.cc
+++ b/backends/aws_kinesis/aws_kinesis_put_record.cc
@@ -10,18 +10,18 @@
using namespace Aws;
-SDKOptions options;
+static SDKOptions options;
-Kinesis::KinesisClient *client;
+static Kinesis::KinesisClient *client;
struct request_outcome {
Kinesis::Model::PutRecordOutcomeCallable future_outcome;
size_t data_len;
};
-Vector<request_outcome> request_outcomes;
+static Vector<request_outcome> request_outcomes;
-void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout) {
+void backends_kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout) {
InitAPI(options);
Client::ClientConfiguration config;
@@ -37,13 +37,13 @@ void kinesis_init(const char *region, const char *access_key_id, const char *sec
}
}
-void kinesis_shutdown() {
+void backends_kinesis_shutdown() {
Delete(client);
ShutdownAPI(options);
}
-int kinesis_put_record(const char *stream_name, const char *partition_key,
+int backends_kinesis_put_record(const char *stream_name, const char *partition_key,
const char *data, size_t data_len) {
Kinesis::Model::PutRecordRequest request;
@@ -56,7 +56,7 @@ int kinesis_put_record(const char *stream_name, const char *partition_key,
return 0;
}
-int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes) {
+int backends_kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes) {
Kinesis::Model::PutRecordOutcome outcome;
*sent_bytes = 0;
*lost_bytes = 0;
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.h b/backends/aws_kinesis/aws_kinesis_put_record.h
index f48e420f34..fa3d034591 100644
--- a/backends/aws_kinesis/aws_kinesis_put_record.h
+++ b/backends/aws_kinesis/aws_kinesis_put_record.h
@@ -9,14 +9,14 @@
extern "C" {
#endif
-void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout);
+void backends_kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout);
-void kinesis_shutdown();
+void backends_kinesis_shutdown();
-int kinesis_put_record(const char *stream_name, const char *partition_key,
+int backends_kinesis_put_record(const char *stream_name, const char *partition_key,
const char *data, size_t data_len);
-int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+int backends_kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes);
#ifdef __cplusplus
}
diff --git a/backends/backends.c b/backends/backends.c
index a436ca6836..b373b2d05e 100644
--- a/backends/backends.c
+++ b/backends/backends.c
@@ -578,7 +578,7 @@ void *backends_main(void *ptr) {
goto cleanup;
}
- kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
+ backends_kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
#else
error("BACKEND: AWS Kinesis support isn't compiled");
#endif // HAVE_KINESIS
@@ -860,18 +860,18 @@ void *backends_main(void *ptr) {
char error_message[ERROR_LINE_MAX + 1] = "";
- debug(D_BACKEND, "BACKEND: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
+ debug(D_BACKEND, "BACKEND: backends_kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name,
partition_key, buffer_len, record_len);
- kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len);
+ backends_kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len);
sent += record_len;
chart_transmission_successes++;
size_t sent_bytes = 0, lost_bytes = 0;
- if(unlikely(kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) {
+ if(unlikely(backends_kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) {
// oops! we couldn't send (all or some of the) data
error("BACKEND: %s", error_message);
error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
@@ -1199,7 +1199,7 @@ void *backends_main(void *ptr) {
cleanup:
#if HAVE_KINESIS
if(do_kinesis) {
- kinesis_shutdown();
+ backends_kinesis_shutdown();
freez(kinesis_auth_key_id);
freez(kinesis_secure_key);
freez(kinesis_stream_name);
diff --git a/configure.ac b/configure.ac
index 730d539d4c..7dcb221962 100644
--- a/configure.ac
+++ b/configure.ac
@@ -445,7 +445,7 @@ if test "${ACLK}" = "yes"; then
AC_MSG_CHECKING([if libmosquitto static lib is present])
if test -f "externaldeps/mosquitto/libmosquitto.a"; then
HAVE_libmosquitto_a="yes"
- else
+ else
HAVE_libmosquitto_a="no"
fi
AC_MSG_RESULT([${HAVE_libmosquitto_a}])
@@ -1311,6 +1311,7 @@ AC_CONFIG_FILES([
exporting/graphite/Makefile
exporting/json/Makefile
exporting/opentsdb/Makefile
+ exporting/aws_kinesis/Makefile
exporting/tests/Makefile
health/Makefile
health/notifications/Makefile
diff --git a/exporting/Makefile.am b/exporting/Makefile.am
index ce6282989a..82ae0f77e4 100644
--- a/exporting/Makefile.am
+++ b/exporting/Makefile.am
@@ -8,6 +8,7 @@ SUBDIRS = \
graphite \
json \
opentsdb \
+ aws_kinesis \
$(NULL)
dist_noinst_DATA = \
diff --git a/exporting/aws_kinesis/Makefile.am b/exporting/aws_kinesis/Makefile.am
new file mode 100644
index 0000000000..161784b8f6
--- /dev/null
+++ b/exporting/aws_kinesis/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/exporting/aws_kinesis/README.md b/exporting/aws_kinesis/README.md
new file mode 100644
index 0000000000..acf9d7d0fa
--- /dev/null
+++ b/exporting/aws_kinesis/README.md
@@ -0,0 +1,48 @@
+# Export metrics to AWS Kinesis Data Streams
+
+## Prerequisites
+
+To use AWS Kinesis for metric collecting and processing, you should first
+[install](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) AWS SDK for C++. Netdata
+works with the SDK version 1.7.121. Other versions might work correctly as well, but they were not tested with Netdata.
+`libcrypto`, `libssl`, and `libcurl` are also required to compile Netdata with Kinesis support enabled. Next, Netdata
+should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+If the AWS SDK for C++ is being installed from source, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the
+building process could take a very long time. Note that the default installation path for the libraries is
+`/usr/local/lib64`. Many Linux distributions don't include this path as the default one for a library search, so it is
+advisable to use the following options to `cmake` while building the AWS SDK:
+
+```sh
+cmake -DCMAKE_INSTALL_LIBDIR=/usr/lib -DCMAKE_INSTALL_INCLUDEDIR=/usr/include -DBUILD_SHARED_LIBS=OFF -DBUILD_ONLY=kinesis <aws-sdk-cpp sources>
+```
+
+## Configuration
+
+To enable data sending to the Kinesis service, run `./edit-config exporting.conf` in the Netdata configuration directory
+and set the following options:
+
+```conf
+[kinesis:my_instance]
+ enabled = yes
+ destination = us-east-1
+```
+
+Set the `destination` option to an AWS region.
+
+Set AWS credentials and stream name:
+
+```conf
+ # AWS credentials
+ aws_access_key_id = your_access_key_id
+ aws_secret_access_key = your_secret_access_key
+ # destination stream
+ stream name = your_stream_name
+```
+
+Alternatively, you can set AWS credentials for the `netdata` user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html).
+
+Netdata automatically computes a partition key for every record with the purpose to distribute records across
+available shards evenly.
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2Faws_kinesis%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
diff --git a/exporting/aws_kinesis/aws_kinesis.c b/exporting/aws_kinesis/aws_kinesis.c
new file mode 100644
index 0000000000..2e5da3fad9
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis.c
@@ -0,0 +1,157 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aws_kinesis.h"
+
+/**
+ * Initialize AWS Kinesis connector instance
+ *
+ * @param instance an instance data structure.
+ * @return Returns 0 on success, 1 on failure.
+ */
+int init_aws_kinesis_instance(struct instance *instance)
+{
+ instance->worker = aws_kinesis_connector_worker;
+
+ instance->start_batch_formatting = NULL;
+ instance->start_host_formatting = format_host_labels_json_plaintext;
+ instance->start_chart_formatting = NULL;
+
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
+ else
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
+
+ instance->end_chart_formatting = NULL;
+ instance->end_host_formatting = flush_host_labels;
+ instance->end_batch_formatting = NULL;
+
+ instance->buffer = (void *)buffer_create(0);
+ if (!instance->buffer) {
+ error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s", instance->config.name);
+ return 1;
+ }
+ uv_mutex_init(&instance->mutex);
+ uv_cond_init(&instance->cond_var);
+
+ if (!instance->engine->aws_sdk_initialized) {
+ aws_sdk_init();
+ instance->engine->aws_sdk_initialized = 1;
+ }
+
+ struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ kinesis_init(
+ (void *)connector_specific_data,
+ instance->config.destination,
+ connector_specific_config->auth_key_id,
+ connector_specific_config->secure_key,
+ instance->config.timeoutms);
+
+ return 0;
+}
+
+/**
+ * AWS Kinesis connector worker
+ *
+ * Runs in a separate thread for every instance.
+ *
+ * @param instance_p an instance data structure.
+ */
+void aws_kinesis_connector_worker(void *instance_p)
+{
+ struct instance *instance = (struct instance *)instance_p;
+ struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
+ struct aws_kinesis_specific_data *connector_specific_data = instance->connector_specific_data;
+
+ while (!netdata_exit) {
+ unsigned long long partition_key_seq = 0;
+ struct stats *stats = &instance->stats;
+
+ uv_mutex_lock(&instance->mutex);
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
+
+ BUFFER *buffer = (BUFFER *)instance->buffer;
+ size_t buffer_len = buffer_strlen(buffer);
+
+ size_t sent = 0;
+
+ while (sent < buffer_len) {
+ char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
+ snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
+ size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
+
+ const char *first_char = buffer_tostring(buffer) + sent;
+
+ size_t record_len = 0;
+
+ // split buffer into chunks of maximum allowed size
+ if (buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
+ record_len = buffer_len - sent;
+ } else {
+ record_len = KINESIS_RECORD_MAX - partition_key_len;
+ while (*(first_char + record_len) != '\n' && record_len)
+ record_len--;
+ }
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ debug(
+ D_BACKEND,
+ "EXPORTING: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
+ buffer = %zu, record = %zu",
+ instance->config.destination,
+ connector_specific_config->auth_key_id,
+ connector_specific_config->secure_key,
+ connector_specific_config->stream_name,
+ partition_key,
+ buffer_len,
+ record_len);
+
+ kinesis_put_record(
+ connector_specific_data, connector_specific_config->stream_name, partition_key, first_char, record_len);
+
+ sent += record_len;
+ stats->chart_transmission_successes++;
+
+ size_t sent_bytes = 0, lost_bytes = 0;
+
+ if (unlikely(kinesis_get_result(
+ connector_specific_data->request_outcomes, error_message, &sent_bytes, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ error("EXPORTING: %s", error_message);
+ error(
+ "EXPORTING: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ instance->config.destination, sent_bytes, sent_bytes - lost_bytes);
+
+ stats->chart_transmission_failures++;
+ stats->chart_data_lost_events++;
+ stats->chart_lost_bytes += lost_bytes;
+
+ // estimate the number of lost metrics
+ stats->chart_lost_metrics += (collected_number)(
+ stats->chart_buffered_metrics *
+ (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
+
+ break;
+ } else {
+ stats->chart_receptions++;
+ }
+
+ if (unlikely(netdata_exit))
+ break;
+ }
+
+ stats->chart_sent_bytes += sent;
+ if (likely(sent == buffer_len))
+ stats->chart_sent_metrics = stats->chart_buffered_metrics;
+
+ buffer_flush(buffer);
+
+ uv_mutex_unlock(&instance->mutex);
+
+#ifdef UNIT_TESTING
+ break;
+#endif
+ }
+}
diff --git a/exporting/aws_kinesis/aws_kinesis.h b/exporting/aws_kinesis/aws_kinesis.h
new file mode 100644
index 0000000000..d88a45861c
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis.h
@@ -0,0 +1,16 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_KINESIS_H
+#define NETDATA_EXPORTING_KINESIS_H
+
+#include "exporting/exporting_engine.h"
+#include "exporting/json/json.h"
+#include "aws_kinesis_put_record.h"
+
+#define KINESIS_PARTITION_KEY_MAX 256
+#define KINESIS_RECORD_MAX 1024 * 1024
+
+int init_aws_kinesis_instance(struct instance *instance);
+void aws_kinesis_connector_worker(void *instance_p);
+
+#endif //NETDATA_EXPORTING_KINESIS_H
diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.cc b/exporting/aws_kinesis/aws_kinesis_put_record.cc
new file mode 100644
index 0000000000..b20ec13736
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis_put_record.cc
@@ -0,0 +1,151 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <aws/core/Aws.h>
+#include <aws/core/client/ClientConfiguration.h>
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/utils/Outcome.h>
+#include <aws/kinesis/KinesisClient.h>
+#include <aws/kinesis/model/PutRecordRequest.h>
+#include "aws_kinesis_put_record.h"
+
+using namespace Aws;
+
+static SDKOptions options;
+
+struct request_outcome {
+ Kinesis::Model::PutRecordOutcomeCallable future_outcome;
+ size_t data_len;
+};
+
+/**
+ * Initialize AWS SDK API
+ */
+void aws_sdk_init()
+{
+ InitAPI(options);
+}
+
+/**
+ * Shutdown AWS SDK API
+ */
+void aws_sdk_shutdown()
+{
+ ShutdownAPI(options);
+}
+
+/**
+ * Initialize a client and a data structure for request outcomes
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param region AWS region.
+ * @param access_key_id AWS account access key ID.
+ * @param secret_key AWS account secret access key.
+ * @param timeout communication timeout.
+ */
+void kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+
+ Client::ClientConfiguration config;
+
+ config.region = region;
+ config.requestTimeoutMs = timeout;
+ config.connectTimeoutMs = timeout;
+
+ Kinesis::KinesisClient *client;
+
+ if (access_key_id && *access_key_id && secret_key && *secret_key) {
+ client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config);
+ } else {
+ client = New<Kinesis::KinesisClient>("client", config);
+ }
+ kinesis_specific_data->client = (void *)client;
+
+ Vector<request_outcome> *request_outcomes;
+
+ request_outcomes = new Vector<request_outcome>;
+ kinesis_specific_data->request_outcomes = (void *)request_outcomes;
+}
+
+/**
+ * Deallocate Kinesis specific data
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ */
+void kinesis_shutdown(void *kinesis_specific_data_p)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+
+ Delete((Kinesis::KinesisClient *)kinesis_specific_data->client);
+ delete (Vector<request_outcome> *)kinesis_specific_data->request_outcomes;
+}
+
+/**
+ * Send data to the Kinesis service
+ *
+ * @param kinesis_specific_data_p a pointer to a structure with client and request outcome information.
+ * @param stream_name the name of a stream to send to.
+ * @param partition_key a partition key which automatically maps data to a specific stream.
+ * @param data a data buffer to send to the stream.
+ * @param data_len the length of the data buffer.
+ */
+void kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len)
+{
+ struct aws_kinesis_specific_data *kinesis_specific_data =
+ (struct aws_kinesis_specific_data *)kinesis_specific_data_p;
+ Kinesis::Model::PutRecordRequest request;
+
+ request.SetStreamName(stream_name);
+ request.SetPartitionKey(partition_key);
+ request.SetData(Utils::ByteBuffer((unsigned char *)data, data_len));
+
+ ((Vector<request_outcome> *)(kinesis_specific_data->request_outcomes))->push_back(
+ { ((Kinesis::KinesisClient *)(kinesis_specific_data->client))->PutRecordCallable(request), data_len });
+}
+
+/**
+ * Get results from service responces
+ *
+ * @param request_outcomes_p request outcome information.
+ * @param error_message report error message to a caller.
+ * @param sent_bytes report to a caller how many bytes was successfuly sent.
+ * @param lost_bytes report to a caller how many bytes was lost during transmission.
+ * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission
+ */
+int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes)
+{
+ Vector<request_outcome> *request_outcomes = (Vector<request_outcome> *)request_outcomes_p;
+ Kinesis::Model::PutRecordOutcome outcome;
+ *sent_bytes = 0;
+ *lost_bytes = 0;
+
+ for (auto request_outcome = request_outcomes->begin(); request_outcome != request_outcomes->end();) {
+ std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100));
+
+ if (status == std::future_status::ready || status == std::future_status::deferred) {
+ outcome = request_outcome->future_outcome.get();
+ *sent_bytes += request_outcome->data_len;
+
+ if (!outcome.IsSuccess()) {
+ *lost_bytes += request_outcome->data_len;
+ outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX);
+ }
+
+ request_outcomes->erase(request_outcome);
+ } else {
+ ++request_outcome;
+ }
+ }
+
+ if (*lost_bytes) {
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/exporting/aws_kinesis/aws_kinesis_put_record.h b/exporting/aws_kinesis/aws_kinesis_put_record.h
new file mode 100644
index 0000000000..321baf6699
--- /dev/null
+++ b/exporting/aws_kinesis/aws_kinesis_put_record.h
@@ -0,0 +1,35 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
+#define NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aws_kinesis_specific_data {
+ void *client;
+ void *request_outcomes;
+};
+
+void aws_sdk_init();
+void aws_sdk_shutdown();
+
+void kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout);
+void kinesis_shutdown(void *client);
+
+void kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len);
+
+int kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_EXPORTING_KINESIS_PUT_RECORD_H
diff --git a/exporting/check_filters.c b/exporting/check_filters.c
index f1ed9e828c..cfe0b4ce43 100644
--- a/exporting/check_filters.c
+++ b/exporting/check_filters.c
@@ -12,7 +12,7 @@
int rrdhost_is_exportable(struct instance *instance, RRDHOST *host)
{
if (host->exporting_flags == NULL)
- host->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
+ host->exporting_flags = callocz(instance->engine->instance_num, sizeof(size_t));
RRDHOST_FLAGS *flags = &host->exporting_flags[instance->index];
@@ -46,7 +46,7 @@ int rrdset_is_exportable(struct instance *instance, RRDSET *st)
RRDHOST *host = st->rrdhost;
if (st->exporting_flags == NULL)
- st->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
+ st->exporting_flags = callocz(instance->engine->instance_num, sizeof(size_t));
RRDSET_FLAGS *flags = &st->exporting_flags[instance->index];
diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h
index d45397b448..c1e63a42eb 100644
--- a/exporting/exporting_engine.h
+++ b/exporting/exporting_engine.h
@@ -43,6 +43,12 @@ extern struct config exporting_config;
#define EXPORTER_SEND_NAMES "send