summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-23 22:18:44 +0200
committerGitHub <noreply@github.com>2023-01-23 22:18:44 +0200
commitdd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch)
treefecf5514eda33c0a96f4d359f30fd07229d12cf7 /database/engine
parentc2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff)
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers * when there is a discrepancy between update every, fix it * fix the other occurences of metric update every mismatch * allow resetting the same timestamp * validate flushed pages before committing them to disk * initialize collection with the latest time in mrg * these should be static functions * acquire metrics for writing to detect multiple data collections of the same metric * print the uuid of the metric that is collected twice * log the discrepancies of completed pages * 1 second tolerance * unify validation of pages and related logging across dbengine * make do_flush_pages() thread safe * flush pages runs on libuv workers * added uv events to tp workers * dont cross datafile spinlock and rwlock * should be unlock * prevent the creation of multiple datafiles * break an infinite replication loop * do not log the epxansion of the replication window due to start streaming * log all invalid pages with internal checks * do not shutdown event loop threads * add information about collected page events, to find the root cause of invalid collected pages * rewrite of the gap filling to fix the invalid collected pages problem * handle multiple collections of the same metric gracefully * added log about main cache page conflicts; fix gap filling once again... * keep track of the first metric writer * it should be an internal fatal - it does not harm users * do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks * prevent negative replication completion percentage * internal error for the discrepancy of mrg * better logging of dbengine new metrics collection * without internal checks it is unused * prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread * renames and atomics everywhere * if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it * Debug for context load * rrdcontext uuid debug * rrddim uuid debug * rrdeng uuid debug * Revert "rrdeng uuid debug" This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b. * Revert "rrddim uuid debug" This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8. * Revert "rrdcontext uuid debug" This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0. * Revert "Debug for context load" This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b. * do not use legacy uuids on multihost dbs * thread safety for journafile size * handle other cases of inconsistent collected pages * make health thread check if it should be running in key loops * do not log uuids Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/cache.c2
-rw-r--r--database/engine/datafile.c50
-rw-r--r--database/engine/journalfile.c108
-rw-r--r--database/engine/journalfile.h14
-rw-r--r--database/engine/metric.c85
-rw-r--r--database/engine/metric.h4
-rw-r--r--database/engine/pagecache.c4
-rw-r--r--database/engine/pdc.c195
-rw-r--r--database/engine/rrdengine.c328
-rw-r--r--database/engine/rrdengine.h93
-rwxr-xr-xdatabase/engine/rrdengineapi.c371
-rw-r--r--database/engine/rrdenginelib.c81
-rw-r--r--database/engine/rrdenginelib.h1
13 files changed, 858 insertions, 478 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c
index 7993535b3f..c942d17120 100644
--- a/database/engine/cache.c
+++ b/database/engine/cache.c
@@ -1975,7 +1975,7 @@ void pgc_page_hot_set_end_time_s(PGC *cache __maybe_unused, PGC_PAGE *page, time
internal_fatal(!is_page_hot(page),
"DBENGINE CACHE: end_time_s update on non-hot page");
- internal_fatal(end_time_s <= __atomic_load_n(&page->end_time_s, __ATOMIC_RELAXED),
+ internal_fatal(end_time_s < __atomic_load_n(&page->end_time_s, __ATOMIC_RELAXED),
"DBENGINE CACHE: end_time_s is not bigger than existing");
__atomic_store_n(&page->end_time_s, end_time_s, __ATOMIC_RELAXED);
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 7cd2081962..c17827f94f 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -183,8 +183,7 @@ int close_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -203,12 +202,11 @@ int unlink_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.datafile_deletions;
+ __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED);
return ret;
}
@@ -225,28 +223,25 @@ int destroy_data_file_unsafe(struct rrdengine_datafile *datafile)
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.datafile_deletions;
+ __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED);
return ret;
}
@@ -264,12 +259,11 @@ int create_data_file(struct rrdengine_datafile *datafile)
generate_datafilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return fd;
}
datafile->file = file;
- ++ctx->stats.datafile_creations;
+ __atomic_add_fetch(&ctx->stats.datafile_creations, 1, __ATOMIC_RELAXED);
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -286,8 +280,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
if (ret < 0) {
fatal_assert(req.result < 0);
error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
+ ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
@@ -297,8 +290,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
}
datafile->pos = sizeof(*superblock);
- ctx->stats.io_write_bytes += sizeof(*superblock);
- ++ctx->stats.io_write_requests;
+ ctx_io_write_op_bytes(ctx, sizeof(*superblock));
return 0;
}
@@ -350,8 +342,7 @@ static int load_data_file(struct rrdengine_datafile *datafile)
generate_datafilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return fd;
}
info("DBENGINE: initializing data file \"%s\".", path);
@@ -364,8 +355,8 @@ static int load_data_file(struct rrdengine_datafile *datafile)
ret = check_data_file_superblock(file);
if (ret)
goto error;
- ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb);
- ++ctx->stats.io_read_requests;
+
+ ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_df_sb));
datafile->file = file;
datafile->pos = file_size;
@@ -378,8 +369,7 @@ static int load_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
return error;
@@ -412,8 +402,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
fatal_assert(req.result < 0);
uv_fs_req_cleanup(&req);
error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return ret;
}
info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path);
@@ -474,8 +463,8 @@ static int scan_data_files(struct rrdengine_instance *ctx)
continue;
}
+ ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
datafile_list_insert(ctx, datafile);
- ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->pos);
}
matched_files -= failed_to_load;
freez(datafiles);
@@ -511,8 +500,8 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
journalfile_v1_generate_path(datafile, path, sizeof(path));
info("DBENGINE: created journal file \"%s\".", path);
+ ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
datafile_list_insert(ctx, datafile);
- ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->pos);
ctx_last_fileno_increment(ctx);
return 0;
@@ -576,7 +565,8 @@ void finalize_data_files(struct rrdengine_instance *ctx)
}
logged = false;
- while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev) {
+ size_t iterations = 100;
+ while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) {
if(!logged) {
info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier);
logged = true;
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 49521f40a7..8b6472a775 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -52,7 +52,7 @@ static void update_metric_retention_and_granularity_by_uuid(
mrg_metric_release(main_mrg, metric);
}
-static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
+static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
{
worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
@@ -62,8 +62,7 @@ static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
+ ctx_io_error(ctx);
error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
@@ -78,7 +77,7 @@ static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
}
/* Careful to always call this before creating a new journal file */
-void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
+void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
{
int ret;
struct generic_io_descriptor *io_descr;
@@ -92,19 +91,23 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi
}
io_descr->buf = wal->buf;
io_descr->bytes = wal->buf_size;
- io_descr->pos = journalfile->pos;
+
+ netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ io_descr->pos = journalfile->unsafe.pos;
+ journalfile->unsafe.pos += wal->buf_size;
+ netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+
io_descr->req.data = wal;
io_descr->data = journalfile;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
- journalfile->pos, wal_flush_transaction_buffer_cb);
+ (int64_t)io_descr->pos, after_extent_write_journalfile_v1_io);
fatal_assert(-1 != ret);
- journalfile->pos += wal->buf_size;
+
ctx_current_disk_space_increase(ctx, wal->buf_size);
- ctx->stats.io_write_bytes += wal->buf_size;
- ++ctx->stats.io_write_requests;
+ ctx_io_write_op_bytes(ctx, wal->buf_size);
}
void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
@@ -137,8 +140,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
netdata_spinlock_unlock(&journalfile->v2.spinlock);
- ++journalfile->datafile->ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(journalfile->datafile->ctx);
}
else {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED);
@@ -194,8 +196,7 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
error("DBENGINE: failed to unmap index file '%s'", path);
internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
- ++journalfile->datafile->ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(journalfile->datafile->ctx);
}
else {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED);
@@ -385,6 +386,7 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi
journalfile->datafile = datafile;
netdata_spinlock_init(&journalfile->mmap.spinlock);
netdata_spinlock_init(&journalfile->v2.spinlock);
+ netdata_spinlock_init(&journalfile->unsafe.spinlock);
journalfile->mmap.fd = -1;
datafile->journalfile = journalfile;
return journalfile;
@@ -400,8 +402,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
if (ret < 0) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++datafile->ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(datafile->ctx);
}
uv_fs_req_cleanup(&req);
return ret;
@@ -430,12 +431,11 @@ int journalfile_unlink(struct rrdengine_journalfile *journalfile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.journalfile_deletions;
+ __atomic_add_fetch(&ctx->stats.journalfile_deletions, 1, __ATOMIC_RELAXED);
return ret;
}
@@ -455,8 +455,7 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
(void) close_uv_file(datafile, journalfile->file);
@@ -466,21 +465,18 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
- ++ctx->stats.journalfile_deletions;
- ++ctx->stats.journalfile_deletions;
+ __atomic_add_fetch(&ctx->stats.journalfile_deletions, 2, __ATOMIC_RELAXED);
if(journalfile_v2_data_available(journalfile))
journalfile_v2_data_unmap_permanently(journalfile);
@@ -501,12 +497,11 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
journalfile_v1_generate_path(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return fd;
}
journalfile->file = file;
- ++ctx->stats.journalfile_creations;
+ __atomic_add_fetch(&ctx->stats.journalfile_creations, 1, __ATOMIC_RELAXED);
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@@ -522,8 +517,7 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
if (ret < 0) {
fatal_assert(req.result < 0);
error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
- ++ctx->stats.io_errors;
- rrd_stat_atomic_add(&global_io_errors, 1);
+ ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
@@ -532,9 +526,9 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
return ret;
}
- journalfile->pos = sizeof(*superblock);
- ctx->stats.io_write_bytes += sizeof(*superblock);
- ++ctx->stats.io_write_requests;
+ journalfile->unsafe.pos = sizeof(*superblock);
+
+ ctx_io_write_op_bytes(ctx, sizeof(*superblock));
return 0;
}
@@ -588,7 +582,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
return;
}
- time_t now_s = now_realtime_sec();
+ time_t now_s = max_acceptable_collected_time();
for (i = 0; i < count ; ++i) {
uuid_t *temp_id;
uint8_t page_type = jf_metric_data->descr[i].type;
@@ -610,7 +604,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
(metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0,
false);
- if(!vd.data_on_disk_valid) {
+ if(!vd.is_valid) {
mrg_metric_release(main_mrg, metric);
continue;
}
@@ -717,7 +711,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
uv_fs_t req;
file = journalfile->file;
- file_size = journalfile->pos;
+ file_size = journalfile->unsafe.pos;
//data_file_size = journalfile->datafile->pos; TODO: utilize this?
max_id = 1;
@@ -741,8 +735,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
}
fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
- ++ctx->stats.io_read_requests;
- ctx->stats.io_read_bytes += size_bytes;
+ ctx_io_read_op_bytes(ctx, size_bytes);
}
for (pos_i = 0 ; pos_i < size_bytes ; ) {
@@ -922,7 +915,7 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
- time_t now_s = now_realtime_sec();
+ time_t now_s = max_acceptable_collected_time();
for (size_t i=0; i < entries; i++) {
time_t start_time_s = header_start_time_s + metric->delta_start_s;
time_t end_time_s = header_start_time_s + metric->delta_end_s;
@@ -968,8 +961,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
if (fd < 0) {
if (errno == ENOENT)
return 1;
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
error("DBENGINE: failed to open '%s'", path_v2);
return 1;
}
@@ -1200,7 +1192,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
number_of_pages);
#ifdef NETDATA_INTERNAL_CHECKS
- usec_t start_loading = now_realtime_usec();
+ usec_t start_loading = now_monotonic_usec();
#endif
size_t total_file_size = 0;
@@ -1251,13 +1243,13 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
j2_header.extent_trailer_offset = extent_offset_trailer;
j2_header.metric_trailer_offset = metric_offset_trailer;
j2_header.journal_v2_file_size = total_file_size;
- j2_header.journal_v1_file_size = (uint32_t) journalfile->pos;
+ j2_header.journal_v1_file_size = (uint32_t)journalfile_current_size(journalfile);
j2_header.data = data_start; // Used during migration
struct journal_v2_block_trailer *journal_v2_trailer;
data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
- internal_error(true, "DBENGINE: write extent list so far %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: write extent list so far %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
fatal_assert(data == data_start + extent_offset_trailer);
@@ -1268,7 +1260,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
crc = crc32(crc, (uint8_t *) data_start + extent_offset, number_of_extents * sizeof(struct journal_extent_list));
crc32set(journal_v2_trailer->checksum, crc);
- internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
// Skip the trailer, point to the metrics off
data += sizeof(struct journal_v2_block_trailer);
@@ -1295,7 +1287,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
j2_header.end_time_ut = max_time_s * USEC_PER_SEC;
qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare);
- internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
uint32_t resize_file_to = total_file_size;
@@ -1341,7 +1333,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
}
if (data == data_start + metric_offset_trailer) {
- internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
// Calculate CRC for metrics
journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + metric_offset_trailer);
@@ -1349,7 +1341,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
crc =
crc32(crc, (uint8_t *)data_start + metrics_offset, number_of_metrics * sizeof(struct journal_metric_list));
crc32set(journal_v2_trailer->checksum, crc);
- internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
// Prepare to write checksum for the file
j2_header.data = NULL;
@@ -1361,14 +1353,14 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
// Write header to the file
memcpy(data_start, &j2_header, sizeof(j2_header));
- internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
// msync(data_start, total_file_size, MS_SYNC);
journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
- internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
+ internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
ctx_current_disk_space_increase(ctx, total_file_size);
freez(uuid_list);
return;
@@ -1390,8 +1382,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
int ret = truncate(path, (long) resize_file_to);
if (ret < 0) {
ctx_current_disk_space_increase(ctx, total_file_size);
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
error("DBENGINE: failed to resize file '%s'", path);
}
else
@@ -1420,8 +1411,7 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
// If it is not the last file, open read only
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
return fd;
}
@@ -1435,11 +1425,10 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
goto error;
}
- ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
- ++ctx->stats.io_read_requests;
+ ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
journalfile->file = file;
- journalfile->pos = file_size;
+ journalfile->unsafe.pos = file_size;
journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0, !(datafile->fileno == ctx_last_fileno_get(ctx)), NULL);
info("DBENGINE: loading journal file '%s' using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
@@ -1471,8 +1460,7 @@ error:
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
- rrd_stat_atomic_add(&global_fs_errors, 1);
+ ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
return error;
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 3ef6de7f3d..fc63ad2994 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -41,12 +41,22 @@ struct rrdengine_journalfile {
time_t not_needed_since_s;
} v2;
+ struct {
+ SPINLOCK spinlock;
+ uint64_t pos;
+ } unsafe;
+
uv_file file;
- uint64_t pos;
void *data;
struct rrdengine_datafile *datafile;
};
+static inline uint64_t journalfile_current_size(struct rrdengine_journalfile *journalfile) {
+ netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ uint64_t size = journalfile->unsafe.pos;
+ netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+ return size;
+}
// Journal v2 structures
@@ -126,7 +136,7 @@ struct wal;
void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile);
-void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, struct wal *wal, uv_loop_t *loop);
+void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, struct wal *wal, uv_loop_t *loop);
int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
int journalfile_unlink(struct rrdengine_journalfile *journalfile);
int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 0208e21fb6..71672598b7 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -6,11 +6,13 @@ typedef int32_t REFCOUNT;
struct metric {
uuid_t uuid; // never changes
Word_t section; // never changes
+
time_t first_time_s; //
time_t latest_time_s_clean; // archived pages latest time
time_t latest_time_s_hot; // latest time of the currently collected page
uint32_t latest_update_every_s; //
- SPINLOCK timestamps_lock; // protects the 3 timestamps
+ pid_t writer;
+ SPINLOCK spinlock; // protects all variable members
// THIS IS allocated with malloc()
// YOU HAVE TO INITIALIZE IT YOURSELF !
@@ -133,7 +135,8 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
metric->latest_time_s_clean = entry->last_time_s;
metric->latest_time_s_hot = 0;
metric->latest_update_every_s = entry->latest_update_every_s;
- netdata_spinlock_init(&metric->timestamps_lock);
+ metric->writer = 0;
+ netdata_spinlock_init(&metric->spinlock);
*PValue = metric;
mrg_index_write_unlock(mrg, partition);
@@ -241,7 +244,7 @@ void mrg_destroy(MRG *mrg __maybe_unused) {
METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
// FIXME - support refcount
-// internal_fatal(entry.latest_time_s > now_realtime_sec(),
+// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
return metric_add(mrg, &entry, ret);
@@ -280,21 +283,21 @@ Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
}
bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
- netdata_spinlock_lock(&metric->timestamps_lock);
+ netdata_spinlock_lock(&metric->spinlock);
metric->first_time_s = first_time_s;
- netdata_spinlock_unlock(&metric->timestamps_lock);
+ netdata_spinlock_unlock(&metric->spinlock);
return true;
}
void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
- internal_fatal(first_time_s > now_realtime_sec() + 1,
+ internal_fatal(first_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric first time is in the future");
- internal_fatal(last_time_s > now_realtime_sec() + 1,
+ internal_fatal(last_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric last time is in the future");
- netdata_spinlock_lock(&metric->timestamps_lock);
+ netdata_spinlock_lock(&metric->spinlock);
if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
metric->first_time_s = first_time_s;
@@ -308,13 +311,13 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
else if(unlikely(!metric->latest_update_every_s && update_every_s))
metric->latest_update_every_s = update_every_s;
- netdata_spinlock_unlock(&metric->timestamps_lock);
+ netdata_spinlock_unlock(&metric->spinlock);
}
bool mrg_metric_set_first_time_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
bool ret = false;
- netdata_spinlock_lock(&metric->timestamps_lock);
+ netdata_spinlock_lock(&metric->spinlock);
if(!metric->first_time_s) {
metric->first_time_s = first_time_s;
@@ -326,14 +329,14 @@ bool mrg_metric_set_first_time_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric
ret = true;
}
- netdata_spinlock_unlock(&metric->timestamps_lock);
+ netdata_spinlock_unlock(&metric->spinlock);
return ret;
}
time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t first_time_s;
- netdata_spinlock_lock(&metric->timestamps_lock);
+ netdata_spinlock_lock(&metric->spinlock);
first_time_s = metric->first_time_s;
if(!first_time_s) {
if(metric->latest_time_s_clean)
@@ -342,15 +345,15 @@ time_t mrg_metric_get_first_time_s(MR