diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-20 23:56:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-20 23:56:33 +0200 |
commit | dc9f81ccfe611410f5a710dafcc14c6c9f030aa2 (patch) | |
tree | fe922ca2647abb9def5b903c6e1e2e83c2d59066 /database/engine/rrdengine.h | |
parent | c036541019018e363f0aa7bd705534baeb015b09 (diff) |
DBENGINE v2 - improvements part 6 (#14299)
* query preparation runs before extent reads
* populate mrg in parallel
* fix formatting warning
* first search for a metric then add it if it does not exist
* Revert "first search for a metric then add it if it does not exist"
This reverts commit 4afa6461fcce859d03f1c9cf56dd3b5933ee5ebc.
* Revert "fix formatting warning"
This reverts commit 49473493f7f1c3399b5635a573d3c6ed2b6e46f3.
* Revert "populate mrg in parallel"
This reverts commit a40166708d4222f6329904f109114c47c44ca666.
* merge journalfiles metrics before committing them to MRG
* Revert "merge journalfiles metrics before committing them to MRG"
This reverts commit 50c8934e23a0a09ea4da80e3f88290e46496ad92.
* Revert "Revert "populate mrg in parallel""
This reverts commit f4c149d2ab7a8c9af24a10f95438a0d662a5cf8a.
* Revert "Revert "fix formatting warning""
This reverts commit 78298ff9efc49806ded029f5f1e868cc42e8f6eb.
* Revert "Revert "first search for a metric then add it if it does not exist""
This reverts commit 997b9c813b290882ba18a8c44bf73f9ee5480adf.
* preload first and last journal files v2
* fix formatting warning
* parallel loading of tiers; cleanup of ctx structures
* use half the cores
* add partitions to metrics registry
* revert accidental change
* parallel processing according to MRG partitions; dont recalculate retention on exit
Diffstat (limited to 'database/engine/rrdengine.h')
-rw-r--r-- | database/engine/rrdengine.h | 101 |
1 files changed, 64 insertions, 37 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index df77b018a9..f134f401b5 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -233,6 +233,7 @@ enum rrdeng_opcode { RRDENG_OPCODE_DATABASE_ROTATE, RRDENG_OPCODE_CTX_SHUTDOWN, RRDENG_OPCODE_CTX_QUIESCE, + RRDENG_OPCODE_CTX_POPULATE_MRG, RRDENG_OPCODE_MAX }; @@ -303,16 +304,6 @@ typedef struct wal { WAL *wal_get(struct rrdengine_instance *ctx, unsigned size); void wal_release(WAL *wal); -struct rrdengine_worker_config { - bool now_deleting_files; - bool migration_to_v2_running; - - struct { - // non-zero until we commit data to disk (both datafile and journal file) - unsigned extents_currently_being_flushed; - } atomics; -}; - /* * Debug statistics not used by code logic. * They only describe operations since DB engine instance load time. @@ -359,37 +350,73 @@ extern rrdeng_stats_t rrdeng_reserved_file_descriptors; extern rrdeng_stats_t global_pg_cache_over_half_dirty_events; extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */ -#define NO_QUIESCE (0) /* initial state when all operations function normally */ -#define SET_QUIESCE (1) /* set it before shutting down the instance, quiesce long running operations */ -#define QUIESCED (2) /* is set after all threads have finished running */ - struct rrdengine_instance { - struct rrdengine_worker_config worker_config; - struct completion rrdengine_completion; - bool journal_initialization; - uint8_t global_compress_alg; - struct transaction_commit_log commit_log; - struct rrdengine_datafile_list datafiles; - RRDHOST *host; /* the legacy host, or NULL for multi-host DB */ - char dbfiles_path[FILENAME_MAX + 1]; - char machine_guid[GUID_LEN + 1]; /* the unique ID of the corresponding host, or localhost for multihost DB */ - uint64_t disk_space; - uint64_t max_disk_space; - int tier; - unsigned last_fileno; /* newest index of datafile and journalfile */ - unsigned last_flush_fileno; - - bool create_new_datafile_pair; - uint8_t quiesce; /* set to SET_QUIESCE before shutdown of the engine */ - uint8_t page_type; /* Default page type for this context */ - - struct completion quiesce_completion; - - size_t inflight_queries; + struct { + int tier; // the tier of this ctx + uint8_t page_type; // default page type for this context + + uint64_t max_disk_space; // the max disk space this ctx is allowed to use + uint8_t global_compress_alg; // the wanted compression algorithm + + char dbfiles_path[FILENAME_MAX + 1]; + } config; + + struct { + uv_rwlock_t rwlock; // the linked list of datafiles is protected by this lock + struct rrdengine_datafile *first; // oldest - the newest with ->first->prev + } datafiles; + + struct { + unsigned last_fileno; // newest index of datafile and journalfile + unsigned last_flush_fileno; // newest index of datafile received data + + size_t inflight_queries; // the number of queries currently running + uint64_t current_disk_space; // the current disk space size used + + uint64_t transaction_id; // the transaction id of the next extent flushing + + bool migration_to_v2_running; + bool now_deleting_files; + unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file) + } atomic; + + struct { + bool exit_mode; + bool enabled; // when set (before shutdown), queries are prohibited + struct completion completion; + } quiesce; + + struct { + struct { + size_t size; + struct completion *array; + } populate_mrg; + + bool create_new_datafile_pair; + } loading; + struct rrdengine_statistics stats; }; -#define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce, __ATOMIC_RELAXED) == NO_QUIESCE) +#define ctx_current_disk_space_get(ctx) __atomic_load_n(&(ctx)->atomic.current_disk_space, __ATOMIC_RELAXED) +#define ctx_current_disk_space_increase(ctx, size) __atomic_add_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED) +#define ctx_current_disk_space_decrease(ctx, size) __atomic_sub_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED) + +#define ctx_last_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_fileno, __ATOMIC_RELAXED) +#define ctx_last_fileno_increment(ctx) __atomic_add_fetch(&(ctx)->atomic.last_fileno, 1, __ATOMIC_RELAXED) + +#define ctx_last_flush_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_flush_fileno, __ATOMIC_RELAXED) +static inline void ctx_last_flush_fileno_set(struct rrdengine_instance *ctx, unsigned fileno) { + unsigned old_fileno = ctx_last_flush_fileno_get(ctx); + + do { + if(old_fileno >= fileno) + return; + + } while(!__atomic_compare_exchange_n(&ctx->atomic.last_flush_fileno, &old_fileno, fileno, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); +} + +#define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce.enabled, __ATOMIC_RELAXED) == false && __atomic_load_n(&(ctx)->quiesce.exit_mode, __ATOMIC_RELAXED) == false) void *dbengine_page_alloc(size_t size); void dbengine_page_free(void *page, size_t size); |