summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-20 23:56:33 +0200
committerGitHub <noreply@github.com>2023-01-20 23:56:33 +0200
commitdc9f81ccfe611410f5a710dafcc14c6c9f030aa2 (patch)
treefe922ca2647abb9def5b903c6e1e2e83c2d59066 /database/engine/rrdengine.h
parentc036541019018e363f0aa7bd705534baeb015b09 (diff)
DBENGINE v2 - improvements part 6 (#14299)
* query preparation runs before extent reads * populate mrg in parallel * fix formatting warning * first search for a metric then add it if it does not exist * Revert "first search for a metric then add it if it does not exist" This reverts commit 4afa6461fcce859d03f1c9cf56dd3b5933ee5ebc. * Revert "fix formatting warning" This reverts commit 49473493f7f1c3399b5635a573d3c6ed2b6e46f3. * Revert "populate mrg in parallel" This reverts commit a40166708d4222f6329904f109114c47c44ca666. * merge journalfiles metrics before committing them to MRG * Revert "merge journalfiles metrics before committing them to MRG" This reverts commit 50c8934e23a0a09ea4da80e3f88290e46496ad92. * Revert "Revert "populate mrg in parallel"" This reverts commit f4c149d2ab7a8c9af24a10f95438a0d662a5cf8a. * Revert "Revert "fix formatting warning"" This reverts commit 78298ff9efc49806ded029f5f1e868cc42e8f6eb. * Revert "Revert "first search for a metric then add it if it does not exist"" This reverts commit 997b9c813b290882ba18a8c44bf73f9ee5480adf. * preload first and last journal files v2 * fix formatting warning * parallel loading of tiers; cleanup of ctx structures * use half the cores * add partitions to metrics registry * revert accidental change * parallel processing according to MRG partitions; dont recalculate retention on exit
Diffstat (limited to 'database/engine/rrdengine.h')
-rw-r--r--database/engine/rrdengine.h101
1 files changed, 64 insertions, 37 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index df77b018a9..f134f401b5 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -233,6 +233,7 @@ enum rrdeng_opcode {
RRDENG_OPCODE_DATABASE_ROTATE,
RRDENG_OPCODE_CTX_SHUTDOWN,
RRDENG_OPCODE_CTX_QUIESCE,
+ RRDENG_OPCODE_CTX_POPULATE_MRG,
RRDENG_OPCODE_MAX
};
@@ -303,16 +304,6 @@ typedef struct wal {
WAL *wal_get(struct rrdengine_instance *ctx, unsigned size);
void wal_release(WAL *wal);
-struct rrdengine_worker_config {
- bool now_deleting_files;
- bool migration_to_v2_running;
-
- struct {
- // non-zero until we commit data to disk (both datafile and journal file)
- unsigned extents_currently_being_flushed;
- } atomics;
-};
-
/*
* Debug statistics not used by code logic.
* They only describe operations since DB engine instance load time.
@@ -359,37 +350,73 @@ extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
extern rrdeng_stats_t global_pg_cache_over_half_dirty_events;
extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */
-#define NO_QUIESCE (0) /* initial state when all operations function normally */
-#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */
-#define QUIESCED (2) /* is set after all threads have finished running */
-
struct rrdengine_instance {
- struct rrdengine_worker_config worker_config;
- struct completion rrdengine_completion;
- bool journal_initialization;
- uint8_t global_compress_alg;
- struct transaction_commit_log commit_log;
- struct rrdengine_datafile_list datafiles;
- RRDHOST *host; /* the legacy host, or NULL for multi-host DB */
- char dbfiles_path[FILENAME_MAX + 1];
- char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */
- uint64_t disk_space;
- uint64_t max_disk_space;
- int tier;
- unsigned last_fileno; /* newest index of datafile and journalfile */
- unsigned last_flush_fileno;
-
- bool create_new_datafile_pair;
- uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */
- uint8_t page_type; /* Default page type for this context */
-
- struct completion quiesce_completion;
-
- size_t inflight_queries;
+ struct {
+ int tier; // the tier of this ctx
+ uint8_t page_type; // default page type for this context
+
+ uint64_t max_disk_space; // the max disk space this ctx is allowed to use
+ uint8_t global_compress_alg; // the wanted compression algorithm
+
+ char dbfiles_path[FILENAME_MAX + 1];
+ } config;
+
+ struct {
+ uv_rwlock_t rwlock; // the linked list of datafiles is protected by this lock
+ struct rrdengine_datafile *first; // oldest - the newest with ->first->prev
+ } datafiles;
+
+ struct {
+ unsigned last_fileno; // newest index of datafile and journalfile
+ unsigned last_flush_fileno; // newest index of datafile received data
+
+ size_t inflight_queries; // the number of queries currently running
+ uint64_t current_disk_space; // the current disk space size used
+
+ uint64_t transaction_id; // the transaction id of the next extent flushing
+
+ bool migration_to_v2_running;
+ bool now_deleting_files;
+ unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file)
+ } atomic;
+
+ struct {
+ bool exit_mode;
+ bool enabled; // when set (before shutdown), queries are prohibited
+ struct completion completion;
+ } quiesce;
+
+ struct {
+ struct {
+ size_t size;
+ struct completion *array;
+ } populate_mrg;
+
+ bool create_new_datafile_pair;
+ } loading;
+
struct rrdengine_statistics stats;
};
-#define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce, __ATOMIC_RELAXED) == NO_QUIESCE)
+#define ctx_current_disk_space_get(ctx) __atomic_load_n(&(ctx)->atomic.current_disk_space, __ATOMIC_RELAXED)
+#define ctx_current_disk_space_increase(ctx, size) __atomic_add_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED)
+#define ctx_current_disk_space_decrease(ctx, size) __atomic_sub_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED)
+
+#define ctx_last_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_fileno, __ATOMIC_RELAXED)
+#define ctx_last_fileno_increment(ctx) __atomic_add_fetch(&(ctx)->atomic.last_fileno, 1, __ATOMIC_RELAXED)
+
+#define ctx_last_flush_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_flush_fileno, __ATOMIC_RELAXED)
+static inline void ctx_last_flush_fileno_set(struct rrdengine_instance *ctx, unsigned fileno) {
+ unsigned old_fileno = ctx_last_flush_fileno_get(ctx);
+
+ do {
+ if(old_fileno >= fileno)
+ return;
+
+ } while(!__atomic_compare_exchange_n(&ctx->atomic.last_flush_fileno, &old_fileno, fileno, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+}
+
+#define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce.enabled, __ATOMIC_RELAXED) == false && __atomic_load_n(&(ctx)->quiesce.exit_mode, __ATOMIC_RELAXED) == false)
void *dbengine_page_alloc(size_t size);
void dbengine_page_free(void *page, size_t size);