diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-08-11 16:16:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-11 16:16:21 +0200 |
commit | 1c846d568d8ad075fbc3444d35f9c93b8aaf028c (patch) | |
tree | 2b88a4f98768a873f6952ac567545ed19a166d5f /streaming | |
parent | cedb8707c8ce1728984da85a58d24c26ddc24ad9 (diff) |
Remove broken optimization in the sender thread (#9703)
The sender thread avoided locking the circular buffer to check if there was outstanding data on the connection. The
condition it needs (unsent data) grows monotonically w.r.t. other threads as the collectors can add data but only
this thread can remove it. However, it cached the pointer into the buffer as a side-effect and then reused it later
during the transmission. This fails if the buffer is resized by a collector thread. Peeking at the buffer sizes without
locking could fail in the same situation.
The optimization is removed and the sender thread now locks the mutex before checking the buffer, throws away the
data buffer pointer and releases the mutex over the poll() operation. It then reacquires the mutex and checks the buffer size
and data pointer again when it performs the send.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/sender.c | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 4a41d6acc7..c6ea18bab9 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -418,15 +418,17 @@ static void attempt_to_connect(struct sender_state *state) } // TCP window is open and we have data to transmit. -void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { +void attempt_to_send(struct sender_state *s) { + rrdpush_send_labels(s->host); - struct circular_buffer *cb = s->host->sender->buffer; - debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); + struct circular_buffer *cb = s->buffer; netdata_thread_disable_cancelability(); - netdata_mutex_lock(&s->host->sender->mutex); - + netdata_mutex_lock(&s->mutex); + char *chunk; + size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk); + debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); ssize_t ret; #ifdef ENABLE_HTTPS SSL *conn = s->host->ssl.conn ; @@ -439,7 +441,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); #endif if (likely(ret > 0)) { - cbuffer_remove_unsafe(s->host->sender->buffer, ret); + cbuffer_remove_unsafe(s->buffer, ret); s->sent_bytes_on_this_connection += ret; s->sent_bytes += ret; debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret); @@ -457,7 +459,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); } - netdata_mutex_unlock(&s->host->sender->mutex); + netdata_mutex_unlock(&s->mutex); netdata_thread_enable_cancelability(); } @@ -635,8 +637,11 @@ void *rrdpush_sender_thread(void *ptr) { fds[Socket].revents = 0; fds[Socket].fd = s->host->rrdpush_sender_socket; + netdata_mutex_lock(&s->mutex); char *chunk; size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk); + chunk = NULL; // Do not cache pointer outside of region - could be invalidated + netdata_mutex_unlock(&s->mutex); if(outstanding) { s->send_attempts++; fds[Socket].events = POLLIN | POLLOUT; @@ -679,7 +684,7 @@ void *rrdpush_sender_thread(void *ptr) { // If we have data and have seen the TCP window open then try to close it by a transmission. if (outstanding && fds[Socket].revents & POLLOUT) - attempt_to_send(s, chunk, outstanding); + attempt_to_send(s); // TODO-GAPS - why do we only check this on the socket, not the pipe? if (outstanding) { |