summaryrefslogtreecommitdiffstats
path: root/backends/backends.c
diff options
context:
space:
mode:
Diffstat (limited to 'backends/backends.c')
-rw-r--r--backends/backends.c144
1 files changed, 123 insertions, 21 deletions
diff --git a/backends/backends.c b/backends/backends.c
index 0e79189162..7108a2a87d 100644
--- a/backends/backends.c
+++ b/backends/backends.c
@@ -260,6 +260,12 @@ void *backends_main(void *ptr) {
char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
#endif
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ int do_prometheus_remote_write = 0;
+ BUFFER *http_request_header = buffer_create(1);
+#endif
+
+
// ------------------------------------------------------------------------
// collect configuration options
@@ -285,6 +291,10 @@ void *backends_main(void *ptr) {
charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ const char *remote_write_path = config_get(CONFIG_SECTION_BACKEND, "remote write URL path", "/receive");
+#endif
+
// ------------------------------------------------------------------------
// validate configuration options
// and prepare for sending data to our backend
@@ -337,9 +347,8 @@ void *backends_main(void *ptr) {
backend_request_formatter = format_dimension_stored_json_plaintext;
}
-#if HAVE_KINESIS
else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
-
+#if HAVE_KINESIS
do_kinesis = 1;
if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
@@ -354,15 +363,31 @@ void *backends_main(void *ptr) {
backend_request_formatter = format_dimension_collected_json_plaintext;
else
backend_request_formatter = format_dimension_stored_json_plaintext;
+#else
+ error("AWS Kinesis support isn't compiled");
+#endif /* HAVE_KINESIS */
+ }
+ else if (!strcmp(type, "prometheus_remote_write")) {
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ do_prometheus_remote_write = 1;
+ backend_response_checker = process_prometheus_remote_write_response;
+
+ init_write_request();
+#else
+ error("Prometheus remote write support isn't compiled");
+#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
}
-#endif /* HAVE_KINESIS */
else {
error("BACKEND: Unknown backend type '%s'", type);
goto cleanup;
}
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if((backend_request_formatter == NULL && !do_prometheus_remote_write) || backend_response_checker == NULL) {
+#else
if(backend_request_formatter == NULL || backend_response_checker == NULL) {
+#endif
error("BACKEND: backend is misconfigured - disabling it.");
goto cleanup;
}
@@ -451,6 +476,9 @@ void *backends_main(void *ptr) {
size_t count_charts_total = 0;
size_t count_dims_total = 0;
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ clear_write_request();
+#endif
rrd_rdlock();
RRDHOST *host;
rrdhost_foreach_read(host) {
@@ -478,26 +506,45 @@ void *backends_main(void *ptr) {
const char *__hostname = (host == localhost)?hostname:host->hostname;
- RRDSET *st;
- rrdset_foreach_read(st, host) {
- if(likely(backends_can_send_rrdset(global_backend_options, st))) {
- rrdset_rdlock(st);
-
- count_charts++;
-
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if (likely(rd->last_collected_time.tv_sec >= after)) {
- chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
- count_dims++;
- }
- else {
- debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
- count_dims_skipped++;
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(do_prometheus_remote_write) {
+ rrd_stats_remote_write_allmetrics_prometheus(
+ host
+ , __hostname
+ , global_backend_prefix
+ , global_backend_options
+ , after
+ , before
+ , &count_charts
+ , &count_dims
+ , &count_dims_skipped
+ );
+ chart_buffered_metrics += count_dims;
+ }
+ else
+#endif
+ {
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ if(likely(backends_can_send_rrdset(global_backend_options, st))) {
+ rrdset_rdlock(st);
+
+ count_charts++;
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if (likely(rd->last_collected_time.tv_sec >= after)) {
+ chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
+ count_dims++;
+ }
+ else {
+ debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
+ count_dims_skipped++;
+ }
}
- }
- rrdset_unlock(st);
+ rrdset_unlock(st);
+ }
}
}
@@ -672,6 +719,43 @@ void *backends_main(void *ptr) {
flags += MSG_NOSIGNAL;
#endif
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(do_prometheus_remote_write) {
+ size_t data_size = get_write_request_size();
+
+ if(unlikely(!data_size)) {
+ error("BACKEND: write request size is out of range");
+ continue;
+ }
+
+ buffer_flush(b);
+ buffer_need_bytes(b, data_size);
+ if(unlikely(pack_write_request(b->buffer, &data_size))) {
+ error("BACKEND: cannot pack write request");
+ continue;
+ }
+ b->len = data_size;
+ chart_buffered_bytes = (collected_number)buffer_strlen(b);
+
+ buffer_flush(http_request_header);
+ buffer_sprintf(http_request_header,
+ "POST %s HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ "Accept: */*\r\n"
+ "Content-Length: %zu\r\n"
+ "Content-Type: application/x-www-form-urlencoded\r\n\r\n",
+ remote_write_path,
+ hostname,
+ data_size
+ );
+
+ len = buffer_strlen(http_request_header);
+ send(sock, buffer_tostring(http_request_header), len, flags);
+
+ len = data_size;
+ }
+#endif
+
ssize_t written = send(sock, buffer_tostring(b), len, flags);
// chart_backend_latency += now_monotonic_usec() - start_ut;
if(written != -1 && (size_t)written == len) {
@@ -711,6 +795,16 @@ void *backends_main(void *ptr) {
}
}
+
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(failures) {
+ (void) buffer_on_failures;
+ failures = 0;
+ chart_lost_bytes = chart_buffered_bytes = get_write_request_size(); // estimated write request size
+ chart_data_lost_events++;
+ chart_lost_metrics = chart_buffered_metrics;
+ }
+#else
if(failures > buffer_on_failures) {
// too bad! we are going to lose data
chart_lost_bytes += buffer_strlen(b);
@@ -720,6 +814,7 @@ void *backends_main(void *ptr) {
chart_data_lost_events++;
chart_lost_metrics = chart_buffered_metrics;
}
+#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
if(unlikely(netdata_exit)) break;
@@ -775,6 +870,13 @@ cleanup:
}
#endif
+#if ENABLE_PROMETHEUS_REMOTE_WRITE
+ if(do_prometheus_remote_write) {
+ buffer_free(http_request_header);
+ protocol_buffers_shutdown();
+ }
+#endif
+
if(sock != -1)
close(sock);