diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-23 22:18:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-23 22:18:44 +0200 |
commit | dd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch) | |
tree | fecf5514eda33c0a96f4d359f30fd07229d12cf7 /database/engine/rrdengine.h | |
parent | c2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff) |
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers
* when there is a discrepancy between update every, fix it
* fix the other occurences of metric update every mismatch
* allow resetting the same timestamp
* validate flushed pages before committing them to disk
* initialize collection with the latest time in mrg
* these should be static functions
* acquire metrics for writing to detect multiple data collections of the same metric
* print the uuid of the metric that is collected twice
* log the discrepancies of completed pages
* 1 second tolerance
* unify validation of pages and related logging across dbengine
* make do_flush_pages() thread safe
* flush pages runs on libuv workers
* added uv events to tp workers
* dont cross datafile spinlock and rwlock
* should be unlock
* prevent the creation of multiple datafiles
* break an infinite replication loop
* do not log the epxansion of the replication window due to start streaming
* log all invalid pages with internal checks
* do not shutdown event loop threads
* add information about collected page events, to find the root cause of invalid collected pages
* rewrite of the gap filling to fix the invalid collected pages problem
* handle multiple collections of the same metric gracefully
* added log about main cache page conflicts; fix gap filling once again...
* keep track of the first metric writer
* it should be an internal fatal - it does not harm users
* do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks
* prevent negative replication completion percentage
* internal error for the discrepancy of mrg
* better logging of dbengine new metrics collection
* without internal checks it is unused
* prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread
* renames and atomics everywhere
* if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it
* Debug for context load
* rrdcontext uuid debug
* rrddim uuid debug
* rrdeng uuid debug
* Revert "rrdeng uuid debug"
This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b.
* Revert "rrddim uuid debug"
This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8.
* Revert "rrdcontext uuid debug"
This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0.
* Revert "Debug for context load"
This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b.
* do not use legacy uuids on multihost dbs
* thread safety for journafile size
* handle other cases of inconsistent collected pages
* make health thread check if it should be running in key loops
* do not log uuids
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine/rrdengine.h')
-rw-r--r-- | database/engine/rrdengine.h | 93 |
1 files changed, 71 insertions, 22 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index f134f401b5..e5732251da 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -169,17 +169,36 @@ struct jv2_page_info { typedef enum __attribute__ ((__packed__)) { RRDENG_CHO_UNALIGNED = (1 << 0), // set when this metric is not page aligned according to page alignment RRDENG_FIRST_PAGE_ALLOCATED = (1 << 1), // set when this metric has allocated its first page + RRDENG_1ST_METRIC_WRITER = (1 << 2), } RRDENG_COLLECT_HANDLE_OPTIONS; +typedef enum __attribute__ ((__packed__)) { + RRDENG_PAGE_PAST_COLLECTION = (1 << 0), + RRDENG_PAGE_REPEATED_COLLECTION = (1 << 1), + RRDENG_PAGE_BIG_GAP = (1 << 2), + RRDENG_PAGE_GAP = (1 << 3), + RRDENG_PAGE_FUTURE_POINT = (1 << 4), + RRDENG_PAGE_CREATED_IN_FUTURE = (1 << 5), + RRDENG_PAGE_COMPLETED_IN_FUTURE = (1 << 6), + RRDENG_PAGE_UNALIGNED = (1 << 7), + RRDENG_PAGE_CONFLICT = (1 << 8), + RRDENG_PAGE_FULL = (1 << 9), + RRDENG_PAGE_COLLECT_FINALIZE = (1 << 10), + RRDENG_PAGE_UPDATE_EVERY_CHANGE = (1 << 11), + RRDENG_PAGE_STEP_TOO_SMALL = (1 << 12), + RRDENG_PAGE_STEP_UNALIGNED = (1 << 13), +} RRDENG_COLLECT_PAGE_FLAGS; + struct rrdeng_collect_handle { struct metric *metric; struct pgc_page *page; struct pg_alignment *alignment; RRDENG_COLLECT_HANDLE_OPTIONS options; uint8_t type; - // 2 bytes remaining here for future use + RRDENG_COLLECT_PAGE_FLAGS page_flags; uint32_t page_entries_max; uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it + usec_t page_start_time_ut; usec_t page_end_time_ut; usec_t update_every_ut; }; @@ -222,18 +241,18 @@ enum rrdeng_opcode { /* can be used to return empty status or flush the command queue */ RRDENG_OPCODE_NOOP = 0, + RRDENG_OPCODE_QUERY, + RRDENG_OPCODE_EXTENT_WRITE, RRDENG_OPCODE_EXTENT_READ, - RRDENG_OPCODE_PREP_QUERY, - RRDENG_OPCODE_FLUSH_PAGES, RRDENG_OPCODE_FLUSHED_TO_OPEN, + RRDENG_OPCODE_DATABASE_ROTATE, + RRDENG_OPCODE_JOURNAL_INDEX, RRDENG_OPCODE_FLUSH_INIT, RRDENG_OPCODE_EVICT_INIT, - //RRDENG_OPCODE_DATAFILE_CREATE, - RRDENG_OPCODE_JOURNAL_FILE_INDEX, - RRDENG_OPCODE_DATABASE_ROTATE, RRDENG_OPCODE_CTX_SHUTDOWN, RRDENG_OPCODE_CTX_QUIESCE, RRDENG_OPCODE_CTX_POPULATE_MRG, + RRDENG_OPCODE_CLEANUP, RRDENG_OPCODE_MAX }; @@ -309,35 +328,23 @@ void wal_release(WAL *wal); * They only describe operations since DB engine instance load time. */ struct rrdengine_statistics { - rrdeng_stats_t metric_API_producers; - rrdeng_stats_t metric_API_consumers; - rrdeng_stats_t pg_cache_insertions; - rrdeng_stats_t pg_cache_deletions; - rrdeng_stats_t pg_cache_hits; - rrdeng_stats_t pg_cache_misses; - rrdeng_stats_t pg_cache_backfills; - rrdeng_stats_t pg_cache_evictions; rrdeng_stats_t before_decompress_bytes; rrdeng_stats_t after_decompress_bytes; rrdeng_stats_t before_compress_bytes; rrdeng_stats_t after_compress_bytes; + rrdeng_stats_t io_write_bytes; rrdeng_stats_t io_write_requests; rrdeng_stats_t io_read_bytes; rrdeng_stats_t io_read_requests; - rrdeng_stats_t io_write_extent_bytes; - rrdeng_stats_t io_write_extents; - rrdeng_stats_t io_read_extent_bytes; - rrdeng_stats_t io_read_extents; + rrdeng_stats_t datafile_creations; rrdeng_stats_t datafile_deletions; rrdeng_stats_t journalfile_creations; rrdeng_stats_t journalfile_deletions; - rrdeng_stats_t page_cache_descriptors; + rrdeng_stats_t io_errors; rrdeng_stats_t fs_errors; - rrdeng_stats_t pg_cache_over_half_dirty_events; - rrdeng_stats_t flushing_pressure_page_deletions; }; /* I/O errors global counter */ @@ -352,6 +359,8 @@ extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of dele struct rrdengine_instance { struct { + bool legacy; // true when the db is autonomous for a single host + int tier; // the tier of this ctx uint8_t page_type; // default page type for this context @@ -370,6 +379,8 @@ struct rrdengine_instance { unsigned last_fileno; // newest index of datafile and journalfile unsigned last_flush_fileno; // newest index of datafile received data + size_t collectors_running; + size_t collectors_running_duplicate; size_t inflight_queries; // the number of queries currently running uint64_t current_disk_space; // the current disk space size used @@ -402,6 +413,26 @@ struct rrdengine_instance { #define ctx_current_disk_space_increase(ctx, size) __atomic_add_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED) #define ctx_current_disk_space_decrease(ctx, size) __atomic_sub_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED) +static inline void ctx_io_read_op_bytes(struct rrdengine_instance *ctx, size_t bytes) { + __atomic_add_fetch(&ctx->stats.io_read_bytes, bytes, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.io_read_requests, 1, __ATOMIC_RELAXED); +} + +static inline void ctx_io_write_op_bytes(struct rrdengine_instance *ctx, size_t bytes) { + __atomic_add_fetch(&ctx->stats.io_write_bytes, bytes, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.io_write_requests, 1, __ATOMIC_RELAXED); +} + +static inline void ctx_io_error(struct rrdengine_instance *ctx) { + __atomic_add_fetch(&ctx->stats.io_errors, 1, __ATOMIC_RELAXED); + rrd_stat_atomic_add(&global_io_errors, 1); +} + +static inline void ctx_fs_error(struct rrdengine_instance *ctx) { + __atomic_add_fetch(&ctx->stats.fs_errors, 1, __ATOMIC_RELAXED); + rrd_stat_atomic_add(&global_fs_errors, 1); +} + #define ctx_last_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_fileno, __ATOMIC_RELAXED) #define ctx_last_fileno_increment(ctx) __atomic_add_fetch(&(ctx)->atomic.last_fileno, 1, __ATOMIC_RELAXED) @@ -460,7 +491,7 @@ typedef struct validated_page_descriptor { size_t point_size; size_t entries; uint8_t type; - bool data_on_disk_valid; + bool is_valid; } VALIDATED_PAGE_DESCRIPTOR; #define page_entries_by_time(start_time_s, end_time_s, update_every_s) \ @@ -469,7 +500,21 @@ typedef struct validated_page_descriptor { #define page_entries_by_size(page_length_in_bytes, point_size_in_bytes) \ ((page_length_in_bytes) / (point_size_in_bytes)) +VALIDATED_PAGE_DESCRIPTOR validate_page(uuid_t *uuid, + time_t start_time_s, + time_t end_time_s, + time_t update_every_s, + size_t page_length, + uint8_t page_type, + size_t entries, + time_t now_s, + time_t overwrite_zero_update_every_s, + bool have_read_error, + bool minimize_invalid_size, + const char *msg, + RRDENG_COLLECT_PAGE_FLAGS flags); VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error); +void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags); typedef enum { PAGE_IS_IN_THE_PAST = -1, @@ -479,4 +524,8 @@ typedef enum { TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t page_last_time_s, time_t wanted_start_time_s, time_t wanted_end_time_s); +static inline time_t max_acceptable_collected_time(void) { + return now_realtime_sec() + 1; +} + #endif /* NETDATA_RRDENGINE_H */ |