summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-17 19:35:14 +0200
committerGitHub <noreply@github.com>2023-01-17 19:35:14 +0200
commit7279dd092c23fbafcd7edb8ef7f3f79e1a0e5ecb (patch)
tree730e1cc845613b1751ad79f5d8ba6bdce1ad9491 /database/engine/journalfile.c
parent6be264d62788b1b50109dc1f2a0cb6f622cfb804 (diff)
DBENGINE v2 - improvements part 3 (#14269)
* reduce journal v2 shared memory using madvise() - not integrated yet * working attempt to minimize dbengine shared memory * never call willneed - let the kernel decide which parts of each file are really needed * journal files get MADV_RANDOM * dont call MADV_DONTNEED too frequently * madvise() is always called with the journal unlocked but referenced * call madvise() even less frequently * added chart for monitoring database events * turn batch mode on under critical conditions * max size to evict is 1/4 of the max * fix max size to evict calculation * use dbengine_page/extent_alloc/free to pages and extents allocations, tracking also the size of these allocations at free time * fix calculation for batch evictions * allow main and open cache to have as many evictors as needed * control inline evictors for each cache; report different levels of cache pressure on every cache evaluation * more inline evictors for extent cache * bypass max inline evictors above critical level * current cache usage has to be taken * re-arrange items in journafile * updated docs - work in progress * more docs work * more docs work * Map / unmap journal file * draw.io diagram for dbengine operations * updated dbengine diagram * updated docs * journal files v2 now get mapped and unmapped as needed * unmap journal v2 immediately when getting retention * mmap and munmap do not block queries evaluating journal files v2 * have only one unmap function Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c409
1 files changed, 313 insertions, 96 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 0ba0b9b440..5560bd21ea 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -52,7 +52,7 @@ static void update_metric_retention_and_granularity_by_uuid(
mrg_metric_release(main_mrg, metric);
}
-static void flush_transaction_buffer_cb(uv_fs_t* req)
+static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
{
worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
@@ -99,7 +99,7 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi
io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
- journalfile->pos, flush_transaction_buffer_cb);
+ journalfile->pos, wal_flush_transaction_buffer_cb);
fatal_assert(-1 != ret);
journalfile->pos += wal->buf_size;
ctx->disk_space += wal->buf_size;
@@ -107,26 +107,256 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi
++ctx->stats.io_write_requests;
}
-void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
-void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+void journalfile_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
-void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
+ struct journal_v2_header *j2_header = NULL;
+
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+
+ if(!journalfile->mmap.data) {
+ journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
+ if (journalfile->mmap.data == MAP_FAILED) {
+ internal_fatal(true, "DBENGINE: failed to re-mmap() journal file v2");
+ close(journalfile->mmap.fd);
+ journalfile->mmap.fd = -1;
+ journalfile->mmap.data = NULL;
+ journalfile->mmap.size = 0;
+
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+ journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ ++journalfile->datafile->ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ else {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED);
+
+ madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
+ madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
+ madvise_random(journalfile->mmap.data, journalfile->mmap.size);
+ madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
+
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+ journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ }
+ }
+
+ if(journalfile->mmap.data) {
+ j2_header = journalfile->mmap.data;
+
+ if (data_size)
+ *data_size = journalfile->mmap.size;
+ }
+
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+
+ return j2_header;
+}
+
+static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *journalfile, bool have_locks) {
+ bool unmounted = false;
+
+ if(!have_locks) {
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+ }
+
+ if(!journalfile->v2.refcount && journalfile->mmap.data) {
+ if (munmap(journalfile->mmap.data, journalfile->mmap.size)) {
+ char path[RRDENG_PATH_MAX];
+ journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
+ error("DBENGINE: failed to unmap index file '%s'", path);
+ internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
+ ++journalfile->datafile->ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ else {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED);
+ journalfile->mmap.data = NULL;
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_MOUNTED;
+ }
+
+ unmounted = true;
+ }
+
+ if(!have_locks) {
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ }
+
+ return unmounted;
+}
+
+struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
+ bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
+ bool do_we_need_it = false;
+ bool unmount = false;
+
+ if(has_data) {
+ if (!wanted_first_time_s || !wanted_last_time_s ||
+ is_page_in_time_range(journalfile->v2.first_time_s, journalfile->v2.last_time_s,
+ wanted_first_time_s, wanted_last_time_s) == PAGE_IS_IN_RANGE) {
+
+ journalfile->v2.refcount++;
+
+ do_we_need_it = true;
+ journalfile->v2.not_needed_counter = 0;
+
+ if (!wanted_first_time_s && !wanted_last_time_s && !is_mounted)
+ journalfile->v2.flags |= JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
+ else
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
+
+ }
+ else if (is_mounted) {
+ // this journal has data, but it does not match our query
+
+ if (!journalfile->v2.refcount) {
+ // this journal has no references
+
+ if (!journalfile->v2.not_needed_counter)
+ journalfile->v2.not_needed_since_s = now_monotonic_sec();
+
+ if ((++journalfile->v2.not_needed_counter) % 100 == 0) {
+ // at least 100 times it has been evaluated since last use
+
+ if (now_monotonic_sec() - journalfile->v2.not_needed_since_s >= 120)
+ // 2 minutes have passed since last use
+ unmount = true;
+ }
+ }
+ }
+ }
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ if(do_we_need_it)
+ return journalfile_v2_mounted_data_get(journalfile, data_size);
+
+ else if(unmount)
+ journalfile_v2_mounted_data_unmount(journalfile, false);
+
+ return NULL;
+}
+
+void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
+ internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
+
+ bool unmount = false;
+
+ journalfile->v2.refcount--;
+
+ if(journalfile->v2.refcount == 0) {
+ journalfile->v2.not_needed_counter = 0;
+
+ if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
+ unmount = true;
+ }
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ if(unmount)
+ journalfile_v2_mounted_data_unmount(journalfile, false);
+}
+
+bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
+
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+ bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+
+ return has_data;
+}
+
+size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
+
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ size_t data_size = journalfile->mmap.size;
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+
+ return data_size;
+}
+
+void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
+ internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
+ internal_fatal(journalfile->v2.refcount, "DBENGINE JOURNALFILE: trying to re-set journal_data of referenced journalfile");
+
+ journalfile->mmap.fd = fd;
+ journalfile->mmap.data = journal_data;
+ journalfile->mmap.size = journal_data_size;
+ journalfile->v2.not_needed_since_s = now_monotonic_sec();
+ journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
+
+ struct journal_v2_header *j2_header = journalfile->mmap.data;
+ journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
+ journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
+
+ journalfile_v2_mounted_data_unmount(journalfile, true);
+
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+}
+
+static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
+ bool has_references = false;
+
+ do {
+ if (has_references)
+ sleep_usec(10 * USEC_PER_MS);
+
+ netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ netdata_spinlock_lock(&journalfile->v2.spinlock);
+
+ if(journalfile_v2_mounted_data_unmount(journalfile, true)) {
+ close(journalfile->mmap.fd);
+ journalfile->mmap.fd = -1;
+ journalfile->mmap.data = NULL;
+ journalfile->mmap.size = 0;
+ journalfile->v2.first_time_s = 0;
+ journalfile->v2.last_time_s = 0;
+ journalfile->v2.flags = 0;
+ }
+ else {
+ has_references = true;
+ internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
+ }
+
+ netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+
+ } while(has_references);
+}
+
+struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile)
{
- journalfile->file = (uv_file)0;
- journalfile->pos = 0;
+ struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
journalfile->datafile = datafile;
- SET_JOURNAL_DATA(journalfile, 0);
- SET_JOURNAL_DATA_SIZE(journalfile, 0);
- journalfile->data = NULL;
+ netdata_spinlock_init(&journalfile->mmap.spinlock);
+ netdata_spinlock_init(&journalfile->v2.spinlock);
+ journalfile->mmap.fd = -1;
+ datafile->journalfile = journalfile;
+ return journalfile;
}
static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
@@ -137,7 +367,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
uv_fs_t req;
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- generate_journalfilepath(datafile, path, sizeof(path));
+ journalfile_generate_path(datafile, path, sizeof(path));
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
++datafile->ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
@@ -146,30 +376,17 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
return ret;
}
-int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
- struct rrdengine_instance *ctx = datafile->ctx;
- char path[RRDENG_PATH_MAX];
-
- void *journal_data = GET_JOURNAL_DATA(journalfile);
- size_t journal_data_size = GET_JOURNAL_DATA_SIZE(journalfile);
-
- if (likely(journal_data)) {
- if (munmap(journal_data, journal_data_size)) {
- generate_journalfilepath_v2(datafile, path, sizeof(path));
- error("DBENGINE: failed to unmap journal index file for %s", path);
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
- }
- SET_JOURNAL_DATA(journalfile, 0);
- SET_JOURNAL_DATA_SIZE(journalfile, 0);
+ if(journalfile_v2_data_available(journalfile)) {
+ journalfile_v2_data_unmap_permanently(journalfile);
return 0;
}
return close_uv_file(datafile, journalfile->file);
}
-int unlink_journal_file(struct rrdengine_journalfile *journalfile)
+int journalfile_unlink(struct rrdengine_journalfile *journalfile)
{
struct rrdengine_datafile *datafile = journalfile->datafile;
struct rrdengine_instance *ctx = datafile->ctx;
@@ -177,7 +394,7 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
int ret;
char path[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
+ journalfile_generate_path(datafile, path, sizeof(path));
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
@@ -192,7 +409,7 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
return ret;
}
-int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
@@ -200,8 +417,8 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
char path[RRDENG_PATH_MAX];
char path_v2[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
- generate_journalfilepath_v2(datafile, path_v2, sizeof(path));
+ journalfile_generate_path(datafile, path, sizeof(path));
+ journalfile_v2_generate_path(datafile, path_v2, sizeof(path));
if (journalfile->file) {
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
@@ -234,19 +451,13 @@ int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struc
++ctx->stats.journalfile_deletions;
++ctx->stats.journalfile_deletions;
- void *journal_data = GET_JOURNAL_DATA(journalfile);
- size_t journal_data_size = GET_JOURNAL_DATA_SIZE(journalfile);
-
- if (journal_data) {
- if (munmap(journal_data, journal_data_size)) {
- error("DBENGINE: failed to unmap index file %s", path_v2);
- }
- }
+ if(journalfile_v2_data_available(journalfile))
+ journalfile_v2_data_unmap_permanently(journalfile);
return ret;
}
-int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
@@ -256,7 +467,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
uv_buf_t iov;
char path[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
+ journalfile_generate_path(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
++ctx->stats.fs_errors;
@@ -286,7 +497,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_journal_file_unsafe(journalfile, datafile);
+ journalfile_destroy_unsafe(journalfile, datafile);
return ret;
}
@@ -297,7 +508,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
return 0;
}
-static int check_journal_file_superblock(uv_file file)
+static int journalfile_check_superblock(uv_file file)
{
int ret;
struct rrdeng_jf_sb *superblock;
@@ -331,7 +542,7 @@ static int check_journal_file_superblock(uv_file file)
return ret;
}
-static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
+static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
{
static BITMAP256 page_error_map;
unsigned i, count, payload_length, descr_size;
@@ -407,8 +618,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
* Sets id to the current transaction id or to 0 if unknown.
* Returns size of transaction record or 0 for unknown size.
*/
-static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
- void *buf, uint64_t *id, unsigned max_size)
+static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ void *buf, uint64_t *id, unsigned max_size)
{
unsigned payload_length, size_bytes;
int ret;
@@ -446,7 +657,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
switch (jf_header->type) {
case STORE_DATA:
debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
- restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
+ journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
break;
default:
error("DBENGINE: unknown transaction type, skipping record.");
@@ -463,7 +674,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
* Page cache must already be initialized.
* Returns the maximum transaction id it discovered.
*/
-static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
+static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
{
uv_file file;
uint64_t file_size;//, data_file_size;
@@ -507,7 +718,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
unsigned max_size;
max_size = pos + size_bytes - pos_i;
- ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
+ ret = journalfile_replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
if (!ret) /* TODO: support transactions bigger than 4K */
/* unknown transaction size, move on to the next block */
pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
@@ -525,7 +736,7 @@ skip_file:
}
// Checks that the extent list checksum is valid
-static int check_journal_v2_extent_list (void *data_start, size_t file_size)
+static int journalfile_check_v2_extent_list (void *data_start, size_t file_size)
{
UNUSED(file_size);
uLong crc;
@@ -545,7 +756,7 @@ static int check_journal_v2_extent_list (void *data_start, size_t file_size)
}
// Checks that the metric list (UUIDs) checksum is valid
-static int check_journal_v2_metric_list(void *data_start, size_t file_size)
+static int journalfile_check_v2_metric_list(void *data_start, size_t file_size)
{
UNUSED(file_size);
uLong crc;
@@ -570,7 +781,7 @@ static int check_journal_v2_metric_list(void *data_start, size_t file_size)
// 2 Force rebuild
// 3 skip
-static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t original_size)
+static int journalfile_v2_validate(void *data_start, size_t file_size, uint32_t original_size)
{
int rc;
uLong crc;
@@ -605,10 +816,10 @@ static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t or
return 1;
}
- rc = check_journal_v2_extent_list(data_start, file_size);
+ rc = journalfile_check_v2_extent_list(data_start, file_size);
if (rc) return 1;
- rc = check_journal_v2_metric_list(data_start, file_size);
+ rc = journalfile_check_v2_metric_list(data_start, file_size);
if (rc) return 1;
if (!db_engine_journal_check)
@@ -667,7 +878,7 @@ static int check_journal_v2_file(void *data_start, size_t file_size, uint32_t or
return 0;
}
-int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
int ret, fd;
uint64_t file_size;
@@ -675,12 +886,12 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa
struct stat statbuf;
uint32_t original_file_size = 0;
- generate_journalfilepath(datafile, path, sizeof(path));
+ journalfile_generate_path(datafile, path, sizeof(path));
ret = stat(path, &statbuf);
if (!ret)
original_file_size = (uint32_t)statbuf.st_size;
- generate_journalfilepath_v2(datafile, path, sizeof(path));
+ journalfile_v2_generate_path(datafile, path, sizeof(path));
fd = open(path, O_RDONLY);
if (fd < 0) {
@@ -713,10 +924,9 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa
close(fd);
return 1;
}
- close(fd);
info("DBENGINE: checking integrity of '%s'", path);
- int rc = check_journal_v2_file(data_start, file_size, original_file_size);
+ int rc = journalfile_v2_validate(data_start, file_size, original_file_size);
if (unlikely(rc)) {
if (rc == 2)
error_report("File %s needs to be rebuilt", path);
@@ -728,6 +938,7 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa
if (unlikely(munmap(data_start, file_size)))
error("DBENGINE: failed to unmap '%s'", path);
+ close(fd);
return rc;
}
@@ -738,6 +949,7 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa
if (unlikely(munmap(data_start, file_size)))
error("DBENGINE: failed to unmap '%s'", path);
+ close(fd);
return 1;
}
@@ -746,10 +958,6 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa
struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
- // Initialize the journal file to be able to access the data
- SET_JOURNAL_DATA(journalfile, data_start);
- SET_JOURNAL_DATA_SIZE(journalfile, file_size);
-
time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
time_t now_s = now_realtime_sec();
@@ -771,6 +979,9 @@ int load_journal_file_v2(struct rrdengine_instance *ctx, struct rrdengine_journa
info("DBENGINE: journal file '%s' loaded (size:%"PRIu64") with %u metrics in %d ms", path, file_size, entries,
(int) ((now_realtime_usec() - start_loading) / USEC_PER_MS));
+ // Initialize the journal file to be able to access the data
+ journalfile_v2_data_set(journalfile, fd, data_start, file_size);
+
// File is OK load it
return 0;
}
@@ -779,7 +990,7 @@ struct journal_metric_list_to_sort {
struct jv2_metrics_info *metric_info;
};
-static int journal_metric_compare (const void *item1, const void *item2)
+static int journalfile_metric_compare (const void *item1, const void *item2)
{
const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
@@ -789,7 +1000,7 @@ static int journal_metric_compare (const void *item1, const void *item2)
// Write list of extents for the journalfile
-void *journal_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data)
+void *journalfile_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data)
{
Pvoid_t *PValue;
struct journal_extent_list *j2_extent_base = (void *) data;
@@ -810,7 +1021,7 @@ void *journal_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data)
return j2_extent_base + count;
}
-static int verify_journal_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
+static int journalfile_verify_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
{
if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->total_file_size - sizeof(struct journal_v2_block_trailer)))
return 1;
@@ -818,11 +1029,11 @@ static int verify_journal_space(struct journal_v2_header *j2_header, void *data,
return 0;
}
-void *journal_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset)
+void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset)
{
struct journal_metric_list *metric = (void *) data;
- if (verify_journal_space(j2_header, data, sizeof(*metric)))
+ if (journalfile_verify_space(j2_header, data, sizeof(*metric)))
return NULL;
uuid_copy(metric->uuid, *metric_info->uuid);
@@ -834,7 +1045,7 @@ void *journal_v2_write_metric_page(struct journal_v2_header *j2_header, void *da
return ++metric;
}
-void *journal_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset)
+void *journalfile_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset)
{
struct journal_page_header *data_page_header = (void *) data;
uLong crc;
@@ -849,7 +1060,7 @@ void *journal_v2_write_data_page_header(struct journal_v2_header *j2_header __ma
return ++data_page_header;
}
-void *journal_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header)
+void *journalfile_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header)
{
struct journal_page_header *data_page_header = (void *) page_header;
struct journal_v2_block_trailer *journal_trailer = (void *) data;
@@ -861,11 +1072,11 @@ void *journal_v2_write_data_page_trailer(struct journal_v2_header *j2_header __m
return ++journal_trailer;
}
-void *journal_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info)
+void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info)
{
struct journal_page_list *data_page = data;
- if (verify_journal_space(j2_header, data, sizeof(*data_page)))
+ if (journalfile_verify_space(j2_header, data, sizeof(*data_page)))
return NULL;
struct extent_io_data *ei = page_info->custom_data;
@@ -882,7 +1093,7 @@ void *journal_v2_write_data_page(struct journal_v2_header *j2_header, void *data
}
// Must be recorded in metric_info->entries
-void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info)
+void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info)
{
Pvoid_t *PValue;
@@ -897,7 +1108,7 @@ void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *da
while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) {
page_info = *PValue;
// Write one descriptor and return the next data page location
- data_page = journal_v2_write_data_page(j2_header, (void *)data_page, page_info);
+ data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
if (NULL == data_page)
break;
}
@@ -910,9 +1121,9 @@ void *journal_v2_write_descriptors(struct journal_v2_header *j2_header, void *da
// startup : if the migration is done during agent startup
// this will allow us to optimize certain things
-void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused,
- Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos,
- size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data)
+void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused,
+ Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos,
+ size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data)
{
char path[RRDENG_PATH_MAX];
Pvoid_t *PValue;
@@ -923,7 +1134,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
time_t max_time_s = 0;
struct jv2_metrics_info *metric_info;
- generate_journalfilepath_v2(datafile, path, sizeof(path));
+ journalfile_v2_generate_path(datafile, path, sizeof(path));
info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
path,
@@ -961,7 +1172,8 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
uint32_t trailer_offset = total_file_size;
total_file_size += sizeof(struct journal_v2_block_trailer);
- uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false);
+ int fd_v2;
+ uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false, &fd_v2);
uint8_t *data = data_start;
memset(data_start, 0, extent_offset);
@@ -987,7 +1199,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
struct journal_v2_block_trailer *journal_v2_trailer;
- data = journal_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
+ data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
internal_error(true, "DBENGINE: write extent list so far %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
fatal_assert(data == data_start + extent_offset_trailer);
@@ -1025,7 +1237,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
j2_header.start_time_ut = min_time_s * USEC_PER_SEC;
j2_header.end_time_ut = max_time_s * USEC_PER_SEC;
- qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journal_metric_compare);
+ qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare);
internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
uint32_t resize_file_to = total_file_size;
@@ -1037,7 +1249,7 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
uint32_t uuid_offset = data - data_start;
// Write the UUID we are processing
- data = (void *) journal_v2_write_metric_page(&j2_header, data, metric_info, pages_offset);
+ data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset);
if (unlikely(!data))
break;
@@ -1049,15 +1261,17 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
// Keep the page_list_header, to be used for migration when where agent is running
metric_info->page_list_header = pages_offset;
// Write page header
- void *metric_page = journal_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info, uuid_offset);
+ void *metric_page = journalfile_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info,
+ uuid_offset);
// Start writing descr @ time
- void *page_trailer = journal_v2_write_descriptors(&j2_header, metric_page, metric_info);
+ void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info);
if (unlikely(!page_trailer))
break;
// Trailer (checksum)
- uint8_t *next_page_address = journal_v2_write_data_page_trailer(&j2_header, page_trailer, data_start + pages_offset);
+ uint8_t *next_page_address = journalfile_v2_write_data_page_trailer(&j2_header, page_trailer,
+ data_start + pages_offset);
// Calculate start of the pages start for next descriptor
pages_offset += (metric_info->number_of_pages * (sizeof(struct journal_page_list)) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer));
@@ -1094,8 +1308,8 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
- SET_JOURNAL_DATA(journalfile, data_start);
- SET_JOURNAL_DATA_SIZE(journalfile, total_file_size);
+ // msync(data_start, total_file_size, MS_SYNC);
+ journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
ctx->disk_space += total_file_size;
@@ -1127,8 +1341,8 @@ void do_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_
ctx->disk_space += sizeof(struct journal_v2_header);
}
-int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
- struct rrdengine_datafile *datafile)
+int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
+ struct rrdengine_datafile *datafile)
{
uv_fs_t req;
uv_file file;
@@ -1138,11 +1352,13 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
// Do not try to load the latest file (always rebuild and live migrate)
if (datafile->fileno != ctx->last_fileno) {
- if (!load_journal_file_v2(ctx, journalfile