summaryrefslogtreecommitdiffstats
path: root/streaming/compression.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-09-05 19:31:06 +0300
committerGitHub <noreply@github.com>2022-09-05 19:31:06 +0300
commit5e1b95cf92168c4df74586fb4430dc284806da82 (patch)
treef42077d8b02eaf316683453a7474bd1f599a833d /streaming/compression.c
parent544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff)
Deduplicate all netdata strings (#13570)
* rrdfamily * rrddim * rrdset plugin and module names * rrdset units * rrdset type * rrdset family * rrdset title * rrdset title more * rrdset context * rrdcalctemplate context and removal of context hash from rrdset * strings statistics * rrdset name * rearranged members of rrdset * eliminate rrdset name hash; rrdcalc chart converted to STRING * rrdset id, eliminated rrdset hash * rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate * rrdcalctemplate * rrdvar * eval_variable * rrddimvar and rrdsetvar * rrdhost hostname, os and tags * fix master commits * added thread cache; implemented string_dup without locks * faster thread cache * rrdset and rrddim now use dictionaries for indexing * rrdhost now uses dictionary * rrdfamily now uses DICTIONARY * rrdvar using dictionary instead of AVL * allocate the right size to rrdvar flag members * rrdhost remaining char * members to STRING * * better error handling on indexing * strings now use a read/write lock to allow parallel searches to the index * removed AVL support from dictionaries; implemented STRING with native Judy calls * string releases should be negative * only 31 bits are allowed for enum flags * proper locking on strings * string threading unittest and fixes * fix lgtm finding * fixed naming * stream chart/dimension definitions at the beginning of a streaming session * thread stack variable is undefined on thread cancel * rrdcontext garbage collect per host on startup * worker control in garbage collection * relaxed deletion of rrdmetrics * type checking on dictfe * netdata chart to monitor rrdcontext triggers * Group chart label updates * rrdcontext better handling of collected rrdsets * rrdpush incremental transmition of definitions should use as much buffer as possible * require 1MB per chart * empty the sender buffer before enabling metrics streaming * fill up to 50% of buffer * reset signaling metrics sending * use the shared variable for status * use separate host flag for enabling streaming of metrics * make sure the flag is clear * add logging for streaming * add logging for streaming on buffer overflow * circular_buffer proper sizing * removed obsolete logs * do not execute worker jobs if not necessary * better messages about compression disabling * proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped * monitor stream sender used buffer ratio * Update exporting unit tests * no need to compare label value with strcmp * streaming send workers now monitor bandwidth * workers now use strings * streaming receiver monitors incoming bandwidth * parser shift of worker ids * minor fixes * Group chart label updates * Populate context with dimensions that have data * Fix chart id * better shift of parser worker ids * fix for streaming compression * properly count received bytes * ensure LZ4 compression ring buffer does not wrap prematurely * do not stream empty charts; do not process empty instances in rrdcontext * need_to_send_chart_definition() does not need an rrdset lock any more * rrdcontext objects are collected, after data have been written to the db * better logging of RRDCONTEXT transitions * always set all variables needed by the worker utilization charts * implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes * lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock * STRING code re-organization for clarity * thread_cache improvements; double numbers precision on worker threads * STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS * rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts * Add index to speed up database context searches * Removed last_updated optimization (was also buggy after latest merge with master) Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming/compression.c')
-rw-r--r--streaming/compression.c71
1 files changed, 44 insertions, 27 deletions
diff --git a/streaming/compression.c b/streaming/compression.c
index d6178d6c34..302b0b1809 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -5,9 +5,6 @@
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
-#define LZ4_MAX_MSG_SIZE 0x4000
-#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
-
#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define SIGNATURE_SIZE 4
@@ -18,8 +15,9 @@
*/
struct compressor_data {
LZ4_stream_t *stream;
- char *stream_buffer;
- size_t stream_buffer_pos;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
};
@@ -33,7 +31,7 @@ static void lz4_compressor_reset(struct compressor_state *state)
LZ4_resetStream_fast(state->data->stream);
info("%s: Compressor Reset", STREAM_COMPRESSION_MSG);
}
- state->data->stream_buffer_pos = 0;
+ state->data->input_ring_buffer_pos = 0;
}
}
@@ -47,10 +45,10 @@ static void lz4_compressor_destroy(struct compressor_state **state)
if (s->data) {
if (s->data->stream)
LZ4_freeStream(s->data->stream);
- freez(s->data->stream_buffer);
+ freez(s->data->input_ring_buffer);
freez(s->data);
}
- freez(s->buffer);
+ freez(s->compression_result_buffer);
freez(s);
*state = NULL;
debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
@@ -65,37 +63,53 @@ static void lz4_compressor_destroy(struct compressor_state **state)
*/
static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
{
- if (!state || !size || !out)
+ if(unlikely(!state || !size || !out))
return 0;
- if (size > LZ4_MAX_MSG_SIZE) {
+
+ if(unlikely(size > LZ4_MAX_MSG_SIZE)) {
error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE);
return 0;
}
+
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
size_t data_size = max_dst_size + SIGNATURE_SIZE;
- if (!state->buffer) {
- state->buffer = mallocz(data_size);
- state->buffer_size = data_size;
- } else if (state->buffer_size < data_size) {
- state->buffer = reallocz(state->buffer, data_size);
- state->buffer_size = data_size;
+ if (!state->compression_result_buffer) {
+ state->compression_result_buffer = mallocz(data_size);
+ state->compression_result_buffer_size = data_size;
+ }
+ else if(unlikely(state->compression_result_buffer_size < data_size)) {
+ state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size);
+ state->compression_result_buffer_size = data_size;
}
- memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size);
- long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream,
- state->data->stream_buffer + state->data->stream_buffer_pos,
- state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
+ // the ring buffer always has space for LZ4_MAX_MSG_SIZE
+ memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size);
+
+ // this call needs the last 64K of our previous data
+ // they are available in the ring buffer
+ long int compressed_data_size = LZ4_compress_fast_continue(
+ state->data->stream,
+ state->data->input_ring_buffer + state->data->input_ring_buffer_pos,
+ state->compression_result_buffer + SIGNATURE_SIZE,
+ size,
+ max_dst_size,
+ 1);
+
if (compressed_data_size < 0) {
error("Data compression error: %ld", compressed_data_size);
return 0;
}
- state->data->stream_buffer_pos += size;
- if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE)
- state->data->stream_buffer_pos = 0;
+
+ // update the next writing position of the ring buffer
+ state->data->input_ring_buffer_pos += size;
+ if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - LZ4_MAX_MSG_SIZE))
+ state->data->input_ring_buffer_pos = 0;
+
+ // update the signature header
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
- *(uint32_t *)state->buffer = len | SIGNATURE;
- *out = state->buffer;
+ *(uint32_t *)state->compression_result_buffer = len | SIGNATURE;
+ *out = state->compression_result_buffer;
debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
return compressed_data_size + SIGNATURE_SIZE;
}
@@ -114,8 +128,9 @@ struct compressor_state *create_compressor()
state->data = callocz(1, sizeof(struct compressor_data));
state->data->stream = LZ4_createStream();
- state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
- state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
+ state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE * 2);
+ state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size);
+ state->compression_result_buffer_size = 0;
state->reset(state);
debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
return state;
@@ -281,6 +296,8 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
size_t avg_size = state->total_uncompressed / state->packet_count;
+ (void)saving;
+
if (old_avg_saving != avg_saving || old_avg_size != avg_size){
debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size);
}