summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAdrien BĂ©raud <adrien.beraud@savoirfairelinux.com>2022-05-03 04:34:15 -0400
committerGitHub <noreply@github.com>2022-05-03 11:34:15 +0300
commitd92890b5f180f13b5f680b3bd345e3674b8f8e8c (patch)
tree1ec4ccc7b409e2bdb2fe12fdbb6954470a75e3f5
parent5850810715a9b2fc9413a2b43ae2dc1d1a5b4bf6 (diff)
Configurable storage engine for Netdata agents: step 1 (#12776)
* rrd: move API structures out of rrddim_volatile In C, unlike C++, it's not possible to reference a nested structure from outside this structure. Since we later want to use rrddim_query_ops and rrddim_collect_ops separately from rrddim_volatile, move these nested structures out. * rrd: use opaque handle types for different memory modes
-rw-r--r--database/engine/rrdengine.h16
-rwxr-xr-xdatabase/engine/rrdengineapi.c33
-rw-r--r--database/rrd.h131
-rw-r--r--database/rrddim.c28
-rw-r--r--ml/Dimension.h2
-rw-r--r--ml/Query.h2
-rw-r--r--web/api/queries/query.c10
7 files changed, 112 insertions, 110 deletions
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 7aba98e451..fae911e747 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -34,6 +34,22 @@ struct rrdengine_instance;
#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
+struct rrdeng_collect_handle {
+ struct rrdeng_page_descr *descr, *prev_descr;
+ unsigned long page_correlation_id;
+ struct rrdengine_instance *ctx;
+ // set to 1 when this dimension is not page aligned with the other dimensions in the chart
+ uint8_t unaligned_page;
+};
+
+struct rrdeng_query_handle {
+ struct rrdeng_page_descr *descr;
+ struct rrdengine_instance *ctx;
+ struct pg_cache_page_index *page_index;
+ time_t next_page_time;
+ time_t now;
+ unsigned position;
+};
typedef enum {
RRDENGINE_STATUS_UNINITIALIZED = 0,
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index e94f6e003d..a9e4eb644a 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -126,12 +126,13 @@ void rrdeng_store_metric_init(RRDDIM *rd)
struct pg_cache_page_index *page_index;
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
- handle = &rd->state->handle.rrdeng;
- handle->ctx = ctx;
+ handle = callocz(1, sizeof(struct rrdeng_collect_handle));
+ handle->ctx = ctx;
handle->descr = NULL;
handle->prev_descr = NULL;
handle->unaligned_page = 0;
+ rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle;
page_index = rd->state->page_index;
uv_rwlock_wrlock(&page_index->lock);
@@ -162,7 +163,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
- handle = &rd->state->handle.rrdeng;
+ handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
if (unlikely(!ctx))
return;
@@ -211,14 +212,13 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
- struct rrdeng_collect_handle *handle;
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
struct rrdeng_page_descr *descr;
storage_number *page;
uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
- handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;
@@ -301,7 +301,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
struct pg_cache_page_index *page_index;
uint8_t can_delete_metric = 0;
- handle = &rd->state->handle.rrdeng;
+ handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
page_index = rd->state->page_index;
rrdeng_store_metric_flush_current_page(rd);
@@ -314,6 +314,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
can_delete_metric = 1;
}
uv_rwlock_wrunlock(&page_index->lock);
+ freez(handle);
return can_delete_metric;
}
@@ -535,12 +536,14 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
rrdimm_handle->start_time = start_time;
rrdimm_handle->end_time = end_time;
- handle = &rrdimm_handle->rrdeng;
+
+ handle = calloc(1, sizeof(struct rrdeng_query_handle));
handle->next_page_time = start_time;
handle->now = start_time;
handle->position = 0;
handle->ctx = ctx;
handle->descr = NULL;
+ rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
NULL, &handle->page_index);
if (unlikely(NULL == handle->page_index || 0 == pages_nr))
@@ -551,7 +554,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
/* Returns the metric and sets its timestamp into current_time */
storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
{
- struct rrdeng_query_handle *handle;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
storage_number *page, ret;
@@ -559,7 +562,6 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
usec_t next_page_time = 0, current_position_time, page_end_time = 0;
uint32_t page_length;
- handle = &rrdimm_handle->rrdeng;
if (unlikely(INVALID_TIME == handle->next_page_time)) {
return SN_EMPTY_SLOT;
}
@@ -641,9 +643,7 @@ no_more_metrics:
int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
{
- struct rrdeng_query_handle *handle;
-
- handle = &rrdimm_handle->rrdeng;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
return (INVALID_TIME == handle->next_page_time);
}
@@ -652,13 +652,10 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
*/
void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
- struct rrdeng_query_handle *handle;
- struct rrdengine_instance *ctx;
- struct rrdeng_page_descr *descr;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
+ struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdeng_page_descr *descr = handle->descr;
- handle = &rrdimm_handle->rrdeng;
- ctx = handle->ctx;
- descr = handle->descr;
if (descr) {
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
diff --git a/database/rrd.h b/database/rrd.h
index 371f1af6af..137f3228e4 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -326,53 +326,56 @@ struct rrddim {
};
// ----------------------------------------------------------------------------
-// iterator state for RRD dimension data collection
-union rrddim_collect_handle {
- struct {
- long slot;
- long entries;
- } slotted; // state the legacy code uses
-#ifdef ENABLE_DBENGINE
- struct rrdeng_collect_handle {
- struct rrdeng_page_descr *descr, *prev_descr;
- unsigned long page_correlation_id;
- struct rrdengine_instance *ctx;
- // set to 1 when this dimension is not page aligned with the other dimensions in the chart
- uint8_t unaligned_page;
- } rrdeng; // state the database engine uses
-#endif
-};
+// engine-specific iterator state for dimension data collection
+typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE;
// ----------------------------------------------------------------------------
-// iterator state for RRD dimension data queries
-
-#ifdef ENABLE_DBENGINE
-struct rrdeng_query_handle {
- struct rrdeng_page_descr *descr;
- struct rrdengine_instance *ctx;
- struct pg_cache_page_index *page_index;
- time_t next_page_time;
- time_t now;
- unsigned position;
-};
-#endif
+// engine-specific iterator state for dimension data queries
+typedef struct storage_query_handle STORAGE_QUERY_HANDLE;
+// ----------------------------------------------------------------------------
+// iterator state for RRD dimension data queries
struct rrddim_query_handle {
RRDDIM *rd;
time_t start_time;
time_t end_time;
- union {
- struct {
- long slot;
- long last_slot;
- uint8_t finished;
- } slotted; // state the legacy code uses
-#ifdef ENABLE_DBENGINE
- struct rrdeng_query_handle rrdeng; // state the database engine uses
-#endif
- };
+ STORAGE_QUERY_HANDLE* handle;
};
+// ------------------------------------------------------------------------
+// function pointers that handle data collection
+struct rrddim_collect_ops {
+ // an initialization function to run before starting collection
+ void (*init)(RRDDIM *rd);
+
+ // run this to store each metric into the database
+ void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number);
+
+ // an finalization function to run after collection is over
+ // returns 1 if it's safe to delete the dimension
+ int (*finalize)(RRDDIM *rd);
+};
+
+// function pointers that handle database queries
+struct rrddim_query_ops {
+ // run this before starting a series of next_metric() database queries
+ void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
+
+ // run this to load each metric number from the database
+ storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time);
+
+ // run this to test if the series of next_metric() database queries is finished
+ int (*is_finished)(struct rrddim_query_handle *handle);
+
+ // run this after finishing a series of load_metric() database queries
+ void (*finalize)(struct rrddim_query_handle *handle);
+
+ // get the timestamp of the last entry of this metric
+ time_t (*latest_time)(RRDDIM *rd);
+
+ // get the timestamp of the first entry of this metric
+ time_t (*oldest_time)(RRDDIM *rd);
+};
// ----------------------------------------------------------------------------
// volatile state per RRD dimension
@@ -385,42 +388,9 @@ struct rrddim_volatile {
int aclk_live_status;
#endif
uuid_t metric_uuid; // global UUID for this metric (unique_across hosts)
- union rrddim_collect_handle handle;
- // ------------------------------------------------------------------------
- // function pointers that handle data collection
- struct rrddim_collect_ops {
- // an initialization function to run before starting collection
- void (*init)(RRDDIM *rd);
-
- // run this to store each metric into the database
- void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number);
-
- // an finalization function to run after collection is over
- // returns 1 if it's safe to delete the dimension
- int (*finalize)(RRDDIM *rd);
- } collect_ops;
-
- // function pointers that handle database queries
- struct rrddim_query_ops {
- // run this before starting a series of next_metric() database queries
- void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
-
- // run this to load each metric number from the database
- storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time);
-
- // run this to test if the series of next_metric() database queries is finished
- int (*is_finished)(struct rrddim_query_handle *handle);
-
- // run this after finishing a series of load_metric() database queries
- void (*finalize)(struct rrddim_query_handle *handle);
-
- // get the timestamp of the last entry of this metric
- time_t (*latest_time)(RRDDIM *rd);
-
- // get the timestamp of the first entry of this metric
- time_t (*oldest_time)(RRDDIM *rd);
- } query_ops;
-
+ STORAGE_COLLECT_HANDLE* handle;
+ struct rrddim_collect_ops collect_ops;
+ struct rrddim_query_ops query_ops;
ml_dimension_t ml_dimension;
};
@@ -435,6 +405,19 @@ struct rrdset_volatile {
bool is_ar_chart;
};
+// RRDDIM legacy data collection structures
+
+struct mem_collect_handle {
+ long slot;
+ long entries;
+};
+
+struct mem_query_handle {
+ long slot;
+ long last_slot;
+ uint8_t finished;
+};
+
// ----------------------------------------------------------------------------
// these loop macros make sure the linked list is accessed with the right lock
diff --git a/database/rrddim.c b/database/rrddim.c
index 3c013e1ded..d4e232eb50 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -99,6 +99,7 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor)
// RRDDIM legacy data collection functions
static void rrddim_collect_init(RRDDIM *rd) {
+ rd->state->handle = callocz(1, sizeof(struct mem_collect_handle));
rd->values[rd->rrdset->current_entry] = SN_EMPTY_SLOT;
}
static void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number) {
@@ -107,11 +108,11 @@ static void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storag
rd->values[rd->rrdset->current_entry] = number;
}
static int rrddim_collect_finalize(RRDDIM *rd) {
- (void)rd;
-
+ freez(rd->state->handle);
return 0;
}
+
// ----------------------------------------------------------------------------
// RRDDIM legacy database query functions
@@ -119,34 +120,37 @@ static void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, ti
handle->rd = rd;
handle->start_time = start_time;
handle->end_time = end_time;
- handle->slotted.slot = rrdset_time2slot(rd->rrdset, start_time);
- handle->slotted.last_slot = rrdset_time2slot(rd->rrdset, end_time);
- handle->slotted.finished = 0;
+ struct mem_query_handle* mem_handle = callocz(1, sizeof(struct mem_query_handle));
+ mem_handle->slot = rrdset_time2slot(rd->rrdset, start_time);
+ mem_handle->last_slot = rrdset_time2slot(rd->rrdset, end_time);
+ mem_handle->finished = 0;
+ handle->handle = (STORAGE_QUERY_HANDLE *)mem_handle;
}
static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time) {
RRDDIM *rd = handle->rd;
+ struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle->handle;
long entries = rd->rrdset->entries;
- long slot = handle->slotted.slot;
+ long slot = mem_handle->slot;
(void)current_time;
- if (unlikely(handle->slotted.slot == handle->slotted.last_slot))
- handle->slotted.finished = 1;
+ if (unlikely(mem_handle->slot == mem_handle->last_slot))
+ mem_handle->finished = 1;
storage_number n = rd->values[slot++];
if(unlikely(slot >= entries)) slot = 0;
- handle->slotted.slot = slot;
+ mem_handle->slot = slot;
return n;
}
static int rrddim_query_is_finished(struct rrddim_query_handle *handle) {
- return handle->slotted.finished;
+ struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle->handle;
+ return mem_handle->finished;
}
static void rrddim_query_finalize(struct rrddim_query_handle *handle) {
- (void)handle;
-
+ freez(handle->handle);
return;
}
diff --git a/ml/Dimension.h b/ml/Dimension.h
index 44b348e9b2..b2af323a4b 100644
--- a/ml/Dimension.h
+++ b/ml/Dimension.h
@@ -45,7 +45,7 @@ private:
RRDDIM *RD;
RRDDIM *AnomalyRateRD;
- struct rrddim_volatile::rrddim_query_ops *Ops;
+ struct rrddim_query_ops *Ops;
std::string ID;
};
diff --git a/ml/Query.h b/ml/Query.h
index cbaf6c2977..8b84bb73e2 100644
--- a/ml/Query.h
+++ b/ml/Query.h
@@ -40,7 +40,7 @@ public:
private:
RRDDIM *RD;
- struct rrddim_volatile::rrddim_query_ops *Ops;
+ struct rrddim_query_ops *Ops;
struct rrddim_query_handle Handle;
};
diff --git a/web/api/queries/query.c b/web/api/queries/query.c
index 9308d10bf5..0190a6cfb6 100644
--- a/web/api/queries/query.c
+++ b/web/api/queries/query.c
@@ -580,9 +580,10 @@ static inline void do_dimension_fixedstep(
// read the value from the database
//storage_number n = rd->values[slot];
#ifdef NETDATA_INTERNAL_CHECKS
+ struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle.handle;
if ((rd->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) &&
- (rrdset_time2slot(st, now) != (long unsigned)handle.slotted.slot)) {
- error("INTERNAL CHECK: Unaligned query for %s, database slot: %lu, expected slot: %lu", rd->id, (long unsigned)handle.slotted.slot, rrdset_time2slot(st, now));
+ (rrdset_time2slot(st, now) != (long unsigned)(mem_handle->slot))) {
+ error("INTERNAL CHECK: Unaligned query for %s, database slot: %lu, expected slot: %lu", rd->id, (long unsigned)mem_handle->slot, rrdset_time2slot(st, now));
}
#endif
db_now = now; // this is needed to set db_now in case the next_metric implementation does not set it
@@ -601,8 +602,9 @@ static inline void do_dimension_fixedstep(
calculated_number value = NAN;
if(likely(now >= db_now && does_storage_number_exist(n))) {
#if defined(NETDATA_INTERNAL_CHECKS) && defined(ENABLE_DBENGINE)
- if ((rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && (now != handle.rrdeng.now)) {
- error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)handle.rrdeng.now, (long)now);
+ struct rrdeng_query_handle* rrd_handle = (struct rrdeng_query_handle*)handle.handle;
+ if ((rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && (now != rrd_handle->now)) {
+ error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)rrd_handle->now, (long)now);
}
#endif
if (options & RRDR_OPTION_ANOMALY_BIT)