summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authoravstrakhov <57530959+avstrakhov@users.noreply.github.com>2022-01-19 18:57:49 +0300
committerGitHub <noreply@github.com>2022-01-19 17:57:49 +0200
commitb003e5fd40c3b42dfacc87db5a7730b76eff0e92 (patch)
treeda105b1b74c98cb567ff18aa73d282566bbff454 /streaming/rrdpush.h
parent8a01dabf89add0ef473d37bc8f2310cabfc1db43 (diff)
Add code for LZ4 streaming data compression (#11821)
* Add code for LZ4 streaming data compression * Fix LGTM alert * Add lz4 library for link when compression enabled * Add LZ4_resetStream_fast presence detection * Disable compression for older LZ4 libraries * Correct LZ4 API check * [Testing Stream Compression] Debug msgs and report.md * Add LZ4 library version using LZ4_initStream * Fixed bug in SSL mode * [Testing compression] - Add compression info messages * Set compression enabled by default, update doc * Update streaming/README.md Co-authored-by: DShreve2 <david@netdata.cloud> * [Agent Negotiation] Compression as separate capability * [Agent Negotiation] Compression as separate capability - default compression variable always active * Add code to negotiate compression * [Agent Negotiation] Based on stream version * [Agent Negotiation] Version based - fix compilation error * [Agent Negotiation] Fix glob var default_compression_enbaled=0 affects all the connections - Handle compression - stream version based * [Agent Negotiation - Compression] - Add control flag in 1. sender/receiver state & 2. stream.conf per child * [Agent Negotiation - Compression] Fix stream.conf key, mguid control * [Agent Negotiate Compression] Fine control on stream.conf per key,mguid for each child * [Agent Negotiation Compression] Stop destroying compressor for runtime configuration + Update Readme.md * [Agent Negotiation Compression] Use stream_version 4 if compression is disabled * Correct child's compression check * [Agent Negotiation Compression] Create streaming compression section in docs. * [Agent Negotiation Compresion] Remove redundant debug msgs * [Stream Compression] - integrate compression build info & config info in api/v1/info endpoint. * [Agent Negotiation] Finalize README.md * [Agent Stream Compression] Fix buildinfo json, Finalize readme.md * [Agent Stream Compression] Negotiate compression based on stream version * [Agent Stream Compression] Stream compression control per child in stream.conf | per AP_KEY, MACHINE_GUID * [Agent Stream Compression] Avoid destroying compressor enabling runtime configuration + Update Readme.md * [Agent Stream Compression] - Provide stream compression build info & config info in api/v1/info endpoint + Update Readme.md * [Agent Stream Compression] Fix rebase conflicts * [Agent Stream Compression] Fix more rebase conflicts * [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api * [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api * [Agent Stream Compression] Change unsuccessful buffer check to error * [Agent Stream Compression] Readme.md proof-read corrections, downgrade to stream_version_clabels, add shields for supported versions, EOF lint * [Agent Stream Compression] Fix missed lz4 library on Alpine Linux * Phrasal review Co-authored-by: odynik <odynik.ee@gmail.com> Co-authored-by: DShreve2 <david@netdata.cloud> Co-authored-by: Tina Lüdtke <tina@kickoke.com>
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h69
1 files changed, 62 insertions, 7 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 027ccd102a..937ead6faa 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,10 +10,16 @@
#define CONNECTED_TO_SIZE 100
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4
#define STREAM_VERSION_CLAIM 3
#define STREAM_VERSION_CLABELS 4
-#define VERSION_GAP_FILLING 5
+#define STREAM_VERSION_COMPRESSION 5
+#define VERSION_GAP_FILLING 6
+
+#ifdef ENABLE_COMPRESSION
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION)
+#else
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS)
+#endif //ENABLE_COMPRESSION
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
@@ -35,6 +41,38 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
+#ifdef ENABLE_COMPRESSION
+struct compressor_state {
+ char *buffer;
+ size_t buffer_size;
+ struct compressor_data *data; // Compression API specific data
+ void (*reset)(struct compressor_state *state);
+ size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
+ void (*destroy)(struct compressor_state **state);
+};
+
+struct decompressor_state {
+ char *buffer;
+ size_t buffer_size;
+ size_t buffer_len;
+ size_t buffer_pos;
+ char *out_buffer;
+ size_t out_buffer_len;
+ size_t out_buffer_pos;
+ size_t total_compressed;
+ size_t total_uncompressed;
+ size_t packet_count;
+ struct decompressor_data *data; // Deompression API specific data
+ void (*reset)(struct decompressor_state *state);
+ size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
+ size_t (*put)(struct decompressor_state *state, const char *data, size_t size);
+ size_t (*decompress)(struct decompressor_state *state);
+ size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
+ size_t (*get)(struct decompressor_state *state, char *data, size_t size);
+ void (*destroy)(struct decompressor_state **state);
+};
+#endif
+
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
@@ -60,6 +98,10 @@ struct sender_state {
char read_buffer[512];
int read_len;
int32_t version;
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression;
+ struct compressor_state *compressor;
+#endif
};
struct receiver_state {
@@ -75,9 +117,9 @@ struct receiver_state {
char *abbrev_timezone;
int32_t utc_offset;
char *tags;
- char *client_ip; // Duplicated in pluginsd
- char *client_port; // Duplicated in pluginsd
- char *program_name; // Duplicated in pluginsd
+ char *client_ip; // Duplicated in pluginsd
+ char *client_port; // Duplicated in pluginsd
+ char *program_name; // Duplicated in pluginsd
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
@@ -85,15 +127,22 @@ struct receiver_state {
time_t last_msg_t;
char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
int read_len;
+ unsigned int shutdown:1; // Tell the thread to exit
+ unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
- unsigned int shutdown:1; // Tell the thread to exit
- unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression;
+ struct decompressor_state *decompressor;
+#endif
};
extern unsigned int default_rrdpush_enabled;
+#ifdef ENABLE_COMPRESSION
+extern unsigned int default_compression_enabled;
+#endif
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@@ -116,4 +165,10 @@ extern void rrdpush_sender_thread_stop(RRDHOST *host);
extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
+#ifdef ENABLE_COMPRESSION
+struct compressor_state *create_compressor();
+struct decompressor_state *create_decompressor();
+size_t is_compressed_data(const char *data, size_t data_size);
+#endif
+
#endif //NETDATA_RRDPUSH_H