summaryrefslogtreecommitdiffstats
path: root/libnetdata
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-06-03 08:38:25 +0200
committerGitHub <noreply@github.com>2020-06-03 08:38:25 +0200
commit49719a961d6c079004b65458ea8c5e08ada1c44c (patch)
tree258b25ac60c403696a72b1589d5fa8634dfc6764 /libnetdata
parent1aa2cd7c43f6dd68b4bb43a87eb8b2995687ca9c (diff)
Fix bugs in streaming and enable support for gap filling (#9214)
This PR adds (inactive) support that we will use to fill the gaps on chart when a receiving agent goes offline and the sender reconnects. The streaming component has been reworked to make the connection bi-directional and fix several outstanding bugs in the area. * Fixed an incorrect case of version negotiation. Removed fatal() on exhaustion of fds. * Fixed cases that fell through to polling the socket after closing. * Fixed locking of data related to sender and receiver in the host structure. * Added fine-grained locks to reduce contention. * Added circular buffer to sender to prevent starvation in high-latency conditions. * Fixed case where agent is a proxy and negotiated different streaming versions with sender and receiver. * Changed interface to new parser to put the buffering code in streaming. * Fixed the bug that stopped senders from reconnecting after their socket times out - this was part of the scaling fixes that provide an early shortcut path for rejecting connections without lock contention. * Uses fine-grained locking and a different approach to thread shutdown instead. * Added liveness detection to connections to allow selection of the best connection.
Diffstat (limited to 'libnetdata')
-rw-r--r--libnetdata/circular_buffer/Makefile.am8
-rw-r--r--libnetdata/circular_buffer/README.md13
-rw-r--r--libnetdata/circular_buffer/circular_buffer.c85
-rw-r--r--libnetdata/circular_buffer/circular_buffer.h16
-rw-r--r--libnetdata/clocks/clocks.h10
-rw-r--r--libnetdata/libnetdata.h1
6 files changed, 133 insertions, 0 deletions
diff --git a/libnetdata/circular_buffer/Makefile.am b/libnetdata/circular_buffer/Makefile.am
new file mode 100644
index 0000000000..161784b8f6
--- /dev/null
+++ b/libnetdata/circular_buffer/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/libnetdata/circular_buffer/README.md b/libnetdata/circular_buffer/README.md
new file mode 100644
index 0000000000..1917e22a65
--- /dev/null
+++ b/libnetdata/circular_buffer/README.md
@@ -0,0 +1,13 @@
+<!--
+--
+title: "circular_buffer"
+custom_edit_url: https://github.com/netdata/netdata/edit/master/libnetdata/circular_buffer/README.md
+---
+-->
+
+# Circular Buffer
+
+`struct circular_buffer` is an adaptive circular buffer. It will start at an initial size
+and grow up to a maximum size as it fills. Two indices within the structure track the current
+`read` and `write` position for data.
+
diff --git a/libnetdata/circular_buffer/circular_buffer.c b/libnetdata/circular_buffer/circular_buffer.c
new file mode 100644
index 0000000000..998008db27
--- /dev/null
+++ b/libnetdata/circular_buffer/circular_buffer.c
@@ -0,0 +1,85 @@
+#include "../libnetdata.h"
+
+struct circular_buffer *cbuffer_new(size_t initial, size_t max) {
+ struct circular_buffer *result = mallocz(sizeof(*result));
+ result->size = initial;
+ result->data = mallocz(initial);
+ result->write = 0;
+ result->read = 0;
+ result->max_size = max;
+ return result;
+}
+
+void cbuffer_free(struct circular_buffer *buf) {
+ freez(buf->data);
+ freez(buf);
+}
+
+static int cbuffer_realloc_unsafe(struct circular_buffer *buf) {
+ // Check that we can grow
+ if (buf->size >= buf->max_size)
+ return 1;
+ size_t new_size = buf->size * 2;
+ if (new_size > buf->max_size)
+ new_size = buf->max_size;
+
+ // We know that: size < new_size <= max_size
+ // For simplicity align the current data at the bottom of the new buffer
+ char *new_data = mallocz(new_size);
+ if (buf->read == buf->write)
+ buf->write = 0; // buffer is empty
+ else if (buf->read < buf->write) {
+ memcpy(new_data, buf->data + buf->read, buf->write - buf->read);
+ buf->write -= buf->read;
+ } else {
+ size_t top_part = buf->size - buf->read;
+ memcpy(new_data, buf->data + buf->read, top_part);
+ memcpy(new_data + top_part, buf->data, buf->write);
+ buf->write = top_part + buf->write;
+ }
+ buf->read = 0;
+
+ // Switch buffers
+ freez(buf->data);
+ buf->data = new_data;
+ buf->size = new_size;
+ return 0;
+}
+
+int cbuffer_add_unsafe(struct circular_buffer *buf, const char *d, size_t d_len) {
+ size_t len = (buf->write >= buf->read) ? (buf->write - buf->read) : (buf->size - buf->read + buf->write);
+ while (d_len + len >= buf->size) {
+ if (cbuffer_realloc_unsafe(buf)) {
+ return 1;
+ }
+ }
+ // Guarantee: write + d_len cannot hit read
+ if (buf->write + d_len < buf->size) {
+ memcpy(buf->data + buf->write, d, d_len);
+ buf->write += d_len;
+ }
+ else {
+ size_t top_part = buf->size - buf->write;
+ memcpy(buf->data + buf->write, d, top_part);
+ memcpy(buf->data, d + top_part, d_len - top_part);
+ buf->write = d_len - top_part;
+ }
+ return 0;
+}
+
+// Assume caller does not remove too many bytes (i.e. read will jump over write)
+void cbuffer_remove_unsafe(struct circular_buffer *buf, size_t num) {
+ buf->read += num;
+ // Assume num < size (i.e. caller cannot remove more bytes than are in the buffer)
+ if (buf->read >= buf->size)
+ buf->read -= buf->size;
+}
+
+size_t cbuffer_next_unsafe(struct circular_buffer *buf, char **start) {
+ if (start != NULL)
+ *start = buf->data + buf->read;
+ if (buf->read <= buf->write) {
+ return buf->write - buf->read; // Includes empty case
+ }
+ return buf->size - buf->read;
+}
diff --git a/libnetdata/circular_buffer/circular_buffer.h b/libnetdata/circular_buffer/circular_buffer.h
new file mode 100644
index 0000000000..ba37e0ebf9
--- /dev/null
+++ b/libnetdata/circular_buffer/circular_buffer.h
@@ -0,0 +1,16 @@
+#ifndef CIRCULAR_BUFFER_H
+#define CIRCULAR_BUFFER_H 1
+
+#include <string.h>
+
+struct circular_buffer {
+ size_t size, write, read, max_size;
+ char *data;
+};
+
+extern struct circular_buffer *cbuffer_new(size_t initial, size_t max);
+extern void cbuffer_free(struct circular_buffer *buf);
+extern int cbuffer_add_unsafe(struct circular_buffer *buf, const char *d, size_t d_len);
+extern void cbuffer_remove_unsafe(struct circular_buffer *buf, size_t num);
+extern size_t cbuffer_next_unsafe(struct circular_buffer *buf, char **start);
+#endif
diff --git a/libnetdata/clocks/clocks.h b/libnetdata/clocks/clocks.h
index 83e7c48a2d..cfe99f5e7b 100644
--- a/libnetdata/clocks/clocks.h
+++ b/libnetdata/clocks/clocks.h
@@ -60,13 +60,23 @@ typedef struct heartbeat {
#endif // CLOCK_BOOTTIME
+#ifndef NSEC_PER_MSEC
#define NSEC_PER_MSEC 1000000ULL
+#endif
+#ifndef NSEC_PER_SEC
#define NSEC_PER_SEC 1000000000ULL
+#endif
+#ifndef NSEC_PER_USEC
#define NSEC_PER_USEC 1000ULL
+#endif
+#ifndef USEC_PER_SEC
#define USEC_PER_SEC 1000000ULL
+#endif
+#ifndef MSEC_PER_SEC
#define MSEC_PER_SEC 1000ULL
+#endif
#define USEC_PER_MS 1000ULL
diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h
index 45ac6e81bf..75a0de9575 100644
--- a/libnetdata/libnetdata.h
+++ b/libnetdata/libnetdata.h
@@ -300,6 +300,7 @@ extern char *netdata_configured_host_prefix;
#include "threads/threads.h"
#include "buffer/buffer.h"
#include "locks/locks.h"
+#include "circular_buffer/circular_buffer.h"
#include "avl/avl.h"
#include "inlined.h"
#include "clocks/clocks.h"