summaryrefslogtreecommitdiffstats
path: root/database/engine/journalfile.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /database/engine/journalfile.c
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'database/engine/journalfile.c')
-rw-r--r--database/engine/journalfile.c1007
1 files changed, 815 insertions, 192 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 500dd78800..5ad383d850 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,96 +1,116 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
+
+// DBENGINE2: Helper
+
+static void update_metric_retention_and_granularity_by_uuid(
+ struct rrdengine_instance *ctx, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ time_t update_every_s, time_t now_s)
+{
+ if(last_time_s > now_s) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
+ last_time_s = now_s;
+ }
+
+ if(first_time_s > last_time_s) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
+
+ first_time_s = last_time_s;
+ }
+
+ if(first_time_s == 0 ||
+ last_time_s == 0
+ ) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
+ }
+
+ MRG_ENTRY entry = {
+ .section = (Word_t)ctx,
+ .first_time_s = first_time_s,
+ .last_time_s = last_time_s,
+ .latest_update_every_s = update_every_s
+ };
+ uuid_copy(entry.uuid, *uuid);
+
+ bool added;
+ METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+
+ if (likely(!added))
+ mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
+
+ mrg_metric_release(main_mrg, metric);
+}
+
static void flush_transaction_buffer_cb(uv_fs_t* req)
{
- struct generic_io_descriptor *io_descr = req->data;
- struct rrdengine_worker_config* wc = req->loop->data;
- struct rrdengine_instance *ctx = wc->ctx;
+ worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
+
+ WAL *wal = req->data;
+ struct generic_io_descriptor *io_descr = &wal->io_descr;
+ struct rrdengine_instance *ctx = io_descr->ctx;
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
- error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
}
uv_fs_req_cleanup(req);
- posix_memfree(io_descr->buf);
- freez(io_descr);
+ wal_release(wal);
+
+ __atomic_sub_fetch(&ctx->worker_config.atomics.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
+
+ worker_is_idle();
}
/* Careful to always call this before creating a new journal file */
-void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
+void wal_flush_transaction_buffer(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
{
- struct rrdengine_instance *ctx = wc->ctx;
int ret;
struct generic_io_descriptor *io_descr;
- unsigned pos, size;
- struct rrdengine_journalfile *journalfile;
+ struct rrdengine_journalfile *journalfile = datafile->journalfile;
- if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
- return;
- }
- /* care with outstanding transactions when switching journal files */
- journalfile = ctx->datafiles.last->journalfile;
-
- io_descr = mallocz(sizeof(*io_descr));
- pos = ctx->commit_log.buf_pos;
- size = ctx->commit_log.buf_size;
- if (pos < size) {
+ io_descr = &wal->io_descr;
+ io_descr->ctx = ctx;
+ if (wal->size < wal->buf_size) {
/* simulate an empty transaction to skip the rest of the block */
- *(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
+ *(uint8_t *) (wal->buf + wal->size) = STORE_PADDING;
}
- io_descr->buf = ctx->commit_log.buf;
- io_descr->bytes = size;
+ io_descr->buf = wal->buf;
+ io_descr->bytes = wal->buf_size;
io_descr->pos = journalfile->pos;
- io_descr->req.data = io_descr;
+ io_descr->req.data = wal;
+ io_descr->data = journalfile;
io_descr->completion = NULL;
- io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
- ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
+ io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
+ ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
journalfile->pos, flush_transaction_buffer_cb);
fatal_assert(-1 != ret);
- journalfile->pos += RRDENG_BLOCK_SIZE;
- ctx->disk_space += RRDENG_BLOCK_SIZE;
- ctx->commit_log.buf = NULL;
- ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
+ journalfile->pos += wal->buf_size;
+ ctx->disk_space += wal->buf_size;
+ ctx->stats.io_write_bytes += wal->buf_size;
++ctx->stats.io_write_requests;
}
-void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
+void generate_journalfilepath_v2(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
- struct rrdengine_instance *ctx = wc->ctx;
- int ret;
- unsigned buf_pos = 0, buf_size;
-
- fatal_assert(size);
- if (ctx->commit_log.buf) {
- unsigned remaining;
-
- buf_pos = ctx->commit_log.buf_pos;
- buf_size = ctx->commit_log.buf_size;
- remaining = buf_size - buf_pos;
- if (size > remaining) {
- /* we need a new buffer */
- wal_flush_transaction_buffer(wc);
- }
- }
- if (NULL == ctx->commit_log.buf) {
- buf_size = ALIGN_BYTES_CEILING(size);
- ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
- if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
- }
- memset(ctx->commit_log.buf, 0, buf_size);
- buf_pos = ctx->commit_log.buf_pos = 0;
- ctx->commit_log.buf_size = buf_size;
- }
- ctx->commit_log.buf_pos += size;
-
- return ctx->commit_log.buf + buf_pos;
+ (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
+ datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
@@ -104,28 +124,51 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->file = (uv_file)0;
journalfile->pos = 0;
journalfile->datafile = datafile;
+ SET_JOURNAL_DATA(journalfile, 0);
+ SET_JOURNAL_DATA_SIZE(journalfile, 0);
+ journalfile->data = NULL;
}
-int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
{
- struct rrdengine_instance *ctx = datafile->ctx;
- uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
- generate_journalfilepath(datafile, path, sizeof(path));
-
- ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ uv_fs_t req;
+ ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- error("uv_fs_close(%s): %s", path, uv_strerror(ret));
- ++ctx->stats.fs_errors;
+ generate_journalfilepath(datafile, path, sizeof(path));
+ error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ ++datafile->ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
-
return ret;
}
+int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+{
+ struct rrdengine_instance *ctx = datafile->ctx;
+ char path[RRDENG_PATH_MAX];
+
+ void *journal_data = GET_JOURNAL_DATA(journalfile);
+ size_t journal_data_size = GET_JOURNAL_DATA_SIZE(journalfile);
+
+ if (likely(journal_data)) {
+ if (munmap(journal_data, journal_data_size)) {
+ generate_journalfilepath_v2(datafile, path, sizeof(path));
+ error("DBENGINE: failed to unmap journal index file for %s", path);
+ ++ctx->stats.fs_errors;
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ }
+ SET_JOURNAL_DATA(journalfile, 0);
+ SET_JOURNAL_DATA_SIZE(journalfile, 0);
+ return 0;
+ }
+
+ return close_uv_file(datafile, journalfile->file);
+}
+
int unlink_journal_file(struct rrdengine_journalfile *journalfile)
{
struct rrdengine_datafile *datafile = journalfile->datafile;
@@ -138,7 +181,7 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
@@ -149,26 +192,32 @@ int unlink_journal_file(struct rrdengine_journalfile *journalfile)
return ret;
}
-int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
+int destroy_journal_file_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
+ char path_v2[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
+ generate_journalfilepath_v2(datafile, path_v2, sizeof(path));
+ if (journalfile->file) {
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
+ (void) close_uv_file(datafile, journalfile->file);
+ }
- ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
+ // This is the new journal v2 index file
+ ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
if (ret < 0) {
- error("uv_fs_close(%s): %s", path, uv_strerror(ret));
+ error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
@@ -176,13 +225,23 @@ int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrden
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
+ ++ctx->stats.journalfile_deletions;
+
+ void *journal_data = GET_JOURNAL_DATA(journalfile);
+ size_t journal_data_size = GET_JOURNAL_DATA_SIZE(journalfile);
+
+ if (journal_data) {
+ if (munmap(journal_data, journal_data_size)) {
+ error("DBENGINE: failed to unmap index file %s", path_v2);
+ }
+ }
return ret;
}
@@ -209,7 +268,7 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
}
memset(superblock, 0, sizeof(*superblock));
(void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
@@ -220,14 +279,14 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
- error("uv_fs_write: %s", uv_strerror(ret));
+ error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
- destroy_journal_file(journalfile, datafile);
+ destroy_journal_file_unsafe(journalfile, datafile);
return ret;
}
@@ -247,13 +306,13 @@ static int check_journal_file_superblock(uv_file file)
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
- fatal("posix_memalign:%s", strerror(ret));
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
}
iov = uv_buf_init((void *)superblock, sizeof(*superblock));
ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- error("uv_fs_read: %s", uv_strerror(ret));
+ error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto error;
}
@@ -262,7 +321,7 @@ static int check_journal_file_superblock(uv_file file)
if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
- error("File has invalid superblock.");
+ error("DBENGINE: File has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -272,15 +331,10 @@ static int check_journal_file_superblock(uv_file file)
return ret;
}
-static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
- void *buf, unsigned max_size)
+static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
{
static BITMAP256 page_error_map;
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned i, count, payload_length, descr_size, valid_pages;
- struct rrdeng_page_descr *descr;
- struct extent_info *extent;
- /* persistent structures */
+ unsigned i, count, payload_length, descr_size;
struct rrdeng_jf_store_data *jf_metric_data;
jf_metric_data = buf;
@@ -288,117 +342,63 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
descr_size = sizeof(*jf_metric_data->descr) * count;
payload_length = sizeof(*jf_metric_data) + descr_size;
if (payload_length > max_size) {
- error("Corrupted transaction payload.");
+ error("DBENGINE: corrupted transaction payload.");
return;
}
- extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
- extent->offset = jf_metric_data->extent_offset;
- extent->size = jf_metric_data->extent_size;
- extent->datafile = journalfile->datafile;
- extent->next = NULL;
-
- for (i = 0, valid_pages = 0 ; i < count ; ++i) {
+ time_t now_s = now_realtime_sec();
+ for (i = 0; i < count ; ++i) {
uuid_t *temp_id;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
uint8_t page_type = jf_metric_data->descr[i].type;
if (page_type > PAGE_TYPE_MAX) {
if (!bitmap256_get_bit(&page_error_map, page_type)) {
- error("Unknown page type %d encountered.", page_type);
+ error("DBENGINE: unknown page type %d encountered.", page_type);
bitmap256_set_bit(&page_error_map, page_type, 1);
}
continue;
}
- uint64_t start_time_ut = jf_metric_data->descr[i].start_time_ut;
- uint64_t end_time_ut = jf_metric_data->descr[i].end_time_ut;
- size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type];
- time_t update_every_s = (entries > 1) ? ((end_time_ut - start_time_ut) / USEC_PER_SEC / (entries - 1)) : 0;
-
- if (unlikely(start_time_ut > end_time_ut)) {
- ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut = end_time_ut;
- continue;
- }
- if (unlikely(start_time_ut == end_time_ut && entries != 1)) {
- ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut = end_time_ut;
- continue;
- }
+ temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
+ METRIC *metric = mrg_metric_get_and_acquire(main_mrg, temp_id, (Word_t) ctx);
- if (unlikely(!entries)) {
- ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut = end_time_ut;
- continue;
- }
+ struct rrdeng_extent_page_descr *descr = &jf_metric_data->descr[i];
+ VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr(
+ descr, now_s,
+ (metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0,
+ false);
- if(entries > 1 && update_every_s == 0) {
- ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut = end_time_ut;
+ if(!vd.data_on_disk_valid) {
+ mrg_metric_release(main_mrg, metric);
continue;
}
- if(start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1) != end_time_ut) {
- ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter++;
- if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut < end_time_ut)
- ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut = end_time_ut;
-
- // let this be
- // end_time_ut = start_time_ut + update_every_s * USEC_PER_SEC * (entries - 1);
+ bool update_metric_time = true;
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .section = (Word_t)ctx,
+ .first_time_s = vd.start_time_s,
+ .last_time_s = vd.end_time_s,
+ .latest_update_every_s = vd.update_every_s,
+ };
+ uuid_copy(entry.uuid, *temp_id);
+
+ bool added;
+ metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+ if(added)
+ update_metric_time = false;
}
+ Word_t metric_id = mrg_metric_id(main_mrg, metric);
- temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- /* First time we see the UUID */
- uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
- fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
- *PValue = page_index = create_page_index(temp_id, ctx);
- page_index->prev = pg_cache->metrics_index.last_page_index;
- pg_cache->metrics_index.last_page_index = page_index;
- uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
- }
-
- descr = pg_cache_create_descr();
- descr->page_length = jf_metric_data->descr[i].page_length;
- descr->start_time_ut = start_time_ut;
- descr->end_time_ut = end_time_ut;
- descr->update_every_s = (update_every_s > 0) ? (uint32_t)update_every_s : (page_index->latest_update_every_s);
- descr->id = &page_index->id;
- descr->extent = extent;
- descr->type = page_type;
- extent->pages[valid_pages++] = descr;
- pg_cache_insert(ctx, page_index, descr);
-
- if(page_index->latest_time_ut == descr->end_time_ut)
- page_index->latest_update_every_s = descr->update_every_s;
-
- if(descr->update_every_s == 0)
- fatal(
- "DBENGINE: page descriptor update every is zero, end_time_ut = %llu, start_time_ut = %llu, entries = %zu",
- (unsigned long long)end_time_ut, (unsigned long long)start_time_ut, entries);
- }
+ if (update_metric_time)
+ mrg_metric_expand_retention(main_mrg, metric, vd.start_time_s, vd.end_time_s, vd.update_every_s);
- extent->number_of_pages = valid_pages;
+ pgc_open_add_hot_page(
+ (Word_t)ctx, metric_id, vd.start_time_s, vd.end_time_s, vd.update_every_s,
+ journalfile->datafile,
+ jf_metric_data->extent_offset, jf_metric_data->extent_size, jf_metric_data->descr[i].page_length);
- if (likely(valid_pages))
- df_extent_insert(extent);
- else {
- freez(extent);
- ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter++;
+ mrg_metric_release(main_mrg, metric);
}
}
@@ -424,14 +424,14 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
return 0;
}
if (sizeof(*jf_header) > max_size) {
- error("Corrupted transaction record, skipping.");
+ error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
*id = jf_header->id;
payload_length = jf_header->payload_length;
size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
if (size_bytes > max_size) {
- error("Corrupted transaction record, skipping.");
+ error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
jf_trailer = buf + sizeof(*jf_header) + payload_length;
@@ -440,7 +440,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
ret = crc32cmp(jf_trailer->checksum, crc);
debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
if (unlikely(ret)) {
- error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
+ error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
return size_bytes;
}
switch (jf_header->type) {
@@ -449,7 +449,7 @@ static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdeng
restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
break;
default:
- error("Unknown transaction type. Skipping record.");
+ error("DBENGINE: unknown transaction type, skipping record.");
break;
}
@@ -483,7 +483,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
if (unlikely(!journal_is_mmapped)) {
ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
if (unlikely(ret))
- fatal("posix_memalign:%s", strerror(ret));
+ fatal("DBENGINE: posix_memalign:%s", strerror(ret));
}
else
buf = journalfile->data + sizeof(struct rrdeng_jf_sb);
@@ -493,7 +493,7 @@ static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrde
iov = uv_buf_init(buf, size_bytes);
ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
if (ret < 0) {
- error("uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
+ error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto skip_file;
}
@@ -524,6 +524,609 @@ skip_file:
return max_id;
}
+// Checks that the extent list checksum is valid
+static int check_journal_v2_extent_list (void *data_start, size_t file_size)
+{
+ UNUSED(file_size);
+ uLong crc;
+
+ struct journal_v2_header *j2_header = (void *) data_start;
+ struct journal_v2_block_trailer *journal_v2_trailer;
+
+ journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
+ if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
+ error("DBENGINE: extent list CRC32 check: FAILED");
+ return 1;
+ }
+
+ return 0;
+}
+
+// Checks that the metric list (UUIDs) checksum is valid
+static int check_journal_v2_metric_list(void *data_start, size_t file_size)
+{
+ UNUSED(file_size);
+ uLong crc;
+
+ struct journal_v2_header *j2_header = (void *) data_start;
+ struct journal_v2_block_trailer *journal_v2_trailer;
+
+ journal_v2_trailer = (struct journal_v2_block_trailer *