diff options
author | Andrew Moss <1043609+amoss@users.noreply.github.com> | 2020-06-03 08:38:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-03 08:38:25 +0200 |
commit | 49719a961d6c079004b65458ea8c5e08ada1c44c (patch) | |
tree | 258b25ac60c403696a72b1589d5fa8634dfc6764 /streaming/rrdpush.h | |
parent | 1aa2cd7c43f6dd68b4bb43a87eb8b2995687ca9c (diff) |
Fix bugs in streaming and enable support for gap filling (#9214)
This PR adds (inactive) support that we will use to fill the gaps on chart when a receiving agent goes offline and the sender reconnects. The streaming component has been reworked to make the connection bi-directional and fix several outstanding bugs in the area.
* Fixed an incorrect case of version negotiation. Removed fatal() on exhaustion of fds.
* Fixed cases that fell through to polling the socket after closing.
* Fixed locking of data related to sender and receiver in the host structure.
* Added fine-grained locks to reduce contention.
* Added circular buffer to sender to prevent starvation in high-latency conditions.
* Fixed case where agent is a proxy and negotiated different streaming versions with sender and receiver.
* Changed interface to new parser to put the buffering code in streaming.
* Fixed the bug that stopped senders from reconnecting after their socket times out - this was part of the scaling fixes that provide an early shortcut path for rejecting connections without lock contention.
* Uses fine-grained locking and a different approach to thread shutdown instead.
* Added liveness detection to connections to allow selection of the best connection.
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 87 |
1 files changed, 86 insertions, 1 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 214c7c6fde..bad63888b9 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -3,10 +3,91 @@ #ifndef NETDATA_RRDPUSH_H #define NETDATA_RRDPUSH_H 1 +#include "../database/rrd.h" +#include "../libnetdata/libnetdata.h" #include "web/server/web_client.h" #include "daemon/common.h" +#define CONNECTED_TO_SIZE 100 + +// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3 Gap-filling #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)2 +#define VERSION_GAP_FILLING 3 + +#define STREAMING_PROTOCOL_VERSION "1.1" +#define START_STREAMING_PROMPT "Hit me baby, push them over..." +#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." +#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version=" + +#define HTTP_HEADER_SIZE 8192 + +typedef enum { + RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, + RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW +} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY; + +typedef struct { + char *os_name; + char *os_id; + char *os_version; + char *kernel_name; + char *kernel_version; +} stream_encoded_t; + +// Thread-local storage + // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. + +struct sender_state { + RRDHOST *host; + pid_t task_id; + unsigned int overflow:1; + int timeout, default_port; + size_t max_size; + usec_t reconnect_delay; + char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c + size_t begin; + size_t reconnects_counter; + size_t sent_bytes; + size_t sent_bytes_on_this_connection; + size_t send_attempts; + time_t last_sent_t; + size_t not_connected_loops; + // metrics may be collected asynchronously + // these synchronize all the threads willing the write to our sending buffer + netdata_mutex_t mutex; // Guard access to buffer / build + struct circular_buffer *buffer; + BUFFER *build; + char read_buffer[512]; + int read_len; + int32_t version; +}; + +struct receiver_state { + RRDHOST *host; + int fd; + char *key; + char *hostname; + char *registry_hostname; + char *machine_guid; + char *os; + char *timezone; // Unused? + char *tags; + char *client_ip; // Duplicated in pluginsd + char *client_port; // Duplicated in pluginsd + char *program_name; // Duplicated in pluginsd + char *program_version; + struct rrdhost_system_info *system_info; + int update_every; + uint32_t stream_version; + time_t last_msg_t; + char read_buffer[512]; + int read_len; +#ifdef ENABLE_HTTPS + struct netdata_ssl ssl; +#endif + unsigned int shutdown:1; +}; + extern unsigned int default_rrdpush_enabled; extern char *default_rrdpush_destination; @@ -14,6 +95,9 @@ extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; extern unsigned int remote_clock_resync_iterations; +extern void sender_init(struct sender_state *s, RRDHOST *parent); +void sender_start(struct sender_state *s); +void sender_commit(struct sender_state *s); extern int rrdpush_init(); extern int configured_as_master(); extern void rrdset_done_push(RRDSET *st); @@ -21,9 +105,10 @@ extern void rrdset_push_chart_definition_now(RRDSET *st); extern void *rrdpush_sender_thread(void *ptr); extern void rrdpush_send_labels(RRDHOST *host); -extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url); +extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); extern void rrdpush_sender_thread_stop(RRDHOST *host); extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv); +extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); #endif //NETDATA_RRDPUSH_H |