summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /database/engine/rrdengine.h
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'database/engine/rrdengine.h')
-rw-r--r--database/engine/rrdengine.h411
1 files changed, 281 insertions, 130 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 521d2521a2..3973281d0c 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -19,11 +19,9 @@
#include "journalfile.h"
#include "rrdengineapi.h"
#include "pagecache.h"
-#include "rrdenglocking.h"
-
-#ifdef NETDATA_RRD_INTERNALS
-
-#endif /* NETDATA_RRD_INTERNALS */
+#include "metric.h"
+#include "cache.h"
+#include "pdc.h"
extern unsigned rrdeng_pages_per_extent;
@@ -32,153 +30,288 @@ struct rrdengine_instance;
#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
+#define GET_JOURNAL_DATA(x) __atomic_load_n(&(x)->journal_data, __ATOMIC_ACQUIRE)
+#define GET_JOURNAL_DATA_SIZE(x) __atomic_load_n(&(x)->journal_data_size, __ATOMIC_ACQUIRE)
+#define SET_JOURNAL_DATA(x, y) __atomic_store_n(&(x)->journal_data, (y), __ATOMIC_RELEASE)
+#define SET_JOURNAL_DATA_SIZE(x, y) __atomic_store_n(&(x)->journal_data_size, (y), __ATOMIC_RELEASE)
+
#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+typedef struct page_details_control {
+ struct rrdengine_instance *ctx;
+ struct metric *metric;
+
+ struct completion prep_completion;
+ struct completion page_completion; // sync between the query thread and the workers
+
+ Pvoid_t page_list_JudyL; // the list of page details
+ unsigned completed_jobs; // the number of jobs completed last time the query thread checked
+ bool preload_all_extent_pages; // true to preload all the pages on each extent involved in the query
+ bool workers_should_stop; // true when the query thread left and the workers should stop
+ bool prep_done;
+
+ SPINLOCK refcount_spinlock; // spinlock to protect refcount
+ int32_t refcount; // the number of workers currently working on this request + 1 for the query thread
+ size_t executed_with_gaps;
+
+ time_t start_time_s;
+ time_t end_time_s;
+ STORAGE_PRIORITY priority;
+
+ time_t optimal_end_time_s;
+
+ struct {
+ struct page_details_control *prev;
+ struct page_details_control *next;
+ } cache;
+} PDC;
+
+PDC *pdc_get(void);
+
+typedef enum __attribute__ ((__packed__)) {
+ // final status for all pages
+ // if a page does not have one of these, it is considered unroutable
+ PDC_PAGE_READY = (1 << 0), // ready to be processed (pd->page is not null)
+ PDC_PAGE_FAILED = (1 << 1), // failed to be loaded (pd->page is null)
+ PDC_PAGE_SKIP = (1 << 2), // don't use this page, it is not good for us
+ PDC_PAGE_INVALID = (1 << 3), // don't use this page, it is invalid
+
+ // other statuses for tracking issues
+ PDC_PAGE_PREPROCESSED = (1 << 4), // used during preprocessing
+ PDC_PAGE_PROCESSED = (1 << 5), // processed by the query caller
+ PDC_PAGE_RELEASED = (1 << 6), // already released
+
+ // data found in cache (preloaded) or on disk?
+ PDC_PAGE_PRELOADED = (1 << 7), // data found in memory
+ PDC_PAGE_DISK_PENDING = (1 << 8), // data need to be loaded from disk
+
+ // worker related statuses
+ PDC_PAGE_FAILED_INVALID_EXTENT = (1 << 9),
+ PDC_PAGE_FAILED_UUID_NOT_IN_EXTENT = (1 << 10),
+ PDC_PAGE_FAILED_TO_MAP_EXTENT = (1 << 11),
+ PDC_PAGE_FAILED_TO_ACQUIRE_DATAFILE= (1 << 12),
+
+ PDC_PAGE_LOADED_FROM_EXTENT_CACHE = (1 << 13),
+ PDC_PAGE_LOADED_FROM_DISK = (1 << 14),
+
+ PDC_PAGE_PRELOADED_PASS1 = (1 << 15),
+ PDC_PAGE_PRELOADED_PASS4 = (1 << 16),
+ PDC_PAGE_PRELOADED_WORKER = (1 << 17),
+
+ PDC_PAGE_SOURCE_MAIN_CACHE = (1 << 19),
+ PDC_PAGE_SOURCE_OPEN_CACHE = (1 << 19),
+ PDC_PAGE_SOURCE_JOURNAL_V2 = (1 << 20),
+
+ // datafile acquired
+ PDC_PAGE_DATAFILE_ACQUIRED = (1 << 30),
+} PDC_PAGE_STATUS;
+
+struct page_details {
+ struct {
+ struct rrdengine_datafile *ptr;
+ uv_file file;
+ unsigned fileno;
+
+ struct {
+ uint64_t pos;
+ uint32_t bytes;
+ } extent;
+ } datafile;
+
+ struct pgc_page *page;
+ Word_t metric_id;
+ time_t first_time_s;
+ time_t last_time_s;
+ uint32_t update_every_s;
+ uint16_t page_length;
+ PDC_PAGE_STATUS status;
+
+ struct {
+ struct page_details *prev;
+ struct page_details *next;
+ } cache;
+};
+
+struct page_details *page_details_get(void);
+
+#define pdc_page_status_check(pd, flag) (__atomic_load_n(&((pd)->status), __ATOMIC_ACQUIRE) & (flag))
+#define pdc_page_status_set(pd, flag) __atomic_or_fetch(&((pd)->status), flag, __ATOMIC_RELEASE)
+#define pdc_page_status_clear(pd, flag) __atomic_and_fetch(&((od)->status), ~(flag), __ATOMIC_RELEASE)
+
+struct jv2_extents_info {
+ size_t index;
+ uint64_t pos;
+ unsigned bytes;
+ size_t number_of_pages;
+};
+
+struct jv2_metrics_info {
+ uuid_t *uuid;
+ uint32_t page_list_header;
+ time_t first_time_s;
+ time_t last_time_s;
+ size_t number_of_pages;
+ Pvoid_t JudyL_pages_by_start_time;
+};
+
+struct jv2_page_info {
+ time_t start_time_s;
+ time_t end_time_s;
+ time_t update_every_s;
+ size_t page_length;
+ uint32_t extent_index;
+ void *custom_data;
+
+ // private
+ struct pgc_page *page;
+};
+
+typedef enum __attribute__ ((__packed__)) {
+ RRDENG_CHO_UNALIGNED = (1 << 0), // set when this metric is not page aligned according to page alignment
+ RRDENG_FIRST_PAGE_ALLOCATED = (1 << 1), // set when this metric has allocated its first page
+} RRDENG_COLLECT_HANDLE_OPTIONS;
+
struct rrdeng_collect_handle {
- struct pg_cache_page_index *page_index;
- struct rrdeng_page_descr *descr;
- unsigned long page_correlation_id;
- // set to 1 when this dimension is not page aligned with the other dimensions in the chart
- uint8_t unaligned_page;
+ struct metric *metric;
+ struct pgc_page *page;
struct pg_alignment *alignment;
+ RRDENG_COLLECT_HANDLE_OPTIONS options;
+ uint8_t type;
+ // 2 bytes remaining here for future use
+ uint32_t page_entries_max;
+ uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it
+ usec_t page_end_time_ut;
+ usec_t update_every_ut;
};
struct rrdeng_query_handle {
- struct rrdeng_page_descr *descr;
+ struct metric *metric;
+ struct pgc_page *page;
struct rrdengine_instance *ctx;
- struct pg_cache_page_index *page_index;
- time_t wanted_start_time_s;
+ storage_number *metric_data;
+ struct page_details_control *pdc;
+
+ // the request
+ time_t start_time_s;
+ time_t end_time_s;
+ STORAGE_PRIORITY priority;
+
+ // internal data
time_t now_s;
+ time_t dt_s;
+
unsigned position;
unsigned entries;
- storage_number *page;
- usec_t page_end_time_ut;
- uint32_t page_length;
- time_t dt_s;
+
+ struct {
+ struct rrdeng_query_handle *prev;
+ struct rrdeng_query_handle *next;
+ } cache;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ usec_t started_time_s;
+ pid_t query_pid;
+ struct rrdeng_query_handle *prev, *next;
+#endif
};
-typedef enum {
- RRDENGINE_STATUS_UNINITIALIZED = 0,
- RRDENGINE_STATUS_INITIALIZING,
- RRDENGINE_STATUS_INITIALIZED
-} rrdengine_state_t;
+struct rrdeng_query_handle *rrdeng_query_handle_get(void);
+void rrdeng_query_handle_release(struct rrdeng_query_handle *handle);
enum rrdeng_opcode {
/* can be used to return empty status or flush the command queue */
- RRDENG_NOOP = 0,
-
- RRDENG_READ_PAGE,
- RRDENG_READ_EXTENT,
- RRDENG_COMMIT_PAGE,
- RRDENG_FLUSH_PAGES,
- RRDENG_SHUTDOWN,
- RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
- RRDENG_QUIESCE,
-
- RRDENG_MAX_OPCODE
-};
-
-struct rrdeng_read_page {
- struct rrdeng_page_descr *page_cache_descr;
-};
-
-struct rrdeng_read_extent {
- struct rrdeng_page_descr *page_cache_descr[MAX_PAGES_PER_EXTENT];
- int page_count;
+ RRDENG_OPCODE_NOOP = 0,
+
+ RRDENG_OPCODE_EXTENT_READ,
+ RRDENG_OPCODE_PREP_QUERY,
+ RRDENG_OPCODE_FLUSH_PAGES,
+ RRDENG_OPCODE_FLUSHED_TO_OPEN,
+ RRDENG_OPCODE_FLUSH_INIT,
+ RRDENG_OPCODE_EVICT_INIT,
+ //RRDENG_OPCODE_DATAFILE_CREATE,
+ RRDENG_OPCODE_JOURNAL_FILE_INDEX,
+ RRDENG_OPCODE_DATABASE_ROTATE,
+ RRDENG_OPCODE_CTX_SHUTDOWN,
+ RRDENG_OPCODE_CTX_QUIESCE,
+
+ RRDENG_OPCODE_MAX
};
-struct rrdeng_cmd {
- enum rrdeng_opcode opcode;
- union {
- struct rrdeng_read_page read_page;
- struct rrdeng_read_extent read_extent;
- struct completion *completion;
- };
-};
-
-#define RRDENG_CMD_Q_MAX_SIZE (2048)
-
-struct rrdeng_cmdqueue {
- unsigned head, tail;
- struct rrdeng_cmd cmd_array[RRDENG_CMD_Q_MAX_SIZE];
+// WORKERS IDS:
+// RRDENG_MAX_OPCODE : reserved for the cleanup
+// RRDENG_MAX_OPCODE + opcode : reserved for the callbacks of each opcode
+// RRDENG_MAX_OPCODE + RRDENG_MAX_OPCODE : reserved for the timer
+#define RRDENG_TIMER_CB (RRDENG_OPCODE_MAX + RRDENG_OPCODE_MAX)
+#define RRDENG_FLUSH_TRANSACTION_BUFFER_CB (RRDENG_TIMER_CB + 1)
+#define RRDENG_OPCODES_WAITING (RRDENG_TIMER_CB + 2)
+#define RRDENG_WORKS_DISPATCHED (RRDENG_TIMER_CB + 3)
+#define RRDENG_WORKS_EXECUTING (RRDENG_TIMER_CB + 4)
+
+struct extent_io_data {
+ unsigned fileno;
+ uv_file file;
+ uint64_t pos;
+ unsigned bytes;
+ uint16_t page_length;
};
struct extent_io_descriptor {
- uv_fs_t req;
- uv_work_t req_worker;
+ struct rrdengine_instance *ctx;
+ uv_fs_t uv_fs_request;
uv_buf_t iov;
uv_file file;
void *buf;
- void *map_base;
- size_t map_length;
+ struct wal *wal;
uint64_t pos;
unsigned bytes;
struct completion *completion;
unsigned descr_count;
- int release_descr;
- struct rrdeng_page_descr *descr_array[MAX_PAGES_PER_EXTENT];
- struct rrdeng_page_descr descr_read_array[MAX_PAGES_PER_EXTENT];
- Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT];
+ struct page_descr_with_data *descr_array[MAX_PAGES_PER_EXTENT];
+ struct rrdengine_datafile *datafile;
struct extent_io_descriptor *next; /* multiple requests to be served by the same cached extent */
+
+ struct {
+ struct extent_io_descriptor *prev;
+ struct extent_io_descriptor *next;
+ } cache;
};
struct generic_io_descriptor {
+ struct rrdengine_instance *ctx;
uv_fs_t req;
uv_buf_t iov;
void *buf;
+ void *data;
uint64_t pos;
unsigned bytes;
struct completion *completion;
};
-struct extent_cache_element {
- struct extent_info *extent; /* The ABA problem is avoided with the help of fileno below */
- unsigned fileno;
- struct extent_cache_element *prev; /* LRU */
- struct extent_cache_element *next; /* LRU */
- struct extent_io_descriptor *inflight_io_descr; /* I/O descriptor for in-flight extent */
- uint8_t pages[MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE];
-};
-
-#define MAX_CACHED_EXTENTS 16 /* cannot be over 32 to fit in 32-bit architectures */
+typedef struct wal {
+ uint64_t transaction_id;
+ void *buf;
+ size_t size;
+ size_t buf_size;
+ struct generic_io_descriptor io_descr;
-/* Initialize by setting the structure to zero */
-struct extent_cache {
- struct extent_cache_element extent_array[MAX_CACHED_EXTENTS];
- unsigned allocation_bitmap; /* 1 if the corresponding position in the extent_array is allocated */
- unsigned inflight_bitmap; /* 1 if the corresponding position in the extent_array is waiting for I/O */
+ struct {
+ struct wal *prev;
+ struct wal *next;
+ } cache;
+} WAL;
- struct extent_cache_element *replaceQ_head; /* LRU */
- struct extent_cache_element *replaceQ_tail; /* MRU */
-};
+WAL *wal_get(struct rrdengine_instance *ctx, unsigned size);
+void wal_release(WAL *wal);
struct rrdengine_worker_config {
- struct rrdengine_instance *ctx;
-
- uv_thread_t thread;
- uv_loop_t* loop;
- uv_async_t async;
-
- /* file deletion thread */
- uv_thread_t *now_deleting_files;
- unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
-
- /* dirty page deletion thread */
- uv_thread_t *now_invalidating_dirty_pages;
- /* set to 0 when now_invalidating_dirty_pages is still running */
- unsigned long cleanup_thread_invalidating_dirty_pages;
- unsigned inflight_dirty_pages;
-
- /* FIFO command queue */
- uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
- volatile unsigned queue_size;
- struct rrdeng_cmdqueue cmd_queue;
+ bool now_deleting_files;
+ bool migration_to_v2_running;
- struct extent_cache xt_cache;
-
- int error;
+ struct {
+ // non-zero until we commit data to disk (both datafile and journal file)
+ unsigned extents_currently_being_flushed;
+ } atomics;
};
/*
@@ -231,20 +364,10 @@ extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of dele
#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
#define QUIESCED (2) /* is set after all threads have finished running */
-typedef enum {
- LOAD_ERRORS_PAGE_FLIPPED_TIME = 0,
- LOAD_ERRORS_PAGE_EQUAL_TIME = 1,
- LOAD_ERRORS_PAGE_ZERO_ENTRIES = 2,
- LOAD_ERRORS_PAGE_UPDATE_ZERO = 3,
- LOAD_ERRORS_PAGE_FLEXY_TIME = 4,
- LOAD_ERRORS_DROPPED_EXTENT = 5,
-} INVALID_PAGE_ID;
-
struct rrdengine_instance {
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
- struct page_cache pg_cache;
- uint8_t drop_metrics_under_page_cache_pressure; /* boolean */
+ bool journal_initialization;
uint8_t global_compress_alg;
struct transaction_commit_log commit_log;
struct rrdengine_datafile_list datafiles;
@@ -255,29 +378,57 @@ struct rrdengine_instance {
uint64_t max_disk_space;
int tier;
unsigned last_fileno; /* newest index of datafile and journalfile */
- unsigned long max_cache_pages;
- unsigned long cache_pages_low_watermark;
+ unsigned last_flush_fileno;
unsigned long metric_API_max_producers;
- uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
+ bool create_new_datafile_pair;
+ uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
uint8_t page_type; /* Default page type for this context */
- struct rrdengine_statistics stats;
+ struct completion quiesce_completion;
- struct {
- size_t counter;
- usec_t latest_end_time_ut;
- } load_errors[6];
+ size_t inflight_queries;
+ struct rrdengine_statistics stats;
};
-void *dbengine_page_alloc(void);
+#define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce, __ATOMIC_RELAXED) == NO_QUIESCE)
+
+void *dbengine_page_alloc(struct rrdengine_instance *ctx, size_t size);
void dbengine_page_free(void *page);
int init_rrd_files(struct rrdengine_instance *ctx);
void finalize_rrd_files(struct rrdengine_instance *ctx);
-void rrdeng_test_quota(struct rrdengine_worker_config* wc);
-void rrdeng_worker(void* arg);
-void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);
-struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc);
+bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx);
+void dbengine_event_loop(void *arg);
+void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion, enum storage_priority priority);
+
+void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc);
+void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc);
+
+void pdc_acquire(PDC *pdc);
+bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router);
+
+unsigned rrdeng_target_data_file_size(struct rrdengine_instance *ctx);
+
+struct page_descr_with_data *page_descriptor_get(void);
+
+typedef struct validated_page_descriptor {
+ time_t start_time_s;
+ time_t end_time_s;
+ time_t update_every_s;
+ size_t page_length;
+ size_t point_size;
+ size_t entries;
+ uint8_t type;
+ bool data_on_disk_valid;
+} VALIDATED_PAGE_DESCRIPTOR;
+
+#define page_entries_by_time(start_time_s, end_time_s, update_every_s) \
+ ((update_every_s) ? (((end_time_s) - ((start_time_s) - (update_every_s))) / (update_every_s)) : 1)
+
+#define page_entries_by_size(page_length_in_bytes, point_size_in_bytes) \
+ ((page_length_in_bytes) / (point_size_in_bytes))
+
+VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error);
#endif /* NETDATA_RRDENGINE_H */