diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-17 19:35:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-17 19:35:14 +0200 |
commit | 7279dd092c23fbafcd7edb8ef7f3f79e1a0e5ecb (patch) | |
tree | 730e1cc845613b1751ad79f5d8ba6bdce1ad9491 /database/engine/journalfile.c | |
parent | 6be264d62788b1b50109dc1f2a0cb6f622cfb804 (diff) |
DBENGINE v2 - improvements part 3 (#14269)
* reduce journal v2 shared memory using madvise() - not integrated yet
* working attempt to minimize dbengine shared memory
* never call willneed - let the kernel decide which parts of each file are really needed
* journal files get MADV_RANDOM
* dont call MADV_DONTNEED too frequently
* madvise() is always called with the journal unlocked but referenced
* call madvise() even less frequently
* added chart for monitoring database events
* turn batch mode on under critical conditions
* max size to evict is 1/4 of the max
* fix max size to evict calculation
* use dbengine_page/extent_alloc/free to pages and extents allocations, tracking also the size of these allocations at free time
* fix calculation for batch evictions
* allow main and open cache to have as many evictors as needed
* control inline evictors for each cache; report different levels of cache pressure on every cache evaluation
* more inline evictors for extent cache
* bypass max inline evictors above critical level
* current cache usage has to be taken
* re-arrange items in journafile
* updated docs - work in progress
* more docs work
* more docs work
* Map / unmap journal file
* draw.io diagram for dbengine operations
* updated dbengine diagram
* updated docs
* journal files v2 now get mapped and unmapped as needed
* unmap journal v2 immediately when getting retention
* mmap and munmap do not block queries evaluating journal files v2
* have only one unmap function
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r-- | database/engine/journalfile.c | 409 |
1 files changed, 313 insertions, 96 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 0ba0b9b440..5560bd21ea 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -52,7 +52,7 @@ static void update_metric_retention_and_granularity_by_uuid( mrg_metric_release(main_mrg, metric); } -static void flush_transaction_buffer_cb(uv_fs_t* req) +static void wal_flush_transaction_buffer_cb(uv_fs_t* req) { worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB); @@ -99,7 +99,7 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size); ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1, - journalfile->pos, flush_transaction_buffer_cb); + journalfile->pos, wal_flush_transaction_buffer_cb); fatal_assert(-1 != ret); journalfile->pos += wal->buf_size; ctx->disk_space += wal->buf_size; @@ -107,26 +107,256 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi ++ctx->stats.io_write_requests; } -void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); } -void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +void journalfile_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); } -void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) { + struct journal_v2_header *j2_header = NULL; + + netdata_spinlock_lock(&journalfile->mmap.spinlock); + + if(!journalfile->mmap.data) { + journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0); + if (journalfile->mmap.data == MAP_FAILED) { + internal_fatal(true, "DBENGINE: failed to re-mmap() journal file v2"); + close(journalfile->mmap.fd); + journalfile->mmap.fd = -1; + journalfile->mmap.data = NULL; + journalfile->mmap.size = 0; + + netdata_spinlock_lock(&journalfile->v2.spinlock); + journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED); + netdata_spinlock_unlock(&journalfile->v2.spinlock); + + ++journalfile->datafile->ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + else { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED); + + madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size); + madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size); + madvise_random(journalfile->mmap.data, journalfile->mmap.size); + madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); + + netdata_spinlock_lock(&journalfile->v2.spinlock); + journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; + netdata_spinlock_unlock(&journalfile->v2.spinlock); + } + } + + if(journalfile->mmap.data) { + j2_header = journalfile->mmap.data; + + if (data_size) + *data_size = journalfile->mmap.size; + } + + netdata_spinlock_unlock(&journalfile->mmap.spinlock); + + return j2_header; +} + +static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *journalfile, bool have_locks) { + bool unmounted = false; + + if(!have_locks) { + netdata_spinlock_lock(&journalfile->mmap.spinlock); + netdata_spinlock_lock(&journalfile->v2.spinlock); + } + + if(!journalfile->v2.refcount && journalfile->mmap.data) { + if (munmap(journalfile->mmap.data, journalfile->mmap.size)) { + char path[RRDENG_PATH_MAX]; + journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path)); + error("DBENGINE: failed to unmap index file '%s'", path); + internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path); + ++journalfile->datafile->ctx->stats.fs_errors; + rrd_stat_atomic_add(&global_fs_errors, 1); + } + else { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED); + journalfile->mmap.data = NULL; + journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_MOUNTED; + } + + unmounted = true; + } + + if(!have_locks) { + netdata_spinlock_unlock(&journalfile->v2.spinlock); + netdata_spinlock_unlock(&journalfile->mmap.spinlock); + } + + return unmounted; +} + +struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) { + netdata_spinlock_lock(&journalfile->v2.spinlock); + + bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); + bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED); + bool do_we_need_it = false; + bool unmount = false; + + if(has_data) { + if (!wanted_first_time_s || !wanted_last_time_s || + is_page_in_time_range(journalfile->v2.first_time_s, journalfile->v2.last_time_s, + wanted_first_time_s, wanted_last_time_s) == PAGE_IS_IN_RANGE) { + + journalfile->v2.refcount++; + + do_we_need_it = true; + journalfile->v2.not_needed_counter = 0; + + if (!wanted_first_time_s && !wanted_last_time_s && !is_mounted) + journalfile->v2.flags |= JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION; + else + journalfile->v2.flags &= ~JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION; + + } + else if (is_mounted) { + // this journal has data, but it does not match our query + + if (!journalfile->v2.refcount) { + // this journal has no references + + if (!journalfile->v2.not_needed_counter) + journalfile->v2.not_needed_since_s = now_monotonic_sec(); + + if ((++journalfile->v2.not_needed_counter) % 100 == 0) { + // at least 100 times it has been evaluated since last use + + if (now_monotonic_sec() - journalfile->v2.not_needed_since_s >= 120) + // 2 minutes have passed since last use + unmount = true; + } + } + } + } + netdata_spinlock_unlock(&journalfile->v2.spinlock); + + if(do_we_need_it) + return journalfile_v2_mounted_data_get(journalfile, data_size); + + else if(unmount) + journalfile_v2_mounted_data_unmount(journalfile, false); + + return NULL; +} + +void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { + netdata_spinlock_lock(&journalfile->v2.spinlock); + + internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data"); + internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile"); + + bool unmount = false; + + journalfile->v2.refcount--; + + if(journalfile->v2.refcount == 0) { + journalfile->v2.not_needed_counter = 0; + + if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) + unmount = true; + } + netdata_spinlock_unlock(&journalfile->v2.spinlock); + + if(unmount) + journalfile_v2_mounted_data_unmount(journalfile, false); +} + +bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) { + + netdata_spinlock_lock(&journalfile->v2.spinlock); + bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); + netdata_spinlock_unlock(&journalfile->v2.spinlock); + + return has_data; +} + +size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) { + + netdata_spinlock_lock(&journalfile->mmap.spinlock); + size_t data_size = journalfile->mmap.size; + netdata_spinlock_unlock(&journalfile->mmap.spinlock); + + return data_size; +} + +void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) { + netdata_spinlock_lock(&journalfile->mmap.spinlock); + netdata_spinlock_lock(&journalfile->v2.spinlock); + + internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd"); + internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data"); + internal_fatal(journalfile->v2.refcount, "DBENGINE JOURNALFILE: trying to re-set journal_data of referenced journalfile"); + + journalfile->mmap.fd = fd; + journalfile->mmap.data = journal_data; + journalfile->mmap.size = journal_data_size; + journalfile->v2.not_needed_since_s = now_monotonic_sec(); + journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; + + struct journal_v2_header *j2_header = journalfile->mmap.data; + journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC); + journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC); + + journalfile_v2_mounted_data_unmount(journalfile, true); + + netdata_spinlock_unlock(&journalfile->v2.spinlock); + netdata_spinlock_unlock(&journalfile->mmap.spinlock); +} + +static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) { + bool has_references = false; + + do { + if (has_references) + sleep_usec(10 * USEC_PER_MS); + + netdata_spinlock_lock(&journalfile->mmap.spinlock); + netdata_spinlock_lock(&journalfile->v2.spinlock); + + if(journalfile_v2_mounted_data_unmount(journalfile, true)) { + close(journalfile->mmap.fd); + journalfile->mmap.fd = -1; + journalfile->mmap.data = NULL; + journalfile->mmap.size = 0; + journalfile->v2.first_time_s = 0; + journalfile->v2.last_time_s = 0; + journalfile->v2.flags = 0; + } + else { + has_references = true; + internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap..."); + } + + netdata_spinlock_unlock(&journalfile->v2.spinlock); + netdata_spinlock_unlock(&journalfile->mmap.spinlock); + + } while(has_references); +} + +struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile) { - journalfile->file = (uv_file)0; - journalfile->pos = 0; + struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile)); journalfile->datafile = datafile; - SET_JOURNAL_DATA(journalfile, 0); - SET_JOURNAL_DATA_SIZE(journalfile, 0); - journalfile->data = NULL; + netdata_spinlock_init(&journalfile->mmap.spinlock); + netdata_spinlock_init(&journalfile->v2.spinlock); + journalfile->mmap.fd = -1; + datafile->journalfile = journalfile; + return journalfile; } static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) @@ -137,7 +367,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) uv_fs_t req; ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { - generate_journalfilepath(datafile, path, sizeof(path)); + journalfile_generate_path(datafile, path, sizeof(path)); error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ++datafile->ctx->stats.fs_errors; rrd_stat_atomic_add(&global_fs_errors, 1); @@ -146,30 +376,17 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) return ret; } -int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { - struct rrdengine_instance *ctx = datafile->ctx; - char path[RRDENG_PATH_MAX]; - - void *journal_data = GET_JOURNAL_DATA(journalfile); - size_t journal_data_size = GET_JOURNAL_DATA_SIZE(journalfile); - - if (likely(journal_data)) { - if (munmap(journal_data, journal_data_size)) { - generate_journalfilepath_v2(datafile, path, sizeof(path)); - error("DBENGINE: failed to unmap journal index file for %s", path); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); - } - SET_JOURNAL_DATA(journalfile, 0); - SET_JOURNAL_DATA_SIZE(journalfile, 0); + if(journalfile_v2_data_available(journalfile)) { + journalfile_v2_data_unmap_permanently(journalfile); return 0; } return close_uv_file(datafile, journalfile->file); } -int unlink_journal_file(struct rrdengine_journalfile *journalfile) +int journalfile_unlink(struct rrdengine_journalfile *journalfile) { struct rrdengine_datafile *datafile = journalfile->datafile; struct rrdengine_instance *ctx = datafile->ctx; @@ -177,7 +394,7 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile) int ret; char path[RRDENG_PATH_MAX]; - generate_journalfilepath(datafile, path, sizeof(path)); + journalfile_generate_path(datafile, path, sizeof(path)); ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { @@ -192,7 +409,7 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile) return ret; } -int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; @@ -200,8 +417,8 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc char path[RRDENG_PATH_MAX]; char path_v2[RRDENG_PATH_MAX]; - generate_journalfilepath(datafile, path, sizeof(path)); - generate_journalfilepath_v2(datafile, path_v2, sizeof(path)); + journalfile_generate_path(datafile, path, sizeof(path)); + journalfile_v2_generate_path(datafile, path_v2, sizeof(path)); if (journalfile->file) { ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); @@ -234,19 +451,13 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc ++ctx->stats.journalfile_deletions; ++ctx->stats.journalfile_deletions; - void *journal_data = GET_JOURNAL_DATA(journalfile); - size_t journal_data_size = GET_JOURNAL_DATA_SIZE(journalfile); - - if (journal_data) { - if (munmap(journal_data, journal_data_size)) { - error("DBENGINE: failed to unmap index file %s", path_v2); - } - } + if(journalfile_v2_data_available(journalfile)) + journalfile_v2_data_unmap_permanently(journalfile); return ret; } -int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; @@ -256,7 +467,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng uv_buf_t iov; char path[RRDENG_PATH_MAX]; - generate_journalfilepath(datafile, path, sizeof(path)); + journalfile_generate_path(datafile, path, sizeof(path)); fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); if (fd < 0) { ++ctx->stats.fs_errors; @@ -286,7 +497,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng uv_fs_req_cleanup(&req); posix_memfree(superblock); if (ret < 0) { - destroy_journal_file_unsafe(journalfile, datafile); + journalfile_destroy_unsafe(journalfile, datafile); return ret; } @@ -297,7 +508,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng return 0; } -static int check_journal_file_superblock(uv_file file) +static int journalfile_check_superblock(uv_file file) { int ret; struct rrdeng_jf_sb *superblock; @@ -331,7 +542,7 @@ static int check_journal_file_superblock(uv_file file) return ret; } -static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size) +static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size) { static BITMAP256 page_error_map; unsigned i, count, payload_length, descr_size; @@ -407,8 +618,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden * Sets id to the current transaction id or to 0 if unknown. * Returns size of transaction record or 0 for unknown size. */ -static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, - void *buf, uint64_t *id, unsigned max_size) +static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + void *buf, uint64_t *id, unsigned max_size) { unsigned payload_length, size_bytes; int ret; @@ -446,7 +657,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng switch (jf_header->type) { case STORE_DATA: debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); - restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); + journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); break; default: error("DBENGINE: unknown transaction type, skipping record."); @@ -463,7 +674,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng * Page cache must already be initialized. * Returns the maximum transaction id it discovered. */ -static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) +static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) { uv_file file; uint64_t file_size;//, data_file_size; @@ -507,7 +718,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde unsigned max_size; max_size = pos + size_bytes - pos_i; - ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size); + ret = journalfile_replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size); if (!ret) /* TODO: support transactions bigger than 4K */ /* unknown transaction size, move on to the next block */ pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE); @@ -525,7 +736,7 @@ skip_file: } // Checks that the extent list checksum is valid -static int check_journal_v2_extent_list (void *data_start, size_t file_size) +static int journalfile_check_v2_extent_list (void *data_start, size_t file_size) { UNUSED(file_size); uLong crc; @@ -545,7 +756,7 @@ static int check_journal_v2_extent_list (void *data_start, size_t file_size) } // Checks that the metric list (UUIDs) checksum is valid -static int check_journal_v2_metric_list(void *data_start, size_t file_size) +static int journalfile_check_v2_metric_list(void *data_start, size_t file_size) { UNUSED(file_size); uLong crc; @@ -570,7 +781,7 @@ static int check_journal_v2_metric_list(void *data_start, size_t file_size) // 2 Force rebuild // 3 skip -static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size) +static int journalfile_v2_validate(void *data_start, size_t file_size, uint32_t original_size) { int rc; uLong crc; @@ -605,10 +816,10 @@ static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t or return 1; } - rc = check_journal_v2_extent_list(data_start, file_size); + rc = journalfile_check_v2_extent_list(data_start, file_size); if (rc) return 1; - rc = check_journal_v2_metric_list(data_start, file_size); + rc = journalfile_check_v2_metric_list(data_start, file_size); if (rc) return 1; if (!db_engine_journal_check) @@ -667,7 +878,7 @@ static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t or return 0; } -int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) +int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) { int ret, fd; uint64_t file_size; @@ -675,12 +886,12 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa struct stat statbuf; uint32_t original_file_size = 0; - generate_journalfilepath(datafile, path, sizeof(path)); + journalfile_generate_path(datafile, path, sizeof(path)); ret = stat(path, &statbuf); if (!ret) original_file_size = (uint32_t)statbuf.st_size; - generate_journalfilepath_v2(datafile, path, sizeof(path)); + journalfile_v2_generate_path(datafile, path, sizeof(path)); fd = open(path, O_RDONLY); if (fd < 0) { @@ -713,10 +924,9 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa close(fd); return 1; } - close(fd); info("DBENGINE: checking integrity of '%s'", path); - int rc = check_journal_v2_file(data_start, file_size, original_file_size); + int rc = journalfile_v2_validate(data_start, file_size, original_file_size); if (unlikely(rc)) { if (rc == 2) error_report("File %s needs to be rebuilt", path); @@ -728,6 +938,7 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa if (unlikely(munmap(data_start, file_size))) error("DBENGINE: failed to unmap '%s'", path); + close(fd); return rc; } @@ -738,6 +949,7 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa if (unlikely(munmap(data_start, file_size))) error("DBENGINE: failed to unmap '%s'", path); + close(fd); return 1; } @@ -746,10 +958,6 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset); - // Initialize the journal file to be able to access the data - SET_JOURNAL_DATA(journalfile, data_start); - SET_JOURNAL_DATA_SIZE(journalfile, file_size); - time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); time_t now_s = now_realtime_sec(); @@ -771,6 +979,9 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa info("DBENGINE: journal file '%s' loaded (size:%"PRIu64") with %u metrics in %d ms", path, file_size, entries, (int) ((now_realtime_usec() - start_loading) / USEC_PER_MS)); + // Initialize the journal file to be able to access the data + journalfile_v2_data_set(journalfile, fd, data_start, file_size); + // File is OK load it return 0; } @@ -779,7 +990,7 @@ struct journal_metric_list_to_sort { struct jv2_metrics_info *metric_info; }; -static int journal_metric_compare (const void *item1, const void *item2) +static int journalfile_metric_compare (const void *item1, const void *item2) { const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info; const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info; @@ -789,7 +1000,7 @@ static int journal_metric_compare (const void *item1, const void *item2) // Write list of extents for the journalfile -void *journal_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data) +void *journalfile_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data) { Pvoid_t *PValue; struct journal_extent_list *j2_extent_base = (void *) data; @@ -810,7 +1021,7 @@ void *journal_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data) return j2_extent_base + count; } -static int verify_journal_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes) +static int journalfile_verify_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes) { if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->total_file_size - sizeof(struct journal_v2_block_trailer))) return 1; @@ -818,11 +1029,11 @@ static int verify_journal_space(struct journal_v2_header *j2_header, void *data, return 0; } -void *journal_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset) +void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset) { struct journal_metric_list *metric = (void *) data; - if (verify_journal_space(j2_header, data, sizeof(*metric))) + if (journalfile_verify_space(j2_header, data, sizeof(*metric))) return NULL; uuid_copy(metric->uuid, *metric_info->uuid); @@ -834,7 +1045,7 @@ void *journal_v2_write_metric_page(struct journal_v2_header *j2_header, void *da return ++metric; } -void *journal_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset) +void *journalfile_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset) { struct journal_page_header *data_page_header = (void *) data; uLong crc; @@ -849,7 +1060,7 @@ void *journal_v2_write_data_page_header(struct journal_v2_header *j2_header __ma return ++data_page_header; } -void *journal_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header) +void *journalfile_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header) { struct journal_page_header *data_page_header = (void *) page_header; struct journal_v2_block_trailer *journal_trailer = (void *) data; @@ -861,11 +1072,11 @@ void *journal_v2_write_data_page_trailer(struct journal_v2_header *j2_header __m return ++journal_trailer; } -void *journal_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info) +void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info) { struct journal_page_list *data_page = data; - if (verify_journal_space(j2_header, data, sizeof(*data_page))) + if (journalfile_verify_space(j2_header, data, sizeof(*data_page))) return NULL; struct extent_io_data *ei = page_info->custom_data; @@ -882,7 +1093,7 @@ void *journal_v2_write_data_page(struct journal_v2_header *j2_header, void *data } // Must be recorded in metric_info->entries -void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info) +void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info) { Pvoid_t *PValue; @@ -897,7 +1108,7 @@ void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *da while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) { page_info = *PValue; // Write one descriptor and return the next data page location - data_page = journal_v2_write_data_page(j2_header, (void *)data_page, page_info); + data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info); if (NULL == data_page) break; } @@ -910,9 +1121,9 @@ void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *da // startup : if the migration is done during agent startup // this will allow us to optimize certain things -void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused, - Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos, - size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data) +void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused, + Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos, + size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data) { char path[RRDENG_PATH_MAX]; Pvoid_t *PValue; @@ -923,7 +1134,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ time_t max_time_s = 0; struct jv2_metrics_info *metric_info; - generate_journalfilepath_v2(datafile, path, sizeof(path)); + journalfile_v2_generate_path(datafile, path, sizeof(path)); info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu", path, @@ -961,7 +1172,8 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ uint32_t trailer_offset = total_file_size; total_file_size += sizeof(struct journal_v2_block_trailer); - uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false); + int fd_v2; + uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false, &fd_v2); uint8_t *data = data_start; memset(data_start, 0, extent_offset); @@ -987,7 +1199,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ struct journal_v2_block_trailer *journal_v2_trailer; - data = journal_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset); + data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset); internal_error(true, "DBENGINE: write extent list so far %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS); fatal_assert(data == data_start + extent_offset_trailer); @@ -1025,7 +1237,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ j2_header.start_time_ut = min_time_s * USEC_PER_SEC; j2_header.end_time_ut = max_time_s * USEC_PER_SEC; - qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journal_metric_compare); + qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare); internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS); uint32_t resize_file_to = total_file_size; @@ -1037,7 +1249,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ uint32_t uuid_offset = data - data_start; // Write the UUID we are processing - data = (void *) journal_v2_write_metric_page(&j2_header, data, metric_info, pages_offset); + data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset); if (unlikely(!data)) break; @@ -1049,15 +1261,17 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ // Keep the page_list_header, to be used for migration when where agent is running metric_info->page_list_header = pages_offset; // Write page header - void *metric_page = journal_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info, uuid_offset); + void *metric_page = journalfile_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info, + uuid_offset); // Start writing descr @ time - void *page_trailer = journal_v2_write_descriptors(&j2_header, metric_page, metric_info); + void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info); if (unlikely(!page_trailer)) break; // Trailer (checksum) - uint8_t *next_page_address = journal_v2_write_data_page_trailer(&j2_header, page_trailer, data_start + pages_offset); + uint8_t *next_page_address = journalfile_v2_write_data_page_trailer(&j2_header, page_trailer, + data_start + pages_offset); // Calculate start of the pages start for next descriptor pages_offset += (metric_info->number_of_pages * (sizeof(struct journal_page_list)) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer)); @@ -1094,8 +1308,8 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size); - SET_JOURNAL_DATA(journalfile, data_start); - SET_JOURNAL_DATA_SIZE(journalfile, total_file_size); + // msync(data_start, total_file_size, MS_SYNC); + journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size); internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS); ctx->disk_space += total_file_size; @@ -1127,8 +1341,8 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_ ctx->disk_space += sizeof(struct journal_v2_header); } -int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, - struct rrdengine_datafile *datafile) +int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, + struct rrdengine_datafile *datafile) { uv_fs_t req; uv_file file; @@ -1138,11 +1352,13 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi // Do not try to load the latest file (always rebuild and live migrate) if (datafile->fileno != ctx->last_fileno) { - if (!load_journal_file_v2(ctx, journalfile |