diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-06-03 08:38:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-03 08:38:25 +0200 |
commit | 49719a961d6c079004b65458ea8c5e08ada1c44c (patch) | |
tree | 258b25ac60c403696a72b1589d5fa8634dfc6764 /libnetdata/circular_buffer | |
parent | 1aa2cd7c43f6dd68b4bb43a87eb8b2995687ca9c (diff) |
Fix bugs in streaming and enable support for gap filling (#9214)
This PR adds (inactive) support that we will use to fill the gaps on chart when a receiving agent goes offline and the sender reconnects. The streaming component has been reworked to make the connection bi-directional and fix several outstanding bugs in the area.
* Fixed an incorrect case of version negotiation. Removed fatal() on exhaustion of fds.
* Fixed cases that fell through to polling the socket after closing.
* Fixed locking of data related to sender and receiver in the host structure.
* Added fine-grained locks to reduce contention.
* Added circular buffer to sender to prevent starvation in high-latency conditions.
* Fixed case where agent is a proxy and negotiated different streaming versions with sender and receiver.
* Changed interface to new parser to put the buffering code in streaming.
* Fixed the bug that stopped senders from reconnecting after their socket times out - this was part of the scaling fixes that provide an early shortcut path for rejecting connections without lock contention.
* Uses fine-grained locking and a different approach to thread shutdown instead.
* Added liveness detection to connections to allow selection of the best connection.
Diffstat (limited to 'libnetdata/circular_buffer')
-rw-r--r-- | libnetdata/circular_buffer/Makefile.am | 8 | ||||
-rw-r--r-- | libnetdata/circular_buffer/README.md | 13 | ||||
-rw-r--r-- | libnetdata/circular_buffer/circular_buffer.c | 85 | ||||
-rw-r--r-- | libnetdata/circular_buffer/circular_buffer.h | 16 |
4 files changed, 122 insertions, 0 deletions
diff --git a/libnetdata/circular_buffer/Makefile.am b/libnetdata/circular_buffer/Makefile.am new file mode 100644 index 0000000000..161784b8f6 --- /dev/null +++ b/libnetdata/circular_buffer/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/libnetdata/circular_buffer/README.md b/libnetdata/circular_buffer/README.md new file mode 100644 index 0000000000..1917e22a65 --- /dev/null +++ b/libnetdata/circular_buffer/README.md @@ -0,0 +1,13 @@ +<!-- +-- +title: "circular_buffer" +custom_edit_url: https://github.com/netdata/netdata/edit/master/libnetdata/circular_buffer/README.md +--- +--> + +# Circular Buffer + +`struct circular_buffer` is an adaptive circular buffer. It will start at an initial size +and grow up to a maximum size as it fills. Two indices within the structure track the current +`read` and `write` position for data. + diff --git a/libnetdata/circular_buffer/circular_buffer.c b/libnetdata/circular_buffer/circular_buffer.c new file mode 100644 index 0000000000..998008db27 --- /dev/null +++ b/libnetdata/circular_buffer/circular_buffer.c @@ -0,0 +1,85 @@ +#include "../libnetdata.h" + +struct circular_buffer *cbuffer_new(size_t initial, size_t max) { + struct circular_buffer *result = mallocz(sizeof(*result)); + result->size = initial; + result->data = mallocz(initial); + result->write = 0; + result->read = 0; + result->max_size = max; + return result; +} + +void cbuffer_free(struct circular_buffer *buf) { + freez(buf->data); + freez(buf); +} + +static int cbuffer_realloc_unsafe(struct circular_buffer *buf) { + // Check that we can grow + if (buf->size >= buf->max_size) + return 1; + size_t new_size = buf->size * 2; + if (new_size > buf->max_size) + new_size = buf->max_size; + + // We know that: size < new_size <= max_size + // For simplicity align the current data at the bottom of the new buffer + char *new_data = mallocz(new_size); + if (buf->read == buf->write) + buf->write = 0; // buffer is empty + else if (buf->read < buf->write) { + memcpy(new_data, buf->data + buf->read, buf->write - buf->read); + buf->write -= buf->read; + } else { + size_t top_part = buf->size - buf->read; + memcpy(new_data, buf->data + buf->read, top_part); + memcpy(new_data + top_part, buf->data, buf->write); + buf->write = top_part + buf->write; + } + buf->read = 0; + + // Switch buffers + freez(buf->data); + buf->data = new_data; + buf->size = new_size; + return 0; +} + +int cbuffer_add_unsafe(struct circular_buffer *buf, const char *d, size_t d_len) { + size_t len = (buf->write >= buf->read) ? (buf->write - buf->read) : (buf->size - buf->read + buf->write); + while (d_len + len >= buf->size) { + if (cbuffer_realloc_unsafe(buf)) { + return 1; + } + } + // Guarantee: write + d_len cannot hit read + if (buf->write + d_len < buf->size) { + memcpy(buf->data + buf->write, d, d_len); + buf->write += d_len; + } + else { + size_t top_part = buf->size - buf->write; + memcpy(buf->data + buf->write, d, top_part); + memcpy(buf->data, d + top_part, d_len - top_part); + buf->write = d_len - top_part; + } + return 0; +} + +// Assume caller does not remove too many bytes (i.e. read will jump over write) +void cbuffer_remove_unsafe(struct circular_buffer *buf, size_t num) { + buf->read += num; + // Assume num < size (i.e. caller cannot remove more bytes than are in the buffer) + if (buf->read >= buf->size) + buf->read -= buf->size; +} + +size_t cbuffer_next_unsafe(struct circular_buffer *buf, char **start) { + if (start != NULL) + *start = buf->data + buf->read; + if (buf->read <= buf->write) { + return buf->write - buf->read; // Includes empty case + } + return buf->size - buf->read; +} diff --git a/libnetdata/circular_buffer/circular_buffer.h b/libnetdata/circular_buffer/circular_buffer.h new file mode 100644 index 0000000000..ba37e0ebf9 --- /dev/null +++ b/libnetdata/circular_buffer/circular_buffer.h @@ -0,0 +1,16 @@ +#ifndef CIRCULAR_BUFFER_H +#define CIRCULAR_BUFFER_H 1 + +#include <string.h> + +struct circular_buffer { + size_t size, write, read, max_size; + char *data; +}; + +extern struct circular_buffer *cbuffer_new(size_t initial, size_t max); +extern void cbuffer_free(struct circular_buffer *buf); +extern int cbuffer_add_unsafe(struct circular_buffer *buf, const char *d, size_t d_len); +extern void cbuffer_remove_unsafe(struct circular_buffer *buf, size_t num); +extern size_t cbuffer_next_unsafe(struct circular_buffer *buf, char **start); +#endif |