summaryrefslogtreecommitdiffstats
path: root/streaming/compression.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/compression.c')
-rw-r--r--streaming/compression.c71
1 files changed, 44 insertions, 27 deletions
diff --git a/streaming/compression.c b/streaming/compression.c
index d6178d6c34..302b0b1809 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -5,9 +5,6 @@
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
-#define LZ4_MAX_MSG_SIZE 0x4000
-#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
-
#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define SIGNATURE_SIZE 4
@@ -18,8 +15,9 @@
*/
struct compressor_data {
LZ4_stream_t *stream;
- char *stream_buffer;
- size_t stream_buffer_pos;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
};
@@ -33,7 +31,7 @@ static void lz4_compressor_reset(struct compressor_state *state)
LZ4_resetStream_fast(state->data->stream);
info("%s: Compressor Reset", STREAM_COMPRESSION_MSG);
}
- state->data->stream_buffer_pos = 0;
+ state->data->input_ring_buffer_pos = 0;
}
}
@@ -47,10 +45,10 @@ static void lz4_compressor_destroy(struct compressor_state **state)
if (s->data) {
if (s->data->stream)
LZ4_freeStream(s->data->stream);
- freez(s->data->stream_buffer);
+ freez(s->data->input_ring_buffer);
freez(s->data);
}
- freez(s->buffer);
+ freez(s->compression_result_buffer);
freez(s);
*state = NULL;
debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
@@ -65,37 +63,53 @@ static void lz4_compressor_destroy(struct compressor_state **state)
*/
static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
{
- if (!state || !size || !out)
+ if(unlikely(!state || !size || !out))
return 0;
- if (size > LZ4_MAX_MSG_SIZE) {
+
+ if(unlikely(size > LZ4_MAX_MSG_SIZE)) {
error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE);
return 0;
}
+
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
size_t data_size = max_dst_size + SIGNATURE_SIZE;
- if (!state->buffer) {
- state->buffer = mallocz(data_size);
- state->buffer_size = data_size;
- } else if (state->buffer_size < data_size) {
- state->buffer = reallocz(state->buffer, data_size);
- state->buffer_size = data_size;
+ if (!state->compression_result_buffer) {
+ state->compression_result_buffer = mallocz(data_size);
+ state->compression_result_buffer_size = data_size;
+ }
+ else if(unlikely(state->compression_result_buffer_size < data_size)) {
+ state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size);
+ state->compression_result_buffer_size = data_size;
}
- memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size);
- long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream,
- state->data->stream_buffer + state->data->stream_buffer_pos,
- state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
+ // the ring buffer always has space for LZ4_MAX_MSG_SIZE
+ memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size);
+
+ // this call needs the last 64K of our previous data
+ // they are available in the ring buffer
+ long int compressed_data_size = LZ4_compress_fast_continue(
+ state->data->stream,
+ state->data->input_ring_buffer + state->data->input_ring_buffer_pos,
+ state->compression_result_buffer + SIGNATURE_SIZE,
+ size,
+ max_dst_size,
+ 1);
+
if (compressed_data_size < 0) {
error("Data compression error: %ld", compressed_data_size);
return 0;
}
- state->data->stream_buffer_pos += size;
- if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE)
- state->data->stream_buffer_pos = 0;
+
+ // update the next writing position of the ring buffer
+ state->data->input_ring_buffer_pos += size;
+ if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - LZ4_MAX_MSG_SIZE))
+ state->data->input_ring_buffer_pos = 0;
+
+ // update the signature header
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
- *(uint32_t *)state->buffer = len | SIGNATURE;
- *out = state->buffer;
+ *(uint32_t *)state->compression_result_buffer = len | SIGNATURE;
+ *out = state->compression_result_buffer;
debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
return compressed_data_size + SIGNATURE_SIZE;
}
@@ -114,8 +128,9 @@ struct compressor_state *create_compressor()
state->data = callocz(1, sizeof(struct compressor_data));
state->data->stream = LZ4_createStream();
- state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
- state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
+ state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE * 2);
+ state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size);
+ state->compression_result_buffer_size = 0;
state->reset(state);
debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
return state;
@@ -281,6 +296,8 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
size_t avg_size = state->total_uncompressed / state->packet_count;
+ (void)saving;
+
if (old_avg_saving != avg_saving || old_avg_size != avg_size){
debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size);
}