summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2019-05-13 15:45:34 +0300
committerChris Akritidis <43294513+cakrit@users.noreply.github.com>2019-05-13 14:45:34 +0200
commit198c7fa3e34c71bc075bcfb067da80242e74bddf (patch)
tree906a580bf1658a09f67836dbff5f93cced079382
parent51decad9896131ed478681d90886d63bbd953b14 (diff)
Add AWS Kinesis backend (#5914)
* Add Kinesis backend * Separate config file * Send data in chunks * Fix minor issues * Add error handling * Use existing JSON functions * Do not retry on send failure * Implement building with autotools * Implement building with CMake * Fix CMake variables * Fix build when C++ compiler is not available * Add checks for C++11 * Don't reinitialize API * Don't reinitialize client * Minor cleanup * Fix Codacy warning * Separate sending records and receiving results * Add documentation * Make connection timeout configurable * Fix operation metrics * Fix typo * Change parameter names for credentials * Allow using the default SDK credentials configuration
-rw-r--r--CMakeLists.txt79
-rw-r--r--Makefile.am19
-rw-r--r--backends/Makefile.am1
-rw-r--r--backends/README.md34
-rw-r--r--backends/aws_kinesis/Makefile.am12
-rw-r--r--backends/aws_kinesis/README.md34
-rw-r--r--backends/aws_kinesis/aws_kinesis.c101
-rw-r--r--backends/aws_kinesis/aws_kinesis.conf10
-rw-r--r--backends/aws_kinesis/aws_kinesis.h14
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.cc87
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.h25
-rw-r--r--backends/backends.c251
-rw-r--r--backends/backends.h4
-rw-r--r--configure.ac147
-rwxr-xr-xnetdata-installer.sh5
15 files changed, 722 insertions, 101 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 46a3fed32f..72a31ec4ef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -3,8 +3,8 @@
# This file is only used for development (netdata in Clion)
# It can build netdata, but you are on your own...
-cmake_minimum_required(VERSION 3.0.2)
-project(netdata C)
+cmake_minimum_required(VERSION 3.1.0)
+project(netdata C CXX)
find_package(Threads REQUIRED)
find_package(PkgConfig REQUIRED)
@@ -158,6 +158,50 @@ ENDIF(MACOS)
# -----------------------------------------------------------------------------
+# Detect libcrypto
+
+pkg_check_modules(CRYPTO libcrypto)
+# later we use:
+# ${CRYPTO_LIBRARIES}
+# ${CRYPTO_CFLAGS_OTHER}
+# ${CRYPTO_INCLUDE_DIRS}
+
+
+# -----------------------------------------------------------------------------
+# Detect libssl
+
+pkg_check_modules(SSL libssl)
+# later we use:
+# ${SSL_LIBRARIES}
+# ${SSL_CFLAGS_OTHER}
+# ${SSL_INCLUDE_DIRS}
+
+
+# -----------------------------------------------------------------------------
+# Detect libcurl
+
+pkg_check_modules(CURL libcurl)
+# later we use:
+# ${CURL_LIBRARIES}
+# ${CURL_CFLAGS_OTHER}
+# ${CURL_INCLUDE_DIRS}
+
+
+# -----------------------------------------------------------------------------
+# Detect libaws-cpp-sdk-core
+
+find_library(HAVE_AWS aws-cpp-sdk-core)
+# later we use:
+# ${HAVE_AWS}
+
+# -----------------------------------------------------------------------------
+# Detect libaws-cpp-sdk-kinesis
+
+find_library(HAVE_KINESIS aws-cpp-sdk-kinesis)
+# later we use:
+# ${HAVE_KINESIS}
+
+# -----------------------------------------------------------------------------
# netdata files
set(LIBNETDATA_FILES
@@ -440,6 +484,13 @@ set(BACKENDS_PLUGIN_FILES
backends/prometheus/backend_prometheus.h
)
+set(KINESIS_BACKEND_FILES
+ backends/aws_kinesis/aws_kinesis.c
+ backends/aws_kinesis/aws_kinesis.h
+ backends/aws_kinesis/aws_kinesis_put_record.cc
+ backends/aws_kinesis/aws_kinesis_put_record.h
+ )
+
set(DAEMON_FILES
daemon/common.c
daemon/common.h
@@ -486,6 +537,25 @@ add_definitions(
)
# -----------------------------------------------------------------------------
+# kinesis backend
+
+IF(HAVE_KINESIS AND HAVE_AWS AND CRYPTO_LIBRARIES AND SSL_LIBRARIES AND CURL_LIBRARIES)
+ SET(ENABLE_BACKEND_KINESIS True)
+ELSE()
+ SET(ENABLE_BACKEND_KINESIS False)
+ENDIF()
+
+IF(ENABLE_BACKEND_KINESIS)
+ message(STATUS "kinesis backend: enabled")
+ list(APPEND NETDATA_FILES ${KINESIS_BACKEND_FILES})
+ list(APPEND NETDATA_COMMON_LIBRARIES aws-cpp-sdk-kinesis aws-cpp-sdk-core ${CRYPTO_LIBRARIES} ${SSL_LIBRARIES} ${CURL_LIBRARIES})
+ list(APPEND NETDATA_COMMON_INCLUDE_DIRS ${CRYPTO_INCLUDE_DIRS} ${SSL_INCLUDE_DIRS} ${CURL_INCLUDE_DIRS})
+ list(APPEND NETDATA_COMMON_CFLAGS ${CRYPTO_CFLAGS_OTHER} ${SSL_CFLAGS_OTHER} ${CURL_CFLAGS_OTHER})
+ELSE()
+ message(STATUS "kinesis backend: disabled (requires AWS SDK for C++)")
+ENDIF()
+
+# -----------------------------------------------------------------------------
# netdata
set(NETDATA_COMMON_LIBRARIES ${NETDATA_COMMON_LIBRARIES} m ${CMAKE_THREAD_LIBS_INIT})
@@ -522,6 +592,11 @@ ELSEIF(MACOS)
ENDIF()
+IF(ENABLE_BACKEND_KINESIS)
+ set_property(TARGET netdata PROPERTY CXX_STANDARD 11)
+ set_property(TARGET netdata PROPERTY CMAKE_CXX_STANDARD_REQUIRED ON)
+ENDIF()
+
IF(IPMI_LIBRARIES)
SET(ENABLE_PLUGIN_FREEIPMI True)
ELSE()
diff --git a/Makefile.am b/Makefile.am
index ee5ba9f172..7f3604ef83 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -412,6 +412,13 @@ BACKENDS_PLUGIN_FILES = \
backends/prometheus/backend_prometheus.h \
$(NULL)
+KINESIS_BACKEND_FILES = \
+ backends/aws_kinesis/aws_kinesis.c \
+ backends/aws_kinesis/aws_kinesis.h \
+ backends/aws_kinesis/aws_kinesis_put_record.cc \
+ backends/aws_kinesis/aws_kinesis_put_record.h \
+ $(NULL)
+
DAEMON_FILES = \
daemon/common.c \
daemon/common.h \
@@ -471,13 +478,18 @@ NETDATA_COMMON_LIBS = \
$(OPTIONAL_ZLIB_LIBS) \
$(OPTIONAL_UUID_LIBS) \
$(NULL)
-
+# TODO: Find more graceful way to add libs for AWS Kinesis
sbin_PROGRAMS += netdata
netdata_SOURCES = $(NETDATA_FILES)
netdata_LDADD = \
$(NETDATA_COMMON_LIBS) \
$(NULL)
+if ENABLE_BACKEND_KINESIS
+ netdata_LINK = $(CXXLD) $(CXXFLAGS) -o $@
+else
+ netdata_LINK = $(CCLD) $(CFLAGS) -o $@
+endif
if ENABLE_PLUGIN_APPS
plugins_PROGRAMS += apps.plugin
@@ -531,3 +543,8 @@ if ENABLE_PLUGIN_XENSTAT
$(OPTIONAL_XENSTAT_LIBS) \
$(NULL)
endif
+
+if ENABLE_BACKEND_KINESIS
+ netdata_SOURCES += $(KINESIS_BACKEND_FILES)
+ netdata_LDADD += $(OPTIONAL_KINESIS_LIBS)
+endif
diff --git a/backends/Makefile.am b/backends/Makefile.am
index b8daefc592..278f057a1d 100644
--- a/backends/Makefile.am
+++ b/backends/Makefile.am
@@ -8,6 +8,7 @@ SUBDIRS = \
json \
opentsdb \
prometheus \
+ aws_kinesis \
$(NULL)
dist_noinst_DATA = \
diff --git a/backends/README.md b/backends/README.md
index 22dc77597f..efaba0caac 100644
--- a/backends/README.md
+++ b/backends/README.md
@@ -32,24 +32,28 @@ X seconds (though, it can send them per second if you need it to).
- **prometheus** is described at [prometheus page](prometheus/) since it pulls data from netdata.
+ - **AWS Kinesis Data Streams**
+
+ metrics are sent to the service in `JSON` format.
+
2. Only one backend may be active at a time.
3. Netdata can filter metrics (at the chart level), to send only a subset of the collected metrics.
4. Netdata supports three modes of operation for all backends:
- - `as-collected` sends to backends the metrics as they are collected, in the units they are collected.
- So, counters are sent as counters and gauges are sent as gauges, much like all data collectors do.
+ - `as-collected` sends to backends the metrics as they are collected, in the units they are collected.
+ So, counters are sent as counters and gauges are sent as gauges, much like all data collectors do.
For example, to calculate CPU utilization in this format, you need to know how to convert kernel ticks to percentage.
- - `average` sends to backends normalized metrics from the netdata database.
- In this mode, all metrics are sent as gauges, in the units netdata uses. This abstracts data collection
- and simplifies visualization, but you will not be able to copy and paste queries from other sources to convert units.
- For example, CPU utilization percentage is calculated by netdata, so netdata will convert ticks to percentage and
+ - `average` sends to backends normalized metrics from the netdata database.
+ In this mode, all metrics are sent as gauges, in the units netdata uses. This abstracts data collection
+ and simplifies visualization, but you will not be able to copy and paste queries from other sources to convert units.
+ For example, CPU utilization percentage is calculated by netdata, so netdata will convert ticks to percentage and
send the average percentage to the backend.
- - `sum` or `volume`: the sum of the interpolated values shown on the netdata graphs is sent to the backend.
- So, if netdata is configured to send data to the backend every 10 seconds, the sum of the 10 values shown on the
+ - `sum` or `volume`: the sum of the interpolated values shown on the netdata graphs is sent to the backend.
+ So, if netdata is configured to send data to the backend every 10 seconds, the sum of the 10 values shown on the
netdata charts will be used.
Time-series databases suggest to collect the raw values (`as-collected`). If you plan to invest on building your monitoring around a time-series database and you already know (or you will invest in learning) how to convert units and normalize the metrics in Grafana or other visualization tools, we suggest to use `as-collected`.
@@ -66,9 +70,9 @@ of `netdata.conf` from your netdata):
```
[backend]
enabled = yes | no
- type = graphite | opentsdb | json
+ type = graphite | opentsdb | json | kinesis
host tags = list of TAG=VALUE
- destination = space separated list of [PROTOCOL:]HOST[:PORT] - the first working will be used
+ destination = space separated list of [PROTOCOL:]HOST[:PORT] - the first working will be used, or a region for kinesis
data source = average | sum | as collected
prefix = netdata
hostname = my-name
@@ -82,7 +86,7 @@ of `netdata.conf` from your netdata):
- `enabled = yes | no`, enables or disables sending data to a backend
-- `type = graphite | opentsdb | json`, selects the backend type
+- `type = graphite | opentsdb | json | kinesis`, selects the backend type
- `destination = host1 host2 host3 ...`, accepts **a space separated list** of hostnames,
IPs (IPv4 and IPv6) and ports to connect to.
@@ -105,7 +109,7 @@ of `netdata.conf` from your netdata):
```
Example IPv6 and IPv4 together:
-
+
```
destination = [ffff:...:0001]:2003 10.11.12.1:2003
```
@@ -118,6 +122,8 @@ of `netdata.conf` from your netdata):
time-series database when it becomes available again. It can also be used to monitor / trace / debug
the metrics netdata generates.
+ For kinesis backend `destination` should be set to an AWS region (for example, `us-east-1`).
+
- `data source = as collected`, or `data source = average`, or `data source = sum`, selects the kind of
data that will be sent to the backend.
@@ -170,7 +176,7 @@ netdata provides 5 charts:
1. **Buffered metrics**, the number of metrics netdata added to the buffer for dispatching them to the
backend server.
-
+
2. **Buffered data size**, the amount of data (in KB) netdata added the buffer.
3. ~~**Backend latency**, the time the backend server needed to process the data netdata sent.
@@ -178,7 +184,7 @@ netdata provides 5 charts:
(this chart has been removed, because it only measures the time netdata needs to give the data
to the O/S - since the backend servers do not ack the reception, netdata does not have any means
to measure this properly).
-
+
4. **Backend operations**, the number of operations performed by netdata.
5. **Backend thread CPU usage**, the CPU resources consumed by the netdata thread, that is responsible
diff --git a/backends/aws_kinesis/Makefile.am b/backends/aws_kinesis/Makefile.am
new file mode 100644
index 0000000000..7317b3821b
--- /dev/null
+++ b/backends/aws_kinesis/Makefile.am
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
+dist_libconfig_DATA = \
+ aws_kinesis.conf \
+ $(NULL) \ No newline at end of file
diff --git a/backends/aws_kinesis/README.md b/backends/aws_kinesis/README.md
new file mode 100644
index 0000000000..a9cc77d6ec
--- /dev/null
+++ b/backends/aws_kinesis/README.md
@@ -0,0 +1,34 @@
+# Using netdata with AWS Kinesis Data Streams
+
+## Prerequisites
+
+To use AWS Kinesis as a backend AWS SDK for C++ should be [installed](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) first. `libcrypto`, `libssl`, and `libcurl` are also required to compile netdata with Kinesis support enabled. Next, netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+If AWS SDK for C++ is being installed from sources, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the building process could take a very long time.
+
+## Configuration
+
+To enable data sending to the kinesis backend set the following options in `netdata.conf`:
+```
+[backend]
+ enabled = yes
+ type = kinesis
+ destination = us-east-1
+```
+set the `destination` option to an AWS region.
+
+In the netdata configuration directory run `./edit-config aws_kinesis.conf` and set AWS credentials and stream name:
+```
+# AWS credentials
+aws_access_key_id = your_access_key_id
+aws_secret_access_key = your_secret_access_key
+
+# destination stream
+stream name = your_stream_name
+```
+Alternatively, AWS credentials can be set for the *netdata* user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html).
+
+A partition key for every record is computed automatically by the netdata with the purpose to distribute records across available shards evenly.
+
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fbackends%2Faws_kinesis%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/backends/aws_kinesis/aws_kinesis.c b/backends/aws_kinesis/aws_kinesis.c
new file mode 100644
index 0000000000..d8b79364cc
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.c
@@ -0,0 +1,101 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define BACKENDS_INTERNALS
+#include "aws_kinesis.h"
+
+#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2)
+
+// ----------------------------------------------------------------------------
+// kinesis backend
+
+// read the aws_kinesis.conf file
+int read_kinesis_conf(const char *path, char **access_key_id_p, char **secret_access_key_p, char **stream_name_p)
+{
+ char *access_key_id = *access_key_id_p;
+ char *secret_access_key = *secret_access_key_p;
+ char *stream_name = *stream_name_p;
+
+ if(unlikely(access_key_id)) freez(access_key_id);
+ if(unlikely(secret_access_key)) freez(secret_access_key);
+ if(unlikely(stream_name)) freez(stream_name);
+ access_key_id = NULL;
+ secret_access_key = NULL;
+ stream_name = NULL;
+
+ int line = 0;
+
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/aws_kinesis.conf", path);
+
+ char buffer[CONFIG_FILE_LINE_MAX + 1], *s;
+
+ debug(D_BACKEND, "BACKEND: opening config file '%s'", filename);
+
+ FILE *fp = fopen(filename, "r");
+ if(!fp) {
+ return 1;
+ }
+
+ while(fgets(buffer, CONFIG_FILE_LINE_MAX, fp) != NULL) {
+ buffer[CONFIG_FILE_LINE_MAX] = '\0';
+ line++;
+
+ s = trim(buffer);
+ if(!s || *s == '#') {
+ debug(D_BACKEND, "BACKEND: ignoring line %d of file '%s', it is empty.", line, filename);
+ continue;
+ }
+
+ char *name = s;
+ char *value = strchr(s, '=');
+ if(unlikely(!value)) {
+ error("BACKEND: ignoring line %d ('%s') of file '%s', there is no = in it.", line, s, filename);
+ continue;
+ }
+ *value = '\0';
+ value++;
+
+ name = trim(name);
+ value = trim(value);
+
+ if(unlikely(!name || *name == '#')) {
+ error("BACKEND: ignoring line %d of file '%s', name is empty.", line, filename);
+ continue;
+ }
+
+ if(!value) value = "";
+
+ // strip quotes
+ if(*value == '"' || *value == '\'') {
+ value++;
+
+ s = value;
+ while(*s) s++;
+ if(s != value) s--;
+
+ if(*s == '"' || *s == '\'') *s = '\0';
+ }
+ if(name[0] == 'a' && name[4] == 'a' && !strcmp(name, "aws_access_key_id")) {
+ access_key_id = strdupz(value);
+ }
+ else if(name[0] == 'a' && name[4] == 's' && !strcmp(name, "aws_secret_access_key")) {
+ secret_access_key = strdupz(value);
+ }
+ else if(name[0] == 's' && !strcmp(name, "stream name")) {
+ stream_name = strdupz(value);
+ }
+ }
+
+ fclose(fp);
+
+ if(unlikely(!stream_name || !*stream_name)) {
+ error("BACKEND: stream name is a mandatory Kinesis parameter but it is not configured");
+ return 1;
+ }
+
+ *access_key_id_p = access_key_id;
+ *secret_access_key_p = secret_access_key;
+ *stream_name_p = stream_name;
+
+ return 0;
+}
diff --git a/backends/aws_kinesis/aws_kinesis.conf b/backends/aws_kinesis/aws_kinesis.conf
new file mode 100644
index 0000000000..cc54b5fa22
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.conf
@@ -0,0 +1,10 @@
+# AWS Kinesis Data Streams backend configuration
+#
+# All options in this file are mandatory
+
+# AWS credentials
+aws_access_key_id =
+aws_secret_access_key =
+
+# destination stream
+stream name = \ No newline at end of file
diff --git a/backends/aws_kinesis/aws_kinesis.h b/backends/aws_kinesis/aws_kinesis.h
new file mode 100644
index 0000000000..50a4631c55
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_BACKEND_KINESIS_H
+#define NETDATA_BACKEND_KINESIS_H
+
+#include "backends/backends.h"
+#include "aws_kinesis_put_record.h"
+
+#define KINESIS_PARTITION_KEY_MAX 256
+#define KINESIS_RECORD_MAX 1024 * 1024
+
+extern int read_kinesis_conf(const char *path, char **auth_key_id_p, char **secure_key_p, char **stream_name_p);
+
+#endif //NETDATA_BACKEND_KINESIS_H
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.cc b/backends/aws_kinesis/aws_kinesis_put_record.cc
new file mode 100644
index 0000000000..0c8ece68bf
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis_put_record.cc
@@ -0,0 +1,87 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <aws/core/Aws.h>
+#include <aws/core/client/ClientConfiguration.h>
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/utils/Outcome.h>
+#include <aws/kinesis/KinesisClient.h>
+#include <aws/kinesis/model/PutRecordRequest.h>
+#include "aws_kinesis_put_record.h"
+
+using namespace Aws;
+
+SDKOptions options;
+
+Kinesis::KinesisClient *client;
+
+struct request_outcome {
+ Kinesis::Model::PutRecordOutcomeCallable future_outcome;
+ size_t data_len;
+};
+
+Vector<request_outcome> request_outcomes;
+
+void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout) {
+ InitAPI(options);
+
+ Client::ClientConfiguration config;
+
+ config.region = region;
+ config.requestTimeoutMs = timeout;
+ config.connectTimeoutMs = timeout;
+
+ if(access_key_id && *access_key_id && secret_key && *secret_key) {
+ client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config);
+ } else {
+ client = New<Kinesis::KinesisClient>("client", config);
+ }
+}
+
+void kinesis_shutdown() {
+ Delete(client);
+
+ ShutdownAPI(options);
+}
+
+int kinesis_put_record(const char *stream_name, const char *partition_key,
+ const char *data, size_t data_len) {
+ Kinesis::Model::PutRecordRequest request;
+
+ request.SetStreamName(stream_name);
+ request.SetPartitionKey(partition_key);
+ request.SetData(Utils::ByteBuffer((unsigned char*) data, data_len));
+
+ request_outcomes.push_back({client->PutRecordCallable(request), data_len});
+
+ return 0;
+}
+
+int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes) {
+ Kinesis::Model::PutRecordOutcome outcome;
+ *sent_bytes = 0;
+ *lost_bytes = 0;
+
+ for(auto request_outcome = request_outcomes.begin(); request_outcome != request_outcomes.end(); ) {
+ std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100));
+
+ if(status == std::future_status::ready || status == std::future_status::deferred) {
+ outcome = request_outcome->future_outcome.get();
+ *sent_bytes += request_outcome->data_len;
+
+ if(!outcome.IsSuccess()) {
+ *lost_bytes += request_outcome->data_len;
+ outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX);
+ }
+
+ request_outcomes.erase(request_outcome);
+ } else {
+ ++request_outcome;
+ }
+ }
+
+ if(*lost_bytes) {
+ return 1;
+ }
+
+ return 0;
+} \ No newline at end of file
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.h b/backends/aws_kinesis/aws_kinesis_put_record.h
new file mode 100644
index 0000000000..f48e420f34
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis_put_record.h
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_BACKEND_KINESIS_PUT_RECORD_H
+#define NETDATA_BACKEND_KINESIS_PUT_RECORD_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout);
+
+void kinesis_shutdown();
+
+int kinesis_put_record(const char *stream_name, const char *partition_key,
+ const char *data, size_t data_len);
+
+int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_BACKEND_KINESIS_PUT_RECORD_H
diff --git a/backends/backends.c b/backends/backends.c
index da818c50bf..deced30c46 100644
--- a/backends/backends.c
+++ b/backends/backends.c
@@ -238,6 +238,11 @@ void *backends_main(void *ptr) {
int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS) = NULL;
int (*backend_response_checker)(BUFFER *) = NULL;
+#if HAVE_KINESIS
+ int do_kinesis = 0;
+ char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
+#endif
+
// ------------------------------------------------------------------------
// collect configuration options
@@ -263,7 +268,6 @@ void *backends_main(void *ptr) {
charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);
-
// ------------------------------------------------------------------------
// validate configuration options
// and prepare for sending data to our backend
@@ -316,6 +320,26 @@ void *backends_main(void *ptr) {
backend_request_formatter = format_dimension_stored_json_plaintext;
}
+#if HAVE_KINESIS
+ else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
+
+ do_kinesis = 1;
+
+ if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
+ error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir);
+ goto cleanup;
+ }
+
+ kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
+
+ backend_response_checker = process_json_response;
+ if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ backend_request_formatter = format_dimension_collected_json_plaintext;
+ else
+ backend_request_formatter = format_dimension_stored_json_plaintext;
+
+ }
+#endif /* HAVE_KINESIS */
else {
error("BACKEND: Unknown backend type '%s'", type);
goto cleanup;
@@ -481,6 +505,7 @@ void *backends_main(void *ptr) {
chart_sent_bytes =
chart_sent_metrics =
chart_lost_metrics =
+ chart_receptions =
chart_transmission_successes =
chart_transmission_failures =
chart_data_lost_events =
@@ -497,104 +522,177 @@ void *backends_main(void *ptr) {
// to add incrementally data to buffer
after = before;
- // ------------------------------------------------------------------------
- // if we are connected, receive a response, without blocking
+#if HAVE_KINESIS
+ if(do_kinesis) {
+ unsigned long long partition_key_seq = 0;
- if(likely(sock != -1)) {
- errno = 0;
+ size_t buffer_len = buffer_strlen(b);
+ size_t sent = 0;
- // loop through to collect all data
- while(sock != -1 && errno != EWOULDBLOCK) {
- buffer_need_bytes(response, 4096);
+ while(sent < buffer_len) {
+ char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
+ snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
+ size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
- ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
- if(likely(r > 0)) {
- // we received some data
- response->len += r;
- chart_received_bytes += r;
- chart_receptions++;
+ const char *first_char = buffer_tostring(b) + sent;
+
+ size_t record_len = 0;
+
+ // split buffer into chunks of maximum allowed size
+ if(buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
+ record_len = buffer_len - sent;
}
- else if(r == 0) {
- error("BACKEND: '%s' closed the socket", destination);
- close(sock);
- sock = -1;
+ else {
+ record_len = KINESIS_RECORD_MAX - partition_key_len;
+ while(*(first_char + record_len) != '\n' && record_len) record_len--;
+ }
+
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ debug(D_BACKEND, "BACKEND: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
+ buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name,
+ partition_key, buffer_len, record_len);
+
+ kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len);
+
+ sent += record_len;
+ chart_transmission_successes++;
+
+ size_t sent_bytes = 0, lost_bytes = 0;
+
+ if(unlikely(kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ error("BACKEND: %s", error_message);
+ error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ destination, sent_bytes, sent_bytes - lost_bytes);
+
+ chart_transmission_failures++;
+ chart_data_lost_events++;
+ chart_lost_bytes += lost_bytes;
+
+ // estimate the number of lost metrics
+ chart_lost_metrics += (collected_number)(chart_buffered_metrics
+ * (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
+
+ break;
}
else {
- // failed to receive data
- if(errno != EAGAIN && errno != EWOULDBLOCK) {
- error("BACKEND: cannot receive data from backend '%s'.", destination);
- }
+ chart_receptions++;
}
+
+ if(unlikely(netdata_exit)) break;
}
- // if we received data, process them
- if(buffer_strlen(response))
- backend_response_checker(response);
+ chart_sent_bytes += sent;
+ if(likely(sent == buffer_len))
+ chart_sent_metrics = chart_buffered_metrics;
+
+ buffer_flush(b);
}
+ else {
+#else
+ {
+#endif /* HAVE_KINESIS */
+
+ // ------------------------------------------------------------------------
+ // if we are connected, receive a response, without blocking
+
+ if(likely(sock != -1)) {
+ errno = 0;
+
+ // loop through to collect all data
+ while(sock != -1 && errno != EWOULDBLOCK) {
+ buffer_need_bytes(response, 4096);
+
+ ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ if(likely(r > 0)) {
+ // we received some data
+ response->len += r;
+ chart_received_bytes += r;
+ chart_receptions++;
+ }
+ else if(r == 0) {
+ error("BACKEND: '%s' closed the socket", destination);
+ close(sock);
+ sock = -1;
+ }
+ else {
+ // failed to receive data
+ if(errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("BACKEND: cannot receive data from backend '%s'.", destination);
+ }
+ }
+ }
- // ------------------------------------------------------------------------
- // if we are not connected, connect to a backend server
+ // if we received data, process them
+ if(buffer_strlen(response))
+ backend_response_checker(response);
+ }
- if(unlikely(sock == -1)) {
- // usec_t start_ut = now_monotonic_usec();
- size_t reconnects = 0;
+ // ------------------------------------------------------------------------
+ // if we are not connected, connect to a backend server
- sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
+ if(unlikely(sock == -1)) {
+ // usec_t start_ut = now_monotonic_usec();
+ size_t reconnects = 0;
- chart_backend_reconnects += reconnects;
- // chart_backend_latency += now_monotonic_usec() - start_ut;
- }
+ sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
- if(unlikely(netdata_exit)) break;
+ chart_backend_reconnects += reconnects;<