summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-20 23:56:33 +0200
committerGitHub <noreply@github.com>2023-01-20 23:56:33 +0200
commitdc9f81ccfe611410f5a710dafcc14c6c9f030aa2 (patch)
treefe922ca2647abb9def5b903c6e1e2e83c2d59066 /database
parentc036541019018e363f0aa7bd705534baeb015b09 (diff)
DBENGINE v2 - improvements part 6 (#14299)
* query preparation runs before extent reads * populate mrg in parallel * fix formatting warning * first search for a metric then add it if it does not exist * Revert "first search for a metric then add it if it does not exist" This reverts commit 4afa6461fcce859d03f1c9cf56dd3b5933ee5ebc. * Revert "fix formatting warning" This reverts commit 49473493f7f1c3399b5635a573d3c6ed2b6e46f3. * Revert "populate mrg in parallel" This reverts commit a40166708d4222f6329904f109114c47c44ca666. * merge journalfiles metrics before committing them to MRG * Revert "merge journalfiles metrics before committing them to MRG" This reverts commit 50c8934e23a0a09ea4da80e3f88290e46496ad92. * Revert "Revert "populate mrg in parallel"" This reverts commit f4c149d2ab7a8c9af24a10f95438a0d662a5cf8a. * Revert "Revert "fix formatting warning"" This reverts commit 78298ff9efc49806ded029f5f1e868cc42e8f6eb. * Revert "Revert "first search for a metric then add it if it does not exist"" This reverts commit 997b9c813b290882ba18a8c44bf73f9ee5480adf. * preload first and last journal files v2 * fix formatting warning * parallel loading of tiers; cleanup of ctx structures * use half the cores * add partitions to metrics registry * revert accidental change * parallel processing according to MRG partitions; dont recalculate retention on exit
Diffstat (limited to 'database')
-rw-r--r--database/engine/datafile.c49
-rw-r--r--database/engine/datafile.h10
-rw-r--r--database/engine/journalfile.c130
-rw-r--r--database/engine/journalfile.h11
-rw-r--r--database/engine/metric.c83
-rw-r--r--database/engine/metric.h2
-rw-r--r--database/engine/pagecache.c12
-rw-r--r--database/engine/pdc.c2
-rw-r--r--database/engine/rrdengine.c170
-rw-r--r--database/engine/rrdengine.h101
-rwxr-xr-xdatabase/engine/rrdengineapi.c149
-rw-r--r--database/engine/rrdengineapi.h8
-rw-r--r--database/rrd.h7
-rw-r--r--database/rrdhost.c11
14 files changed, 461 insertions, 284 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index d14bdc897e..7cd2081962 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -120,7 +120,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
"%zu clean and %zu hot open cache pages "
"- will be deleted shortly "
"(scanned open cache in %llu usecs)",
- df->fileno, df->ctx->tier,
+ df->fileno, df->ctx->config.tier,
df->users.lockers,
df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE],
df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS],
@@ -137,7 +137,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
"%zu clean and %zu hot open cache pages "
"- will be deleted now "
"(scanned open cache in %llu usecs)",
- df->fileno, df->ctx->tier,
+ df->fileno, df->ctx->config.tier,
df->users.lockers,
df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE],
df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS],
@@ -151,7 +151,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
"has %u lockers (oc:%u, pd:%u), "
"%zu clean and %zu hot open cache pages "
"(scanned open cache in %llu usecs)",
- df->fileno, df->ctx->tier,
+ df->fileno, df->ctx->config.tier,
df->users.lockers,
df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE],
df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS],
@@ -168,7 +168,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
- datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+ datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
int close_data_file(struct rrdengine_datafile *datafile)
@@ -407,16 +407,16 @@ static int scan_data_files(struct rrdengine_instance *ctx)
struct rrdengine_datafile **datafiles, *datafile;
struct rrdengine_journalfile *journalfile;
- ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
+ ret = uv_fs_scandir(NULL, &req, ctx->config.dbfiles_path, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
uv_fs_req_cleanup(&req);
- error("DBENGINE: uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
+ error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return ret;
}
- info("DBENGINE: found %d files in path %s", ret, ctx->dbfiles_path);
+ info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
@@ -437,7 +437,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
/* TODO: change this when tiering is implemented */
- ctx->last_fileno = datafiles[matched_files - 1]->fileno;
+ ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno;
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
uint8_t must_delete_pair = 0;
@@ -475,7 +475,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
datafile_list_insert(ctx, datafile);
- ctx->disk_space += datafile->pos + journalfile->pos;
+ ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->pos);
}
matched_files -= failed_to_load;
freez(datafiles);
@@ -490,11 +490,11 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
- unsigned fileno = __atomic_load_n(&ctx->last_fileno, __ATOMIC_RELAXED) + 1;
+ unsigned fileno = ctx_last_fileno_get(ctx) + 1;
int ret;
char path[RRDENG_PATH_MAX];
- info("DBENGINE: creating new data and journal files in path %s", ctx->dbfiles_path);
+ info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path);
datafile = datafile_alloc_and_init(ctx, 1, fileno);
ret = create_data_file(datafile);
if(ret)
@@ -512,9 +512,8 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
info("DBENGINE: created journal file \"%s\".", path);
datafile_list_insert(ctx, datafile);
- ctx->disk_space += datafile->pos + journalfile->pos;
-
- __atomic_add_fetch(&ctx->last_fileno, 1, __ATOMIC_RELAXED);
+ ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->pos);
+ ctx_last_fileno_increment(ctx);
return 0;
@@ -535,26 +534,24 @@ int init_data_files(struct rrdengine_instance *ctx)
int ret;
fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock));
- __atomic_store_n(&ctx->journal_initialization, true, __ATOMIC_RELAXED);
ret = scan_data_files(ctx);
if (ret < 0) {
- error("DBENGINE: failed to scan path \"%s\".", ctx->dbfiles_path);
+ error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path);
return ret;
} else if (0 == ret) {
- info("DBENGINE: data files not found, creating in path \"%s\".", ctx->dbfiles_path);
- ctx->last_fileno = 0;
+ info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path);
+ ctx->atomic.last_fileno = 0;
ret = create_new_datafile_pair(ctx);
if (ret) {
- error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
+ error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path);
return ret;
}
}
- else if(ctx->create_new_datafile_pair)
+ else if(ctx->loading.create_new_datafile_pair)
create_new_datafile_pair(ctx);
pgc_reset_hot_max(open_cache);
- ctx->create_new_datafile_pair = false;
- __atomic_store_n(&ctx->journal_initialization, false, __ATOMIC_RELAXED);
+ ctx->loading.create_new_datafile_pair = false;
return 0;
}
@@ -569,9 +566,9 @@ void finalize_data_files(struct rrdengine_instance *ctx)
logged = false;
if(datafile == ctx->datafiles.first->prev) {
// this is the last file
- while(__atomic_load_n(&ctx->worker_config.atomics.extents_currently_being_flushed, __ATOMIC_RELAXED)) {
+ while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) {
if(!logged) {
- info("Waiting for inflight flush to finish on tier %d to close last datafile %u...", ctx->tier, datafile->fileno);
+ info("Waiting for inflight flush to finish on tier %d to close last datafile %u...", ctx->config.tier, datafile->fileno);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -581,7 +578,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
logged = false;
while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev) {
if(!logged) {
- info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->tier);
+ info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -598,7 +595,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
netdata_spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
if(!logged) {
- info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->tier);
+ info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index 62b7754fb7..274add91ed 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -47,6 +47,11 @@ struct rrdengine_datafile {
struct {
SPINLOCK spinlock;
+ bool populated;
+ } populate_mrg;
+
+ struct {
+ SPINLOCK spinlock;
size_t running;
size_t flushed_to_open_running;
} writers;
@@ -70,11 +75,6 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re
void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason);
bool datafile_acquire_for_deletion(struct rrdengine_datafile *df);
-struct rrdengine_datafile_list {
- uv_rwlock_t rwlock;
- struct rrdengine_datafile *first; /* oldest - the newest with ->first->prev */
-};
-
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index f188fe7e35..49521f40a7 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -17,7 +17,7 @@ static void update_metric_retention_and_granularity_by_uuid(
last_time_s = now_s;
}
- if(unlikely(first_time_s > last_time_s)) {
+ if (unlikely(first_time_s > last_time_s)) {
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
"fixing first time to last time",
@@ -26,23 +26,25 @@ static void update_metric_retention_and_granularity_by_uuid(
first_time_s = last_time_s;
}
- if(unlikely(first_time_s == 0 || last_time_s == 0)) {
+ if (unlikely(first_time_s == 0 || last_time_s == 0)) {
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
"using them as-is",
first_time_s, last_time_s, now_s);
}
- MRG_ENTRY entry = {
- .section = (Word_t)ctx,
- .first_time_s = first_time_s,
- .last_time_s = last_time_s,
- .latest_update_every_s = update_every_s
- };
- uuid_copy(entry.uuid, *uuid);
-
- bool added;
- METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+ bool added = false;
+ METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .section = (Word_t) ctx,
+ .first_time_s = first_time_s,
+ .last_time_s = last_time_s,
+ .latest_update_every_s = update_every_s
+ };
+ uuid_copy(entry.uuid, *uuid);
+ metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+ }
if (likely(!added))
mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
@@ -70,7 +72,7 @@ static void wal_flush_transaction_buffer_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
wal_release(wal);
- __atomic_sub_fetch(&ctx->worker_config.atomics.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
worker_is_idle();
}
@@ -100,7 +102,7 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi
journalfile->pos, wal_flush_transaction_buffer_cb);
fatal_assert(-1 != ret);
journalfile->pos += wal->buf_size;
- ctx->disk_space += wal->buf_size;
+ ctx_current_disk_space_increase(ctx, wal->buf_size);
ctx->stats.io_write_bytes += wal->buf_size;
++ctx->stats.io_write_requests;
}
@@ -108,13 +110,13 @@ void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengi
void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
- datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+ datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
- datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+ datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
@@ -213,10 +215,10 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
return unmounted;
}
-void journalfile_v2_data_unmount_cleanup(time_t now_s, int storage_tiers) {
+void journalfile_v2_data_unmount_cleanup(time_t now_s) {
// DO NOT WAIT ON ANY LOCK!!!
- for(size_t tier = 0; tier < storage_tiers ;tier++) {
+ for(size_t tier = 0; tier < (size_t)storage_tiers ;tier++) {
struct rrdengine_instance *ctx = multidb_ctx[tier];
if(!ctx) continue;
@@ -907,6 +909,46 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
return 0;
}
+void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) {
+ usec_t started_ut = now_monotonic_usec();
+
+ size_t data_size = 0;
+ struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, &data_size, 0, 0);
+ if(!j2_header)
+ return;
+
+ uint8_t *data_start = (uint8_t *)j2_header;
+ uint32_t entries = j2_header->metric_count;
+
+ struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
+ time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
+ time_t now_s = now_realtime_sec();
+ for (size_t i=0; i < entries; i++) {
+ time_t start_time_s = header_start_time_s + metric->delta_start_s;
+ time_t end_time_s = header_start_time_s + metric->delta_end_s;
+ time_t update_every_s = (metric->entries > 1) ? ((end_time_s - start_time_s) / (entries - 1)) : 0;
+ update_metric_retention_and_granularity_by_uuid(
+ ctx, &metric->uuid, start_time_s, end_time_s, update_every_s, now_s);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
+ fatal_assert(uuid_compare(metric_list_header->uuid, metric->uuid) == 0);
+ fatal_assert(metric->entries == metric_list_header->entries);
+#endif
+ metric++;
+ }
+
+ journalfile_v2_data_release(journalfile);
+ usec_t ended_ut = now_monotonic_usec();
+
+ info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
+ , ctx->config.tier, journalfile->datafile->fileno
+ , (double)data_size / 1024 / 1024
+ , (double)entries / 1000
+ , ((double)(ended_ut - started_ut) / USEC_PER_MS)
+ );
+}
+
int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
int ret, fd;
@@ -983,38 +1025,15 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
return 1;
}
- madvise_dontfork(data_start, journal_v2_file_size);
- madvise_dontdump(data_start, journal_v2_file_size);
-
- usec_t mrg_start_ut = now_monotonic_usec();
- struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
- time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
- time_t now_s = now_realtime_sec();
- for (size_t i=0; i < entries; i++) {
- time_t start_time_s = header_start_time_s + metric->delta_start_s;
- time_t end_time_s = header_start_time_s + metric->delta_end_s;
- time_t update_every_s = (metric->entries > 1) ? ((end_time_s - start_time_s) / (entries - 1)) : 0;
- update_metric_retention_and_granularity_by_uuid(
- ctx, &metric->uuid, start_time_s, end_time_s, update_every_s, now_s);
-
-#ifdef NETDATA_INTERNAL_CHECKS
- struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
- fatal_assert(uuid_compare(metric_list_header->uuid, metric->uuid) == 0);
- fatal_assert(metric->entries == metric_list_header->entries);
-#endif
- metric++;
- }
-
usec_t finished_ut = now_monotonic_usec();
info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
- "mmap: %0.2f ms, validate: %0.2f ms, populate: %0.2f ms"
+ "mmap: %0.2f ms, validate: %0.2f ms"
, path_v2
, (double)journal_v2_file_size / 1024 / 1024
, (double)entries / 1000
, ((double)(validation_start_ut - mmap_start_ut) / USEC_PER_MS)
- , ((double)(mrg_start_ut - validation_start_ut) / USEC_PER_MS)
- , ((double)(finished_ut - mrg_start_ut) / USEC_PER_MS)
+ , ((double)(finished_ut - validation_start_ut) / USEC_PER_MS)
);
// Initialize the journal file to be able to access the data
@@ -1350,7 +1369,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_realtime_usec() - start_loading) / USEC_PER_MS);
- ctx->disk_space += total_file_size;
+ ctx_current_disk_space_increase(ctx, total_file_size);
freez(uuid_list);
return;
}
@@ -1370,13 +1389,13 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
int ret = truncate(path, (long) resize_file_to);
if (ret < 0) {
- ctx->disk_space += total_file_size;
+ ctx_current_disk_space_increase(ctx, total_file_size);
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
error("DBENGINE: failed to resize file '%s'", path);
}
else
- ctx->disk_space += sizeof(struct journal_v2_header);
+ ctx_current_disk_space_increase(ctx, sizeof(struct journal_v2_header));
}
int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
@@ -1389,7 +1408,7 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
char path[RRDENG_PATH_MAX];
// Do not try to load the latest file (always rebuild and live migrate)
- if (datafile->fileno != ctx->last_fileno) {
+ if (datafile->fileno != ctx_last_fileno_get(ctx)) {
if (!journalfile_v2_load(ctx, journalfile, datafile)) {
// unmap_journal_file(journalfile);
return 0;
@@ -1422,28 +1441,28 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
journalfile->file = file;
journalfile->pos = file_size;
- journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0, !(datafile->fileno == ctx->last_fileno), NULL);
+ journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0, !(datafile->fileno == ctx_last_fileno_get(ctx)), NULL);
info("DBENGINE: loading journal file '%s' using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
max_id = journalfile_iterate_transactions(ctx, journalfile);
- ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
+ __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
if (likely(journalfile->data))
netdata_munmap(journalfile->data, file_size);
- bool is_last_file = (ctx->last_fileno == journalfile->datafile->fileno);
+ bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
- ctx->create_new_datafile_pair = false;
+ ctx->loading.create_new_datafile_pair = false;
return 0;
}
- pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->page_type,
+ pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
if (is_last_file)
- ctx->create_new_datafile_pair = true;
+ ctx->loading.create_new_datafile_pair = true;
return 0;
@@ -1458,8 +1477,3 @@ error:
uv_fs_req_cleanup(&req);
return error;
}
-
-void init_commit_log(struct rrdengine_instance *ctx)
-{
- ctx->commit_log.transaction_id = 1;
-}
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 36cba44750..3ef6de7f3d 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -121,13 +121,6 @@ struct journal_v2_header {
#define JOURNAL_V2_HEADER_PADDING_SZ (RRDENG_BLOCK_SIZE - (sizeof(struct journal_v2_header)))
-
-
-/* only one event loop is supported for now */
-struct transaction_commit_log {
- uint64_t transaction_id;
-};
-
struct wal;
void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
@@ -140,7 +133,7 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
struct rrdengine_datafile *datafile);
-void init_commit_log(struct rrdengine_instance *ctx);
+void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile);
void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused,
Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos,
@@ -152,6 +145,6 @@ size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile);
void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size);
struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s);
void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile);
-void journalfile_v2_data_unmount_cleanup(time_t now_s, int storage_tiers);
+void journalfile_v2_data_unmount_cleanup(time_t now_s);
#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file
diff --git a/database/engine/metric.c b/database/engine/metric.c
index dc3e49001f..0208e21fb6 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -21,25 +21,31 @@ struct mrg {
ARAL *aral;
netdata_rwlock_t rwlock;
Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers)
- } index;
+ } index[MRG_PARTITIONS];
struct mrg_statistics stats;
+
+ size_t entries_per_partition[MRG_PARTITIONS];
};
static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg) {
__atomic_add_fetch(&mrg->stats.additions_duplicate, 1, __ATOMIC_RELAXED);
}
-static inline void MRG_STATS_ADDED_METRIC(MRG *mrg) {
+static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) {
__atomic_add_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->stats.additions, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
+
+ __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
}
-static inline void MRG_STATS_DELETED_METRIC(MRG *mrg) {
+static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) {
__atomic_sub_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->stats.deletions, 1, __ATOMIC_RELAXED);
+
+ __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
}
static inline void MRG_STATS_SEARCH_HIT(MRG *mrg) {
@@ -54,17 +60,17 @@ static inline void MRG_STATS_DELETE_MISS(MRG *mrg) {
__atomic_add_fetch(&mrg->stats.delete_misses, 1, __ATOMIC_RELAXED);
}
-static inline void mrg_index_read_lock(MRG *mrg) {
- netdata_rwlock_rdlock(&mrg->index.rwlock);
+static inline void mrg_index_read_lock(MRG *mrg, size_t partition) {
+ netdata_rwlock_rdlock(&mrg->index[partition].rwlock);
}
-static inline void mrg_index_read_unlock(MRG *mrg) {
- netdata_rwlock_unlock(&mrg->index.rwlock);
+static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) {
+ netdata_rwlock_unlock(&mrg->index[partition].rwlock);
}
-static inline void mrg_index_write_lock(MRG *mrg) {
- netdata_rwlock_wrlock(&mrg->index.rwlock);
+static inline void mrg_index_write_lock(MRG *mrg, size_t partition) {
+ netdata_rwlock_wrlock(&mrg->index[partition].rwlock);
}
-static inline void mrg_index_write_unlock(MRG *mrg) {
- netdata_rwlock_unlock(&mrg->index.rwlock);
+static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) {
+ netdata_rwlock_unlock(&mrg->index[partition].rwlock);
}
static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl) {
@@ -82,12 +88,19 @@ static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg) {
__atomic_sub_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
}
+static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
+ uint8_t *u = (uint8_t *)uuid;
+ return u[UUID_SZ - 1] % MRG_PARTITIONS;
+}
+
static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
- mrg_index_write_lock(mrg);
+ size_t partition = uuid_partition(mrg, &entry->uuid);
+
+ mrg_index_write_lock(mrg, partition);
size_t mem_before_judyl, mem_after_judyl;
- Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index.uuid_judy, &entry->uuid, sizeof(uuid_t), PJE0);
+ Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, &entry->uuid, sizeof(uuid_t), PJE0);
if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
@@ -104,7 +117,7 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
if(*PValue != NULL) {
METRIC *metric = *PValue;
- mrg_index_write_unlock(mrg);
+ mrg_index_write_unlock(mrg, partition);
if(ret)
*ret = false;
@@ -113,7 +126,7 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
return metric;
}
- METRIC *metric = arrayalloc_mallocz(mrg->index.aral);
+ METRIC *metric = arrayalloc_mallocz(mrg->index[partition].aral);
uuid_copy(metric->uuid, entry->uuid);
metric->section = entry->section;
metric->first_time_s = entry->first_time_s;
@@ -123,49 +136,53 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
netdata_spinlock_init(&metric->timestamps_lock);
*PValue = metric;
- mrg_index_write_unlock(mrg);
+ mrg_index_write_unlock(mrg, partition);
if(ret)
*ret = true;
- MRG_STATS_ADDED_METRIC(mrg);
+ MRG_STATS_ADDED_METRIC(mrg, partition);
return metric;
}
static METRIC *metric_get(MRG *mrg, uuid_t *uuid, Word_t section) {
- mrg_index_read_lock(mrg);
+ size_t partition = uuid_partition(mrg, uuid);
+
+ mrg_index_read_lock(mrg, partition);
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index.uuid_judy, uuid, sizeof(uuid_t));
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
if(unlikely(!sections_judy_pptr)) {
- mrg_index_read_unlock(mrg);
+ mrg_index_read_unlock(mrg, partition);
MRG_STATS_SEARCH_MISS(mrg);
return NULL;
}
Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
if(unlikely(!PValue)) {
- mrg_index_read_unlock(mrg);
+ mrg_index_read_unlock(mrg, partition);
MRG_STATS_SEARCH_MISS(mrg);
return NULL;
}
METRIC *metric = *PValue;
- mrg_index_read_unlock(mrg);
+ mrg_index_read_unlock(mrg, partition);
MRG_STATS_SEARCH_HIT(mrg);
return metric;
}
static bool metric_del(MRG *mrg, METRIC *metric) {
+ size_t partition = uuid_partition(mrg, &metric->uuid);
+
size_t mem_before_judyl, mem_after_judyl;
- mrg_index_write_lock(mrg);
+ mrg_index_write_lock(mrg, partition);
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index.uuid_judy, &metric->uuid, sizeof(uuid_t));
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
- mrg_index_write_unlock(mrg);
+ mrg_index_write_unlock(mrg, partition);
MRG_STATS_DELETE_MISS(mrg);
return false;
}
@@ -176,24 +193,24 @@ static bool metric_del(MRG *mrg, METRIC *metric) {
mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl);
if(unlikely(!rc)) {
- mrg_index_write_unlock(mrg);
+ mrg_index_write_unlock(mrg, partition);
MRG_STATS_DELETE_MISS(mrg);
return false;
}
if(!*sections_judy_pptr) {
- rc = JudyHSDel(&mrg->index.uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
+ rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
if(unlikely(!rc))
fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
mrg_stats_size_judyhs_removed_uuid(mrg);
}
// arrayalloc is running lockless here
- arrayalloc_freez(mrg->index.aral, metric);
+ arrayallo