summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-10-05 14:13:46 +0300
committerGitHub <noreply@github.com>2022-10-05 14:13:46 +0300
commit8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch)
treebde41c66573ccaf8876c280e00742cc6096b587c /streaming/sender.c
parent6850878e697d66dc90b9af1e750b22238c63c292 (diff)
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes * netdata popen() now opens both child process stdin and stdout and returns FILE * for both * pass both input and output to parser structures * updated rrdset to call custom functions * RRDSET FUNCTION leading calls for both sync and async operation * put RRDSET functions to a separate file * added format and timeout at function definition * support for synchronous (internal plugins) and asynchronous (external plugins and children) functions * /api/v1/function endpoint * functions are now attached to the host and there is a dictionary view per chart * functions implemented at plugins.d * remove the defer until keyword hook from plugins.d when it is done * stream sender implementation of functions * sanitization of all functions so that certain characters are only allowed * strictier sanitization * common max size * 1st working plugins.d example * always init inflight dictionary * properly destroy dictionaries to avoid parallel insertion of items * add more debugging on disconnection reasons * add more debugging on disconnection reasons again * streaming receiver respects newlines * dont use the same fp for both streaming receive and send * dont free dbengine memory with internal checks * make sender proceed in the buffer * added timing info and garbage collection at plugins.d * added info about routing nodes * added info about routing nodes with delay * added more info about delays * added more info about delays again * signal sending thread to wake up * streaming version labeling and commented code to support capabilities * added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info * redirect top output to stdout * address coverity findings * fix resource leaks of popen * log attempts to connect to individual destinations * better messages * properly parse destinations * try to find a function from the most matching to the least matching * log added streaming destinations * rotate destinations bypassing a node in the middle that does not accept our connection * break the loops properly * use typedef to define callbacks * capabilities negotiation during streaming * functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities * restore functionality to lookup functions * better logging of capabilities * remove old versions from capabilities when a newer version is there * fix formatting * optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time * delayed health initialization for rrddim and rrdset * cleanup health initialization * fix for popen() not returning the right value * add health worker jobs for initializing rrdset and rrddim * added content type support for functions; apps.plugin permanent function to display all the processes * fixes for functions parameters parsing in apps.plugin * fix for process matching in apps.plugiin * first working function for apps.plugin * Dashboard ACL is disabled for functions; Function errors are all in JSON format * apps.plugin function processes returns json table * use json_escape_string() to escape message * fix formatting * apps.plugin exposes all its metrics to function processes * fix json formatting when filtering out some rows * reopen the internal pipe of rrdpush in case of errors * misplaced statement * do not use buffer->len * support for GLOBAL functions (functions that are not linked to a chart * added /api/v1/functions endpoint; removed format from the FUNCTIONS api; * swagger documentation about the new api end points * added plugins.d documentation about functions * never re-close a file * remove uncessesary ifdef * fixed issues identified by codacy * fix for null label value * make edit-config copy-and-paste friendly * Revert "make edit-config copy-and-paste friendly" This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e. * reworked sender handshake to fix coverity findings * timeout is zero, for both send_timeout() and recv_timeout() * properly detect that parent closed the socket * support caching of function responses; limit function response to 10MB; added protection from malformed function responses * disabled excessive logging * added units to apps.plugin function processes and normalized all values to be human readable * shorter field names * fixed issues reported * fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses * use double linked list macros for double linked list management * faster apps.plugin function printing by minimizing file operations * added memory percentage * fix compatibility issues with older compilers and FreeBSD * rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables; * fix letftover variable in ifdef * apps.plugin: do not call detach from the thread; exit immediately when input is broken * exclude AR charts from health * flush cleaner; prefer sender output * clarity * do not fill the cbuffer if not connected * fix * dont enabled host->sender if streaming is not enabled; send host label updates to parent; * functions are only available through ACLK * Prepared statement reports only in dev mode * fix AR chart detection * fix for streaming not being enabling itself * more cleanup of sender and receiver structures * moved read-only flags and configuration options to rrdhost->options * fixed merge with master * fix for incomplete rename * prevent service thread from working on charts that are being collected Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c1018
1 files changed, 651 insertions, 367 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 11d8aefc8d..6759e9c983 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "parser/parser.h"
#define WORKER_SENDER_JOB_CONNECT 0
#define WORKER_SENDER_JOB_PIPE_READ 1
@@ -30,15 +31,31 @@ extern int netdata_use_ssl_on_stream;
extern char *netdata_ssl_ca_path;
extern char *netdata_ssl_ca_file;
+static __thread BUFFER *sender_thread_buffer = NULL;
+static __thread bool sender_thread_buffer_used = false;
+
+void sender_thread_buffer_free(void) {
+ if(sender_thread_buffer) {
+ buffer_free(sender_thread_buffer);
+ sender_thread_buffer = NULL;
+ }
+}
+
// Collector thread starting a transmission
-void sender_start(struct sender_state *s) {
- netdata_mutex_lock(&s->mutex);
- buffer_flush(s->build);
+BUFFER *sender_start(struct sender_state *s __maybe_unused) {
+ if(!sender_thread_buffer)
+ sender_thread_buffer = buffer_create(1024);
+
+ if(sender_thread_buffer_used)
+ fatal("STREAMING: thread buffer is used multiple times concurrently.");
+
+ sender_thread_buffer_used = true;
+ buffer_flush(sender_thread_buffer);
+ return sender_thread_buffer;
}
-void sender_cancel(struct sender_state *s) {
- buffer_flush(s->build);
- netdata_mutex_unlock(&s->mutex);
+void sender_cancel(struct sender_state *s __maybe_unused) {
+ sender_thread_buffer_used = false;
}
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
@@ -52,81 +69,101 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
static inline void deactivate_compression(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
error("STREAM_COMPRESSION: Compression returned error, disabling it.");
- default_compression_enabled = 0;
- s->rrdpush_compression = 0;
- s->version = STREAM_VERSION_CLABELS;
+ s->flags &= ~SENDER_FLAG_COMPRESSION;
error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_thread_close_socket(s->host);
}
#endif
// Collector thread finishing a transmission
-void sender_commit(struct sender_state *s) {
- char *src = (char *)buffer_tostring(s->host->sender->build);
- size_t src_len = s->host->sender->build->len;
+void sender_commit(struct sender_state *s, BUFFER *wb) {
+
+ if(unlikely(wb != sender_thread_buffer))
+ fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
+
+ if(unlikely(!sender_thread_buffer_used))
+ fatal("STREAMING: sender is committing a buffer twice.");
+
+ sender_thread_buffer_used = false;
+
+ char *src = (char *)buffer_tostring(wb);
+ size_t src_len = buffer_strlen(wb);
+
+ if(unlikely(!src || !src_len))
+ return;
+
+ netdata_mutex_lock(&s->mutex);
+
#ifdef ENABLE_COMPRESSION
- if (src && src_len) {
- if (s->compressor && s->rrdpush_compression) {
- while(src_len) {
- size_t size_to_compress = src_len;
-
- if(size_to_compress > LZ4_MAX_MSG_SIZE) {
- // we need to find the last newline
- // so that the decompressor will have a whole line to work with
-
- const char *t = &src[LZ4_MAX_MSG_SIZE - 1];
- while(t-- > src)
- if(*t == '\n')
- break;
-
- if(t == src)
- size_to_compress = LZ4_MAX_MSG_SIZE;
- else
- size_to_compress = t - src + 1;
- }
+ if (s->flags & SENDER_FLAG_COMPRESSION && s->compressor) {
+ while(src_len) {
+ size_t size_to_compress = src_len;
+
+ if(size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
+ // we need to find the last newline
+ // so that the decompressor will have a whole line to work with
+
+ const char *t = &src[COMPRESSION_MAX_MSG_SIZE - 1];
+ while(t-- > src)
+ if(*t == '\n')
+ break;
+
+ if(t == src)
+ size_to_compress = COMPRESSION_MAX_MSG_SIZE;
+ else
+ size_to_compress = t - src + 1;
+ }
+
+ char *dst;
+ size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ if (!dst_len) {
+ error("STREAM %s [send to %s]: compression failed. Resetting compressor and re-trying",
+ rrdhost_hostname(s->host), s->connected_to);
+
+ s->compressor->reset(s->compressor);
+ dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ if(!dst_len) {
+ error("STREAM %s [send to %s]: compression failed again. Deactivating compression",
+ rrdhost_hostname(s->host), s->connected_to);
- char *dst;
- size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
- if (!dst_len) {
deactivate_compression(s);
- buffer_flush(s->build);
netdata_mutex_unlock(&s->mutex);
return;
}
+ }
- if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len))
- s->overflow = 1;
+ if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
- src = src + size_to_compress;
- src_len -= size_to_compress;
- }
+ src = src + size_to_compress;
+ src_len -= size_to_compress;
}
- else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
- s->overflow = 1;
}
+ else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
#else
if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
- s->overflow = 1;
+ s->flags |= SENDER_FLAG_OVERFLOW;
#endif
- buffer_flush(s->build);
netdata_mutex_unlock(&s->mutex);
+ rrdpush_signal_sender_to_wake_up(s);
}
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
- __atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- if(host->rrdpush_sender_socket != -1) {
- close(host->rrdpush_sender_socket);
- host->rrdpush_sender_socket = -1;
+ if(host->sender->rrdpush_sender_socket != -1) {
+ close(host->sender->rrdpush_sender_socket);
+ host->sender->rrdpush_sender_socket = -1;
}
}
-static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
+static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
buffer_sprintf(
- host->sender->build
+ wb
, "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
, rrdvar_name(rva)
, rrdvar2number(rva)
@@ -136,36 +173,41 @@ static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *ho
}
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
- if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && __atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)) {
- sender_start(host->sender);
- rrdpush_sender_add_host_variable_to_buffer_nolock(host, rva);
- sender_commit(host->sender);
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_sender_add_host_variable_to_buffer(wb, rva);
+ sender_commit(host->sender, wb);
}
}
+struct custom_host_variables_callback {
+ BUFFER *wb;
+};
-static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *host_ptr) {
+static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
- RRDHOST *host = (RRDHOST *)host_ptr;
+ struct custom_host_variables_callback *tmp = struct_ptr;
+ BUFFER *wb = tmp->wb;
if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
- rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
-
- // return 1, so that the traversal will return the number of variables sent
+ rrdpush_sender_add_host_variable_to_buffer(wb, rv);
return 1;
}
-
- // returning a negative number will break the traversal
return 0;
}
static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
- sender_start(host->sender);
- int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, host);
- (void)ret;
- sender_commit(host->sender);
-
- debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ struct custom_host_variables_callback tmp = {
+ .wb = wb
+ };
+ int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
+ (void)ret;
+ sender_commit(host->sender, wb);
+
+ debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ }
}
// resets all the chart, so that their definitions
@@ -187,23 +229,13 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
netdata_mutex_lock(&host->sender->mutex);
-
- size_t len = cbuffer_next_unsafe(host->sender->buffer, NULL);
- if (len)
- error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", rrdhost_hostname(host), len);
-
- cbuffer_remove_unsafe(host->sender->buffer, len);
+ cbuffer_flush(host->sender->buffer);
netdata_mutex_unlock(&host->sender->mutex);
rrdpush_sender_thread_reset_all_charts(host);
rrdpush_sender_thread_send_custom_host_variables(host);
}
-static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
- rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
-}
-
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
{
se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):"";
@@ -231,52 +263,123 @@ void rrdpush_clean_encoded(stream_encoded_t *se)
freez(se->kernel_version);
}
-static inline long int parse_stream_version_for_errors(char *http)
-{
- if (!memcmp(http, START_STREAMING_ERROR_SAME_LOCALHOST, sizeof(START_STREAMING_ERROR_SAME_LOCALHOST)))
- return -2;
- else if (!memcmp(http, START_STREAMING_ERROR_ALREADY_STREAMING, sizeof(START_STREAMING_ERROR_ALREADY_STREAMING)))
- return -3;
- else if (!memcmp(http, START_STREAMING_ERROR_NOT_PERMITTED, sizeof(START_STREAMING_ERROR_NOT_PERMITTED)))
- return -4;
- else
- return -1;
-}
+struct {
+ const char *response;
+ size_t length;
+ int32_t version;
+ bool dynamic;
+ const char *error;
+ int worker_job_id;
+ time_t postpone_reconnect_seconds;
+} stream_responses[] = {
+ {
+ .response = START_STREAMING_PROMPT_VN,
+ .length = sizeof(START_STREAMING_PROMPT_VN) - 1,
+ .version = STREAM_HANDSHAKE_OK_V3, // and above
+ .dynamic = true, // dynamic = we will parse the version / capabilities
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_PROMPT_V2,
+ .length = sizeof(START_STREAMING_PROMPT_V2) - 1,
+ .version = STREAM_HANDSHAKE_OK_V2,
+ .dynamic = false,
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_PROMPT_V1,
+ .length = sizeof(START_STREAMING_PROMPT_V1) - 1,
+ .version = STREAM_HANDSHAKE_OK_V1,
+ .dynamic = false,
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_ERROR_SAME_LOCALHOST,
+ .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
+ .dynamic = false,
+ .error = "remote server rejected this stream, the host we are trying to stream is its localhost",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
+ },
+ {
+ .response = START_STREAMING_ERROR_ALREADY_STREAMING,
+ .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
+ .dynamic = false,
+ .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ },
+ {
+ .response = START_STREAMING_ERROR_NOT_PERMITTED,
+ .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_DENIED,
+ .dynamic = false,
+ .error = "remote server denied access, probably we don't have the right API key?",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ },
+
+ // terminator
+ {
+ .response = NULL,
+ .length = 0,
+ .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
+ .dynamic = false,
+ .error = "remote node response is not understood, is it Netdata?",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ }
+};
-static inline long int parse_stream_version(RRDHOST *host, char *http)
-{
- long int stream_version = -1;
- int answer = -1;
- char *stream_version_start = strchr(http, '=');
- if (stream_version_start) {
- stream_version_start++;
- stream_version = strtol(stream_version_start, NULL, 10);
- answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(stream_version_start - http));
- if (!answer) {
- rrdpush_set_flags_to_newest_stream(host);
+static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) {
+ int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE;
+
+ int i;
+ for(i = 0; stream_responses[i].response ; i++) {
+ if(stream_responses[i].dynamic &&
+ http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) &&
+ strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) {
+
+ version = str2i(&http[stream_responses[i].length]);
+ break;
}
- } else {
- answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
- if (!answer) {
- stream_version = 1;
- rrdpush_set_flags_to_newest_stream(host);
- } else {
- answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
- if (!answer) {
- stream_version = 0;
- rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- }
- else {
- stream_version = parse_stream_version_for_errors(http);
- }
+ else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) {
+ version = stream_responses[i].version;
+
+ break;
}
}
- return stream_version;
+ const char *error = stream_responses[i].error;
+ int worker_job_id = stream_responses[i].worker_job_id;
+ time_t delay = stream_responses[i].postpone_reconnect_seconds;
+
+ if(version >= STREAM_HANDSHAKE_OK_V1) {
+ host->destination->last_error = NULL;
+ host->destination->last_handshake = version;
+ host->destination->postpone_reconnection_until = 0;
+ s->capabilities = convert_stream_version_to_capabilities(version);
+ return true;
+ }
+
+ error("STREAM %s [send to %s]: %s.", rrdhost_hostname(host), s->connected_to, error);
+
+ worker_is_busy(worker_job_id);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = error;
+ host->destination->last_handshake = version;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
+ return false;
}
-static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout,
- struct sender_state *s) {
+static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
struct timeval tv = {
.tv_sec = timeout,
@@ -286,11 +389,8 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
// make sure the socket is closed
rrdpush_sender_thread_close_socket(host);
- debug(D_STREAM, "STREAM: Attempting to connect...");
- info("STREAM %s [send to %s]: connecting...", rrdhost_hostname(host), host->rrdpush_send_destination);
-
- host->rrdpush_sender_socket = connect_to_one_of_destinations(
- host->destinations
+ s->rrdpush_sender_socket = connect_to_one_of_destinations(
+ host
, default_port
, &tv
, &s->reconnects_counter
@@ -299,48 +399,50 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, &host->destination
);
- if(unlikely(host->rrdpush_sender_socket == -1)) {
- error("STREAM %s [send to %s]: failed to connect", rrdhost_hostname(host), host->rrdpush_send_destination);
- return 0;
+ if(unlikely(s->rrdpush_sender_socket == -1)) {
+ error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
+ return false;
}
info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
#ifdef ENABLE_HTTPS
- if( netdata_client_ctx ){
- host->ssl.flags = NETDATA_SSL_START;
- if (!host->ssl.conn){
- host->ssl.conn = SSL_new(netdata_client_ctx);
- if(!host->ssl.conn){
+ if(netdata_ssl_client_ctx){
+ host->sender->ssl.flags = NETDATA_SSL_START;
+ if (!host->sender->ssl.conn){
+ host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx);
+ if(!host->sender->ssl.conn){
error("Failed to allocate SSL structure.");
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
}
}
else{
- SSL_clear(host->ssl.conn);
+ SSL_clear(host->sender->ssl.conn);
}
- if (host->ssl.conn)
+ if (host->sender->ssl.conn)
{
- if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) {
- error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) {
+ error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket);
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
} else{
- host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
+ host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
}
}
}
else {
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
}
#endif
+ // reset our capabilities to default
+ s->capabilities = STREAM_OUR_CAPABILITIES;
+
#ifdef ENABLE_COMPRESSION
-// Negotiate stream VERSION_CLABELS if stream compression is not supported
-s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
-if(!s->rrdpush_compression)
- s->version = STREAM_VERSION_CLABELS;
-#endif //ENABLE_COMPRESSION
+ // If we don't want compression, remove it from our capabilities
+ if(!(s->flags & SENDER_FLAG_COMPRESSION) && stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ s->capabilities &= ~STREAM_CAP_COMPRESSION;
+#endif // ENABLE_COMPRESSION
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
@@ -365,7 +467,7 @@ if(!s->rrdpush_compression)
"&ml_enabled=%d"
"&mc_version=%d"
"&tags=%s"
- "&ver=%d"
+ "&ver=%u"
"&NETDATA_INSTANCE_CLOUD_TYPE=%s"
"&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
"&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
@@ -411,7 +513,7 @@ if(!s->rrdpush_compression)
, host->system_info->ml_enabled
, host->system_info->mc_version
, rrdhost_tags(host)
- , s->version
+ , s->capabilities
, (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
, (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
, (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
@@ -447,131 +549,123 @@ if(!s->rrdpush_compression)
rrdpush_clean_encoded(&se);
#ifdef ENABLE_HTTPS
- if (!host->ssl.flags) {
+ if (!host->sender->ssl.flags) {
ERR_clear_error();
- SSL_set_connect_state(host->ssl.conn);
- int err = SSL_connect(host->ssl.conn);
+ SSL_set_connect_state(host->sender->ssl.conn);
+ int err = SSL_connect(host->sender->ssl.conn);
if (err != 1){
- err = SSL_get_error(host->ssl.conn, err);
- error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL));
+ err = SSL_get_error(host->sender->ssl.conn, err);
+ error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL));
if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
- if (host->destination->next)
- host->destination->disabled_no_proper_reply = 1;
- return 0;
- }else {
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ host->destination->last_error = "SSL error";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+ else {
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
}
}
else {
if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
- if ( security_test_certificate(host->ssl.conn)) {
+ if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
+ if ( security_test_certificate(host->sender->ssl.conn)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
error("Closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
- if (host->destination->next)
- host->destination->disabled_no_proper_reply = 1;
- return 0;
+ host->destination->last_error = "invalid SSL certificate";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
}
}
}
}
}
- if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
-#else
- if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
#endif
+
+ ssize_t bytes;
+
+ bytes = send_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ strlen(http),
+ 0,
+ timeout);
+
+ if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
rrdpush_sender_thread_close_socket(host);
- return 0;
+ error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+ host->destination->last_error = "timeout while sending request";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
+ return false;
}
info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to);
- ssize_t received;
+ bytes = recv_timeout(
#ifdef ENABLE_HTTPS
- received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
- if(received == -1) {
-#else
- received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
- if(received == -1) {
+ &host->sender->ssl,
#endif
+ s->rrdpush_sender_socket,
+ http,
+ HTTP_HEADER_SIZE,
+ 0,
+ timeout);
+
+ if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
rrdpush_sender_thread_close_socket(host);
- return 0;
+ error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+ host->destination->last_error = "timeout while expecting first response";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
+ return false;
}
- http[received] = '\0';
+ http[bytes] = '\0';
debug(D_STREAM, "Response to sender from far end: %s", http);
- int32_t version = (int32_t)parse_stream_version(host, http);
- if(version == -1) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE);
- error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", rrdhost_hostname(host), s->connected_to);
- rrdpush_sender_thread_close_socket(host);
- //catch other reject reasons and force to check other destinations
- if (host->destination->next)
- host->destination->disabled_no_proper_reply = 1;
- return 0;
- }
- else if(version == -2) {
- error("STREAM %s [send to %s]: remote server is the localhost for [%s].", rrdhost_hostname(host), s->connected_to, rrdhost_hostname(host));
- rrdpush_sender_thread_close_socket(host);
- host->destination->disabled_because_of_localhost = 1;
- return 0;
- }
- else if(version == -3) {
- error("STREAM %s [send to %s]: remote server already receives metrics for [%s].", rrdhost_hostname(host), s->connected_to, rrdhost_hostname(host));
- rrdpush_sender_thread_close_socket(host);
- host->destination->disabled_already_streaming = now_realtime_sec();
- return 0;
- }
- else if(version == -4) {
- error("STREAM %s [send to %s]: remote server denied access for [%s].", rrdhost_hostname(host), s->connected_to, rrdhost_hostname(host));
- rrdpush_sender_thread_close_socket(host);
- if (host->destination->next)
- host->destination->disabled_because_of_denied_access = 1;
- return 0;
- }
- s->version = version;
+ if(!rrdpush_sender_validate_response(host, s, http, bytes))
+ return false;
#ifdef ENABLE_COMPRESSION
- s->rrdpush_compression = (s->rrdpush_compression && (s->version >= STREAM_VERSION_COMPRESSION));
- if(s->rrdpush_compression)
- {
- // parent supports compression
+ // if the stream does not have compression capability,
+ // shut it down for us too.
+ // FIXME - this means that if there are multiple parents and one of them does not support compression
+ // we are going to shut it down for all of them eventually...
+ if(!stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ s->flags &= ~SENDER_FLAG_COMPRESSION;
+
+ if(s->flags & SENDER_FLAG_COMPRESSION) {
if(s->compressor)
s->compressor->reset(s->compressor);
}
- else {
- //parent does not support compression or has compression disabled
- debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, rrdhost_hostname(s->host));
- infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, rrdhost_hostname(s->host));
- s->version = STREAM_VERSION_CLABELS;
- }
-#endif //ENABLE_COMPRESSION
+ else
+ info("STREAM %s [send to %s]: compression is disabled on this connection.", rrdhost_hostname(host), s->connected_to);
+#endif //ENABLE_COMPRESSION
- info("STREAM %s [send to %s]: established communication with a parent using protocol version %d"
- , rrdhost_hostname(host)
- , s->connected_to
- , s->version);
+ log_sender_capabilities(s);
- if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
+ if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
- if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
+ if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
- debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
+ debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
- ret