summaryrefslogtreecommitdiffstats
path: root/database/rrd.h
diff options
context:
space:
mode:
Diffstat (limited to 'database/rrd.h')
-rw-r--r--database/rrd.h258
1 files changed, 211 insertions, 47 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 8bca004b79..d1fbca66b0 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -109,12 +109,20 @@ RRD_MEMORY_MODE rrd_memory_mode_id(const char *name);
typedef struct storage_query_handle STORAGE_QUERY_HANDLE;
+typedef enum __attribute__ ((__packed__)) {
+ STORAGE_ENGINE_BACKEND_RRDDIM = 1,
+ STORAGE_ENGINE_BACKEND_DBENGINE = 2,
+} STORAGE_ENGINE_BACKEND;
+
+#define is_valid_backend(backend) ((backend) >= STORAGE_ENGINE_BACKEND_RRDDIM && (backend) <= STORAGE_ENGINE_BACKEND_DBENGINE)
+
// iterator state for RRD dimension data queries
struct storage_engine_query_handle {
time_t start_time_s;
time_t end_time_s;
STORAGE_PRIORITY priority;
- STORAGE_QUERY_HANDLE* handle;
+ STORAGE_ENGINE_BACKEND backend;
+ STORAGE_QUERY_HANDLE *handle;
};
// ----------------------------------------------------------------------------
@@ -162,11 +170,11 @@ extern time_t rrdset_free_obsolete_time_s;
#if defined(ENV32BIT)
#define MIN_LIBUV_WORKER_THREADS 8
-#define MAX_LIBUV_WORKER_THREADS 64
+#define MAX_LIBUV_WORKER_THREADS 128
#define RESERVED_LIBUV_WORKER_THREADS 3
#else
#define MIN_LIBUV_WORKER_THREADS 16
-#define MAX_LIBUV_WORKER_THREADS 128
+#define MAX_LIBUV_WORKER_THREADS 1024
#define RESERVED_LIBUV_WORKER_THREADS 6
#endif
@@ -301,19 +309,20 @@ bool exporting_labels_filter_callback(const char *name, const char *value, RRDLA
// ----------------------------------------------------------------------------
// engine-specific iterator state for dimension data collection
-typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE;
+typedef struct storage_collect_handle {
+ STORAGE_ENGINE_BACKEND backend;
+} STORAGE_COLLECT_HANDLE;
// ----------------------------------------------------------------------------
// Storage tier data for every dimension
struct rrddim_tier {
STORAGE_POINT virtual_point;
+ STORAGE_ENGINE_BACKEND backend;
size_t tier_grouping;
time_t next_point_end_time_s;
STORAGE_METRIC_HANDLE *db_metric_handle; // the metric handle inside the database
STORAGE_COLLECT_HANDLE *db_collection_handle; // the data collection handle
- struct storage_engine_collect_ops *collect_ops;
- struct storage_engine_query_ops *query_ops;
};
void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s);
@@ -412,56 +421,214 @@ size_t rrddim_memory_file_header_size(void);
void rrddim_memory_file_save(RRDDIM *rd);
// ------------------------------------------------------------------------
-// function pointers that handle data collection
-struct storage_engine_collect_ops {
- // an initialization function to run before starting collection
- STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+// DATA COLLECTION STORAGE OPS
- // run this to store each metric into the database
- void (*store_metric)(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value,
- NETDATA_DOUBLE max_value, uint16_t count, uint16_t anomaly_count, SN_FLAGS flags);
+STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
+static inline STORAGE_METRICS_GROUP *storage_engine_metrics_group_get(STORAGE_ENGINE_BACKEND backend, STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
- // run this to flush / reset the current data collection sequence
- void (*flush)(STORAGE_COLLECT_HANDLE *collection_handle);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_metrics_group_get(db_instance, uuid);
+#endif
+ return rrddim_metrics_group_get(db_instance, uuid);
+}
- // a finalization function to run after collection is over
- // returns 1 if it's safe to delete the dimension
- int (*finalize)(STORAGE_COLLECT_HANDLE *collection_handle);
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+static inline void storage_engine_metrics_group_release(STORAGE_ENGINE_BACKEND backend, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
- void (*change_collection_frequency)(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_metrics_group_release(db_instance, smg);
+ else
+#endif
+ rrddim_metrics_group_release(db_instance, smg);
+}
+
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+static inline STORAGE_COLLECT_HANDLE *storage_metric_store_init(STORAGE_ENGINE_BACKEND backend, STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_store_metric_init(db_metric_handle, update_every, smg);
+#endif
+ return rrddim_collect_init(db_metric_handle, update_every, smg);
+}
+
+void rrdeng_store_metric_next(
+ STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut,
+ NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value,
+ uint16_t count, uint16_t anomaly_count, SN_FLAGS flags);
+
+void rrddim_collect_store_metric(
+ STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut,
+ NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value,
+ uint16_t count, uint16_t anomaly_count, SN_FLAGS flags);
+
+static inline void storage_engine_store_metric(
+ STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut,
+ NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value,
+ uint16_t count, uint16_t anomaly_count, SN_FLAGS flags) {
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_store_metric_next(collection_handle, point_in_time_ut,
+ n, min_value, max_value,
+ count, anomaly_count, flags);
+#endif
+ return rrddim_collect_store_metric(collection_handle, point_in_time_ut,
+ n, min_value, max_value,
+ count, anomaly_count, flags);
+}
+
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
+void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle);
+static inline void storage_engine_store_flush(STORAGE_COLLECT_HANDLE *collection_handle) {
+ if(unlikely(!collection_handle))
+ return;
+
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_store_metric_flush_current_page(collection_handle);
+ else
+#endif
+ rrddim_store_metric_flush(collection_handle);
+}
+
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+// a finalization function to run after collection is over
+// returns 1 if it's safe to delete the dimension
+static inline int storage_engine_store_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_store_metric_finalize(collection_handle);
+#endif
+
+ return rrddim_collect_finalize(collection_handle);
+}
+
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
+static inline void storage_engine_store_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
+ internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_store_metric_change_collection_frequency(collection_handle, update_every);
+ else
+#endif
+ rrddim_store_metric_change_collection_frequency(collection_handle, update_every);
+}
- STORAGE_METRICS_GROUP *(*metrics_group_get)(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
- void (*metrics_group_release)(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *sa);
-};
// ----------------------------------------------------------------------------
+// STORAGE ENGINE QUERY OPS
+
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+time_t rrddim_query_oldest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle);
+static inline time_t storage_engine_oldest_time_s(STORAGE_ENGINE_BACKEND backend, STORAGE_METRIC_HANDLE *db_metric_handle) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_metric_oldest_time(db_metric_handle);
+#endif
+ return rrddim_query_oldest_time_s(db_metric_handle);
+}
-// function pointers that handle database queries
-struct storage_engine_query_ops {
- // run this before starting a series of next_metric() database queries
- void (*init)(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+time_t rrddim_query_latest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle);
+static inline time_t storage_engine_latest_time_s(STORAGE_ENGINE_BACKEND backend, STORAGE_METRIC_HANDLE *db_metric_handle) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
- // run this to load each metric number from the database
- STORAGE_POINT (*next_metric)(struct storage_engine_query_handle *handle);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_metric_latest_time(db_metric_handle);
+#endif
+ return rrddim_query_latest_time_s(db_metric_handle);
+}
- // run this to test if the series of next_metric() database queries is finished
- int (*is_finished)(struct storage_engine_query_handle *handle);
+void rrdeng_load_metric_init(
+ STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle,
+ time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
- // run this after finishing a series of load_metric() database queries
- void (*finalize)(struct storage_engine_query_handle *handle);
+void rrddim_query_init(
+ STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle,
+ time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
- // get the timestamp of the last entry of this metric
- time_t (*latest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle);
+static inline void storage_engine_query_init(
+ STORAGE_ENGINE_BACKEND backend,
+ STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle,
+ time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority) {
+ internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend");
- // get the timestamp of the first entry of this metric
- time_t (*oldest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle);
+#ifdef ENABLE_DBENGINE
+ if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_load_metric_init(db_metric_handle, handle, start_time_s, end_time_s, priority);
+ else
+#endif
+ rrddim_query_init(db_metric_handle, handle, start_time_s, end_time_s, priority);
+}
- // adapt 'before' timestamp to the optimal for the query
- // can only move 'before' ahead (to the future)
- time_t (*align_to_optimal_before)(struct storage_engine_query_handle *handle);
-};
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle);
+STORAGE_POINT rrddim_query_next_metric(struct storage_engine_query_handle *handle);
+static inline STORAGE_POINT storage_engine_query_next_metric(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_load_metric_next(handle);
+#endif
+ return rrddim_query_next_metric(handle);
+}
+
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle);
+int rrddim_query_is_finished(struct storage_engine_query_handle *handle);
+static inline int storage_engine_query_is_finished(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_load_metric_is_finished(handle);
+#endif
+ return rrddim_query_is_finished(handle);
+}
+
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle);
+void rrddim_query_finalize(struct storage_engine_query_handle *handle);
+static inline void storage_engine_query_finalize(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ rrdeng_load_metric_finalize(handle);
+ else
+#endif
+ rrddim_query_finalize(handle);
+}
-typedef struct storage_engine STORAGE_ENGINE;
+time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle);
+time_t rrddim_query_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle);
+static inline time_t storage_engine_align_to_optimal_before(struct storage_engine_query_handle *handle) {
+ internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend");
+
+#ifdef ENABLE_DBENGINE
+ if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE))
+ return rrdeng_load_align_to_optimal_before(handle);
+#endif
+ return rrddim_query_align_to_optimal_before(handle);
+}
// ------------------------------------------------------------------------
// function pointers for all APIs provided by a storage engine
@@ -472,17 +639,14 @@ typedef struct storage_engine_api {
void (*metric_release)(STORAGE_METRIC_HANDLE *);
STORAGE_METRIC_HANDLE *(*metric_dup)(STORAGE_METRIC_HANDLE *);
bool (*metric_retention_by_uuid)(STORAGE_INSTANCE *db_instance, uuid_t *uuid, time_t *first_entry_s, time_t *last_entry_s);
-
- // operations
- struct storage_engine_collect_ops collect_ops;
- struct storage_engine_query_ops query_ops;
} STORAGE_ENGINE_API;
-struct storage_engine {
+typedef struct storage_engine {
+ STORAGE_ENGINE_BACKEND backend;
RRD_MEMORY_MODE id;
const char* name;
STORAGE_ENGINE_API api;
-};
+} STORAGE_ENGINE;
STORAGE_ENGINE* storage_engine_get(RRD_MEMORY_MODE mmode);
STORAGE_ENGINE* storage_engine_find(const char* name);