summaryrefslogtreecommitdiffstats
path: root/database/engine/pdc.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /database/engine/pdc.c
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'database/engine/pdc.c')
-rw-r--r--database/engine/pdc.c1221
1 files changed, 1221 insertions, 0 deletions
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
new file mode 100644
index 0000000000..491109b3c0
--- /dev/null
+++ b/database/engine/pdc.c
@@ -0,0 +1,1221 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#define NETDATA_RRD_INTERNALS
+#include "pdc.h"
+
+struct extent_page_details_list {
+ uv_file file;
+ uint64_t extent_offset;
+ uint32_t extent_size;
+ unsigned number_of_pages_in_JudyL;
+ Pvoid_t page_details_by_metric_id_JudyL;
+ struct page_details_control *pdc;
+ struct rrdengine_datafile *datafile;
+
+ struct {
+ struct extent_page_details_list *prev;
+ struct extent_page_details_list *next;
+ } cache;
+};
+
+typedef struct datafile_extent_offset_list {
+ uv_file file;
+ unsigned fileno;
+ Pvoid_t extent_pd_list_by_extent_offset_JudyL;
+
+ struct {
+ struct datafile_extent_offset_list *prev;
+ struct datafile_extent_offset_list *next;
+ } cache;
+} DEOL;
+
+// ----------------------------------------------------------------------------
+// PDC cache
+
+static struct {
+ struct {
+ SPINLOCK spinlock;
+ PDC *available_items;
+ size_t available;
+ } protected;
+
+ struct {
+ size_t allocated;
+ } atomics;
+} pdc_globals = {
+ .protected = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .available_items = NULL,
+ .available = 0,
+ },
+ .atomics = {
+ .allocated = 0,
+ },
+};
+
+void pdc_cleanup(void) {
+ netdata_spinlock_lock(&pdc_globals.protected.spinlock);
+
+ while(pdc_globals.protected.available_items && pdc_globals.protected.available > (size_t)libuv_worker_threads) {
+ PDC *item = pdc_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(pdc_globals.protected.available_items, item, cache.prev, cache.next);
+ freez(item);
+ pdc_globals.protected.available--;
+ __atomic_sub_fetch(&pdc_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ netdata_spinlock_unlock(&pdc_globals.protected.spinlock);
+}
+
+PDC *pdc_get(void) {
+ PDC *pdc = NULL;
+
+ netdata_spinlock_lock(&pdc_globals.protected.spinlock);
+
+ if(likely(pdc_globals.protected.available_items)) {
+ pdc = pdc_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(pdc_globals.protected.available_items, pdc, cache.prev, cache.next);
+ pdc_globals.protected.available--;
+ }
+
+ netdata_spinlock_unlock(&pdc_globals.protected.spinlock);
+
+ if(unlikely(!pdc)) {
+ pdc = mallocz(sizeof(PDC));
+ __atomic_add_fetch(&pdc_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ memset(pdc, 0, sizeof(PDC));
+ return pdc;
+}
+
+static void pdc_release(PDC *pdc) {
+ if(unlikely(!pdc)) return;
+
+ netdata_spinlock_lock(&pdc_globals.protected.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(pdc_globals.protected.available_items, pdc, cache.prev, cache.next);
+ pdc_globals.protected.available++;
+ netdata_spinlock_unlock(&pdc_globals.protected.spinlock);
+}
+
+size_t pdc_cache_size(void) {
+ return __atomic_load_n(&pdc_globals.atomics.allocated, __ATOMIC_RELAXED) * sizeof(PDC);
+}
+
+// ----------------------------------------------------------------------------
+// PD cache
+
+static struct {
+ struct {
+ SPINLOCK spinlock;
+ struct page_details *available_items;
+ size_t available;
+ } protected;
+
+ struct {
+ size_t allocated;
+ } atomics;
+} page_details_globals = {
+ .protected = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .available_items = NULL,
+ .available = 0,
+ },
+ .atomics = {
+ .allocated = 0,
+ },
+};
+
+void page_details_cleanup(void) {
+ netdata_spinlock_lock(&page_details_globals.protected.spinlock);
+
+ while(page_details_globals.protected.available_items && page_details_globals.protected.available > (size_t)libuv_worker_threads * 2) {
+ struct page_details *item = page_details_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(page_details_globals.protected.available_items, item, cache.prev, cache.next);
+ freez(item);
+ page_details_globals.protected.available--;
+ __atomic_sub_fetch(&page_details_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ netdata_spinlock_unlock(&page_details_globals.protected.spinlock);
+}
+
+struct page_details *page_details_get(void) {
+ struct page_details *pd = NULL;
+
+ netdata_spinlock_lock(&page_details_globals.protected.spinlock);
+
+ if(likely(page_details_globals.protected.available_items)) {
+ pd = page_details_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(page_details_globals.protected.available_items, pd, cache.prev, cache.next);
+ page_details_globals.protected.available--;
+ }
+
+ netdata_spinlock_unlock(&page_details_globals.protected.spinlock);
+
+ if(unlikely(!pd)) {
+ pd = mallocz(sizeof(struct page_details));
+ __atomic_add_fetch(&page_details_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ memset(pd, 0, sizeof(struct page_details));
+ return pd;
+}
+
+static void page_details_release(struct page_details *pd) {
+ if(unlikely(!pd)) return;
+
+ netdata_spinlock_lock(&page_details_globals.protected.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(page_details_globals.protected.available_items, pd, cache.prev, cache.next);
+ page_details_globals.protected.available++;
+ netdata_spinlock_unlock(&page_details_globals.protected.spinlock);
+}
+
+size_t pd_cache_size(void) {
+ return __atomic_load_n(&page_details_globals.atomics.allocated, __ATOMIC_RELAXED) * sizeof(struct page_details);
+}
+
+// ----------------------------------------------------------------------------
+// epdl cache
+
+static struct {
+ struct {
+ SPINLOCK spinlock;
+ EPDL *available_items;
+ size_t available;
+ } protected;
+
+ struct {
+ size_t allocated;
+ } atomics;
+} epdl_globals = {
+ .protected = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .available_items = NULL,
+ .available = 0,
+ },
+ .atomics = {
+ .allocated = 0,
+ },
+};
+
+void epdl_cleanup(void) {
+ netdata_spinlock_lock(&epdl_globals.protected.spinlock);
+
+ while(epdl_globals.protected.available_items && epdl_globals.protected.available > 100) {
+ EPDL *item = epdl_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(epdl_globals.protected.available_items, item, cache.prev, cache.next);
+ freez(item);
+ epdl_globals.protected.available--;
+ __atomic_sub_fetch(&epdl_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ netdata_spinlock_unlock(&epdl_globals.protected.spinlock);
+}
+
+static EPDL *epdl_get(void) {
+ EPDL *epdl = NULL;
+
+ netdata_spinlock_lock(&epdl_globals.protected.spinlock);
+
+ if(likely(epdl_globals.protected.available_items)) {
+ epdl = epdl_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(epdl_globals.protected.available_items, epdl, cache.prev, cache.next);
+ epdl_globals.protected.available--;
+ }
+
+ netdata_spinlock_unlock(&epdl_globals.protected.spinlock);
+
+ if(unlikely(!epdl)) {
+ epdl = mallocz(sizeof(EPDL));
+ __atomic_add_fetch(&epdl_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ memset(epdl, 0, sizeof(EPDL));
+ return epdl;
+}
+
+static void epdl_release(EPDL *epdl) {
+ if(unlikely(!epdl)) return;
+
+ netdata_spinlock_lock(&epdl_globals.protected.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(epdl_globals.protected.available_items, epdl, cache.prev, cache.next);
+ epdl_globals.protected.available++;
+ netdata_spinlock_unlock(&epdl_globals.protected.spinlock);
+}
+
+size_t epdl_cache_size(void) {
+ return __atomic_load_n(&epdl_globals.atomics.allocated, __ATOMIC_RELAXED) * sizeof(EPDL);
+}
+
+// ----------------------------------------------------------------------------
+// deol cache
+
+static struct {
+ struct {
+ SPINLOCK spinlock;
+ DEOL *available_items;
+ size_t available;
+ } protected;
+
+ struct {
+ size_t allocated;
+ } atomics;
+} deol_globals = {
+ .protected = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .available_items = NULL,
+ .available = 0,
+ },
+ .atomics = {
+ .allocated = 0,
+ },
+};
+
+void deol_cleanup(void) {
+ netdata_spinlock_lock(&deol_globals.protected.spinlock);
+
+ while(deol_globals.protected.available_items && deol_globals.protected.available > 100) {
+ DEOL *item = deol_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(deol_globals.protected.available_items, item, cache.prev, cache.next);
+ freez(item);
+ deol_globals.protected.available--;
+ __atomic_sub_fetch(&deol_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ netdata_spinlock_unlock(&deol_globals.protected.spinlock);
+}
+
+static DEOL *deol_get(void) {
+ DEOL *deol = NULL;
+
+ netdata_spinlock_lock(&deol_globals.protected.spinlock);
+
+ if(likely(deol_globals.protected.available_items)) {
+ deol = deol_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(deol_globals.protected.available_items, deol, cache.prev, cache.next);
+ deol_globals.protected.available--;
+ }
+
+ netdata_spinlock_unlock(&deol_globals.protected.spinlock);
+
+ if(unlikely(!deol)) {
+ deol = mallocz(sizeof(DEOL));
+ __atomic_add_fetch(&deol_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ }
+
+ memset(deol, 0, sizeof(DEOL));
+ return deol;
+}
+
+static void deol_release(DEOL *deol) {
+ if(unlikely(!deol)) return;
+
+ netdata_spinlock_lock(&deol_globals.protected.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(deol_globals.protected.available_items, deol, cache.prev, cache.next);
+ deol_globals.protected.available++;
+ netdata_spinlock_unlock(&deol_globals.protected.spinlock);
+}
+
+size_t deol_cache_size(void) {
+ return __atomic_load_n(&deol_globals.atomics.allocated, __ATOMIC_RELAXED) * sizeof(DEOL);
+}
+
+// ----------------------------------------------------------------------------
+// extent with buffer cache
+
+static struct {
+ struct {
+ SPINLOCK spinlock;
+ struct extent_buffer *available_items;
+ size_t available;
+ } protected;
+
+ struct {
+ size_t allocated;
+ size_t allocated_bytes;
+ } atomics;
+
+ size_t max_size;
+
+} extent_buffer_globals = {
+ .protected = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .available_items = NULL,
+ .available = 0,
+ },
+ .atomics = {
+ .allocated = 0,
+ .allocated_bytes = 0,
+ },
+ .max_size = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE,
+};
+
+void extent_buffer_init(void) {
+ size_t max_extent_uncompressed = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE;
+ size_t max_size = (size_t)LZ4_compressBound(MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE);
+ if(max_size < max_extent_uncompressed)
+ max_size = max_extent_uncompressed;
+
+ extent_buffer_globals.max_size = max_size;
+}
+
+void extent_buffer_cleanup(void) {
+ netdata_spinlock_lock(&extent_buffer_globals.protected.spinlock);
+
+ while(extent_buffer_globals.protected.available_items && extent_buffer_globals.protected.available > 1) {
+ struct extent_buffer *item = extent_buffer_globals.protected.available_items;
+ size_t bytes = sizeof(struct extent_buffer) + item->bytes;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(extent_buffer_globals.protected.available_items, item, cache.prev, cache.next);
+ freez(item);
+ extent_buffer_globals.protected.available--;
+ __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
+ }
+
+ netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock);
+}
+
+struct extent_buffer *extent_buffer_get(size_t size) {
+ internal_fatal(size > extent_buffer_globals.max_size, "DBENGINE: extent size is too big");
+
+ struct extent_buffer *eb = NULL;
+
+ if(size < extent_buffer_globals.max_size)
+ size = extent_buffer_globals.max_size;
+
+ netdata_spinlock_lock(&extent_buffer_globals.protected.spinlock);
+ if(likely(extent_buffer_globals.protected.available_items)) {
+ eb = extent_buffer_globals.protected.available_items;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
+ extent_buffer_globals.protected.available--;
+ }
+ netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock);
+
+ if(unlikely(eb && eb->bytes < size)) {
+ size_t bytes = sizeof(struct extent_buffer) + eb->bytes;
+ freez(eb);
+ eb = NULL;
+ __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
+ }
+
+ if(unlikely(!eb)) {
+ size_t bytes = sizeof(struct extent_buffer) + size;
+ eb = mallocz(bytes);
+ eb->bytes = size;
+ __atomic_add_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
+ }
+
+ return eb;
+}
+
+void extent_buffer_release(struct extent_buffer *eb) {
+ if(unlikely(!eb)) return;
+
+ netdata_spinlock_lock(&extent_buffer_globals.protected.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
+ extent_buffer_globals.protected.available++;
+ netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock);
+}
+
+size_t extent_buffer_cache_size(void) {
+ return __atomic_load_n(&extent_buffer_globals.atomics.allocated_bytes, __ATOMIC_RELAXED);
+}
+
+// ----------------------------------------------------------------------------
+// epdl logic
+
+static void epdl_destroy(EPDL *epdl)
+{
+ Pvoid_t *pd_by_start_time_s_JudyL;
+ Word_t metric_id_index = 0;
+ bool metric_id_first = true;
+ while ((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(
+ epdl->page_details_by_metric_id_JudyL,
+ &metric_id_index, &metric_id_first)))
+ PDCJudyLFreeArray(pd_by_start_time_s_JudyL, PJE0);
+
+ PDCJudyLFreeArray(&epdl->page_details_by_metric_id_JudyL, PJE0);
+ epdl_release(epdl);
+}
+
+static void epdl_mark_all_not_loaded_pages_as_failed(EPDL *epdl, PDC_PAGE_STATUS tags, size_t *statistics_counter)
+{
+ size_t pages_matched = 0;
+
+ Word_t metric_id_index = 0;
+ bool metric_id_first = true;
+ Pvoid_t *pd_by_start_time_s_JudyL;
+ while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
+
+ Word_t start_time_index = 0;
+ bool start_time_first = true;
+ Pvoid_t *PValue;
+ while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
+ struct page_details *pd = *PValue;
+
+ if(!pd->page) {
+ pdc_page_status_set(pd, PDC_PAGE_FAILED | tags);
+ pages_matched++;
+ }
+ }
+ }
+
+ if(pages_matched && statistics_counter)
+ __atomic_add_fetch(statistics_counter, pages_matched, __ATOMIC_RELAXED);
+}
+
+static bool epdl_check_if_pages_are_already_in_cache(struct rrdengine_instance *ctx, EPDL *epdl, PDC_PAGE_STATUS tags)
+{
+ size_t count_remaining = 0;
+ size_t found = 0;
+
+ Word_t metric_id_index = 0;
+ bool metric_id_first = true;
+ Pvoid_t *pd_by_start_time_s_JudyL;
+ while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
+
+ Word_t start_time_index = 0;
+ bool start_time_first = true;
+ Pvoid_t *PValue;
+ while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
+ struct page_details *pd = *PValue;
+ if (pd->page)
+ continue;
+
+ pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
+ if (pd->page) {
+ found++;
+ pdc_page_status_set(pd, PDC_PAGE_READY | tags);
+ }
+ else
+ count_remaining++;
+ }
+ }
+
+ if(found) {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_preloaded, found, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, found, __ATOMIC_RELAXED);
+ }
+
+ return count_remaining == 0;
+}
+
+// ----------------------------------------------------------------------------
+// PDC logic
+
+static void pdc_destroy(PDC *pdc) {
+ mrg_metric_release(main_mrg, pdc->metric);
+ completion_destroy(&pdc->prep_completion);
+ completion_destroy(&pdc->page_completion);
+
+ Pvoid_t *PValue;
+ struct page_details *pd;
+ Word_t time_index = 0;
+ bool first_then_next = true;
+ size_t unroutable = 0;
+ while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
+ pd = *PValue;
+
+ // no need for atomics here - we are done...
+ PDC_PAGE_STATUS status = pd->status;
+
+ if(status & PDC_PAGE_DATAFILE_ACQUIRED) {
+ datafile_release(pd->datafile.ptr, DATAFILE_ACQUIRE_PAGE_DETAILS);
+ pd->datafile.ptr = NULL;
+ }
+
+ internal_fatal(pd->datafile.ptr, "DBENGINE: page details has a datafile.ptr that is not released.");
+
+ if(!pd->page && !(status & (PDC_PAGE_READY | PDC_PAGE_FAILED | PDC_PAGE_RELEASED | PDC_PAGE_SKIP | PDC_PAGE_INVALID))) {
+ // pdc_page_status_set(pd, PDC_PAGE_FAILED);
+ unroutable++;
+ }
+
+ if(pd->page && !(status & PDC_PAGE_RELEASED)) {
+ pgc_page_release(main_cache, pd->page);
+ // pdc_page_status_set(pd, PDC_PAGE_RELEASED);
+ }
+
+ page_details_release(pd);
+ }
+
+ PDCJudyLFreeArray(&pdc->page_list_JudyL, PJE0);
+
+ __atomic_sub_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&pdc->ctx->inflight_queries, 1, __ATOMIC_RELAXED);
+ pdc_release(pdc);
+
+ if(unroutable)
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_unroutable, unroutable, __ATOMIC_RELAXED);
+}
+
+void pdc_acquire(PDC *pdc) {
+ netdata_spinlock_lock(&pdc->refcount_spinlock);
+
+ if(pdc->refcount < 1)
+ fatal("DBENGINE: pdc is not referenced and cannot be acquired");
+
+ pdc->refcount++;
+ netdata_spinlock_unlock(&pdc->refcount_spinlock);
+}
+
+bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) {
+ netdata_spinlock_lock(&pdc->refcount_spinlock);
+
+ if(pdc->refcount <= 0)
+ fatal("DBENGINE: pdc is not referenced and cannot be released");
+
+ pdc->refcount--;
+
+ if (pdc->refcount <= 1 && worker) {
+ // when 1 refcount is remaining, and we are a worker,
+ // we can mark the job completed:
+ // - if the remaining refcount is from the query caller, we will wake it up
+ // - if the remaining refcount is from another worker, the query thread is already away
+ completion_mark_complete(&pdc->page_completion);
+ }
+
+ if (pdc->refcount == 0) {
+ netdata_spinlock_unlock(&pdc->refcount_spinlock);
+ pdc_destroy(pdc);
+ return true;
+ }
+
+ netdata_spinlock_unlock(&pdc->refcount_spinlock);
+ return false;
+}
+
+void pdc_to_epdl_router(struct rrdengine_instance *ctx, PDC *pdc, execute_extent_page_details_list_t exec_first_extent_list, execute_extent_page_details_list_t exec_rest_extent_list)
+{
+ Pvoid_t *PValue;
+ Pvoid_t *PValue1;
+ Pvoid_t *PValue2;
+ Word_t time_index = 0;
+ struct page_details *pd = NULL;
+
+ // this is the entire page list
+ // Lets do some deduplication
+ // 1. Per datafile
+ // 2. Per extent
+ // 3. Pages per extent will be added to the cache either as acquired or not
+
+ Pvoid_t JudyL_datafile_list = NULL;
+
+ DEOL *deol;
+ EPDL *epdl;
+
+ if (pdc->page_list_JudyL) {
+ bool first_then_next = true;
+ while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
+ pd = *PValue;
+
+ internal_fatal(!pd,
+ "DBENGINE: pdc page list has an empty page details entry");
+
+ if (!(pd->status & PDC_PAGE_DISK_PENDING))
+ continue;
+
+ internal_fatal(!(pd->status & PDC_PAGE_DATAFILE_ACQUIRED),
+ "DBENGINE: page details has not acquired the datafile");
+
+ internal_fatal((pd->status & (PDC_PAGE_READY | PDC_PAGE_FAILED)),
+ "DBENGINE: page details has disk pending flag but it is ready/failed");
+
+ internal_fatal(pd->page,
+ "DBENGINE: page details has a page linked to it, but it is marked for loading");
+
+ PValue1 = PDCJudyLIns(&JudyL_datafile_list, pd->datafile.fileno, PJE0);
+ if (PValue1 && !*PValue1) {
+ *PValue1 = deol = deol_get();
+ deol->extent_pd_list_by_extent_offset_JudyL = NULL;
+ deol->fileno = pd->datafile.fileno;
+ }
+ else
+ deol = *PValue1;
+
+ PValue2 = PDCJudyLIns(&deol->extent_pd_list_by_extent_offset_JudyL, pd->datafile.extent.pos, PJE0);
+ if (PValue2 && !*PValue2) {
+ *PValue2 = epdl = epdl_get();
+ epdl->page_details_by_metric_id_JudyL = NULL;
+ epdl->number_of_pages_in_JudyL = 0;
+ epdl->file = pd->datafile.file;
+ epdl->extent_offset = pd->datafile.extent.pos;
+ epdl->extent_size = pd->datafile.extent.bytes;
+ epdl->datafile = pd->datafile.ptr;
+ }
+ else
+ epdl = *PValue2;
+
+ epdl->number_of_pages_in_JudyL++;
+
+ Pvoid_t *pd_by_first_time_s_judyL = PDCJudyLIns(&epdl->page_details_by_metric_id_JudyL, pd->metric_id, PJE0);
+ Pvoid_t *pd_pptr = PDCJudyLIns(pd_by_first_time_s_judyL, pd->first_time_s, PJE0);
+ *pd_pptr = pd;
+ }
+
+ size_t extent_list_no = 0;
+ Word_t datafile_no = 0;
+ first_then_next = true;
+ while((PValue = PDCJudyLFirstThenNext(JudyL_datafile_list, &datafile_no, &first_then_next))) {
+ deol = *PValue;
+
+ bool first_then_next_extent = true;
+ Word_t pos = 0;
+ while ((PValue = PDCJudyLFirstThenNext(deol->extent_pd_list_by_extent_offset_JudyL, &pos, &first_then_next_extent))) {
+ epdl = *PValue;
+ internal_fatal(!epdl, "DBENGINE: extent_list is not populated properly");
+
+ // The extent page list can be dispatched to a worker
+ // It will need to populate the cache with "acquired" pages that are in the list (pd) only
+ // the rest of the extent pages will be added to the cache butnot acquired
+
+ pdc_acquire(pdc); // we do this for the next worker: do_read_extent_work()
+ epdl->pdc = pdc;
+
+ if(extent_list_no++ == 0)
+ exec_first_extent_list(ctx, epdl, pdc->priority);
+ else
+ exec_rest_extent_list(ctx, epdl, pdc->priority);
+ }
+ PDCJudyLFreeArray(&deol->extent_pd_list_by_extent_offset_JudyL, PJE0);
+ deol_release(deol);
+ }
+ PDCJudyLFreeArray(&JudyL_datafile_list, PJE0);
+ }
+
+ pdc_release_and_destroy_if_unreferenced(pdc, true, true);
+}
+
+static bool datafile_get_exclusive_access_to_extent(EPDL *epdl) {
<