summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-23 22:18:44 +0200
committerGitHub <noreply@github.com>2023-01-23 22:18:44 +0200
commitdd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch)
treefecf5514eda33c0a96f4d359f30fd07229d12cf7 /database/engine/journalfile.c
parentc2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff)
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers * when there is a discrepancy between update every, fix it * fix the other occurences of metric update every mismatch * allow resetting the same timestamp * validate flushed pages before committing them to disk * initialize collection with the latest time in mrg * these should be static functions * acquire metrics for writing to detect multiple data collections of the same metric * print the uuid of the metric that is collected twice * log the discrepancies of completed pages * 1 second tolerance * unify validation of pages and related logging across dbengine * make do_flush_pages() thread safe * flush pages runs on libuv workers * added uv events to tp workers * dont cross datafile spinlock and rwlock * should be unlock * prevent the creation of multiple datafiles * break an infinite replication loop * do not log the epxansion of the replication window due to start streaming * log all invalid pages with internal checks * do not shutdown event loop threads * add information about collected page events, to find the root cause of invalid collected pages * rewrite of the gap filling to fix the invalid collected pages problem * handle multiple collections of the same metric gracefully * added log about main cache page conflicts; fix gap filling once again... * keep track of the first metric writer * it should be an internal fatal - it does not harm users * do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks * prevent negative replication completion percentage * internal error for the discrepancy of mrg * better logging of dbengine new metrics collection * without internal checks it is unused * prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread * renames and atomics everywhere * if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it * Debug for context load * rrdcontext uuid debug * rrddim uuid debug * rrdeng uuid debug * Revert "rrdeng uuid debug" This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b. * Revert "rrddim uuid debug" This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8. * Revert "rrdcontext uuid debug" This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0. * Revert "Debug for context load" This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b. * do not use legacy uuids on multihost dbs * thread safety for journafile size * handle other cases of inconsistent collected pages * make health thread check if it should be running in key loops * do not log uuids Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c108
1 files changed, 48 insertions, 60 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 49521f40a7..8b6472a775 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -52,7 +52,7 @@ static void update_metric_retention_and_granularity_by_uuid(
mrg_metric_release(main_mrg, metric);
}
-static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
+static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
{
worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
@@ -62,8 +62,7 @@ static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
+ ctx_io_error(ctx);
error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
@@ -78,7 +77,7 @@ static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
}
/* Careful to always call this before creating a new journal file */
-void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
+void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
{
int ret;
struct generic_io_descriptor *io_descr;
@@ -92,19 +91,23 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi
}
io_descr->buf = wal->buf;
io_descr->bytes = wal->buf_size;
- io_descr->pos = journalfile->pos;
+
+ netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ io_descr->pos = journalfile->unsafe.pos;
+ journalfile->unsafe.pos += wal->buf_size;
+ netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+
io_descr->req.data = wal;
io_descr->data = journalfile;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
- journalfile->pos, wal_flush_transaction_buffer_cb);
+ (int64_t)io_descr->pos, after_extent_write_journalfile_v1_io);
fatal_assert(-1 != ret);
- journalfile->pos += wal->buf_size;
+
ctx_current_disk_space_increase(ctx, wal->buf_size);
- ctx->stats.io_write_bytes += wal->buf_size;
- ++ctx->stats.io_write_requests;
+ ctx_io_write_op_bytes(ctx, wal->buf_size);
}
void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
@@ -137,8 +140,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
netdata_spinlock_unlock(&journalfile->v2.spinlock);
- ++journalfile->datafile->ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(journalfile->datafile->ctx);
}
else {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED);
@@ -194,8 +196,7 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
error("DBENGINE: failed to unmap index file '%s'", path);
internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
- ++journalfile->datafile->ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(journalfile->datafile->ctx);
}
else {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED);
@@ -385,6 +386,7 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi
journalfile->datafile = datafile;
netdata_spinlock_init(&journalfile->mmap.spinlock);
netdata_spinlock_init(&journalfile->v2.spinlock);
+ netdata_spinlock_init(&journalfile->unsafe.spinlock);
journalfile->mmap.fd = -1;
datafile->journalfile = journalfile;
return journalfile;
@@ -400,8 +402,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
if (ret < 0) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++datafile->ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(datafile->ctx);
}
uv_fs_req_cleanup(&req);
return ret;
@@ -430,12 +431,11 @@ int journalfile_unlink(struct rrdengine_journalfile *journalfile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.journalfile_deletions;
+ __atomic_add_fetch(&ctx->stats.journalfile_deletions, 1, __ATOMIC_RELAXED);
return ret;
}
@@ -455,8 +455,7 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
(void) close_uv_file(datafile, journalfile->file);
@@ -466,21 +465,18 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.journalfile_deletions;
- ++ctx->stats.journalfile_deletions;
+ __atomic_add_fetch(&ctx->stats.journalfile_deletions, 2, __ATOMIC_RELAXED);
if(journalfile_v2_data_available(journalfile))
journalfile_v2_data_unmap_permanently(journalfile);
@@ -501,12 +497,11 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
journalfile_v1_generate_path(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return fd;
}
journalfile->file = file;
- ++ctx->stats.journalfile_creations;
+ __atomic_add_fetch(&ctx->stats.journalfile_creations, 1, __ATOMIC_RELAXED);
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -522,8 +517,7 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
if (ret < 0) {
fatal_assert(req.result < 0);
error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
+ ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
@@ -532,9 +526,9 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
return ret;
}
- journalfile->pos = sizeof(*superblock);
- ctx->stats.io_write_bytes += sizeof(*superblock);
- ++ctx->stats.io_write_requests;
+ journalfile->unsafe.pos = sizeof(*superblock);
+
+ ctx_io_write_op_bytes(ctx, sizeof(*superblock));
return 0;
}
@@ -588,7 +582,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
return;
}
- time_t now_s = now_realtime_sec();
+ time_t now_s = max_acceptable_collected_time();
for (i = 0; i < count ; ++i) {
uuid_t *temp_id;
uint8_t page_type = jf_metric_data->descr[i].type;
@@ -610,7 +604,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
(metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0,
false);
- if(!vd.data_on_disk_valid) {
+ if(!vd.is_valid) {
mrg_metric_release(main_mrg, metric);
continue;
}
@@ -717,7 +711,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
uv_fs_t req;
file = journalfile->file;
- file_size = journalfile->pos;
+ file_size = journalfile->unsafe.pos;
//data_file_size = journalfile->datafile->pos; TODO: utilize this?
max_id = 1;
@@ -741,8 +735,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
}
fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
- ++ctx->stats.io_read_requests;
- ctx->stats.io_read_bytes += size_bytes;
+ ctx_io_read_op_bytes(ctx, size_bytes);
}
for (pos_i = 0 ; pos_i < size_bytes ; ) {
@@ -922,7 +915,7 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
- time_t now_s = now_realtime_sec();
+ time_t now_s = max_acceptable_collected_time();
for (size_t i=0; i < entries; i++) {
time_t start_time_s = header_start_time_s + metric->delta_start_s;
time_t end_time_s = header_start_time_s + metric->delta_end_s;
@@ -968,8 +961,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
if (fd < 0) {
if (errno == ENOENT)
return 1;
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
error("DBENGINE: failed to open '%s'", path_v2);
return 1;
}
@@ -1200,7 +1192,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
number_of_pages);
#ifdef NETDATA_INTERNAL_CHECKS
- usec_t start_loading = now_realtime_usec();
+ usec_t start_loading = now_monotonic_usec();
#endif
size_t total_file_size = 0;
@@ -1251,13 +1243,13 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
j2_header.extent_trailer_offset = extent_offset_trailer;
j2_header.metric_trailer_offset = metric_offset_trailer;
j2_header.journal_v2_file_size = total_file_size;
- j2_header.journal_v1_file_size = (uint32_t) journalfile->pos;
+ j2_header.journal_v1_file_size = (uint32_t)journalfile_current_size(journalfile);
j2_header.data = data_start; // Used during migration
struct journal_v2_block_trailer *journal_v2_trailer;
data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
- internal_error(true, "DBENGINE: write extent list so far %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: write extent list so far %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
fatal_assert(data == data_start + extent_offset_trailer);
@@ -1268,7 +1260,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
crc = crc32(crc, (uint8_t *) data_start + extent_offset, number_of_extents * sizeof(struct journal_extent_list));
crc32set(journal_v2_trailer->checksum, crc);
- internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
// Skip the trailer, point to the metrics off
data += sizeof(struct journal_v2_block_trailer);
@@ -1295,7 +1287,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
j2_header.end_time_ut = max_time_s * USEC_PER_SEC;
qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare);
- internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
uint32_t resize_file_to = total_file_size;
@@ -1341,7 +1333,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
}
if (data == data_start + metric_offset_trailer) {
- internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
// Calculate CRC for metrics
journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + metric_offset_trailer);
@@ -1349,7 +1341,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
crc =
crc32(crc, (uint8_t *)data_start + metrics_offset, number_of_metrics * sizeof(struct journal_metric_list));
crc32set(journal_v2_trailer->checksum, crc);
- internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
// Prepare to write checksum for the file
j2_header.data = NULL;
@@ -1361,14 +1353,14 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
// Write header to the file
memcpy(data_start, &j2_header, sizeof(j2_header));
- internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
// msync(data_start, total_file_size, MS_SYNC);
journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
- internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
ctx_current_disk_space_increase(ctx, total_file_size);
freez(uuid_list);
return;
@@ -1390,8 +1382,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
int ret = truncate(path, (long) resize_file_to);
if (ret < 0) {
ctx_current_disk_space_increase(ctx, total_file_size);
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
error("DBENGINE: failed to resize file '%s'", path);
}
else
@@ -1420,8 +1411,7 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
// If it is not the last file, open read only
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return fd;
}
@@ -1435,11 +1425,10 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
goto error;
}
- ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
- ++ctx->stats.io_read_requests;
+ ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
journalfile->file = file;
- journalfile->pos = file_size;
+ journalfile->unsafe.pos = file_size;
journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0, !(datafile->fileno == ctx_last_fileno_get(ctx)), NULL);
info("DBENGINE: loading journal file '%s' using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
@@ -1471,8 +1460,7 @@ error:
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
return error;