From 1c846d568d8ad075fbc3444d35f9c93b8aaf028c Mon Sep 17 00:00:00 2001 From: Andrew Moss <1043609+amoss@users.noreply.github.com> Date: Tue, 11 Aug 2020 16:16:21 +0200 Subject: Remove broken optimization in the sender thread (#9703) The sender thread avoided locking the circular buffer to check if there was outstanding data on the connection. The condition it needs (unsent data) grows monotonically w.r.t. other threads as the collectors can add data but only this thread can remove it. However, it cached the pointer into the buffer as a side-effect and then reused it later during the transmission. This fails if the buffer is resized by a collector thread. Peeking at the buffer sizes without locking could fail in the same situation. The optimization is removed and the sender thread now locks the mutex before checking the buffer, throws away the data buffer pointer and releases the mutex over the poll() operation. It then reacquires the mutex and checks the buffer size and data pointer again when it performs the send. --- streaming/sender.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'streaming') diff --git a/streaming/sender.c b/streaming/sender.c index 4a41d6acc7..c6ea18bab9 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -418,15 +418,17 @@ static void attempt_to_connect(struct sender_state *state) } // TCP window is open and we have data to transmit. -void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { +void attempt_to_send(struct sender_state *s) { + rrdpush_send_labels(s->host); - struct circular_buffer *cb = s->host->sender->buffer; - debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); + struct circular_buffer *cb = s->buffer; netdata_thread_disable_cancelability(); - netdata_mutex_lock(&s->host->sender->mutex); - + netdata_mutex_lock(&s->mutex); + char *chunk; + size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk); + debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); ssize_t ret; #ifdef ENABLE_HTTPS SSL *conn = s->host->ssl.conn ; @@ -439,7 +441,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); #endif if (likely(ret > 0)) { - cbuffer_remove_unsafe(s->host->sender->buffer, ret); + cbuffer_remove_unsafe(s->buffer, ret); s->sent_bytes_on_this_connection += ret; s->sent_bytes += ret; debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret); @@ -457,7 +459,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); } - netdata_mutex_unlock(&s->host->sender->mutex); + netdata_mutex_unlock(&s->mutex); netdata_thread_enable_cancelability(); } @@ -635,8 +637,11 @@ void *rrdpush_sender_thread(void *ptr) { fds[Socket].revents = 0; fds[Socket].fd = s->host->rrdpush_sender_socket; + netdata_mutex_lock(&s->mutex); char *chunk; size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk); + chunk = NULL; // Do not cache pointer outside of region - could be invalidated + netdata_mutex_unlock(&s->mutex); if(outstanding) { s->send_attempts++; fds[Socket].events = POLLIN | POLLOUT; @@ -679,7 +684,7 @@ void *rrdpush_sender_thread(void *ptr) { // If we have data and have seen the TCP window open then try to close it by a transmission. if (outstanding && fds[Socket].revents & POLLOUT) - attempt_to_send(s, chunk, outstanding); + attempt_to_send(s); // TODO-GAPS - why do we only check this on the socket, not the pipe? if (outstanding) { -- cgit v1.2.3