diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2024-03-25 12:30:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-25 12:30:31 +0200 |
commit | f1c26d0e2bf2f09480b48a5e9f9dc7abea88a5da (patch) | |
tree | 544c8371c3e5ac9a18d440f88916f7842cde7060 /src | |
parent | f1b10158556d3b449807f0929b1421f63e18d391 (diff) |
DBENGINE: support ZSTD compression (#17244)
* extract dbengine compression to separate files
* added ZSTD support in dbengine
* automatically select best compression
* handle decompression errors
* eliminate fatals from compression algorithms; fallback to uncompressed pages if compression fails or generates bigger data
* have the unit test generate many data files
Diffstat (limited to 'src')
-rw-r--r-- | src/database/engine/dbengine-compression.c | 163 | ||||
-rw-r--r-- | src/database/engine/dbengine-compression.h | 15 | ||||
-rw-r--r-- | src/database/engine/pdc.c | 20 | ||||
-rw-r--r-- | src/database/engine/rrddiskprotocol.h | 1 | ||||
-rw-r--r-- | src/database/engine/rrdengine.c | 60 | ||||
-rwxr-xr-x | src/database/engine/rrdengineapi.c | 3 |
6 files changed, 221 insertions, 41 deletions
diff --git a/src/database/engine/dbengine-compression.c b/src/database/engine/dbengine-compression.c new file mode 100644 index 0000000000..7a29c0cc75 --- /dev/null +++ b/src/database/engine/dbengine-compression.c @@ -0,0 +1,163 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdengine.h" +#include "dbengine-compression.h" + +#ifdef ENABLE_LZ4 +#include <lz4.h> +#endif + +#ifdef ENABLE_ZSTD +#include <zstd.h> +#define DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL 3 +#endif + +uint8_t dbengine_default_compression(void) { + +#ifdef ENABLE_ZSTD + return RRDENG_COMPRESSION_ZSTD; +#endif + +#ifdef ENABLE_LZ4 + return RRDENG_COMPRESSION_LZ4; +#endif + + return RRDENG_COMPRESSION_NONE; +} + +bool dbengine_valid_compression_algorithm(uint8_t algorithm) { + switch(algorithm) { + case RRDENG_COMPRESSION_NONE: + +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: +#endif + + return true; + + default: + return false; + } +} + +size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm) { + switch(algorithm) { +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: + fatal_assert(uncompressed_size < LZ4_MAX_INPUT_SIZE); + return LZ4_compressBound((int)uncompressed_size); +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: + return ZSTD_compressBound(uncompressed_size); +#endif + + case RRDENG_COMPRESSION_NONE: + return uncompressed_size; + + default: + fatal("DBENGINE: unknown compression algorithm %u", algorithm); + } +} + +size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm) { + // the result should be stored in the payload + // the caller must have called dbengine_max_compressed_size() to make sure the + // payload is big enough to fit the max size needed. + + switch(algorithm) { +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: { + size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm); + struct extent_buffer *eb = extent_buffer_get(max_compressed_size); + void *compressed_buf = eb->data; + + size_t compressed_size = + LZ4_compress_default(payload, compressed_buf, (int)uncompressed_size, (int)max_compressed_size); + + if(compressed_size > 0 && compressed_size < uncompressed_size) + memcpy(payload, compressed_buf, compressed_size); + else + compressed_size = 0; + + extent_buffer_release(eb); + return compressed_size; + } +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: { + size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm); + struct extent_buffer *eb = extent_buffer_get(max_compressed_size); + void *compressed_buf = eb->data; + + size_t compressed_size = ZSTD_compress(compressed_buf, max_compressed_size, payload, uncompressed_size, + DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL); + + if (ZSTD_isError(compressed_size)) { + internal_fatal(true, "DBENGINE: ZSTD compression error %s", ZSTD_getErrorName(compressed_size)); + compressed_size = 0; + } + + if(compressed_size > 0 && compressed_size < uncompressed_size) + memcpy(payload, compressed_buf, compressed_size); + else + compressed_size = 0; + + extent_buffer_release(eb); + return compressed_size; + } +#endif + + case RRDENG_COMPRESSION_NONE: + return 0; + + default: + fatal("DBENGINE: unknown compression algorithm %u", algorithm); + } +} + +size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm) { + switch(algorithm) { + +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: { + int rc = LZ4_decompress_safe(src, dst, (int)src_size, (int)dst_size); + if(rc < 0) { + nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %d", rc); + rc = 0; + } + + return rc; + } +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: { + size_t decompressed_size = ZSTD_decompress(dst, dst_size, src, src_size); + + if (ZSTD_isError(decompressed_size)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %s", + ZSTD_getErrorName(decompressed_size)); + + decompressed_size = 0; + } + + return decompressed_size; + } +#endif + + case RRDENG_COMPRESSION_NONE: + internal_fatal(true, "DBENGINE: %s() should not be called for uncompressed pages", __FUNCTION__ ); + return 0; + + default: + internal_fatal(true, "DBENGINE: unknown compression algorithm %u", algorithm); + return 0; + } +} diff --git a/src/database/engine/dbengine-compression.h b/src/database/engine/dbengine-compression.h new file mode 100644 index 0000000000..8dd97f5d76 --- /dev/null +++ b/src/database/engine/dbengine-compression.h @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DBENGINE_COMPRESSION_H +#define NETDATA_DBENGINE_COMPRESSION_H + +uint8_t dbengine_default_compression(void); + +bool dbengine_valid_compression_algorithm(uint8_t algorithm); + +size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm); +size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm); + +size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm); + +#endif //NETDATA_DBENGINE_COMPRESSION_H diff --git a/src/database/engine/pdc.c b/src/database/engine/pdc.c index 042765606c..79a424b773 100644 --- a/src/database/engine/pdc.c +++ b/src/database/engine/pdc.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #define NETDATA_RRD_INTERNALS #include "pdc.h" +#include "dbengine-compression.h" struct extent_page_details_list { uv_file file; @@ -940,7 +941,6 @@ static bool epdl_populate_pages_from_extent_data( PDC_PAGE_STATUS tags, bool cached_extent) { - int ret; unsigned i, count; void *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0; @@ -975,7 +975,7 @@ static bool epdl_populate_pages_from_extent_data( if( !can_use_data || count < 1 || count > MAX_PAGES_PER_EXTENT || - (header->compression_algorithm != RRDENG_COMPRESSION_NONE && header->compression_algorithm != RRDENG_COMPRESSION_LZ4) || + !dbengine_valid_compression_algorithm(header->compression_algorithm) || (payload_length != trailer_offset - payload_offset) || (data_length != payload_offset + payload_length + sizeof(*trailer)) ) { @@ -985,8 +985,7 @@ static bool epdl_populate_pages_from_extent_data( crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer)); - ret = crc32cmp(trailer->checksum, crc); - if (unlikely(ret)) { + if (unlikely(crc32cmp(trailer->checksum, crc))) { ctx_io_error(ctx); have_read_error = true; epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED"); @@ -1018,11 +1017,16 @@ static bool epdl_populate_pages_from_extent_data( eb = extent_buffer_get(uncompressed_payload_length); uncompressed_buf = eb->data; - ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf, - (int) payload_length, (int) uncompressed_payload_length); + size_t bytes = dbengine_decompress(uncompressed_buf, data + payload_offset, + uncompressed_payload_length, payload_length, + header->compression_algorithm); - __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED); - __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED); + if(!bytes) + have_read_error = true; + else { + __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_decompress_bytes, bytes, __ATOMIC_RELAXED); + } } } diff --git a/src/database/engine/rrddiskprotocol.h b/src/database/engine/rrddiskprotocol.h index 1529e23298..dc1a4c9801 100644 --- a/src/database/engine/rrddiskprotocol.h +++ b/src/database/engine/rrddiskprotocol.h @@ -21,6 +21,7 @@ #define RRDENG_COMPRESSION_NONE (0) #define RRDENG_COMPRESSION_LZ4 (1) +#define RRDENG_COMPRESSION_ZSTD (2) #define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t))) diff --git a/src/database/engine/rrdengine.c b/src/database/engine/rrdengine.c index e7bb1c56b7..7b21374366 100644 --- a/src/database/engine/rrdengine.c +++ b/src/database/engine/rrdengine.c @@ -3,6 +3,7 @@ #include "rrdengine.h" #include "pdc.h" +#include "dbengine-compression.h" rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; @@ -772,13 +773,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ */ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) { int ret; - int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; - uint32_t uncompressed_payload_length, payload_offset; + uint32_t uncompressed_payload_length, max_compressed_size, payload_offset; struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; struct extent_io_descriptor *xt_io_descr; - struct extent_buffer *eb = NULL; - void *compressed_buf = NULL; Word_t Index; uint8_t compression_algorithm = ctx->config.global_compress_alg; struct rrdengine_datafile *datafile; @@ -807,20 +805,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta xt_io_descr = extent_io_descriptor_get(); xt_io_descr->ctx = ctx; payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); - switch (compression_algorithm) { - case RRDENG_COMPRESSION_NONE: - size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); - break; - - default: /* Compress */ - fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); - max_compressed_size = LZ4_compressBound(uncompressed_payload_length); - eb = extent_buffer_get(max_compressed_size); - compressed_buf = eb->data; - size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); - break; - } - + max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm); + size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer); ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("DBENGINE: posix_memalign:%s", strerror(ret)); @@ -832,7 +818,6 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos = 0; header = xt_io_descr->buf; - header->compression_algorithm = compression_algorithm; header->number_of_pages = count; pos += sizeof(*header); @@ -858,29 +843,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos += sizeof(header->descr[i]); } + + // build the extent payload for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length); pos += descr->page_length; } - if(likely(compression_algorithm == RRDENG_COMPRESSION_LZ4)) { - compressed_size = LZ4_compress_default( - xt_io_descr->buf + payload_offset, - compressed_buf, - (int)uncompressed_payload_length, - max_compressed_size); + // compress the payload + size_t compressed_size = + (int)dbengine_compress(xt_io_descr->buf + payload_offset, + uncompressed_payload_length, + compression_algorithm); - __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); - __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); + internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed"); + internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent"); - (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - extent_buffer_release(eb); - size_bytes = payload_offset + compressed_size + sizeof(*trailer); + if(compressed_size) { + header->compression_algorithm = compression_algorithm; header->payload_length = compressed_size; } - else { // RRD_NO_COMPRESSION - header->payload_length = uncompressed_payload_length; + else { + // compression failed, or generated bigger pages + // so it didn't touch our uncompressed buffer + header->compression_algorithm = RRDENG_COMPRESSION_NONE; + header->payload_length = compressed_size = uncompressed_payload_length; + } + + // set the correct size + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + + if(compression_algorithm != RRDENG_COMPRESSION_NONE) { + __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); } real_io_size = ALIGN_BYTES_CEILING(size_bytes); diff --git a/src/database/engine/rrdengineapi.c b/src/database/engine/rrdengineapi.c index 8dbd71b752..43fed492b5 100755 --- a/src/database/engine/rrdengineapi.c +++ b/src/database/engine/rrdengineapi.c @@ -2,6 +2,7 @@ #include "database/engine/rrddiskprotocol.h" #include "rrdengine.h" +#include "dbengine-compression.h" /* Default global database instance */ struct rrdengine_instance multidb_ctx_storage_tier0; @@ -1170,7 +1171,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path, ctx->config.tier = (int)tier; ctx->config.page_type = tier_page_type[tier]; - ctx->config.global_compress_alg = RRDENG_COMPRESSION_LZ4; + ctx->config.global_compress_alg = dbengine_default_compression(); if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB) disk_space_mb = RRDENG_MIN_DISK_SPACE_MB; ctx->config.max_disk_space = disk_space_mb * 1048576LLU; |