diff options
author | Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com> | 2022-05-03 04:34:15 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-03 11:34:15 +0300 |
commit | d92890b5f180f13b5f680b3bd345e3674b8f8e8c (patch) | |
tree | 1ec4ccc7b409e2bdb2fe12fdbb6954470a75e3f5 | |
parent | 5850810715a9b2fc9413a2b43ae2dc1d1a5b4bf6 (diff) |
Configurable storage engine for Netdata agents: step 1 (#12776)
* rrd: move API structures out of rrddim_volatile
In C, unlike C++, it's not possible to reference a nested structure
from outside this structure.
Since we later want to use rrddim_query_ops and rrddim_collect_ops
separately from rrddim_volatile, move these nested structures out.
* rrd: use opaque handle types for different memory modes
-rw-r--r-- | database/engine/rrdengine.h | 16 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 33 | ||||
-rw-r--r-- | database/rrd.h | 131 | ||||
-rw-r--r-- | database/rrddim.c | 28 | ||||
-rw-r--r-- | ml/Dimension.h | 2 | ||||
-rw-r--r-- | ml/Query.h | 2 | ||||
-rw-r--r-- | web/api/queries/query.c | 10 |
7 files changed, 112 insertions, 110 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 7aba98e451..fae911e747 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -34,6 +34,22 @@ struct rrdengine_instance; #define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u" #define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" +struct rrdeng_collect_handle { + struct rrdeng_page_descr *descr, *prev_descr; + unsigned long page_correlation_id; + struct rrdengine_instance *ctx; + // set to 1 when this dimension is not page aligned with the other dimensions in the chart + uint8_t unaligned_page; +}; + +struct rrdeng_query_handle { + struct rrdeng_page_descr *descr; + struct rrdengine_instance *ctx; + struct pg_cache_page_index *page_index; + time_t next_page_time; + time_t now; + unsigned position; +}; typedef enum { RRDENGINE_STATUS_UNINITIALIZED = 0, diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index e94f6e003d..a9e4eb644a 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -126,12 +126,13 @@ void rrdeng_store_metric_init(RRDDIM *rd) struct pg_cache_page_index *page_index; ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); - handle = &rd->state->handle.rrdeng; - handle->ctx = ctx; + handle = callocz(1, sizeof(struct rrdeng_collect_handle)); + handle->ctx = ctx; handle->descr = NULL; handle->prev_descr = NULL; handle->unaligned_page = 0; + rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle; page_index = rd->state->page_index; uv_rwlock_wrlock(&page_index->lock); @@ -162,7 +163,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) struct rrdengine_instance *ctx; struct rrdeng_page_descr *descr; - handle = &rd->state->handle.rrdeng; + handle = (struct rrdeng_collect_handle *)rd->state->handle; ctx = handle->ctx; if (unlikely(!ctx)) return; @@ -211,14 +212,13 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) { - struct rrdeng_collect_handle *handle; + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle; struct rrdengine_instance *ctx; struct page_cache *pg_cache; struct rrdeng_page_descr *descr; storage_number *page; uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0; - handle = &rd->state->handle.rrdeng; ctx = handle->ctx; pg_cache = &ctx->pg_cache; descr = handle->descr; @@ -301,7 +301,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) struct pg_cache_page_index *page_index; uint8_t can_delete_metric = 0; - handle = &rd->state->handle.rrdeng; + handle = (struct rrdeng_collect_handle *)rd->state->handle; ctx = handle->ctx; page_index = rd->state->page_index; rrdeng_store_metric_flush_current_page(rd); @@ -314,6 +314,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd) can_delete_metric = 1; } uv_rwlock_wrunlock(&page_index->lock); + freez(handle); return can_delete_metric; } @@ -535,12 +536,14 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost); rrdimm_handle->start_time = start_time; rrdimm_handle->end_time = end_time; - handle = &rrdimm_handle->rrdeng; + + handle = calloc(1, sizeof(struct rrdeng_query_handle)); handle->next_page_time = start_time; handle->now = start_time; handle->position = 0; handle->ctx = ctx; handle->descr = NULL; + rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle; pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) @@ -551,7 +554,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand /* Returns the metric and sets its timestamp into current_time */ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { - struct rrdeng_query_handle *handle; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; struct rrdengine_instance *ctx; struct rrdeng_page_descr *descr; storage_number *page, ret; @@ -559,7 +562,6 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle usec_t next_page_time = 0, current_position_time, page_end_time = 0; uint32_t page_length; - handle = &rrdimm_handle->rrdeng; if (unlikely(INVALID_TIME == handle->next_page_time)) { return SN_EMPTY_SLOT; } @@ -641,9 +643,7 @@ no_more_metrics: int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle; - - handle = &rrdimm_handle->rrdeng; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; return (INVALID_TIME == handle->next_page_time); } @@ -652,13 +652,10 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) */ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) { - struct rrdeng_query_handle *handle; - struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; + struct rrdengine_instance *ctx = handle->ctx; + struct rrdeng_page_descr *descr = handle->descr; - handle = &rrdimm_handle->rrdeng; - ctx = handle->ctx; - descr = handle->descr; if (descr) { #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); diff --git a/database/rrd.h b/database/rrd.h index 371f1af6af..137f3228e4 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -326,53 +326,56 @@ struct rrddim { }; // ---------------------------------------------------------------------------- -// iterator state for RRD dimension data collection -union rrddim_collect_handle { - struct { - long slot; - long entries; - } slotted; // state the legacy code uses -#ifdef ENABLE_DBENGINE - struct rrdeng_collect_handle { - struct rrdeng_page_descr *descr, *prev_descr; - unsigned long page_correlation_id; - struct rrdengine_instance *ctx; - // set to 1 when this dimension is not page aligned with the other dimensions in the chart - uint8_t unaligned_page; - } rrdeng; // state the database engine uses -#endif -}; +// engine-specific iterator state for dimension data collection +typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE; // ---------------------------------------------------------------------------- -// iterator state for RRD dimension data queries - -#ifdef ENABLE_DBENGINE -struct rrdeng_query_handle { - struct rrdeng_page_descr *descr; - struct rrdengine_instance *ctx; - struct pg_cache_page_index *page_index; - time_t next_page_time; - time_t now; - unsigned position; -}; -#endif +// engine-specific iterator state for dimension data queries +typedef struct storage_query_handle STORAGE_QUERY_HANDLE; +// ---------------------------------------------------------------------------- +// iterator state for RRD dimension data queries struct rrddim_query_handle { RRDDIM *rd; time_t start_time; time_t end_time; - union { - struct { - long slot; - long last_slot; - uint8_t finished; - } slotted; // state the legacy code uses -#ifdef ENABLE_DBENGINE - struct rrdeng_query_handle rrdeng; // state the database engine uses -#endif - }; + STORAGE_QUERY_HANDLE* handle; }; +// ------------------------------------------------------------------------ +// function pointers that handle data collection +struct rrddim_collect_ops { + // an initialization function to run before starting collection + void (*init)(RRDDIM *rd); + + // run this to store each metric into the database + void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number); + + // an finalization function to run after collection is over + // returns 1 if it's safe to delete the dimension + int (*finalize)(RRDDIM *rd); +}; + +// function pointers that handle database queries +struct rrddim_query_ops { + // run this before starting a series of next_metric() database queries + void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time); + + // run this to load each metric number from the database + storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time); + + // run this to test if the series of next_metric() database queries is finished + int (*is_finished)(struct rrddim_query_handle *handle); + + // run this after finishing a series of load_metric() database queries + void (*finalize)(struct rrddim_query_handle *handle); + + // get the timestamp of the last entry of this metric + time_t (*latest_time)(RRDDIM *rd); + + // get the timestamp of the first entry of this metric + time_t (*oldest_time)(RRDDIM *rd); +}; // ---------------------------------------------------------------------------- // volatile state per RRD dimension @@ -385,42 +388,9 @@ struct rrddim_volatile { int aclk_live_status; #endif uuid_t metric_uuid; // global UUID for this metric (unique_across hosts) - union rrddim_collect_handle handle; - // ------------------------------------------------------------------------ - // function pointers that handle data collection - struct rrddim_collect_ops { - // an initialization function to run before starting collection - void (*init)(RRDDIM *rd); - - // run this to store each metric into the database - void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number); - - // an finalization function to run after collection is over - // returns 1 if it's safe to delete the dimension - int (*finalize)(RRDDIM *rd); - } collect_ops; - - // function pointers that handle database queries - struct rrddim_query_ops { - // run this before starting a series of next_metric() database queries - void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time); - - // run this to load each metric number from the database - storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time); - - // run this to test if the series of next_metric() database queries is finished - int (*is_finished)(struct rrddim_query_handle *handle); - - // run this after finishing a series of load_metric() database queries - void (*finalize)(struct rrddim_query_handle *handle); - - // get the timestamp of the last entry of this metric - time_t (*latest_time)(RRDDIM *rd); - - // get the timestamp of the first entry of this metric - time_t (*oldest_time)(RRDDIM *rd); - } query_ops; - + STORAGE_COLLECT_HANDLE* handle; + struct rrddim_collect_ops collect_ops; + struct rrddim_query_ops query_ops; ml_dimension_t ml_dimension; }; @@ -435,6 +405,19 @@ struct rrdset_volatile { bool is_ar_chart; }; +// RRDDIM legacy data collection structures + +struct mem_collect_handle { + long slot; + long entries; +}; + +struct mem_query_handle { + long slot; + long last_slot; + uint8_t finished; +}; + // ---------------------------------------------------------------------------- // these loop macros make sure the linked list is accessed with the right lock diff --git a/database/rrddim.c b/database/rrddim.c index 3c013e1ded..d4e232eb50 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -99,6 +99,7 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor) // RRDDIM legacy data collection functions static void rrddim_collect_init(RRDDIM *rd) { + rd->state->handle = callocz(1, sizeof(struct mem_collect_handle)); rd->values[rd->rrdset->current_entry] = SN_EMPTY_SLOT; } static void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number) { @@ -107,11 +108,11 @@ static void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storag rd->values[rd->rrdset->current_entry] = number; } static int rrddim_collect_finalize(RRDDIM *rd) { - (void)rd; - + freez(rd->state->handle); return 0; } + // ---------------------------------------------------------------------------- // RRDDIM legacy database query functions @@ -119,34 +120,37 @@ static void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, ti handle->rd = rd; handle->start_time = start_time; handle->end_time = end_time; - handle->slotted.slot = rrdset_time2slot(rd->rrdset, start_time); - handle->slotted.last_slot = rrdset_time2slot(rd->rrdset, end_time); - handle->slotted.finished = 0; + struct mem_query_handle* mem_handle = callocz(1, sizeof(struct mem_query_handle)); + mem_handle->slot = rrdset_time2slot(rd->rrdset, start_time); + mem_handle->last_slot = rrdset_time2slot(rd->rrdset, end_time); + mem_handle->finished = 0; + handle->handle = (STORAGE_QUERY_HANDLE *)mem_handle; } static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time) { RRDDIM *rd = handle->rd; + struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle->handle; long entries = rd->rrdset->entries; - long slot = handle->slotted.slot; + long slot = mem_handle->slot; (void)current_time; - if (unlikely(handle->slotted.slot == handle->slotted.last_slot)) - handle->slotted.finished = 1; + if (unlikely(mem_handle->slot == mem_handle->last_slot)) + mem_handle->finished = 1; storage_number n = rd->values[slot++]; if(unlikely(slot >= entries)) slot = 0; - handle->slotted.slot = slot; + mem_handle->slot = slot; return n; } static int rrddim_query_is_finished(struct rrddim_query_handle *handle) { - return handle->slotted.finished; + struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle->handle; + return mem_handle->finished; } static void rrddim_query_finalize(struct rrddim_query_handle *handle) { - (void)handle; - + freez(handle->handle); return; } diff --git a/ml/Dimension.h b/ml/Dimension.h index 44b348e9b2..b2af323a4b 100644 --- a/ml/Dimension.h +++ b/ml/Dimension.h @@ -45,7 +45,7 @@ private: RRDDIM *RD; RRDDIM *AnomalyRateRD; - struct rrddim_volatile::rrddim_query_ops *Ops; + struct rrddim_query_ops *Ops; std::string ID; }; diff --git a/ml/Query.h b/ml/Query.h index cbaf6c2977..8b84bb73e2 100644 --- a/ml/Query.h +++ b/ml/Query.h @@ -40,7 +40,7 @@ public: private: RRDDIM *RD; - struct rrddim_volatile::rrddim_query_ops *Ops; + struct rrddim_query_ops *Ops; struct rrddim_query_handle Handle; }; diff --git a/web/api/queries/query.c b/web/api/queries/query.c index 9308d10bf5..0190a6cfb6 100644 --- a/web/api/queries/query.c +++ b/web/api/queries/query.c @@ -580,9 +580,10 @@ static inline void do_dimension_fixedstep( // read the value from the database //storage_number n = rd->values[slot]; #ifdef NETDATA_INTERNAL_CHECKS + struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle.handle; if ((rd->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) && - (rrdset_time2slot(st, now) != (long unsigned)handle.slotted.slot)) { - error("INTERNAL CHECK: Unaligned query for %s, database slot: %lu, expected slot: %lu", rd->id, (long unsigned)handle.slotted.slot, rrdset_time2slot(st, now)); + (rrdset_time2slot(st, now) != (long unsigned)(mem_handle->slot))) { + error("INTERNAL CHECK: Unaligned query for %s, database slot: %lu, expected slot: %lu", rd->id, (long unsigned)mem_handle->slot, rrdset_time2slot(st, now)); } #endif db_now = now; // this is needed to set db_now in case the next_metric implementation does not set it @@ -601,8 +602,9 @@ static inline void do_dimension_fixedstep( calculated_number value = NAN; if(likely(now >= db_now && does_storage_number_exist(n))) { #if defined(NETDATA_INTERNAL_CHECKS) && defined(ENABLE_DBENGINE) - if ((rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && (now != handle.rrdeng.now)) { - error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)handle.rrdeng.now, (long)now); + struct rrdeng_query_handle* rrd_handle = (struct rrdeng_query_handle*)handle.handle; + if ((rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && (now != rrd_handle->now)) { + error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)rrd_handle->now, (long)now); } #endif if (options & RRDR_OPTION_ANOMALY_BIT) |