summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-06-03 08:38:25 +0200
committerGitHub <noreply@github.com>2020-06-03 08:38:25 +0200
commit49719a961d6c079004b65458ea8c5e08ada1c44c (patch)
tree258b25ac60c403696a72b1589d5fa8634dfc6764 /streaming/rrdpush.h
parent1aa2cd7c43f6dd68b4bb43a87eb8b2995687ca9c (diff)
Fix bugs in streaming and enable support for gap filling (#9214)
This PR adds (inactive) support that we will use to fill the gaps on chart when a receiving agent goes offline and the sender reconnects. The streaming component has been reworked to make the connection bi-directional and fix several outstanding bugs in the area. * Fixed an incorrect case of version negotiation. Removed fatal() on exhaustion of fds. * Fixed cases that fell through to polling the socket after closing. * Fixed locking of data related to sender and receiver in the host structure. * Added fine-grained locks to reduce contention. * Added circular buffer to sender to prevent starvation in high-latency conditions. * Fixed case where agent is a proxy and negotiated different streaming versions with sender and receiver. * Changed interface to new parser to put the buffering code in streaming. * Fixed the bug that stopped senders from reconnecting after their socket times out - this was part of the scaling fixes that provide an early shortcut path for rejecting connections without lock contention. * Uses fine-grained locking and a different approach to thread shutdown instead. * Added liveness detection to connections to allow selection of the best connection.
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h87
1 files changed, 86 insertions, 1 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 214c7c6fde..bad63888b9 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -3,10 +3,91 @@
#ifndef NETDATA_RRDPUSH_H
#define NETDATA_RRDPUSH_H 1
+#include "../database/rrd.h"
+#include "../libnetdata/libnetdata.h"
#include "web/server/web_client.h"
#include "daemon/common.h"
+#define CONNECTED_TO_SIZE 100
+
+// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3 Gap-filling
#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)2
+#define VERSION_GAP_FILLING 3
+
+#define STREAMING_PROTOCOL_VERSION "1.1"
+#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
+#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
+
+#define HTTP_HEADER_SIZE 8192
+
+typedef enum {
+ RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
+ RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
+} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
+
+typedef struct {
+ char *os_name;
+ char *os_id;
+ char *os_version;
+ char *kernel_name;
+ char *kernel_version;
+} stream_encoded_t;
+
+// Thread-local storage
+ // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+
+struct sender_state {
+ RRDHOST *host;
+ pid_t task_id;
+ unsigned int overflow:1;
+ int timeout, default_port;
+ size_t max_size;
+ usec_t reconnect_delay;
+ char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
+ size_t begin;
+ size_t reconnects_counter;
+ size_t sent_bytes;
+ size_t sent_bytes_on_this_connection;
+ size_t send_attempts;
+ time_t last_sent_t;
+ size_t not_connected_loops;
+ // metrics may be collected asynchronously
+ // these synchronize all the threads willing the write to our sending buffer
+ netdata_mutex_t mutex; // Guard access to buffer / build
+ struct circular_buffer *buffer;
+ BUFFER *build;
+ char read_buffer[512];
+ int read_len;
+ int32_t version;
+};
+
+struct receiver_state {
+ RRDHOST *host;
+ int fd;
+ char *key;
+ char *hostname;
+ char *registry_hostname;
+ char *machine_guid;
+ char *os;
+ char *timezone; // Unused?
+ char *tags;
+ char *client_ip; // Duplicated in pluginsd
+ char *client_port; // Duplicated in pluginsd
+ char *program_name; // Duplicated in pluginsd
+ char *program_version;
+ struct rrdhost_system_info *system_info;
+ int update_every;
+ uint32_t stream_version;
+ time_t last_msg_t;
+ char read_buffer[512];
+ int read_len;
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl;
+#endif
+ unsigned int shutdown:1;
+};
+
extern unsigned int default_rrdpush_enabled;
extern char *default_rrdpush_destination;
@@ -14,6 +95,9 @@ extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
extern unsigned int remote_clock_resync_iterations;
+extern void sender_init(struct sender_state *s, RRDHOST *parent);
+void sender_start(struct sender_state *s);
+void sender_commit(struct sender_state *s);
extern int rrdpush_init();
extern int configured_as_master();
extern void rrdset_done_push(RRDSET *st);
@@ -21,9 +105,10 @@ extern void rrdset_push_chart_definition_now(RRDSET *st);
extern void *rrdpush_sender_thread(void *ptr);
extern void rrdpush_send_labels(RRDHOST *host);
-extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url);
+extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
extern void rrdpush_sender_thread_stop(RRDHOST *host);
extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
+extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
#endif //NETDATA_RRDPUSH_H