// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "rrdengine.h"
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
/* Version strings must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ);
BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ);
/* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */
/* page count must fit in 8 bits */
BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
}
void read_extent_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
int ret;
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
/* persistent structures */
struct rrdeng_df_extent_header *header;
struct rrdeng_df_extent_trailer *trailer;
uLong crc;
xt_io_descr = req->data;
if (req->result < 0) {
error("%s: uv_fs_read: %s", __func__, uv_strerror((int)req->result));
goto cleanup;
}
header = xt_io_descr->buf;
payload_length = header->payload_length;
count = header->number_of_pages;
payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer);
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
ret = crc32cmp(trailer->checksum, crc);
#ifdef NETDATA_INTERNAL_CHECKS
{
struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
}
#endif
if (unlikely(ret)) {
/* TODO: handle errors */
exit(UV_EIO);
goto cleanup;
}
if (RRD_NO_COMPRESSION != header->compression_algorithm) {
uncompressed_payload_length = 0;
for (i = 0 ; i < count ; ++i) {
uncompressed_payload_length += header->descr[i].page_length;
}
uncompressed_buf = mallocz(uncompressed_payload_length);
ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf,
payload_length, uncompressed_payload_length);
ctx->stats.before_decompress_bytes += payload_length;
ctx->stats.after_decompress_bytes += ret;
debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret);
/* care, we don't hold the descriptor mutex */
}
for (i = 0 ; i < xt_io_descr->descr_count; ++i) {
page = mallocz(RRDENG_BLOCK_SIZE);
descr = xt_io_descr->descr_array[i];
for (j = 0, page_offset = 0; j < count; ++j) {
/* care, we don't hold the descriptor mutex */
if (!uuid_compare(*(uuid_t *) header->descr[j].uuid, *descr->id) &&
header->descr[j].page_length == descr->page_length &&
header->descr[j].start_time == descr->start_time &&
header->descr[j].end_time == descr->end_time) {
break;
}
page_offset += header->descr[j].page_length;
}
/* care, we don't hold the descriptor mutex */
if (RRD_NO_COMPRESSION == header->compression_algorithm) {
(void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
} else {
(void) memcpy