summaryrefslogtreecommitdiffstats
path: root/backends
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2019-05-13 15:45:34 +0300
committerChris Akritidis <43294513+cakrit@users.noreply.github.com>2019-05-13 14:45:34 +0200
commit198c7fa3e34c71bc075bcfb067da80242e74bddf (patch)
tree906a580bf1658a09f67836dbff5f93cced079382 /backends
parent51decad9896131ed478681d90886d63bbd953b14 (diff)
Add AWS Kinesis backend (#5914)
* Add Kinesis backend * Separate config file * Send data in chunks * Fix minor issues * Add error handling * Use existing JSON functions * Do not retry on send failure * Implement building with autotools * Implement building with CMake * Fix CMake variables * Fix build when C++ compiler is not available * Add checks for C++11 * Don't reinitialize API * Don't reinitialize client * Minor cleanup * Fix Codacy warning * Separate sending records and receiving results * Add documentation * Make connection timeout configurable * Fix operation metrics * Fix typo * Change parameter names for credentials * Allow using the default SDK credentials configuration
Diffstat (limited to 'backends')
-rw-r--r--backends/Makefile.am1
-rw-r--r--backends/README.md34
-rw-r--r--backends/aws_kinesis/Makefile.am12
-rw-r--r--backends/aws_kinesis/README.md34
-rw-r--r--backends/aws_kinesis/aws_kinesis.c101
-rw-r--r--backends/aws_kinesis/aws_kinesis.conf10
-rw-r--r--backends/aws_kinesis/aws_kinesis.h14
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.cc87
-rw-r--r--backends/aws_kinesis/aws_kinesis_put_record.h25
-rw-r--r--backends/backends.c251
-rw-r--r--backends/backends.h4
11 files changed, 487 insertions, 86 deletions
diff --git a/backends/Makefile.am b/backends/Makefile.am
index b8daefc592..278f057a1d 100644
--- a/backends/Makefile.am
+++ b/backends/Makefile.am
@@ -8,6 +8,7 @@ SUBDIRS = \
json \
opentsdb \
prometheus \
+ aws_kinesis \
$(NULL)
dist_noinst_DATA = \
diff --git a/backends/README.md b/backends/README.md
index 22dc77597f..efaba0caac 100644
--- a/backends/README.md
+++ b/backends/README.md
@@ -32,24 +32,28 @@ X seconds (though, it can send them per second if you need it to).
- **prometheus** is described at [prometheus page](prometheus/) since it pulls data from netdata.
+ - **AWS Kinesis Data Streams**
+
+ metrics are sent to the service in `JSON` format.
+
2. Only one backend may be active at a time.
3. Netdata can filter metrics (at the chart level), to send only a subset of the collected metrics.
4. Netdata supports three modes of operation for all backends:
- - `as-collected` sends to backends the metrics as they are collected, in the units they are collected.
- So, counters are sent as counters and gauges are sent as gauges, much like all data collectors do.
+ - `as-collected` sends to backends the metrics as they are collected, in the units they are collected.
+ So, counters are sent as counters and gauges are sent as gauges, much like all data collectors do.
For example, to calculate CPU utilization in this format, you need to know how to convert kernel ticks to percentage.
- - `average` sends to backends normalized metrics from the netdata database.
- In this mode, all metrics are sent as gauges, in the units netdata uses. This abstracts data collection
- and simplifies visualization, but you will not be able to copy and paste queries from other sources to convert units.
- For example, CPU utilization percentage is calculated by netdata, so netdata will convert ticks to percentage and
+ - `average` sends to backends normalized metrics from the netdata database.
+ In this mode, all metrics are sent as gauges, in the units netdata uses. This abstracts data collection
+ and simplifies visualization, but you will not be able to copy and paste queries from other sources to convert units.
+ For example, CPU utilization percentage is calculated by netdata, so netdata will convert ticks to percentage and
send the average percentage to the backend.
- - `sum` or `volume`: the sum of the interpolated values shown on the netdata graphs is sent to the backend.
- So, if netdata is configured to send data to the backend every 10 seconds, the sum of the 10 values shown on the
+ - `sum` or `volume`: the sum of the interpolated values shown on the netdata graphs is sent to the backend.
+ So, if netdata is configured to send data to the backend every 10 seconds, the sum of the 10 values shown on the
netdata charts will be used.
Time-series databases suggest to collect the raw values (`as-collected`). If you plan to invest on building your monitoring around a time-series database and you already know (or you will invest in learning) how to convert units and normalize the metrics in Grafana or other visualization tools, we suggest to use `as-collected`.
@@ -66,9 +70,9 @@ of `netdata.conf` from your netdata):
```
[backend]
enabled = yes | no
- type = graphite | opentsdb | json
+ type = graphite | opentsdb | json | kinesis
host tags = list of TAG=VALUE
- destination = space separated list of [PROTOCOL:]HOST[:PORT] - the first working will be used
+ destination = space separated list of [PROTOCOL:]HOST[:PORT] - the first working will be used, or a region for kinesis
data source = average | sum | as collected
prefix = netdata
hostname = my-name
@@ -82,7 +86,7 @@ of `netdata.conf` from your netdata):
- `enabled = yes | no`, enables or disables sending data to a backend
-- `type = graphite | opentsdb | json`, selects the backend type
+- `type = graphite | opentsdb | json | kinesis`, selects the backend type
- `destination = host1 host2 host3 ...`, accepts **a space separated list** of hostnames,
IPs (IPv4 and IPv6) and ports to connect to.
@@ -105,7 +109,7 @@ of `netdata.conf` from your netdata):
```
Example IPv6 and IPv4 together:
-
+
```
destination = [ffff:...:0001]:2003 10.11.12.1:2003
```
@@ -118,6 +122,8 @@ of `netdata.conf` from your netdata):
time-series database when it becomes available again. It can also be used to monitor / trace / debug
the metrics netdata generates.
+ For kinesis backend `destination` should be set to an AWS region (for example, `us-east-1`).
+
- `data source = as collected`, or `data source = average`, or `data source = sum`, selects the kind of
data that will be sent to the backend.
@@ -170,7 +176,7 @@ netdata provides 5 charts:
1. **Buffered metrics**, the number of metrics netdata added to the buffer for dispatching them to the
backend server.
-
+
2. **Buffered data size**, the amount of data (in KB) netdata added the buffer.
3. ~~**Backend latency**, the time the backend server needed to process the data netdata sent.
@@ -178,7 +184,7 @@ netdata provides 5 charts:
(this chart has been removed, because it only measures the time netdata needs to give the data
to the O/S - since the backend servers do not ack the reception, netdata does not have any means
to measure this properly).
-
+
4. **Backend operations**, the number of operations performed by netdata.
5. **Backend thread CPU usage**, the CPU resources consumed by the netdata thread, that is responsible
diff --git a/backends/aws_kinesis/Makefile.am b/backends/aws_kinesis/Makefile.am
new file mode 100644
index 0000000000..7317b3821b
--- /dev/null
+++ b/backends/aws_kinesis/Makefile.am
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
+dist_libconfig_DATA = \
+ aws_kinesis.conf \
+ $(NULL) \ No newline at end of file
diff --git a/backends/aws_kinesis/README.md b/backends/aws_kinesis/README.md
new file mode 100644
index 0000000000..a9cc77d6ec
--- /dev/null
+++ b/backends/aws_kinesis/README.md
@@ -0,0 +1,34 @@
+# Using netdata with AWS Kinesis Data Streams
+
+## Prerequisites
+
+To use AWS Kinesis as a backend AWS SDK for C++ should be [installed](https://docs.aws.amazon.com/en_us/sdk-for-cpp/v1/developer-guide/setup.html) first. `libcrypto`, `libssl`, and `libcurl` are also required to compile netdata with Kinesis support enabled. Next, netdata should be re-installed from the source. The installer will detect that the required libraries are now available.
+
+If AWS SDK for C++ is being installed from sources, it is useful to set `-DBUILD_ONLY="kinesis"`. Otherwise, the building process could take a very long time.
+
+## Configuration
+
+To enable data sending to the kinesis backend set the following options in `netdata.conf`:
+```
+[backend]
+ enabled = yes
+ type = kinesis
+ destination = us-east-1
+```
+set the `destination` option to an AWS region.
+
+In the netdata configuration directory run `./edit-config aws_kinesis.conf` and set AWS credentials and stream name:
+```
+# AWS credentials
+aws_access_key_id = your_access_key_id
+aws_secret_access_key = your_secret_access_key
+
+# destination stream
+stream name = your_stream_name
+```
+Alternatively, AWS credentials can be set for the *netdata* user using AWS SDK for C++ [standard methods](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html).
+
+A partition key for every record is computed automatically by the netdata with the purpose to distribute records across available shards evenly.
+
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fbackends%2Faws_kinesis%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/backends/aws_kinesis/aws_kinesis.c b/backends/aws_kinesis/aws_kinesis.c
new file mode 100644
index 0000000000..d8b79364cc
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.c
@@ -0,0 +1,101 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define BACKENDS_INTERNALS
+#include "aws_kinesis.h"
+
+#define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2)
+
+// ----------------------------------------------------------------------------
+// kinesis backend
+
+// read the aws_kinesis.conf file
+int read_kinesis_conf(const char *path, char **access_key_id_p, char **secret_access_key_p, char **stream_name_p)
+{
+ char *access_key_id = *access_key_id_p;
+ char *secret_access_key = *secret_access_key_p;
+ char *stream_name = *stream_name_p;
+
+ if(unlikely(access_key_id)) freez(access_key_id);
+ if(unlikely(secret_access_key)) freez(secret_access_key);
+ if(unlikely(stream_name)) freez(stream_name);
+ access_key_id = NULL;
+ secret_access_key = NULL;
+ stream_name = NULL;
+
+ int line = 0;
+
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/aws_kinesis.conf", path);
+
+ char buffer[CONFIG_FILE_LINE_MAX + 1], *s;
+
+ debug(D_BACKEND, "BACKEND: opening config file '%s'", filename);
+
+ FILE *fp = fopen(filename, "r");
+ if(!fp) {
+ return 1;
+ }
+
+ while(fgets(buffer, CONFIG_FILE_LINE_MAX, fp) != NULL) {
+ buffer[CONFIG_FILE_LINE_MAX] = '\0';
+ line++;
+
+ s = trim(buffer);
+ if(!s || *s == '#') {
+ debug(D_BACKEND, "BACKEND: ignoring line %d of file '%s', it is empty.", line, filename);
+ continue;
+ }
+
+ char *name = s;
+ char *value = strchr(s, '=');
+ if(unlikely(!value)) {
+ error("BACKEND: ignoring line %d ('%s') of file '%s', there is no = in it.", line, s, filename);
+ continue;
+ }
+ *value = '\0';
+ value++;
+
+ name = trim(name);
+ value = trim(value);
+
+ if(unlikely(!name || *name == '#')) {
+ error("BACKEND: ignoring line %d of file '%s', name is empty.", line, filename);
+ continue;
+ }
+
+ if(!value) value = "";
+
+ // strip quotes
+ if(*value == '"' || *value == '\'') {
+ value++;
+
+ s = value;
+ while(*s) s++;
+ if(s != value) s--;
+
+ if(*s == '"' || *s == '\'') *s = '\0';
+ }
+ if(name[0] == 'a' && name[4] == 'a' && !strcmp(name, "aws_access_key_id")) {
+ access_key_id = strdupz(value);
+ }
+ else if(name[0] == 'a' && name[4] == 's' && !strcmp(name, "aws_secret_access_key")) {
+ secret_access_key = strdupz(value);
+ }
+ else if(name[0] == 's' && !strcmp(name, "stream name")) {
+ stream_name = strdupz(value);
+ }
+ }
+
+ fclose(fp);
+
+ if(unlikely(!stream_name || !*stream_name)) {
+ error("BACKEND: stream name is a mandatory Kinesis parameter but it is not configured");
+ return 1;
+ }
+
+ *access_key_id_p = access_key_id;
+ *secret_access_key_p = secret_access_key;
+ *stream_name_p = stream_name;
+
+ return 0;
+}
diff --git a/backends/aws_kinesis/aws_kinesis.conf b/backends/aws_kinesis/aws_kinesis.conf
new file mode 100644
index 0000000000..cc54b5fa22
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.conf
@@ -0,0 +1,10 @@
+# AWS Kinesis Data Streams backend configuration
+#
+# All options in this file are mandatory
+
+# AWS credentials
+aws_access_key_id =
+aws_secret_access_key =
+
+# destination stream
+stream name = \ No newline at end of file
diff --git a/backends/aws_kinesis/aws_kinesis.h b/backends/aws_kinesis/aws_kinesis.h
new file mode 100644
index 0000000000..50a4631c55
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_BACKEND_KINESIS_H
+#define NETDATA_BACKEND_KINESIS_H
+
+#include "backends/backends.h"
+#include "aws_kinesis_put_record.h"
+
+#define KINESIS_PARTITION_KEY_MAX 256
+#define KINESIS_RECORD_MAX 1024 * 1024
+
+extern int read_kinesis_conf(const char *path, char **auth_key_id_p, char **secure_key_p, char **stream_name_p);
+
+#endif //NETDATA_BACKEND_KINESIS_H
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.cc b/backends/aws_kinesis/aws_kinesis_put_record.cc
new file mode 100644
index 0000000000..0c8ece68bf
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis_put_record.cc
@@ -0,0 +1,87 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <aws/core/Aws.h>
+#include <aws/core/client/ClientConfiguration.h>
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/utils/Outcome.h>
+#include <aws/kinesis/KinesisClient.h>
+#include <aws/kinesis/model/PutRecordRequest.h>
+#include "aws_kinesis_put_record.h"
+
+using namespace Aws;
+
+SDKOptions options;
+
+Kinesis::KinesisClient *client;
+
+struct request_outcome {
+ Kinesis::Model::PutRecordOutcomeCallable future_outcome;
+ size_t data_len;
+};
+
+Vector<request_outcome> request_outcomes;
+
+void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout) {
+ InitAPI(options);
+
+ Client::ClientConfiguration config;
+
+ config.region = region;
+ config.requestTimeoutMs = timeout;
+ config.connectTimeoutMs = timeout;
+
+ if(access_key_id && *access_key_id && secret_key && *secret_key) {
+ client = New<Kinesis::KinesisClient>("client", Auth::AWSCredentials(access_key_id, secret_key), config);
+ } else {
+ client = New<Kinesis::KinesisClient>("client", config);
+ }
+}
+
+void kinesis_shutdown() {
+ Delete(client);
+
+ ShutdownAPI(options);
+}
+
+int kinesis_put_record(const char *stream_name, const char *partition_key,
+ const char *data, size_t data_len) {
+ Kinesis::Model::PutRecordRequest request;
+
+ request.SetStreamName(stream_name);
+ request.SetPartitionKey(partition_key);
+ request.SetData(Utils::ByteBuffer((unsigned char*) data, data_len));
+
+ request_outcomes.push_back({client->PutRecordCallable(request), data_len});
+
+ return 0;
+}
+
+int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes) {
+ Kinesis::Model::PutRecordOutcome outcome;
+ *sent_bytes = 0;
+ *lost_bytes = 0;
+
+ for(auto request_outcome = request_outcomes.begin(); request_outcome != request_outcomes.end(); ) {
+ std::future_status status = request_outcome->future_outcome.wait_for(std::chrono::microseconds(100));
+
+ if(status == std::future_status::ready || status == std::future_status::deferred) {
+ outcome = request_outcome->future_outcome.get();
+ *sent_bytes += request_outcome->data_len;
+
+ if(!outcome.IsSuccess()) {
+ *lost_bytes += request_outcome->data_len;
+ outcome.GetError().GetMessage().copy(error_message, ERROR_LINE_MAX);
+ }
+
+ request_outcomes.erase(request_outcome);
+ } else {
+ ++request_outcome;
+ }
+ }
+
+ if(*lost_bytes) {
+ return 1;
+ }
+
+ return 0;
+} \ No newline at end of file
diff --git a/backends/aws_kinesis/aws_kinesis_put_record.h b/backends/aws_kinesis/aws_kinesis_put_record.h
new file mode 100644
index 0000000000..f48e420f34
--- /dev/null
+++ b/backends/aws_kinesis/aws_kinesis_put_record.h
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_BACKEND_KINESIS_PUT_RECORD_H
+#define NETDATA_BACKEND_KINESIS_PUT_RECORD_H
+
+#define ERROR_LINE_MAX 1023
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void kinesis_init(const char *region, const char *access_key_id, const char *secret_key, const long timeout);
+
+void kinesis_shutdown();
+
+int kinesis_put_record(const char *stream_name, const char *partition_key,
+ const char *data, size_t data_len);
+
+int kinesis_get_result(char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //NETDATA_BACKEND_KINESIS_PUT_RECORD_H
diff --git a/backends/backends.c b/backends/backends.c
index da818c50bf..deced30c46 100644
--- a/backends/backends.c
+++ b/backends/backends.c
@@ -238,6 +238,11 @@ void *backends_main(void *ptr) {
int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS) = NULL;
int (*backend_response_checker)(BUFFER *) = NULL;
+#if HAVE_KINESIS
+ int do_kinesis = 0;
+ char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
+#endif
+
// ------------------------------------------------------------------------
// collect configuration options
@@ -263,7 +268,6 @@ void *backends_main(void *ptr) {
charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);
-
// ------------------------------------------------------------------------
// validate configuration options
// and prepare for sending data to our backend
@@ -316,6 +320,26 @@ void *backends_main(void *ptr) {
backend_request_formatter = format_dimension_stored_json_plaintext;
}
+#if HAVE_KINESIS
+ else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
+
+ do_kinesis = 1;
+
+ if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
+ error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir);
+ goto cleanup;
+ }
+
+ kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
+
+ backend_response_checker = process_json_response;
+ if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
+ backend_request_formatter = format_dimension_collected_json_plaintext;
+ else
+ backend_request_formatter = format_dimension_stored_json_plaintext;
+
+ }
+#endif /* HAVE_KINESIS */
else {
error("BACKEND: Unknown backend type '%s'", type);
goto cleanup;
@@ -481,6 +505,7 @@ void *backends_main(void *ptr) {
chart_sent_bytes =
chart_sent_metrics =
chart_lost_metrics =
+ chart_receptions =
chart_transmission_successes =
chart_transmission_failures =
chart_data_lost_events =
@@ -497,104 +522,177 @@ void *backends_main(void *ptr) {
// to add incrementally data to buffer
after = before;
- // ------------------------------------------------------------------------
- // if we are connected, receive a response, without blocking
+#if HAVE_KINESIS
+ if(do_kinesis) {
+ unsigned long long partition_key_seq = 0;
- if(likely(sock != -1)) {
- errno = 0;
+ size_t buffer_len = buffer_strlen(b);
+ size_t sent = 0;
- // loop through to collect all data
- while(sock != -1 && errno != EWOULDBLOCK) {
- buffer_need_bytes(response, 4096);
+ while(sent < buffer_len) {
+ char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
+ snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
+ size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
- ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
- if(likely(r > 0)) {
- // we received some data
- response->len += r;
- chart_received_bytes += r;
- chart_receptions++;
+ const char *first_char = buffer_tostring(b) + sent;
+
+ size_t record_len = 0;
+
+ // split buffer into chunks of maximum allowed size
+ if(buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
+ record_len = buffer_len - sent;
}
- else if(r == 0) {
- error("BACKEND: '%s' closed the socket", destination);
- close(sock);
- sock = -1;
+ else {
+ record_len = KINESIS_RECORD_MAX - partition_key_len;
+ while(*(first_char + record_len) != '\n' && record_len) record_len--;
+ }
+
+ char error_message[ERROR_LINE_MAX + 1] = "";
+
+ debug(D_BACKEND, "BACKEND: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
+ buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name,
+ partition_key, buffer_len, record_len);
+
+ kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len);
+
+ sent += record_len;
+ chart_transmission_successes++;
+
+ size_t sent_bytes = 0, lost_bytes = 0;
+
+ if(unlikely(kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) {
+ // oops! we couldn't send (all or some of the) data
+ error("BACKEND: %s", error_message);
+ error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
+ destination, sent_bytes, sent_bytes - lost_bytes);
+
+ chart_transmission_failures++;
+ chart_data_lost_events++;
+ chart_lost_bytes += lost_bytes;
+
+ // estimate the number of lost metrics
+ chart_lost_metrics += (collected_number)(chart_buffered_metrics
+ * (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
+
+ break;
}
else {
- // failed to receive data
- if(errno != EAGAIN && errno != EWOULDBLOCK) {
- error("BACKEND: cannot receive data from backend '%s'.", destination);
- }
+ chart_receptions++;
}
+
+ if(unlikely(netdata_exit)) break;
}
- // if we received data, process them
- if(buffer_strlen(response))
- backend_response_checker(response);
+ chart_sent_bytes += sent;
+ if(likely(sent == buffer_len))
+ chart_sent_metrics = chart_buffered_metrics;
+
+ buffer_flush(b);
}
+ else {
+#else
+ {
+#endif /* HAVE_KINESIS */
+
+ // ------------------------------------------------------------------------
+ // if we are connected, receive a response, without blocking
+
+ if(likely(sock != -1)) {
+ errno = 0;
+
+ // loop through to collect all data
+ while(sock != -1 && errno != EWOULDBLOCK) {
+ buffer_need_bytes(response, 4096);
+
+ ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ if(likely(r > 0)) {
+ // we received some data
+ response->len += r;
+ chart_received_bytes += r;
+ chart_receptions++;
+ }
+ else if(r == 0) {
+ error("BACKEND: '%s' closed the socket", destination);
+ close(sock);
+ sock = -1;
+ }
+ else {
+ // failed to receive data
+ if(errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("BACKEND: cannot receive data from backend '%s'.", destination);
+ }
+ }
+ }
- // ------------------------------------------------------------------------
- // if we are not connected, connect to a backend server
+ // if we received data, process them
+ if(buffer_strlen(response))
+ backend_response_checker(response);
+ }
- if(unlikely(sock == -1)) {
- // usec_t start_ut = now_monotonic_usec();
- size_t reconnects = 0;
+ // ------------------------------------------------------------------------
+ // if we are not connected, connect to a backend server
- sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
+ if(unlikely(sock == -1)) {
+ // usec_t start_ut = now_monotonic_usec();
+ size_t reconnects = 0;
- chart_backend_reconnects += reconnects;
- // chart_backend_latency += now_monotonic_usec() - start_ut;
- }
+ sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
- if(unlikely(netdata_exit)) break;
+ chart_backend_reconnects += reconnects;
+ // chart_backend_latency += now_monotonic_usec() - start_ut;
+ }
- // ------------------------------------------------------------------------
- // if we are connected, send our buffer to the backend server
-
- if(likely(sock != -1)) {
- size_t len = buffer_strlen(b);
- // usec_t start_ut = now_monotonic_usec();
- int flags = 0;
-#ifdef MSG_NOSIGNAL
- flags += MSG_NOSIGNAL;
-#endif
+ if(unlikely(netdata_exit)) break;
- ssize_t written = send(sock, buffer_tostring(b), len, flags);
- // chart_backend_latency += now_monotonic_usec() - start_ut;
- if(written != -1 && (size_t)written == len) {
- // we sent the data successfully
- chart_transmission_successes++;
- chart_sent_bytes += written;
- chart_sent_metrics = chart_buffered_metrics;
+ // ------------------------------------------------------------------------
+ // if we are connected, send our buffer to the backend server
- // reset the failures count
- failures = 0;
+ if(likely(sock != -1)) {
+ size_t len = buffer_strlen(b);
+ // usec_t start_ut = now_monotonic_usec();
+ int flags = 0;
+ #ifdef MSG_NOSIGNAL
+ flags += MSG_NOSIGNAL;
+ #endif
+
+ ssize_t written = send(sock, buffer_tostring(b), len, flags);
+ // chart_backend_latency += now_monotonic_usec() - start_ut;
+ if(written != -1 && (size_t)written == len) {
+ // we sent the data successfully
+ chart_transmission_successes++;
+ chart_sent_bytes += written;
+ chart_sent_metrics = chart_buffered_metrics;
- // empty the buffer
- buffer_flush(b);
+ // reset the failures count
+ failures = 0;
+
+ // empty the buffer
+ buffer_flush(b);
+ }
+ else {
+ // oops! we couldn't send (all or some of the) data
+ error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
+ chart_transmission_failures++;
+
+ if(written != -1)
+ chart_sent_bytes += written;
+
+ // increment the counter we check for data loss
+ failures++;
+
+ // close the socket - we will re-open it next time
+ close(sock);
+ sock = -1;
+ }
}
else {
- // oops! we couldn't send (all or some of the) data
- error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
+ error("BACKEND: failed to update database backend '%s'", destination);
chart_transmission_failures++;
- if(written != -1)
- chart_sent_bytes += written;
-
// increment the counter we check for data loss
failures++;
-
- // close the socket - we will re-open it next time
- close(sock);
- sock = -1;
}
}
- else {
- error("BACKEND: failed to update database backend '%s'", destination);
- chart_transmission_failures++;
-
- // increment the counter we check for data loss
- failures++;
- }
if(failures > buffer_on_failures) {
// too bad! we are going to lose data
@@ -651,6 +749,15 @@ void *backends_main(void *ptr) {
}
cleanup:
+#if HAVE_KINESIS
+ if(do_kinesis) {
+ kinesis_shutdown();
+ freez(kinesis_auth_key_id);
+ freez(kinesis_secure_key);
+ freez(kinesis_stream_name);
+ }
+#endif
+
if(sock != -1)
close(sock);
diff --git a/backends/backends.h b/backends/backends.h
index 468e4fed85..1186654960 100644
--- a/backends/backends.h
+++ b/backends/backends.h
@@ -49,4 +49,8 @@ extern int discard_response(BUFFER *b, const char *backend);
#include "backends/json/json.h"
#include "backends/opentsdb/opentsdb.h"
+#if HAVE_KINESIS
+#include "backends/aws_kinesis/aws_kinesis.h"
+#endif
+
#endif /* NETDATA_BACKENDS_H */