summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-20 23:47:53 +0200
committerGitHub <noreply@github.com>2022-11-20 23:47:53 +0200
commit284f6f3aa4f36cefad2601c490510621496c2b53 (patch)
tree97a7d55627ef7477f431c53a20d0e6f1f738a419 /streaming/sender.c
parent2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff)
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c257
1 files changed, 86 insertions, 171 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index e0964f9e3a..a5219b14d7 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -21,9 +21,11 @@
#define WORKER_SENDER_JOB_BUFFER_RATIO 15
#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
#define WORKER_SENDER_JOB_BYTES_SENT 17
+#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
+#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 18
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 18
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 20
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 20
#endif
extern struct config stream_config;
@@ -31,12 +33,6 @@ extern int netdata_use_ssl_on_stream;
extern char *netdata_ssl_ca_path;
extern char *netdata_ssl_ca_file;
-struct replication_request {
- bool start_streaming;
- time_t after;
- time_t before;
-};
-
static __thread BUFFER *sender_thread_buffer = NULL;
static __thread bool sender_thread_buffer_used = false;
@@ -81,6 +77,8 @@ static inline void deactivate_compression(struct sender_state *s) {
}
#endif
+#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
+
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s, BUFFER *wb) {
@@ -100,43 +98,49 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
- if(unlikely(s->host->sender->buffer->max_size < (buffer_strlen(wb) + 1) * 2)) {
- error("STREAM %s [send to %s]: max buffer size of %zu is too small for data of size %zu. Increasing the max buffer size to twice the max data size.",
- rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1);
+ if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
+ info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
+ rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
- s->host->sender->buffer->max_size = (buffer_strlen(wb) + 1) * 2;
+ s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
#ifdef ENABLE_COMPRESSION
- if (s->flags & SENDER_FLAG_COMPRESSION && s->compressor) {
+ if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
while(src_len) {
size_t size_to_compress = src_len;
- if(size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
- // we need to find the last newline
- // so that the decompressor will have a whole line to work with
-
- const char *t = &src[COMPRESSION_MAX_MSG_SIZE - 1];
- while(t-- > src)
- if(*t == '\n')
- break;
-
- if(t == src)
+ if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) {
+ if (stream_has_capability(s, STREAM_CAP_BINARY))
size_to_compress = COMPRESSION_MAX_MSG_SIZE;
- else
- size_to_compress = t - src + 1;
+ else {
+ if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
+ // we need to find the last newline
+ // so that the decompressor will have a whole line to work with
+
+ const char *t = &src[COMPRESSION_MAX_MSG_SIZE];
+ while (--t >= src)
+ if (unlikely(*t == '\n'))
+ break;
+
+ if (t <= src) {
+ size_to_compress = COMPRESSION_MAX_MSG_SIZE;
+ } else
+ size_to_compress = t - src + 1;
+ }
+ }
}
char *dst;
size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
if (!dst_len) {
- error("STREAM %s [send to %s]: compression failed. Resetting compressor and re-trying",
+ error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
rrdhost_hostname(s->host), s->connected_to);
s->compressor->reset(s->compressor);
dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
if(!dst_len) {
- error("STREAM %s [send to %s]: compression failed again. Deactivating compression",
+ error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
rrdhost_hostname(s->host), s->connected_to);
deactivate_compression(s);
@@ -255,13 +259,17 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
}
static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
+ __atomic_store_n(&host->sender->last_flush_time_ut, now_realtime_usec(), __ATOMIC_SEQ_CST);
+
netdata_mutex_lock(&host->sender->mutex);
cbuffer_flush(host->sender->buffer);
netdata_mutex_unlock(&host->sender->mutex);
rrdpush_sender_thread_reset_all_charts(host);
rrdpush_sender_thread_send_custom_host_variables(host);
- dictionary_flush(host->sender->replication_requests);
+ replication_flush_sender(host->sender);
+
+ __atomic_store_n(&host->sender->receiving_metrics, 0, __ATOMIC_SEQ_CST);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -343,7 +351,7 @@ struct {
.dynamic = false,
.error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
- .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ .postpone_reconnect_seconds = 2 * 60, // 2 minutes
},
{
.response = START_STREAMING_ERROR_NOT_PERMITTED,
@@ -468,7 +476,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
- if(!(s->flags & SENDER_FLAG_COMPRESSION) && stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ if(!(s->flags & SENDER_FLAG_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
#endif // ENABLE_COMPRESSION
@@ -664,20 +672,12 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
#ifdef ENABLE_COMPRESSION
- // if the stream does not have compression capability,
- // shut it down for us too.
- // FIXME - this means that if there are multiple parents and one of them does not support compression
- // we are going to shut it down for all of them eventually...
- if(!stream_has_capability(s, STREAM_CAP_COMPRESSION))
- s->flags &= ~SENDER_FLAG_COMPRESSION;
-
- if(s->flags & SENDER_FLAG_COMPRESSION) {
- if(s->compressor)
+ if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
+ if(!s->compressor)
+ s->compressor = create_compressor();
+ else
s->compressor->reset(s->compressor);
}
- else
- info("STREAM %s [send to %s]: compression is disabled on this connection.", rrdhost_hostname(host), s->connected_to);
-
#endif //ENABLE_COMPRESSION
log_sender_capabilities(s);
@@ -698,8 +698,6 @@ static bool attempt_to_connect(struct sender_state *state)
state->send_attempts = 0;
if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
- state->last_sent_t = now_monotonic_sec();
-
// reset the buffer, to properly send charts and metrics
rrdpush_sender_thread_data_flush(state->host);
@@ -748,7 +746,7 @@ static ssize_t attempt_to_send(struct sender_state *s) {
#ifdef ENABLE_HTTPS
SSL *conn = s->host->sender->ssl.conn ;
if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
- ret = SSL_write(conn, chunk, outstanding);
+ ret = netdata_ssl_write(conn, chunk, outstanding);
else
ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#else
@@ -760,7 +758,6 @@ static ssize_t attempt_to_send(struct sender_state *s) {
s->sent_bytes_on_this_connection += ret;
s->sent_bytes += ret;
debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
- s->last_sent_t = now_monotonic_sec();
}
else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
@@ -783,24 +780,14 @@ static ssize_t attempt_read(struct sender_state *s) {
#ifdef ENABLE_HTTPS
if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
- ERR_clear_error();
- int desired = sizeof(s->read_buffer) - s->read_len - 1;
- ret = SSL_read(s->host->sender->ssl.conn, s->read_buffer, desired);
+ size_t desired = sizeof(s->read_buffer) - s->read_len - 1;
+ ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired);
if (ret > 0 ) {
- s->read_len += ret;
+ s->read_len += (int)ret;
return ret;
}
- int sslerrno = SSL_get_error(s->host->sender->ssl.conn, desired);
- if (sslerrno == SSL_ERROR_WANT_READ || sslerrno == SSL_ERROR_WANT_WRITE)
- return ret;
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- u_long err;
- char buf[256];
- while ((err = ERR_get_error()) != 0) {
- ERR_error_string_n(err, buf, sizeof(buf));
- error("STREAM %s [send to %s] SSL error: %s", rrdhost_hostname(s->host), s->connected_to, buf);
- }
rrdpush_sender_thread_close_socket(s->host);
return ret;
}
@@ -865,6 +852,8 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
// This is just a placeholder until the gap filling state machine is inserted
void execute_commands(struct sender_state *s) {
+ worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+
char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
*end = 0;
while( start < end && (newline = strchr(start, '\n')) ) {
@@ -881,6 +870,8 @@ void execute_commands(struct sender_state *s) {
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
@@ -909,7 +900,10 @@ void execute_commands(struct sender_state *s) {
stream_execute_function_callback(wb, code, tmp);
}
}
- } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ }
+ else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
+
const char *chart_id = get_word(words, num_words, 1);
const char *start_streaming = get_word(words, num_words, 2);
const char *after = get_word(words, num_words, 3);
@@ -924,18 +918,20 @@ void execute_commands(struct sender_state *s) {
start_streaming ? start_streaming : "(unset)",
after ? after : "(unset)",
before ? before : "(unset)");
- } else {
- struct replication_request tmp = {
- .start_streaming = !strcmp(start_streaming, "true"),
- .after = strtoll(after, NULL, 0),
- .before = strtoll(before, NULL, 0),
- };
- dictionary_set(s->replication_requests, chart_id, &tmp, sizeof(struct replication_request));
}
- } else {
+ else {
+ replication_add_request(s, chart_id,
+ strtoll(after, NULL, 0),
+ strtoll(before, NULL, 0),
+ !strcmp(start_streaming, "true")
+ );
+ }
+ }
+ else {
error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
}
+ worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
start = newline + 1;
}
if (start < end) {
@@ -1037,42 +1033,6 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
freez(data);
}
-static void replication_request_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
- ;
-}
-static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *sender_state) {
- struct sender_state *s = sender_state;
- struct replication_request *rr = old_value;
- struct replication_request *rr_new = new_value;
-
- error("STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
- (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
- (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
-
- bool updated = false;
-
- if(rr_new->after < rr->after) {
- rr->after = rr_new->after;
- updated = true;
- }
-
- if(rr_new->before > rr->before) {
- rr->before = rr_new->before;
- updated = true;
- }
-
- if(rr_new->start_streaming != rr->start_streaming) {
- rr->start_streaming = true;
- updated = true;
- }
-
- return updated;
-}
-static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
- ;
-}
-
void sender_init(RRDHOST *host)
{
if (host->sender)
@@ -1092,63 +1052,12 @@ void sender_init(RRDHOST *host)
host->sender->flags |= SENDER_FLAG_COMPRESSION;
host->sender->compressor = create_compressor();
}
+ else
+ host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
netdata_mutex_init(&host->sender->mutex);
-
- host->sender->replication_requests = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE);
- dictionary_register_insert_callback(host->sender->replication_requests, replication_request_insert_callback, host->sender);
- dictionary_register_conflict_callback(host->sender->replication_requests, replication_request_conflict_callback, host->sender);
- dictionary_register_delete_callback(host->sender->replication_requests, replication_request_delete_callback, host->sender);
-}
-
-static size_t sender_buffer_used_percent(struct sender_state *s) {
- netdata_mutex_lock(&s->mutex);
- size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
- netdata_mutex_unlock(&s->mutex);
-
- return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size;
-}
-
-static void process_replication_requests(struct sender_state *s) {
- if(dictionary_entries(s->replication_requests) == 0)
- return;
-
- struct replication_request *rr;
- dfe_start_write(s->replication_requests, rr) {
- size_t used_percent = sender_buffer_used_percent(s);
- if(used_percent > 50) break;
-
- // delete it from the dictionary
- // the current item is referenced - it will not go away until the next iteration of the dfe loop
- dictionary_del(s->replication_requests, rr_dfe.name);
-
- // find the chart
- RRDSET *st = rrdset_find(s->host, rr_dfe.name);
- if(unlikely(!st)) {
- internal_error(true,
- "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
- , rrdhost_hostname(s->host), s->connected_to, rr_dfe.name);
- continue;
- }
-
- netdata_thread_disable_cancelability();
-
- // send the replication data
- bool start_streaming = replicate_chart_response(st->rrdhost, st,
- rr->start_streaming, rr->after, rr->before);
-
- netdata_thread_enable_cancelability();
-
- // enable normal streaming if we have to
- if (start_streaming) {
- debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
- rrdhost_hostname(s->host), rrdset_id(st));
-
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- }
- }
- dfe_done(rr);
+ replication_init_sender(host->sender);
}
void *rrdpush_sender_thread(void *ptr) {
@@ -1171,9 +1080,12 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+ worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
+ worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
+
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
- worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENTAL);
- worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENTAL);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
struct sender_state *s = ptr;
s->tid = gettid();
@@ -1196,7 +1108,7 @@ void *rrdpush_sender_thread(void *ptr) {
info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->tid);
s->timeout = (int)appconfig_get_number(
- &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
+ &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
s->default_port = (int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
@@ -1255,6 +1167,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(unlikely(!attempt_to_connect(s)))
continue;
+ s->last_traffic_seen_t = now_monotonic_sec();
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
@@ -1265,7 +1178,9 @@ void *rrdpush_sender_thread(void *ptr) {
}
// If the TCP window never opened then something is wrong, restart connection
- if(unlikely(now_monotonic_sec() - s->last_sent_t > s->timeout)) {
+ if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout &&
+ __atomic_load_n(&s->replication_pending_requests, __ATOMIC_SEQ_CST) == 0) &&
+ __atomic_load_n(&s->receiving_metrics, __ATOMIC_SEQ_CST) != 0) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
rrdpush_sender_thread_close_socket(s->host);
@@ -1342,8 +1257,10 @@ void *rrdpush_sender_thread(void *ptr) {
if(likely(outstanding && (fds[Socket].revents & POLLOUT))) {
worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
ssize_t bytes = attempt_to_send(s);
- if(bytes > 0)
- worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, bytes);
+ if(bytes > 0) {
+ s->last_traffic_seen_t = now_monotonic_sec();
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes);
+ }
}
// If the collector woke us up then empty the pipe to remove the signal
@@ -1359,16 +1276,14 @@ void *rrdpush_sender_thread(void *ptr) {
if (fds[Socket].revents & POLLIN) {
worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
ssize_t bytes = attempt_read(s);
- if(bytes > 0)
- worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, bytes);
+ if(bytes > 0) {
+ s->last_traffic_seen_t = now_monotonic_sec();
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes);
+ }
}
- if(unlikely(s->read_len)) {
- worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+ if(unlikely(s->read_len))
execute_commands(s);
- }
-
- process_replication_requests(s);
if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
char *error = NULL;