summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-26 14:00:59 +0300
committerGitHub <noreply@github.com>2023-06-26 14:00:59 +0300
commit0d61c11b5f4772a4762ede1d8204290b94bb08e7 (patch)
tree49c97d67e0d2a4846a4b379345f53ef8d93a6aec /streaming/rrdpush.h
parentf90d56f18d29c2835bc278f6a22e840230b9ca86 (diff)
use gperf for the pluginsd/streaming parser hashtable (#15251)
* use gperf for the pluginsd parser * simplify pluginsd_parser by removing void pointers to user * pluginsd_split_words() with inlined pluginsd_space() * quoted_string_splitter() now uses a map instead of a function for determining spaces * add stress test for pluginsd parser * optimized BITMAP256 * optimized rrdpush receiver reception * optimized rrdpush sender compression * renames and cleanup * remove wrong negation * unify handshake and disconnection reasons * use parser_find_keyword * register job names only for the current repertoire
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h131
1 files changed, 105 insertions, 26 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 8b535a4521..35347aaa0a 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -94,6 +94,17 @@ typedef enum {
STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
STREAM_HANDSHAKE_INITIALIZATION = -12,
+ STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP = -13,
+ STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER = -14,
+ STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
+ STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
+ STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
+ STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
+ STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
+ STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
+ STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
+
} STREAM_HANDSHAKE;
@@ -108,28 +119,98 @@ typedef struct {
} stream_encoded_t;
#ifdef ENABLE_COMPRESSION
+// signature MUST end with a newline
+#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4
+
struct compressor_state {
+ bool initialized;
char *compression_result_buffer;
size_t compression_result_buffer_size;
- struct compressor_data *data; // Compression API specific data
- void (*reset)(struct compressor_state *state);
+ struct {
+ void *lz4_stream;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
+ } stream;
size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
void (*destroy)(struct compressor_state **state);
};
+void rrdpush_compressor_reset(struct compressor_state *state);
+void rrdpush_compressor_destroy(struct compressor_state *state);
+size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);
+
struct decompressor_state {
+ bool initialized;
size_t signature_size;
size_t total_compressed;
size_t total_uncompressed;
size_t packet_count;
- struct decompressor_stream *stream; // Decompression API specific data
- void (*reset)(struct decompressor_state *state);
- size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
- size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
- size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
- size_t (*get)(struct decompressor_state *state, char *data, size_t size);
- void (*destroy)(struct decompressor_state **state);
+ struct {
+ void *lz4_stream;
+ char *buffer;
+ size_t size;
+ size_t write_at;
+ size_t read_at;
+ } stream;
};
+
+void rrdpush_decompressor_destroy(struct decompressor_state *state);
+void rrdpush_decompressor_reset(struct decompressor_state *state);
+size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+
+static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
+ if (unlikely(!data || !data_size))
+ return 0;
+
+ if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
+ return 0;
+
+ uint32_t sign = *(uint32_t *)data;
+ if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
+ return 0;
+
+ size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
+ return length;
+}
+
+static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
+ if(unlikely(state->stream.read_at != state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
+
+ return rrdpush_decompress_decode_header(header, header_size);
+}
+
+static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
+ if(unlikely(state->stream.read_at > state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return state->stream.write_at - state->stream.read_at;
+}
+
+static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
+ if (unlikely(!state || !size || !dst))
+ return 0;
+
+ size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
+
+ if(unlikely(!remaining))
+ return 0;
+
+ size_t bytes_to_return = size;
+ if(bytes_to_return > remaining)
+ bytes_to_return = remaining;
+
+ memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
+ state->stream.read_at += bytes_to_return;
+
+ if(unlikely(state->stream.read_at > state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return bytes_to_return;
+}
#endif
// Thread-local storage
@@ -171,7 +252,7 @@ struct sender_state {
netdata_mutex_t mutex;
struct circular_buffer *buffer;
char read_buffer[PLUGINSD_LINE_MAX + 1];
- int read_len;
+ ssize_t read_len;
STREAM_CAPABILITIES capabilities;
size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
@@ -182,15 +263,16 @@ struct sender_state {
uint16_t hops;
#ifdef ENABLE_COMPRESSION
- struct compressor_state *compressor;
+ struct compressor_state compressor;
#endif
+
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
#endif
struct {
bool shutdown;
- const char *reason;
+ STREAM_HANDSHAKE reason;
} exit;
struct {
@@ -293,7 +375,7 @@ struct receiver_state {
struct {
bool shutdown; // signal the streaming parser to exit
- const char *reason; // the reason of disconnection to log
+ STREAM_HANDSHAKE reason;
} exit;
struct {
@@ -315,13 +397,12 @@ struct receiver_state {
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl;
#endif
-#ifdef ENABLE_COMPRESSION
- unsigned int rrdpush_compression;
- struct decompressor_state *decompressor;
-#endif
time_t replication_first_time_t;
+#ifdef ENABLE_COMPRESSION
+ struct decompressor_state decompressor;
+#endif
/*
struct {
uint32_t count;
@@ -334,11 +415,9 @@ struct rrdpush_destinations {
STRING *destination;
bool ssl;
uint32_t attempts;
-
- const char *last_error;
- time_t last_attempt;
+ time_t since;
time_t postpone_reconnection_until;
- STREAM_HANDSHAKE last_handshake;
+ STREAM_HANDSHAKE reason;
struct rrdpush_destinations *prev;
struct rrdpush_destinations *next;
@@ -390,7 +469,7 @@ void rrdpush_send_global_functions(RRDHOST *host);
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
-void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
+void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
@@ -407,8 +486,8 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
#ifdef ENABLE_COMPRESSION
struct compressor_state *create_compressor();
-struct decompressor_state *create_decompressor();
#endif
+
void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
@@ -419,7 +498,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH
int32_t stream_capabilities_to_vn(uint32_t caps);
void receiver_state_free(struct receiver_state *rpt);
-bool stop_streaming_receiver(RRDHOST *host, const char *reason);
+bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);
void sender_thread_buffer_free(void);
@@ -623,7 +702,7 @@ typedef struct rrdhost_status {
STREAM_CAPABILITIES capabilities;
uint32_t id;
time_t since;
- const char *reason;
+ STREAM_HANDSHAKE reason;
struct {
bool in_progress;
@@ -641,7 +720,7 @@ typedef struct rrdhost_status {
STREAM_CAPABILITIES capabilities;
uint32_t id;
time_t since;
- const char *reason;
+ STREAM_HANDSHAKE reason;
struct {
bool in_progress;