summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-11-15 23:00:53 +0200
committerGitHub <noreply@github.com>2022-11-15 23:00:53 +0200
commit224b051a2b2bab39a4b536e531ab9ca590bf31bb (patch)
treeadb3ca35d6d6a4d4f1b7aad50542619c3efb38c0
parentb4a0298bd48f217c4a6f2eaf729e0684966ea7a3 (diff)
New journal disk based indexing for agent memory reduction (#13885)
* Add read only option to netdata_mmap so files are accessed ousing PROT_READ * Initial functions to write the new journal file and switch to the new indexing * Cleanup code, add parameters to pg_cache_punch_hole to avoid updating page latets oldest times pg_cache insert to have parameter if page index locked needs to be done Page eviction functions will try to deallocate the descriptor as well (pg_cache_punch_hole without page_index time updates) Cleanup messages during startup * Cleanup messages during startup * Disbale extent caching for now, add placeholder for journal indexing and activation while the agent is running * Add main function to populate descriptors by checking the new journal indexing * prevent crash * fix for binary search crash * Avoid Time-of-check time-of-use filesystem race condition * always add a page * populate fixes - it is still incomplete * pg_cache_insert returns the descriptor that ends up in the page_index * Add populate next (Fix 1) * Fix compilation warnings, reactivate extent caching * Add populate next (Fix 2) * Add populate next (Fix 3) switch to the next entry or journal file when asking to populate descriptor with next * Fix resource leak and wrong sizeof * Rework page population (part 1) * Additional checksums added / journal validation * Cleanup (part 1) * Locking added and Cleanup (part 2) * Close journal file after new journal index activation * Skip warning when compiling without NETDATA_INTERNAL_CHECKS * Ignore empty index file (header and trailer and no metrics) * Try to remove all evicted descriptors (may prevent slight memory increase) * Evict pages also when we succesfully do try_reserve * Precache pages and cleanup * Add a separate cleanup thread to release unused descriptors * Check existence of key correctly * Fix total file size calculation * Statistics for journal descriptors * Track and release jourval v2 descriptors * Do not try to allocate pages for locality if under pressure * Do not track v2 descriptors when populating the page_index * Track page descriptors as they are inserted in the page index (per journal file) Scan journal files for pending items to cleanup Cleanup v2 descriptors only if they are not populated Check before adding to page cache to avoid memory allocation /free * Close journal file that has been processed and migrated to the new index Check for valid file before trying to truncate / close. This file has been closed during startup * Better calculation for the number of prefetched data pages based on the query end time Code cleanup and comments Add v2 populated descriptor expiration based on journal access time * Code cleanup * Faster indexing Better journal validation (more sanity checks) Detect new datafile/ journal creation and trigger index generation Switch to the new index / mark descriptors in memory as needed Update journal access time when a descriptor is returned Code cleanup (part 1) * Re activate descriptor clean Code cleanup * Allow locality precaching * Allow locality precaching for the same page alignment * Descriptor cleanup internal changed * Disable locality precaching * Precache only if not under pressure / internal cleanup at 60 seconds * Remove unused functions * Migrate on startup always Make sure the metric uuid is valid (we have a page_index) Prevent crash if no datafile is available when logging an error Remove unused functions * New warn limit for precaching Stress test v2 descriptor cleanup - Every 1s cleanup if it doesnt exist in cache - 60s cache eviction * Arrayalloc internal checks on free activated with NETDATA_ARRAYALLOC_INTERNAL_CHECKS Ability to add DESCRIPTOR_EXPIRATION_TIME and DESCRIPTOR_INTERVAL_CLEANUP during compile Defaults DESCRIPTOR_INTERVAL_CLEANUP = 60 and DESCRIPTOR_EXPIRATION_TIME = 600 * Lookup page index correctly * Calculate index time once * Detect a duplicate page when doing cache insert and during flushing of pages * Better logging * Descriptor validation (extent vs page index) when building an index file while the agent is running * Mark invalid entries in the journal v2 file * Schedule an index rebuild if a descriptor is found without an extent in the timerange we are processing Release descriptor lock to prevent random shutdown locks * Proper unlock * Skip descriptor cleanup when journal file v2 migration is running * Fix page cache statistics Remove multiple entries of the page_index from the page cache Cleanup * Adjust preload pages on pg_cache_next. Handle invalid descriptor properly Unlock properly * Better handling of invalid pages Journal indexing during runtime will scan all files to find potential ones to index * Reactivate migration on startup Evict descriptors to cause migration Don't count the entries in page index (calculate when processing the extent list) Check for valid extent since we may set the extent to NULL on startup if it is invalid Better structure init Address valgrind issues * Add don't fork/dump option * Add separate lock to protect accessing a datafile's extent list Comment out some unused code (for now) Abort descriptor cleanup if we are force flushing pages (page cache under pressure) * Check for index and schedule when data flush completes Configure max datafile size during compilation Keep a separate JudyL array for descriptors Skip quota test if we are deleting descriptors or explicitly flushing pages under pressure * Fix * set function when waiters are waken up * add the line number to trace the deadlock * add thread id * add wait list * init to zero * disable thread cancelability inside dbengine rrdeng_load_page_next() * make sure the owner is the thread * disable thread cancelability for replication as a whole * Check and queue indexing after first page flush * Queue indexing after a small delay to allow some time for page flushing * tracing of waiters only when compiled with internal checks * Mark descr with extent_entry * Return page timeout * Check if a journalfile is ready to be indexed Migrate the descriptors or evict if possible Compilation warning fix * Use page index if indexing during startup Mark if journalfile should be checked depending on whether we can migrate or delete a page during indexing * require 3x max message size as sender buffer * fix for the msg of the adaptive buffer size * fix for the msg of the duplicate replication commands * Disable descriptor deletion during migration * Detect descriptor with same start page time * sender sorts replication requests before fullfilling them; receiver does not send duplicate replication requests * dbengine never allows past timestamps to be collected * do not accept values same as last data point stored in dbengine * replicate non-overlapping ranges * a better replication logic to avoid sending overlapping data to parents * Do not start journal migration in parallel * Always update page index times * Fix page index first / last times on load * internal log when replication responses do not match the requests or when replication commands are sent while there are others inflight * do not log out of bounds RBEGIN if it is the last replication command we sent * better checking of past data collection points * better checking of past data collection points - optimized * fix corruption during decompression of streaming * Add config to disable journal indexing Add config parameter for detailed journal integrity check (Metric chain validation check during startup) pg cache insert drop check for existing page Fix crc calculation for metric headers * children disable compression globally, only when the compression gives an error * turn boolean member into RRDHOST OPTION * Compilation warnings * Remove unused code * replication sender statistics * replication sender statistics set to 100% when no replication requests are pending * Fix casting warning Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
-rw-r--r--collectors/plugins.d/pluginsd_parser.c32
-rw-r--r--daemon/global_statistics.c3
-rw-r--r--daemon/main.c3
-rw-r--r--database/engine/datafile.c41
-rw-r--r--database/engine/datafile.h14
-rw-r--r--database/engine/journalfile.c1126
-rw-r--r--database/engine/journalfile.h88
-rw-r--r--database/engine/pagecache.c569
-rw-r--r--database/engine/pagecache.h63
-rw-r--r--database/engine/rrdengine.c570
-rw-r--r--database/engine/rrdengine.h45
-rwxr-xr-xdatabase/engine/rrdengineapi.c87
-rw-r--r--database/engine/rrdengineapi.h4
-rw-r--r--database/engine/rrdenglocking.c24
-rw-r--r--database/engine/rrdenglocking.h7
-rw-r--r--database/rrd.h14
-rw-r--r--database/rrddim.c6
-rw-r--r--database/rrdhost.c13
-rw-r--r--database/rrdset.c4
-rw-r--r--libnetdata/arrayalloc/arrayalloc.c4
-rw-r--r--libnetdata/dictionary/dictionary.c9
-rw-r--r--libnetdata/dictionary/dictionary.h8
-rw-r--r--libnetdata/libnetdata.c5
-rw-r--r--libnetdata/libnetdata.h2
-rw-r--r--streaming/compression.c1
-rw-r--r--streaming/receiver.c25
-rw-r--r--streaming/replication.c16
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c184
-rw-r--r--web/api/web_api_v1.c2
30 files changed, 2320 insertions, 651 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 3e0f83422c..e92830fe20 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -293,9 +293,25 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
// rrdhost_hostname(host), rrdset_id(st),
// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ bool ok = true;
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+
+ ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child,
+ last_entry_child, 0, 0);
+ }
+ else {
+ internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st));
+ }
- bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
@@ -875,6 +891,11 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
time_t start_time = strtol(start_time_str, NULL, 0);
time_t end_time = strtol(end_time_str, NULL, 0);
+ internal_error(
+ (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
+ "REPLAY: received a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).",
+ rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before);
+
if(start_time && end_time) {
if (start_time > end_time) {
error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
@@ -1135,11 +1156,18 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
st->counter++;
st->counter_done++;
+#ifdef NETDATA_INTERNAL_CHECKS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
if (start_streaming) {
if (st->update_every != update_every_child)
rrdset_set_update_every(st, update_every_child);
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
return PARSER_RC_OK;
}
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index e81fc49d11..6d6c6dd5b8 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -865,6 +865,7 @@ static void dbengine_statistics_charts(void) {
{
static RRDSET *st_long_term_pages = NULL;
+ static RRDDIM *rd_memory = NULL;
static RRDDIM *rd_total = NULL;
static RRDDIM *rd_insertions = NULL;
static RRDDIM *rd_deletions = NULL;
@@ -885,6 +886,7 @@ static void dbengine_statistics_charts(void) {
localhost->rrd_update_every,
RRDSET_TYPE_LINE);
+ rd_memory = rrddim_add(st_long_term_pages, "journal v2 descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
@@ -893,6 +895,7 @@ static void dbengine_statistics_charts(void) {
} else
rrdset_next(st_long_term_pages);
+ rrddim_set_by_pointer(st_long_term_pages, rd_memory, (collected_number)stats_array[37]);
rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]);
rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]);
rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]);
diff --git a/daemon/main.c b/daemon/main.c
index 5c437d208d..6c189fead7 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -680,6 +680,9 @@ static void get_netdata_configured_variables() {
db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO);
default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb);
+ db_engine_journal_indexing = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal indexing", CONFIG_BOOLEAN_YES);
+ db_engine_journal_check = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal integrity check", CONFIG_BOOLEAN_NO);
+
if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) {
error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB);
default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 9c70068d9f..e534d9c673 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -1,9 +1,29 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
+void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile)
+{
+ struct extent_info *extent = datafile->extents.first, *next_extent;
+
+ char path[RRDENG_PATH_MAX];
+
+ generate_journalfilepath_v2(datafile, path, sizeof(path));
+ internal_error(true, "Deleting extents of file %s", path);
+ unsigned count = 0;
+ while (extent) {
+ next_extent = extent->next;
+ freez(extent);
+ count++;
+ extent = next_extent;
+ }
+ datafile->extents.first = NULL;
+ internal_error(true, "Deleted %u extents of file %s", count, path);
+}
+
void df_extent_insert(struct extent_info *extent)
{
struct rrdengine_datafile *datafile = extent->datafile;
+ uv_rwlock_wrlock(&datafile->extent_rwlock);
if (likely(NULL != datafile->extents.last)) {
datafile->extents.last->next = extent;
@@ -12,10 +32,14 @@ void df_extent_insert(struct extent_info *extent)
datafile->extents.first = extent;
}
datafile->extents.last = extent;
+
+ uv_rwlock_wrunlock(&datafile->extent_rwlock);
}
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
+ uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+
if (likely(NULL != ctx->datafiles.last)) {
ctx->datafiles.last->next = datafile;
}
@@ -23,12 +47,13 @@ void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_dataf
ctx->datafiles.first = datafile;
}
ctx->datafiles.last = datafile;
+
+ uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
}
-void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
struct rrdengine_datafile *next;
-
next = datafile->next;
fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
ctx->datafiles.first = next;
@@ -44,6 +69,7 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->file = (uv_file)0;
datafile->pos = 0;
datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
+ fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock));
datafile->journalfile = NULL;
datafile->next = NULL;
datafile->ctx = ctx;
@@ -97,7 +123,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile)
return ret;
}
-int destroy_data_file(struct rrdengine_datafile *datafile)
+int destroy_data_file_unsafe(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
@@ -176,7 +202,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_data_file(datafile);
+ destroy_data_file_unsafe(datafile);
return ret;
}
@@ -304,10 +330,8 @@ static int scan_data_files(struct rrdengine_instance *ctx)
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
- info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
- info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@@ -337,6 +361,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
+ journalfile->file_index = i;
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
if (!must_delete_pair) /* If datafile is still open close it */
@@ -346,6 +371,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
if (must_delete_pair) {
char path[RRDENG_PATH_MAX];
+ // TODO: Also delete the version 2
error("Deleting invalid data and journal file pair.");
ret = unlink_journal_file(journalfile);
if (!ret) {
@@ -407,7 +433,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsi
return 0;
error_after_journalfile:
- destroy_data_file(datafile);
+ destroy_data_file_unsafe(datafile);
freez(journalfile);
error_after_datafile:
freez(datafile);
@@ -421,6 +447,7 @@ int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
+ fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock));
ret = scan_data_files(ctx);
if (ret < 0) {
error("Failed to scan path \"%s\".", ctx->dbfiles_path);
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index 1cf256aff4..48d72623c3 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -13,7 +13,13 @@ struct rrdengine_instance;
#define DATAFILE_PREFIX "datafile-"
#define DATAFILE_EXTENSION ".ndf"
+#ifndef MAX_DATAFILE_SIZE
#define MAX_DATAFILE_SIZE (1073741824LU)
+#endif
+#if MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
+#error MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
+#endif
+
#define MIN_DATAFILE_SIZE (4194304LU)
#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
#define TARGET_DATAFILES (20)
@@ -26,6 +32,7 @@ struct extent_info {
uint8_t number_of_pages;
struct rrdengine_datafile *datafile;
struct extent_info *next;
+ uint32_t index; // This is the entent index for version 2
struct rrdeng_page_descr *pages[];
};
@@ -41,6 +48,7 @@ struct rrdengine_datafile {
unsigned fileno;
uv_file file;
uint64_t pos;
+ uv_rwlock_t extent_rwlock;
struct rrdengine_instance *ctx;
struct rrdengine_df_extents extents;
struct rrdengine_journalfile *journalfile;
@@ -48,20 +56,22 @@ struct rrdengine_datafile {
};
struct rrdengine_datafile_list {
+ uv_rwlock_t rwlock;
struct rrdengine_datafile *first; /* oldest */
struct rrdengine_datafile *last; /* newest */
};
void df_extent_insert(struct extent_info *extent);
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
-void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
int close_data_file(struct rrdengine_datafile *datafile);
int unlink_data_file(struct rrdengine_datafile *datafile);
-int destroy_data_file(struct rrdengine_datafile *datafile);
+int destroy_data_file_unsafe(struct rrdengine_datafile *datafile);
int create_data_file(struct rrdengine_datafile *datafile);
int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
int init_data_files(struct rrdengine_instance *ctx);
void finalize_data_files(struct rrdengine_instance *ctx);
+void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile);
#endif /* NETDATA_DATAFILE_H */ \ No newline at end of file
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 500dd78800..037603b9b6 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,6 +1,25 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
+void queue_journalfile_v2_migration(struct rrdengine_worker_config *wc)
+{
+ struct rrdeng_work *work_request;
+
+ if (unlikely(wc->running_journal_migration))
+ return;
+
+ work_request = mallocz(sizeof(*work_request));
+ work_request->req.data = work_request;
+ work_request->wc = wc;
+ work_request->count = 0;
+ work_request->rerun = false;
+ wc->running_journal_migration = 1;
+ if (unlikely(uv_queue_work(wc->loop, &work_request->req, start_journal_indexing, after_journal_indexing))) {
+ freez(work_request);
+ wc->running_journal_migration = 0;
+ }
+}
+
static void flush_transaction_buffer_cb(uv_fs_t* req)
{
struct generic_io_descriptor *io_descr = req->data;
@@ -47,6 +66,7 @@ void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
io_descr->bytes = size;
io_descr->pos = journalfile->pos;
io_descr->req.data = io_descr;
+ io_descr->data = journalfile;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
@@ -93,6 +113,12 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
+void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
+{
+ (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
+}
+
void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
@@ -104,28 +130,51 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->file = (uv_file)0;
journalfile->pos = 0;
journalfile->datafile = datafile;
+ journalfile->journal_data = NULL;
+ journalfile->journal_data_size = 0;
+ journalfile->JudyL_array = (Pvoid_t) NULL;
+ journalfile->data = NULL;
+ journalfile->file_index = 0;
+ journalfile->last_access = 0;
}
-int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
{
- struct rrdengine_instance *ctx = datafile->ctx;
- uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
-
- ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ uv_fs_t req;
+ ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
+ generate_journalfilepath(datafile, path, sizeof(path));
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
+ ++datafile->ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
-
return ret;
}
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ char path[RRDENG_PATH_MAX];
+
+ if (likely(journalfile->journal_data)) {
+ if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
+ generate_journalfilepath_v2(datafile, path, sizeof(path));
+ error("Failed to unmap journal index file for %s", path);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ journalfile->journal_data = NULL;
+ journalfile->journal_data_size = 0;
+ return 0;
+ }
+
+ return close_uv_file(datafile, journalfile->file);
+}
+
int unlink_journal_file(struct rrdengine_journalfile *journalfile)
{
struct rrdengine_datafile *datafile = journalfile->datafile;
@@ -149,14 +198,16 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
return ret;
}
-int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
+ char path_v2[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
+ generate_journalfilepath_v2(datafile, path_v2, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
@@ -166,9 +217,12 @@ int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrden
}
uv_fs_req_cleanup(&req);
- ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ (void) close_uv_file(datafile, journalfile->file);
+
+ // This is the new journal v2 index file
+ ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
if (ret < 0) {
- error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
@@ -183,6 +237,13 @@ int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrden
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
+ ++ctx->stats.journalfile_deletions;
+
+ if (journalfile->journal_data) {
+ if (munmap(journalfile->journal_data, journalfile->journal_data_size)) {
+ error("Failed to unmap index file %s", path_v2);
+ }
+ }
return ret;
}
@@ -227,7 +288,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_journal_file(journalfile, datafile);
+ destroy_journal_file_unsafe(journalfile, datafile);
return ret;
}
@@ -366,36 +427,67 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(temp_id, ctx);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
+ *PValue = page_index = create_page_index(temp_id, ctx);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
- descr = pg_cache_create_descr();
+ // Lookup this descriptor
+ Word_t p_time = start_time_ut / USEC_PER_SEC;
+ uv_rwlock_rdlock(&page_index->lock);
+ PValue = JudyLFirst(page_index->JudyL_array, &p_time, PJE0);
+ descr = (NULL == PValue) ? NULL: *PValue;
+ uv_rwlock_rdunlock(&page_index->lock);
+
+ bool descr_found = false;
+ if (unlikely(descr && descr->start_time_ut == start_time_ut)) {
+ // We have this descriptor already
+ descr_found = true;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ char uuid_str[UUID_STR_LEN];
+ uuid_unparse_lower(page_index->id, uuid_str);
+ internal_error(true, "REMOVING UUID %s with %lu, %lu length=%u (fileno=%u), extent database offset = %lu, size = %u",
+ uuid_str, start_time_ut, end_time_ut, jf_metric_data->des