summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-02 00:14:35 +0200
committerGitHub <noreply@github.com>2023-02-02 00:14:35 +0200
commit55d1f00bb7c2403b451947b2a225b5d1f6be9183 (patch)
tree043e57edb64b319b1eb6a883d6980fa2d9dd2c8e /database/engine/journalfile.c
parent2e56e2b87622a102aef876d297a3cd80d35028e5 (diff)
DBENGINE v2 - improvements part 12 (#14379)
* parallel initialization of tiers * do not spawn multiple dbengine event loops * user configurable dbengine parallel initialization * size netdata based on the real cpu cores available on the system netdata runs, not on the system monitored * user configurable system cpus * move cpuset parsing to os.c/.h * fix replication of misaligned chart dimensions * give a different path to each tier thread * statically allocate the path into the initialization structure * use aral for reusing dbengine pages * dictionaries uses ARAL for fixed sized values * fix compilation without internal checks * journal v2 index uses aral * test to see judy allocations * judy allocations using aral * Add config option to select if dbengine will use direct I/O (default is yes) * V1 journafiles will use uv_fs_read instead of mmap (respect the direct I/O setting) * Remove sqlite3IsMemdb as it is unused * Fix compilation error when --disable-dbengine is used * use aral for dbengine work_cmds * changed aral API to support new features * pgc and mrg aral overheads * rrdeng opcodes using aral * better structuring and naming * dbegnine query handles using aral * page descriptors using aral * remove obsolete linking * extent io descriptors using aral * aral keeps one last page alive * add missing return value * added judy aral overhead * pdc now uses aral * page_details now use aral * epdl and deol using aral - make sure ARALs are initialized before spawning the event loop * remove unused linking * pgc now uses one aral per partition * aral measure maximum allocation queue * aral to allocate pages in parallel * aral parallel pages allocation when needed * aral cleanup * track page allocation and page population separately --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c58
1 files changed, 21 insertions, 37 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 3f9c9e6fb2..9a9769321a 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -495,7 +495,7 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
char path[RRDENG_PATH_MAX];
journalfile_v1_generate_path(datafile, path, sizeof(path));
- fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
+ fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io);
if (fd < 0) {
ctx_fs_error(ctx);
return fd;
@@ -704,7 +704,7 @@ static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, s
static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
{
uv_file file;
- uint64_t file_size;//, data_file_size;
+ uint64_t file_size;
int ret;
uint64_t pos, pos_i, max_id, id;
unsigned size_bytes;
@@ -714,33 +714,26 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
file = journalfile->file;
file_size = journalfile->unsafe.pos;
- //data_file_size = journalfile->datafile->pos; TODO: utilize this?
max_id = 1;
- bool journal_is_mmapped = (journalfile->data != NULL);
- if (unlikely(!journal_is_mmapped)) {
- ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
- if (unlikely(ret))
- fatal("DBENGINE: posix_memalign:%s", strerror(ret));
- }
- else
- buf = journalfile->data + sizeof(struct rrdeng_jf_sb);
- for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
+ ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
+ if (unlikely(ret))
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
+
+ for (pos = sizeof(struct rrdeng_jf_sb); pos < file_size; pos += READAHEAD_BYTES) {
size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
- if (unlikely(!journal_is_mmapped)) {
- iov = uv_buf_init(buf, size_bytes);
- ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
- if (ret < 0) {
- error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
- uv_fs_req_cleanup(&req);
- goto skip_file;
- }
- fatal_assert(req.result >= 0);
+ iov = uv_buf_init(buf, size_bytes);
+ ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
+ if (ret < 0) {
+ error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
uv_fs_req_cleanup(&req);
- ctx_io_read_op_bytes(ctx, size_bytes);
+ goto skip_file;
}
+ fatal_assert(req.result >= 0);
+ uv_fs_req_cleanup(&req);
+ ctx_io_read_op_bytes(ctx, size_bytes);
- for (pos_i = 0 ; pos_i < size_bytes ; ) {
+ for (pos_i = 0; pos_i < size_bytes;) {
unsigned max_size;
max_size = pos + size_bytes - pos_i;
@@ -752,12 +745,9 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
pos_i += ret;
max_id = MAX(max_id, id);
}
- if (likely(journal_is_mmapped))
- buf += size_bytes;
}
skip_file:
- if (unlikely(!journal_is_mmapped))
- posix_memfree(buf);
+ posix_memfree(buf);
return max_id;
}
@@ -1400,18 +1390,15 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
uint64_t file_size, max_id;
char path[RRDENG_PATH_MAX];
- // Do not try to load the latest file (always rebuild and live migrate)
+ // Do not try to load the latest file
if (datafile->fileno != ctx_last_fileno_get(ctx)) {
- if (!journalfile_v2_load(ctx, journalfile, datafile)) {
-// unmap_journal_file(journalfile);
+ if (likely(!journalfile_v2_load(ctx, journalfile, datafile)))
return 0;
- }
}
journalfile_v1_generate_path(datafile, path, sizeof(path));
- // If it is not the last file, open read only
- fd = open_file_direct_io(path, O_RDWR, &file);
+ fd = open_file_for_io(path, O_RDWR, &file, use_direct_io);
if (fd < 0) {
ctx_fs_error(ctx);
return fd;
@@ -1432,16 +1419,13 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
journalfile->file = file;
journalfile->unsafe.pos = file_size;
- journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0, !(datafile->fileno == ctx_last_fileno_get(ctx)), NULL);
- info("DBENGINE: loading journal file '%s' using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
+ info("DBENGINE: loading journal file '%s'", path);
max_id = journalfile_iterate_transactions(ctx, journalfile);
__atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
- if (likely(journalfile->data))
- netdata_munmap(journalfile->data, file_size);
bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {