summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-23 22:18:44 +0200
committerGitHub <noreply@github.com>2023-01-23 22:18:44 +0200
commitdd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch)
treefecf5514eda33c0a96f4d359f30fd07229d12cf7 /database/engine/rrdengine.h
parentc2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff)
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers * when there is a discrepancy between update every, fix it * fix the other occurences of metric update every mismatch * allow resetting the same timestamp * validate flushed pages before committing them to disk * initialize collection with the latest time in mrg * these should be static functions * acquire metrics for writing to detect multiple data collections of the same metric * print the uuid of the metric that is collected twice * log the discrepancies of completed pages * 1 second tolerance * unify validation of pages and related logging across dbengine * make do_flush_pages() thread safe * flush pages runs on libuv workers * added uv events to tp workers * dont cross datafile spinlock and rwlock * should be unlock * prevent the creation of multiple datafiles * break an infinite replication loop * do not log the epxansion of the replication window due to start streaming * log all invalid pages with internal checks * do not shutdown event loop threads * add information about collected page events, to find the root cause of invalid collected pages * rewrite of the gap filling to fix the invalid collected pages problem * handle multiple collections of the same metric gracefully * added log about main cache page conflicts; fix gap filling once again... * keep track of the first metric writer * it should be an internal fatal - it does not harm users * do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks * prevent negative replication completion percentage * internal error for the discrepancy of mrg * better logging of dbengine new metrics collection * without internal checks it is unused * prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread * renames and atomics everywhere * if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it * Debug for context load * rrdcontext uuid debug * rrddim uuid debug * rrdeng uuid debug * Revert "rrdeng uuid debug" This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b. * Revert "rrddim uuid debug" This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8. * Revert "rrdcontext uuid debug" This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0. * Revert "Debug for context load" This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b. * do not use legacy uuids on multihost dbs * thread safety for journafile size * handle other cases of inconsistent collected pages * make health thread check if it should be running in key loops * do not log uuids Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine/rrdengine.h')
-rw-r--r--database/engine/rrdengine.h93
1 files changed, 71 insertions, 22 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index f134f401b5..e5732251da 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -169,17 +169,36 @@ struct jv2_page_info {
typedef enum __attribute__ ((__packed__)) {
RRDENG_CHO_UNALIGNED = (1 << 0), // set when this metric is not page aligned according to page alignment
RRDENG_FIRST_PAGE_ALLOCATED = (1 << 1), // set when this metric has allocated its first page
+ RRDENG_1ST_METRIC_WRITER = (1 << 2),
} RRDENG_COLLECT_HANDLE_OPTIONS;
+typedef enum __attribute__ ((__packed__)) {
+ RRDENG_PAGE_PAST_COLLECTION = (1 << 0),
+ RRDENG_PAGE_REPEATED_COLLECTION = (1 << 1),
+ RRDENG_PAGE_BIG_GAP = (1 << 2),
+ RRDENG_PAGE_GAP = (1 << 3),
+ RRDENG_PAGE_FUTURE_POINT = (1 << 4),
+ RRDENG_PAGE_CREATED_IN_FUTURE = (1 << 5),
+ RRDENG_PAGE_COMPLETED_IN_FUTURE = (1 << 6),
+ RRDENG_PAGE_UNALIGNED = (1 << 7),
+ RRDENG_PAGE_CONFLICT = (1 << 8),
+ RRDENG_PAGE_FULL = (1 << 9),
+ RRDENG_PAGE_COLLECT_FINALIZE = (1 << 10),
+ RRDENG_PAGE_UPDATE_EVERY_CHANGE = (1 << 11),
+ RRDENG_PAGE_STEP_TOO_SMALL = (1 << 12),
+ RRDENG_PAGE_STEP_UNALIGNED = (1 << 13),
+} RRDENG_COLLECT_PAGE_FLAGS;
+
struct rrdeng_collect_handle {
struct metric *metric;
struct pgc_page *page;
struct pg_alignment *alignment;
RRDENG_COLLECT_HANDLE_OPTIONS options;
uint8_t type;
- // 2 bytes remaining here for future use
+ RRDENG_COLLECT_PAGE_FLAGS page_flags;
uint32_t page_entries_max;
uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it
+ usec_t page_start_time_ut;
usec_t page_end_time_ut;
usec_t update_every_ut;
};
@@ -222,18 +241,18 @@ enum rrdeng_opcode {
/* can be used to return empty status or flush the command queue */
RRDENG_OPCODE_NOOP = 0,
+ RRDENG_OPCODE_QUERY,
+ RRDENG_OPCODE_EXTENT_WRITE,
RRDENG_OPCODE_EXTENT_READ,
- RRDENG_OPCODE_PREP_QUERY,
- RRDENG_OPCODE_FLUSH_PAGES,
RRDENG_OPCODE_FLUSHED_TO_OPEN,
+ RRDENG_OPCODE_DATABASE_ROTATE,
+ RRDENG_OPCODE_JOURNAL_INDEX,
RRDENG_OPCODE_FLUSH_INIT,
RRDENG_OPCODE_EVICT_INIT,
- //RRDENG_OPCODE_DATAFILE_CREATE,
- RRDENG_OPCODE_JOURNAL_FILE_INDEX,
- RRDENG_OPCODE_DATABASE_ROTATE,
RRDENG_OPCODE_CTX_SHUTDOWN,
RRDENG_OPCODE_CTX_QUIESCE,
RRDENG_OPCODE_CTX_POPULATE_MRG,
+ RRDENG_OPCODE_CLEANUP,
RRDENG_OPCODE_MAX
};
@@ -309,35 +328,23 @@ void wal_release(WAL *wal);
* They only describe operations since DB engine instance load time.
*/
struct rrdengine_statistics {
- rrdeng_stats_t metric_API_producers;
- rrdeng_stats_t metric_API_consumers;
- rrdeng_stats_t pg_cache_insertions;
- rrdeng_stats_t pg_cache_deletions;
- rrdeng_stats_t pg_cache_hits;
- rrdeng_stats_t pg_cache_misses;
- rrdeng_stats_t pg_cache_backfills;
- rrdeng_stats_t pg_cache_evictions;
rrdeng_stats_t before_decompress_bytes;
rrdeng_stats_t after_decompress_bytes;
rrdeng_stats_t before_compress_bytes;
rrdeng_stats_t after_compress_bytes;
+
rrdeng_stats_t io_write_bytes;
rrdeng_stats_t io_write_requests;
rrdeng_stats_t io_read_bytes;
rrdeng_stats_t io_read_requests;
- rrdeng_stats_t io_write_extent_bytes;
- rrdeng_stats_t io_write_extents;
- rrdeng_stats_t io_read_extent_bytes;
- rrdeng_stats_t io_read_extents;
+
rrdeng_stats_t datafile_creations;
rrdeng_stats_t datafile_deletions;
rrdeng_stats_t journalfile_creations;
rrdeng_stats_t journalfile_deletions;
- rrdeng_stats_t page_cache_descriptors;
+
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
- rrdeng_stats_t pg_cache_over_half_dirty_events;
- rrdeng_stats_t flushing_pressure_page_deletions;
};
/* I/O errors global counter */
@@ -352,6 +359,8 @@ extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of dele
struct rrdengine_instance {
struct {
+ bool legacy; // true when the db is autonomous for a single host
+
int tier; // the tier of this ctx
uint8_t page_type; // default page type for this context
@@ -370,6 +379,8 @@ struct rrdengine_instance {
unsigned last_fileno; // newest index of datafile and journalfile
unsigned last_flush_fileno; // newest index of datafile received data
+ size_t collectors_running;
+ size_t collectors_running_duplicate;
size_t inflight_queries; // the number of queries currently running
uint64_t current_disk_space; // the current disk space size used
@@ -402,6 +413,26 @@ struct rrdengine_instance {
#define ctx_current_disk_space_increase(ctx, size) __atomic_add_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED)
#define ctx_current_disk_space_decrease(ctx, size) __atomic_sub_fetch(&(ctx)->atomic.current_disk_space, size, __ATOMIC_RELAXED)
+static inline void ctx_io_read_op_bytes(struct rrdengine_instance *ctx, size_t bytes) {
+ __atomic_add_fetch(&ctx->stats.io_read_bytes, bytes, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.io_read_requests, 1, __ATOMIC_RELAXED);
+}
+
+static inline void ctx_io_write_op_bytes(struct rrdengine_instance *ctx, size_t bytes) {
+ __atomic_add_fetch(&ctx->stats.io_write_bytes, bytes, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.io_write_requests, 1, __ATOMIC_RELAXED);
+}
+
+static inline void ctx_io_error(struct rrdengine_instance *ctx) {
+ __atomic_add_fetch(&ctx->stats.io_errors, 1, __ATOMIC_RELAXED);
+ rrd_stat_atomic_add(&global_io_errors, 1);
+}
+
+static inline void ctx_fs_error(struct rrdengine_instance *ctx) {
+ __atomic_add_fetch(&ctx->stats.fs_errors, 1, __ATOMIC_RELAXED);
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+}
+
#define ctx_last_fileno_get(ctx) __atomic_load_n(&(ctx)->atomic.last_fileno, __ATOMIC_RELAXED)
#define ctx_last_fileno_increment(ctx) __atomic_add_fetch(&(ctx)->atomic.last_fileno, 1, __ATOMIC_RELAXED)
@@ -460,7 +491,7 @@ typedef struct validated_page_descriptor {
size_t point_size;
size_t entries;
uint8_t type;
- bool data_on_disk_valid;
+ bool is_valid;
} VALIDATED_PAGE_DESCRIPTOR;
#define page_entries_by_time(start_time_s, end_time_s, update_every_s) \
@@ -469,7 +500,21 @@ typedef struct validated_page_descriptor {
#define page_entries_by_size(page_length_in_bytes, point_size_in_bytes) \
((page_length_in_bytes) / (point_size_in_bytes))
+VALIDATED_PAGE_DESCRIPTOR validate_page(uuid_t *uuid,
+ time_t start_time_s,
+ time_t end_time_s,
+ time_t update_every_s,
+ size_t page_length,
+ uint8_t page_type,
+ size_t entries,
+ time_t now_s,
+ time_t overwrite_zero_update_every_s,
+ bool have_read_error,
+ bool minimize_invalid_size,
+ const char *msg,
+ RRDENG_COLLECT_PAGE_FLAGS flags);
VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error);
+void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags);
typedef enum {
PAGE_IS_IN_THE_PAST = -1,
@@ -479,4 +524,8 @@ typedef enum {
TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t page_last_time_s, time_t wanted_start_time_s, time_t wanted_end_time_s);
+static inline time_t max_acceptable_collected_time(void) {
+ return now_realtime_sec() + 1;
+}
+
#endif /* NETDATA_RRDENGINE_H */