diff options
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 4a41d6acc7..c6ea18bab9 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -418,15 +418,17 @@ static void attempt_to_connect(struct sender_state *state) } // TCP window is open and we have data to transmit. -void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { +void attempt_to_send(struct sender_state *s) { + rrdpush_send_labels(s->host); - struct circular_buffer *cb = s->host->sender->buffer; - debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); + struct circular_buffer *cb = s->buffer; netdata_thread_disable_cancelability(); - netdata_mutex_lock(&s->host->sender->mutex); - + netdata_mutex_lock(&s->mutex); + char *chunk; + size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk); + debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); ssize_t ret; #ifdef ENABLE_HTTPS SSL *conn = s->host->ssl.conn ; @@ -439,7 +441,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT); #endif if (likely(ret > 0)) { - cbuffer_remove_unsafe(s->host->sender->buffer, ret); + cbuffer_remove_unsafe(s->buffer, ret); s->sent_bytes_on_this_connection += ret; s->sent_bytes += ret; debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret); @@ -457,7 +459,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) { debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); } - netdata_mutex_unlock(&s->host->sender->mutex); + netdata_mutex_unlock(&s->mutex); netdata_thread_enable_cancelability(); } @@ -635,8 +637,11 @@ void *rrdpush_sender_thread(void *ptr) { fds[Socket].revents = 0; fds[Socket].fd = s->host->rrdpush_sender_socket; + netdata_mutex_lock(&s->mutex); char *chunk; size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk); + chunk = NULL; // Do not cache pointer outside of region - could be invalidated + netdata_mutex_unlock(&s->mutex); if(outstanding) { s->send_attempts++; fds[Socket].events = POLLIN | POLLOUT; @@ -679,7 +684,7 @@ void *rrdpush_sender_thread(void *ptr) { // If we have data and have seen the TCP window open then try to close it by a transmission. if (outstanding && fds[Socket].revents & POLLOUT) - attempt_to_send(s, chunk, outstanding); + attempt_to_send(s); // TODO-GAPS - why do we only check this on the socket, not the pipe? if (outstanding) { |