summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2024-03-25 12:30:31 +0200
committerAustin S. Hemmelgarn <ahferroin7@gmail.com>2024-03-27 09:30:48 -0400
commitfed636cc8e3896644ecf2fa9e3e9e73a6985a01a (patch)
treecf96ed4bc84b2714d9a400719c32d2c3ae175196
parentf7e2c07e666903db40e8099dfd09501662f22b16 (diff)
DBENGINE: support ZSTD compression. Disabled by default (#17244)
* extract dbengine compression to separate files * added ZSTD support in dbengine * automatically select best compression * handle decompression errors * eliminate fatals from compression algorithms; fallback to uncompressed pages if compression fails or generates bigger data * have the unit test generate many data files (cherry picked from commit f1c26d0e2bf2f09480b48a5e9f9dc7abea88a5da)
-rw-r--r--CMakeLists.txt2
-rw-r--r--src/database/engine/dbengine-compression.c159
-rw-r--r--src/database/engine/dbengine-compression.h15
-rw-r--r--src/database/engine/pdc.c20
-rw-r--r--src/database/engine/rrddiskprotocol.h1
-rw-r--r--src/database/engine/rrdengine.c60
-rwxr-xr-xsrc/database/engine/rrdengineapi.c3
7 files changed, 219 insertions, 41 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 43b279c2d0..4bb75e0ea6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1044,6 +1044,8 @@ if(ENABLE_DBENGINE)
src/database/engine/pdc.h
src/database/engine/dbengine-unittest.c
src/database/engine/dbengine-stresstest.c
+ src/database/engine/dbengine-compression.c
+ src/database/engine/dbengine-compression.h
)
endif()
diff --git a/src/database/engine/dbengine-compression.c b/src/database/engine/dbengine-compression.c
new file mode 100644
index 0000000000..46ef2b075f
--- /dev/null
+++ b/src/database/engine/dbengine-compression.c
@@ -0,0 +1,159 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdengine.h"
+#include "dbengine-compression.h"
+
+#ifdef ENABLE_LZ4
+#include <lz4.h>
+#endif
+
+#ifdef ENABLE_ZSTD
+#include <zstd.h>
+#define DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL 3
+#endif
+
+uint8_t dbengine_default_compression(void) {
+
+#ifdef ENABLE_LZ4
+ return RRDENG_COMPRESSION_LZ4;
+#endif
+
+ return RRDENG_COMPRESSION_NONE;
+}
+
+bool dbengine_valid_compression_algorithm(uint8_t algorithm) {
+ switch(algorithm) {
+ case RRDENG_COMPRESSION_NONE:
+
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4:
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD:
+#endif
+
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm) {
+ switch(algorithm) {
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4:
+ fatal_assert(uncompressed_size < LZ4_MAX_INPUT_SIZE);
+ return LZ4_compressBound((int)uncompressed_size);
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD:
+ return ZSTD_compressBound(uncompressed_size);
+#endif
+
+ case RRDENG_COMPRESSION_NONE:
+ return uncompressed_size;
+
+ default:
+ fatal("DBENGINE: unknown compression algorithm %u", algorithm);
+ }
+}
+
+size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm) {
+ // the result should be stored in the payload
+ // the caller must have called dbengine_max_compressed_size() to make sure the
+ // payload is big enough to fit the max size needed.
+
+ switch(algorithm) {
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4: {
+ size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm);
+ struct extent_buffer *eb = extent_buffer_get(max_compressed_size);
+ void *compressed_buf = eb->data;
+
+ size_t compressed_size =
+ LZ4_compress_default(payload, compressed_buf, (int)uncompressed_size, (int)max_compressed_size);
+
+ if(compressed_size > 0 && compressed_size < uncompressed_size)
+ memcpy(payload, compressed_buf, compressed_size);
+ else
+ compressed_size = 0;
+
+ extent_buffer_release(eb);
+ return compressed_size;
+ }
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD: {
+ size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm);
+ struct extent_buffer *eb = extent_buffer_get(max_compressed_size);
+ void *compressed_buf = eb->data;
+
+ size_t compressed_size = ZSTD_compress(compressed_buf, max_compressed_size, payload, uncompressed_size,
+ DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL);
+
+ if (ZSTD_isError(compressed_size)) {
+ internal_fatal(true, "DBENGINE: ZSTD compression error %s", ZSTD_getErrorName(compressed_size));
+ compressed_size = 0;
+ }
+
+ if(compressed_size > 0 && compressed_size < uncompressed_size)
+ memcpy(payload, compressed_buf, compressed_size);
+ else
+ compressed_size = 0;
+
+ extent_buffer_release(eb);
+ return compressed_size;
+ }
+#endif
+
+ case RRDENG_COMPRESSION_NONE:
+ return 0;
+
+ default:
+ fatal("DBENGINE: unknown compression algorithm %u", algorithm);
+ }
+}
+
+size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm) {
+ switch(algorithm) {
+
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4: {
+ int rc = LZ4_decompress_safe(src, dst, (int)src_size, (int)dst_size);
+ if(rc < 0) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %d", rc);
+ rc = 0;
+ }
+
+ return rc;
+ }
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD: {
+ size_t decompressed_size = ZSTD_decompress(dst, dst_size, src, src_size);
+
+ if (ZSTD_isError(decompressed_size)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %s",
+ ZSTD_getErrorName(decompressed_size));
+
+ decompressed_size = 0;
+ }
+
+ return decompressed_size;
+ }
+#endif
+
+ case RRDENG_COMPRESSION_NONE:
+ internal_fatal(true, "DBENGINE: %s() should not be called for uncompressed pages", __FUNCTION__ );
+ return 0;
+
+ default:
+ internal_fatal(true, "DBENGINE: unknown compression algorithm %u", algorithm);
+ return 0;
+ }
+}
diff --git a/src/database/engine/dbengine-compression.h b/src/database/engine/dbengine-compression.h
new file mode 100644
index 0000000000..8dd97f5d76
--- /dev/null
+++ b/src/database/engine/dbengine-compression.h
@@ -0,0 +1,15 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_DBENGINE_COMPRESSION_H
+#define NETDATA_DBENGINE_COMPRESSION_H
+
+uint8_t dbengine_default_compression(void);
+
+bool dbengine_valid_compression_algorithm(uint8_t algorithm);
+
+size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm);
+size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm);
+
+size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm);
+
+#endif //NETDATA_DBENGINE_COMPRESSION_H
diff --git a/src/database/engine/pdc.c b/src/database/engine/pdc.c
index 042765606c..79a424b773 100644
--- a/src/database/engine/pdc.c
+++ b/src/database/engine/pdc.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "pdc.h"
+#include "dbengine-compression.h"
struct extent_page_details_list {
uv_file file;
@@ -940,7 +941,6 @@ static bool epdl_populate_pages_from_extent_data(
PDC_PAGE_STATUS tags,
bool cached_extent)
{
- int ret;
unsigned i, count;
void *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0;
@@ -975,7 +975,7 @@ static bool epdl_populate_pages_from_extent_data(
if( !can_use_data ||
count < 1 ||
count > MAX_PAGES_PER_EXTENT ||
- (header->compression_algorithm != RRDENG_COMPRESSION_NONE && header->compression_algorithm != RRDENG_COMPRESSION_LZ4) ||
+ !dbengine_valid_compression_algorithm(header->compression_algorithm) ||
(payload_length != trailer_offset - payload_offset) ||
(data_length != payload_offset + payload_length + sizeof(*trailer))
) {
@@ -985,8 +985,7 @@ static bool epdl_populate_pages_from_extent_data(
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer));
- ret = crc32cmp(trailer->checksum, crc);
- if (unlikely(ret)) {
+ if (unlikely(crc32cmp(trailer->checksum, crc))) {
ctx_io_error(ctx);
have_read_error = true;
epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED");
@@ -1018,11 +1017,16 @@ static bool epdl_populate_pages_from_extent_data(
eb = extent_buffer_get(uncompressed_payload_length);
uncompressed_buf = eb->data;
- ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf,
- (int) payload_length, (int) uncompressed_payload_length);
+ size_t bytes = dbengine_decompress(uncompressed_buf, data + payload_offset,
+ uncompressed_payload_length, payload_length,
+ header->compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED);
+ if(!bytes)
+ have_read_error = true;
+ else {
+ __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_decompress_bytes, bytes, __ATOMIC_RELAXED);
+ }
}
}
diff --git a/src/database/engine/rrddiskprotocol.h b/src/database/engine/rrddiskprotocol.h
index 1529e23298..dc1a4c9801 100644
--- a/src/database/engine/rrddiskprotocol.h
+++ b/src/database/engine/rrddiskprotocol.h
@@ -21,6 +21,7 @@
#define RRDENG_COMPRESSION_NONE (0)
#define RRDENG_COMPRESSION_LZ4 (1)
+#define RRDENG_COMPRESSION_ZSTD (2)
#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t)))
diff --git a/src/database/engine/rrdengine.c b/src/database/engine/rrdengine.c
index e7bb1c56b7..7b21374366 100644
--- a/src/database/engine/rrdengine.c
+++ b/src/database/engine/rrdengine.c
@@ -3,6 +3,7 @@
#include "rrdengine.h"
#include "pdc.h"
+#include "dbengine-compression.h"
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
@@ -772,13 +773,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
*/
static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) {
int ret;
- int compressed_size, max_compressed_size = 0;
unsigned i, count, size_bytes, pos, real_io_size;
- uint32_t uncompressed_payload_length, payload_offset;
+ uint32_t uncompressed_payload_length, max_compressed_size, payload_offset;
struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
struct extent_io_descriptor *xt_io_descr;
- struct extent_buffer *eb = NULL;
- void *compressed_buf = NULL;
Word_t Index;
uint8_t compression_algorithm = ctx->config.global_compress_alg;
struct rrdengine_datafile *datafile;
@@ -807,20 +805,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
xt_io_descr = extent_io_descriptor_get();
xt_io_descr->ctx = ctx;
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
- switch (compression_algorithm) {
- case RRDENG_COMPRESSION_NONE:
- size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
- break;
-
- default: /* Compress */
- fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
- max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
- eb = extent_buffer_get(max_compressed_size);
- compressed_buf = eb->data;
- size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
- break;
- }
-
+ max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer);
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("DBENGINE: posix_memalign:%s", strerror(ret));
@@ -832,7 +818,6 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos = 0;
header = xt_io_descr->buf;
- header->compression_algorithm = compression_algorithm;
header->number_of_pages = count;
pos += sizeof(*header);
@@ -858,29 +843,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos += sizeof(header->descr[i]);
}
+
+ // build the extent payload
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length);
pos += descr->page_length;
}
- if(likely(compression_algorithm == RRDENG_COMPRESSION_LZ4)) {
- compressed_size = LZ4_compress_default(
- xt_io_descr->buf + payload_offset,
- compressed_buf,
- (int)uncompressed_payload_length,
- max_compressed_size);
+ // compress the payload
+ size_t compressed_size =
+ (int)dbengine_compress(xt_io_descr->buf + payload_offset,
+ uncompressed_payload_length,
+ compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
+ internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed");
+ internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent");
- (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- extent_buffer_release(eb);
- size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ if(compressed_size) {
+ header->compression_algorithm = compression_algorithm;
header->payload_length = compressed_size;
}
- else { // RRD_NO_COMPRESSION
- header->payload_length = uncompressed_payload_length;
+ else {
+ // compression failed, or generated bigger pages
+ // so it didn't touch our uncompressed buffer
+ header->compression_algorithm = RRDENG_COMPRESSION_NONE;
+ header->payload_length = compressed_size = uncompressed_payload_length;
+ }
+
+ // set the correct size
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+
+ if(compression_algorithm != RRDENG_COMPRESSION_NONE) {
+ __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
}
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
diff --git a/src/database/engine/rrdengineapi.c b/src/database/engine/rrdengineapi.c
index 8dbd71b752..43fed492b5 100755
--- a/src/database/engine/rrdengineapi.c
+++ b/src/database/engine/rrdengineapi.c
@@ -2,6 +2,7 @@
#include "database/engine/rrddiskprotocol.h"
#include "rrdengine.h"
+#include "dbengine-compression.h"
/* Default global database instance */
struct rrdengine_instance multidb_ctx_storage_tier0;
@@ -1170,7 +1171,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
ctx->config.tier = (int)tier;
ctx->config.page_type = tier_page_type[tier];
- ctx->config.global_compress_alg = RRDENG_COMPRESSION_LZ4;
+ ctx->config.global_compress_alg = dbengine_default_compression();
if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
ctx->config.max_disk_space = disk_space_mb * 1048576LLU;