summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c21
1 files changed, 13 insertions, 8 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 4a41d6acc7..c6ea18bab9 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -418,15 +418,17 @@ static void attempt_to_connect(struct sender_state *state)
}
// TCP window is open and we have data to transmit.
-void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) {
+void attempt_to_send(struct sender_state *s) {
+
rrdpush_send_labels(s->host);
- struct circular_buffer *cb = s->host->sender->buffer;
- debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
+ struct circular_buffer *cb = s->buffer;
netdata_thread_disable_cancelability();
- netdata_mutex_lock(&s->host->sender->mutex);
-
+ netdata_mutex_lock(&s->mutex);
+ char *chunk;
+ size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
+ debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
ssize_t ret;
#ifdef ENABLE_HTTPS
SSL *conn = s->host->ssl.conn ;
@@ -439,7 +441,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) {
ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#endif
if (likely(ret > 0)) {
- cbuffer_remove_unsafe(s->host->sender->buffer, ret);
+ cbuffer_remove_unsafe(s->buffer, ret);
s->sent_bytes_on_this_connection += ret;
s->sent_bytes += ret;
debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret);
@@ -457,7 +459,7 @@ void attempt_to_send(struct sender_state *s, char *chunk, size_t outstanding) {
debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
}
- netdata_mutex_unlock(&s->host->sender->mutex);
+ netdata_mutex_unlock(&s->mutex);
netdata_thread_enable_cancelability();
}
@@ -635,8 +637,11 @@ void *rrdpush_sender_thread(void *ptr) {
fds[Socket].revents = 0;
fds[Socket].fd = s->host->rrdpush_sender_socket;
+ netdata_mutex_lock(&s->mutex);
char *chunk;
size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk);
+ chunk = NULL; // Do not cache pointer outside of region - could be invalidated
+ netdata_mutex_unlock(&s->mutex);
if(outstanding) {
s->send_attempts++;
fds[Socket].events = POLLIN | POLLOUT;
@@ -679,7 +684,7 @@ void *rrdpush_sender_thread(void *ptr) {
// If we have data and have seen the TCP window open then try to close it by a transmission.
if (outstanding && fds[Socket].revents & POLLOUT)
- attempt_to_send(s, chunk, outstanding);
+ attempt_to_send(s);
// TODO-GAPS - why do we only check this on the socket, not the pipe?
if (outstanding) {