summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorodynik <odynik.ee@gmail.com>2022-03-24 12:14:19 +0200
committerGitHub <noreply@github.com>2022-03-24 12:14:19 +0200
commit57d6c179c82a21faba20ff0d2995eee31d071935 (patch)
treebc3511643bf920b98776960097a8b5af238d28bc /streaming
parent3b586dc08c54bc80e4f18196dfd6dd069de3a5d3 (diff)
Stream compression - Deactivate compression at runtime in case of a compressor buffer overflow (#12037)
* [Stream compression] Downgrade stream version if compressor buffer overflows * [Stream compression] More user friendly compression messages * [Stream compression] Fix mutex starvation * [Stream compression] enable compression by default * Update streaming/README.md Co-authored-by: Tina Luedtke <kickoke@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/README.md26
-rw-r--r--streaming/compression.c30
-rw-r--r--streaming/rrdpush.c2
-rw-r--r--streaming/sender.c56
4 files changed, 79 insertions, 35 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 40e6260179..60589ddff5 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -357,12 +357,26 @@ Note: The `stream-compression` status can be `"enabled" | "disabled" | "N/A"`.
A compressed data packet is determined and decompressed on the fly.
#### Limitations
- This limitation will be withdrawn asap and is work-in-progress.
-
-The current implementation of streaming data compression can support only a few number of dimensions in a chart with names that cannot exceed the size of 16384 bytes. In case you experience stream connection problems or gaps in the charts please disable stream compression in the `stream.conf` file. This limitation can be seen in the error.log file with the sequence of the following messages:
-```
-Compression error - data discarded
-Message size above limit:
+This limitation will be withdrawn asap and is work-in-progress.
+
+The current implementation of streaming data compression can support only a few number of dimensions in a chart with names that cannot exceed the size of 16384 bytes. In case your instance hit this limitation, the agent will deactivate compression during runtime to avoid stream corruption. This limitation can be seen in the error.log file with the sequence of the following messages:
+```
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: connecting...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: initializing communication...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: waiting response from remote netdata...
+netdata INFO : STREAM_SENDER[child01] : STREAM_COMPRESSION: Compressor Reset
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: established communication with a parent using protocol version 5 - ready to send metrics...
+...
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION: Compression Failed - Message size 27847 above compression buffer limit: 16384 (errno 9, Bad file descriptor)
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION: Deactivating compression to avoid stream corruption
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION child01 [send to my.parent.IP]: Restarting connection without compression
+...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: connecting...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: initializing communication...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: waiting response from remote netdata...
+netdata INFO : STREAM_SENDER[child01] : Stream is uncompressed! One of the agents (my.parent.IP <-> child01) does not support compression OR compression is disabled.
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: established communication with a parent using protocol version 4 - ready to send metrics...
+netdata INFO : WEB_SERVER[static4] : STREAM child01 [send]: sending metrics...
```
#### How to enable stream compression
diff --git a/streaming/compression.c b/streaming/compression.c
index 65351ed08a..8856590c27 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -3,6 +3,8 @@
#ifdef ENABLE_COMPRESSION
#include "lz4.h"
+#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
+
#define LZ4_MAX_MSG_SIZE 0x4000
#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
@@ -29,7 +31,7 @@ static void lz4_compressor_reset(struct compressor_state *state)
if (state->data) {
if (state->data->stream) {
LZ4_resetStream_fast(state->data->stream);
- info("STREAM_COMPRESSION: Compressor resets stream fast!");
+ info("%s: Compressor Reset", STREAM_COMPRESSION_MSG);
}
state->data->stream_buffer_pos = 0;
}
@@ -50,14 +52,14 @@ static void lz4_compressor_destroy(struct compressor_state **state)
freez(s->buffer);
freez(s);
*state = NULL;
- debug(D_STREAM, "STREAM_COMPRESSION: Compressor destroyed!");
+ debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
}
}
/*
* Compress the given block of data
- * Comprecced data will remain in the internal buffer until the next invocation
- * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
+ * Compressed data will remain in the internal buffer until the next invocation
+ * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
* or 0 in case of error
*/
static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
@@ -65,7 +67,7 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
if (!state || !size || !out)
return 0;
if (size > LZ4_MAX_MSG_SIZE) {
- error("Message size above limit: %lu", size);
+ error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE);
return 0;
}
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
@@ -84,7 +86,7 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
state->data->stream_buffer + state->data->stream_buffer_pos,
state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
if (compressed_data_size < 0) {
- error("Date compression error: %ld", compressed_data_size);
+ error("Data compression error: %ld", compressed_data_size);
return 0;
}
state->data->stream_buffer_pos += size;
@@ -93,7 +95,7 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
*(uint32_t *)state->buffer = len | SIGNATURE;
*out = state->buffer;
- debug(D_STREAM, "STREAM: Compressed data header: %ld", compressed_data_size);
+ debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
return compressed_data_size + SIGNATURE_SIZE;
}
@@ -114,7 +116,7 @@ struct compressor_state *create_compressor()
state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
state->reset(state);
- debug(D_STREAM, "STREAM_COMPRESSION: Initialize streaming compression!");
+ debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
return state;
}
@@ -150,7 +152,7 @@ static void lz4_decompressor_destroy(struct decompressor_state **state)
if (state && *state) {
struct decompressor_state *s = *state;
if (s->data) {
- debug(D_STREAM, "STREAM_COMPRESSION: Destroying decompressor.");
+ debug(D_STREAM, "%s: Destroying decompressor.", STREAM_COMPRESSION_MSG);
if (s->data->stream)
LZ4_freeStreamDecode(s->data->stream);
freez(s->data->stream_buffer);
@@ -246,7 +248,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
if (!state)
return 0;
if (!state->buffer) {
- error("STREAM: No decompressor buffer allocated");
+ error("%s: No decompressor buffer allocated", STREAM_COMPRESSION_MSG);
return 0;
}
@@ -254,7 +256,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
state->data->stream_buffer + state->data->stream_buffer_pos,
state->buffer_len, state->data->stream_buffer_size - state->data->stream_buffer_pos);
if (decompressed_size < 0) {
- error("STREAM: Decompressor error %ld", decompressed_size);
+ error("%s: Decompressor error %ld", STREAM_COMPRESSION_MSG, decompressed_size);
return 0;
}
@@ -278,7 +280,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
size_t avg_size = state->total_uncompressed / state->packet_count;
if (old_avg_saving != avg_saving || old_avg_size != avg_size){
- debug(D_STREAM, "STREAM: Saving: %lu%% (avg. %lu%%), avg.size: %lu", saving, avg_saving, avg_size);
+ debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size);
}
return decompressed_size;
}
@@ -301,7 +303,7 @@ static size_t lz4_decompressor_get(struct decompressor_state *state, char *data,
if (!state || !size || !data)
return 0;
if (!state->out_buffer)
- fatal("STREAM: No decompressor output buffer allocated");
+ fatal("%s: No decompressor output buffer allocated", STREAM_COMPRESSION_MSG);
if (state->out_buffer_pos + size > state->out_buffer_len)
size = state->out_buffer_len - state->out_buffer_pos;
@@ -339,7 +341,7 @@ struct decompressor_state *create_decompressor()
state->data->stream_buffer = mallocz(state->data->stream_buffer_size);
fatal_assert(state->data->stream_buffer);
state->reset(state);
- debug(D_STREAM, "STREAM_COMPRESSION: Initialize streaming decompression!");
+ debug(D_STREAM, "%s: Initialize streaming decompression!", STREAM_COMPRESSION_MSG);
return state;
}
#endif
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index b688e7e5cb..04ba1e28f1 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -41,7 +41,7 @@ struct config stream_config = {
unsigned int default_rrdpush_enabled = 0;
#ifdef ENABLE_COMPRESSION
-unsigned int default_compression_enabled = 0;
+unsigned int default_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
diff --git a/streaming/sender.c b/streaming/sender.c
index 916d809a91..6d81e88c88 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -13,24 +13,43 @@ void sender_start(struct sender_state *s) {
buffer_flush(s->build);
}
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
+
+#ifdef ENABLE_COMPRESSION
+/*
+* In case of stream compression buffer oveflow
+* Inform the user through the error log file and
+* deactivate compression by downgrading the stream protocol.
+*/
+static inline void deactivate_compression(struct sender_state *s)
+{
+ error("STREAM_COMPRESSION: Deactivating compression to avoid stream corruption");
+ default_compression_enabled = 0;
+ s->rrdpush_compression = 0;
+ s->version = STREAM_VERSION_CLABELS;
+ error("STREAM_COMPRESSION %s [send to %s]: Restarting connection without compression", s->host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(s->host);
+}
+#endif
+
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s) {
char *src = (char *)buffer_tostring(s->host->sender->build);
size_t src_len = s->host->sender->build->len;
#ifdef ENABLE_COMPRESSION
- do {
- if (src && src_len) {
- if (s->compressor && s->rrdpush_compression) {
- src_len = s->compressor->compress(s->compressor, src, src_len, &src);
- if (!src_len) {
- error("Compression error - data discarded");
- break;
- }
+ if (src && src_len) {
+ if (s->compressor && s->rrdpush_compression) {
+ src_len = s->compressor->compress(s->compressor, src, src_len, &src);
+ if (!src_len) {
+ deactivate_compression(s);
+ buffer_flush(s->build);
+ netdata_mutex_unlock(&s->mutex);
+ return;
}
- if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
- s->overflow = 1;
}
- } while (0);
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->overflow = 1;
+ }
#else
if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
s->overflow = 1;
@@ -252,6 +271,13 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
}
#endif
+#ifdef ENABLE_COMPRESSION
+// Negotiate stream VERSION_CLABELS if stream compression is not supported
+s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+if(!s->rrdpush_compression)
+ s->version = STREAM_VERSION_CLABELS;
+#endif //ENABLE_COMPRESSION
+
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
*/
@@ -274,7 +300,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
"&ml_capable=%d"
"&ml_enabled=%d"
"&tags=%s"
- "&ver=%u"
+ "&ver=%d"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
@@ -316,7 +342,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, host->system_info->ml_capable
, host->system_info->ml_enabled
, (host->tags) ? host->tags : ""
- , STREAMING_PROTOCOL_CURRENT_VERSION
+ , s->version
, se.os_name
, se.os_id
, (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
@@ -410,7 +436,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
s->version = version;
#ifdef ENABLE_COMPRESSION
- s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+ s->rrdpush_compression = (s->rrdpush_compression && (s->version >= STREAM_VERSION_COMPRESSION));
if(s->rrdpush_compression)
{
// parent supports compression
@@ -420,6 +446,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
else {
//parent does not support compression or has compression disabled
debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
+ infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
s->version = STREAM_VERSION_CLABELS;
}
#endif //ENABLE_COMPRESSION
@@ -664,6 +691,7 @@ void *rrdpush_sender_thread(void *ptr) {
error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
return NULL;
}
+ s->version = STREAMING_PROTOCOL_CURRENT_VERSION;
enum {
Collector,