summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-08-11 16:16:21 +0200
committerGitHub <noreply@github.com>2020-08-11 16:16:21 +0200
commit1c846d568d8ad075fbc3444d35f9c93b8aaf028c (patch)
tree2b88a4f98768a873f6952ac567545ed19a166d5f /streaming
parentcedb8707c8ce1728984da85a58d24c26ddc24ad9 (diff)
Remove broken optimization in the sender thread (#9703)
The sender thread avoided locking the circular buffer to check if there was outstanding data on the connection. The condition it needs (unsent data) grows monotonically w.r.t. other threads as the collectors can add data but only this thread can remove it. However, it cached the pointer into the buffer as a side-effect and then reused it later during the transmission. This fails if the buffer is resized by a collector thread. Peeking at the buffer sizes without locking could fail in the same situation. The optimization is removed and the sender thread now locks the mutex before checking the buffer, throws away the data buffer pointer and releases the mutex over the poll() operation. It then reacquires the mutex and checks the buffer size and data pointer again when it performs the send.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/sender.c21
1 files changed, 13 insertions, 8 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 4a41d6acc7..c6ea18bab9 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -418,15 +418,17 @@ static void attempt_to_connect(struct sender_state *state)
}
// TCP window is open and we have data to transmit.
-void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) {
+void attempt_to_send(struct sender_state *s) {
+
rrdpush_send_labels(s->host);
- struct circular_buffer *cb = s->host->sender->buffer;
- debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
+ struct circular_buffer *cb = s->buffer;
netdata_thread_disable_cancelability();
- netdata_mutex_lock(&s->host->sender->mutex);
-
+ netdata_mutex_lock(&s->mutex);
+ char *chunk;
+ size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
+ debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
ssize_t ret;
#ifdef ENABLE_HTTPS
SSL *conn = s->host->ssl.conn ;
@@ -439,7 +441,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) {
ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#endif
if (likely(ret > 0)) {
- cbuffer_remove_unsafe(s->host->sender->buffer, ret);
+ cbuffer_remove_unsafe(s->buffer, ret);
s->sent_bytes_on_this_connection += ret;
s->sent_bytes += ret;
debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret);
@@ -457,7 +459,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) {
debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
}
- netdata_mutex_unlock(&s->host->sender->mutex);
+ netdata_mutex_unlock(&s->mutex);
netdata_thread_enable_cancelability();
}
@@ -635,8 +637,11 @@ void *rrdpush_sender_thread(void *ptr) {
fds[Socket].revents = 0;
fds[Socket].fd = s->host->rrdpush_sender_socket;
+ netdata_mutex_lock(&s->mutex);
char *chunk;
size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk);
+ chunk = NULL; // Do not cache pointer outside of region - could be invalidated
+ netdata_mutex_unlock(&s->mutex);
if(outstanding) {
s->send_attempts++;
fds[Socket].events = POLLIN | POLLOUT;
@@ -679,7 +684,7 @@ void *rrdpush_sender_thread(void *ptr) {
// If we have data and have seen the TCP window open then try to close it by a transmission.
if (outstanding && fds[Socket].revents & POLLOUT)
- attempt_to_send(s, chunk, outstanding);
+ attempt_to_send(s);
// TODO-GAPS - why do we only check this on the socket, not the pipe?
if (outstanding) {