diff options
Diffstat (limited to 'streaming/compression.c')
-rw-r--r-- | streaming/compression.c | 71 |
1 files changed, 44 insertions, 27 deletions
diff --git a/streaming/compression.c b/streaming/compression.c index d6178d6c34..302b0b1809 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -5,9 +5,6 @@ #define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION" -#define LZ4_MAX_MSG_SIZE 0x4000 -#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE) - #define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) #define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) #define SIGNATURE_SIZE 4 @@ -18,8 +15,9 @@ */ struct compressor_data { LZ4_stream_t *stream; - char *stream_buffer; - size_t stream_buffer_pos; + char *input_ring_buffer; + size_t input_ring_buffer_size; + size_t input_ring_buffer_pos; }; @@ -33,7 +31,7 @@ static void lz4_compressor_reset(struct compressor_state *state) LZ4_resetStream_fast(state->data->stream); info("%s: Compressor Reset", STREAM_COMPRESSION_MSG); } - state->data->stream_buffer_pos = 0; + state->data->input_ring_buffer_pos = 0; } } @@ -47,10 +45,10 @@ static void lz4_compressor_destroy(struct compressor_state **state) if (s->data) { if (s->data->stream) LZ4_freeStream(s->data->stream); - freez(s->data->stream_buffer); + freez(s->data->input_ring_buffer); freez(s->data); } - freez(s->buffer); + freez(s->compression_result_buffer); freez(s); *state = NULL; debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG); @@ -65,37 +63,53 @@ static void lz4_compressor_destroy(struct compressor_state **state) */ static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out) { - if (!state || !size || !out) + if(unlikely(!state || !size || !out)) return 0; - if (size > LZ4_MAX_MSG_SIZE) { + + if(unlikely(size > LZ4_MAX_MSG_SIZE)) { error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE); return 0; } + size_t max_dst_size = LZ4_COMPRESSBOUND(size); size_t data_size = max_dst_size + SIGNATURE_SIZE; - if (!state->buffer) { - state->buffer = mallocz(data_size); - state->buffer_size = data_size; - } else if (state->buffer_size < data_size) { - state->buffer = reallocz(state->buffer, data_size); - state->buffer_size = data_size; + if (!state->compression_result_buffer) { + state->compression_result_buffer = mallocz(data_size); + state->compression_result_buffer_size = data_size; + } + else if(unlikely(state->compression_result_buffer_size < data_size)) { + state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size); + state->compression_result_buffer_size = data_size; } - memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size); - long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream, - state->data->stream_buffer + state->data->stream_buffer_pos, - state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1); + // the ring buffer always has space for LZ4_MAX_MSG_SIZE + memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size); + + // this call needs the last 64K of our previous data + // they are available in the ring buffer + long int compressed_data_size = LZ4_compress_fast_continue( + state->data->stream, + state->data->input_ring_buffer + state->data->input_ring_buffer_pos, + state->compression_result_buffer + SIGNATURE_SIZE, + size, + max_dst_size, + 1); + if (compressed_data_size < 0) { error("Data compression error: %ld", compressed_data_size); return 0; } - state->data->stream_buffer_pos += size; - if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE) - state->data->stream_buffer_pos = 0; + + // update the next writing position of the ring buffer + state->data->input_ring_buffer_pos += size; + if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - LZ4_MAX_MSG_SIZE)) + state->data->input_ring_buffer_pos = 0; + + // update the signature header uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; - *(uint32_t *)state->buffer = len | SIGNATURE; - *out = state->buffer; + *(uint32_t *)state->compression_result_buffer = len | SIGNATURE; + *out = state->compression_result_buffer; debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size); return compressed_data_size + SIGNATURE_SIZE; } @@ -114,8 +128,9 @@ struct compressor_state *create_compressor() state->data = callocz(1, sizeof(struct compressor_data)); state->data->stream = LZ4_createStream(); - state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE)); - state->buffer_size = LZ4_STREAM_BUFFER_SIZE; + state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE * 2); + state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size); + state->compression_result_buffer_size = 0; state->reset(state); debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG); return state; @@ -281,6 +296,8 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state) size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed); size_t avg_size = state->total_uncompressed / state->packet_count; + (void)saving; + if (old_avg_saving != avg_saving || old_avg_size != avg_size){ debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size); } |