diff options
Diffstat (limited to 'database/rrd.h')
-rw-r--r-- | database/rrd.h | 258 |
1 files changed, 211 insertions, 47 deletions
diff --git a/database/rrd.h b/database/rrd.h index 8bca004b79..d1fbca66b0 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -109,12 +109,20 @@ RRD_MEMORY_MODE rrd_memory_mode_id(const char *name); typedef struct storage_query_handle STORAGE_QUERY_HANDLE; +typedef enum __attribute__ ((__packed__)) { + STORAGE_ENGINE_BACKEND_RRDDIM = 1, + STORAGE_ENGINE_BACKEND_DBENGINE = 2, +} STORAGE_ENGINE_BACKEND; + +#define is_valid_backend(backend) ((backend) >= STORAGE_ENGINE_BACKEND_RRDDIM && (backend) <= STORAGE_ENGINE_BACKEND_DBENGINE) + // iterator state for RRD dimension data queries struct storage_engine_query_handle { time_t start_time_s; time_t end_time_s; STORAGE_PRIORITY priority; - STORAGE_QUERY_HANDLE* handle; + STORAGE_ENGINE_BACKEND backend; + STORAGE_QUERY_HANDLE *handle; }; // ---------------------------------------------------------------------------- @@ -162,11 +170,11 @@ extern time_t rrdset_free_obsolete_time_s; #if defined(ENV32BIT) #define MIN_LIBUV_WORKER_THREADS 8 -#define MAX_LIBUV_WORKER_THREADS 64 +#define MAX_LIBUV_WORKER_THREADS 128 #define RESERVED_LIBUV_WORKER_THREADS 3 #else #define MIN_LIBUV_WORKER_THREADS 16 -#define MAX_LIBUV_WORKER_THREADS 128 +#define MAX_LIBUV_WORKER_THREADS 1024 #define RESERVED_LIBUV_WORKER_THREADS 6 #endif @@ -301,19 +309,20 @@ bool exporting_labels_filter_callback(const char *name, const char *value, RRDLA // ---------------------------------------------------------------------------- // engine-specific iterator state for dimension data collection -typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE; +typedef struct storage_collect_handle { + STORAGE_ENGINE_BACKEND backend; +} STORAGE_COLLECT_HANDLE; // ---------------------------------------------------------------------------- // Storage tier data for every dimension struct rrddim_tier { STORAGE_POINT virtual_point; + STORAGE_ENGINE_BACKEND backend; size_t tier_grouping; time_t next_point_end_time_s; STORAGE_METRIC_HANDLE *db_metric_handle; // the metric handle inside the database STORAGE_COLLECT_HANDLE *db_collection_handle; // the data collection handle - struct storage_engine_collect_ops *collect_ops; - struct storage_engine_query_ops *query_ops; }; void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s); @@ -412,56 +421,214 @@ size_t rrddim_memory_file_header_size(void); void rrddim_memory_file_save(RRDDIM *rd); // ------------------------------------------------------------------------ -// function pointers that handle data collection -struct storage_engine_collect_ops { - // an initialization function to run before starting collection - STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +// DATA COLLECTION STORAGE OPS - // run this to store each metric into the database - void (*store_metric)(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value, - NETDATA_DOUBLE max_value, uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); +STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +static inline STORAGE_METRICS_GROUP *storage_engine_metrics_group_get(STORAGE_ENGINE_BACKEND backend, STORAGE_INSTANCE *db_instance, uuid_t *uuid) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - // run this to flush / reset the current data collection sequence - void (*flush)(STORAGE_COLLECT_HANDLE *collection_handle); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_metrics_group_get(db_instance, uuid); +#endif + return rrddim_metrics_group_get(db_instance, uuid); +} - // a finalization function to run after collection is over - // returns 1 if it's safe to delete the dimension - int (*finalize)(STORAGE_COLLECT_HANDLE *collection_handle); +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +static inline void storage_engine_metrics_group_release(STORAGE_ENGINE_BACKEND backend, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - void (*change_collection_frequency)(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_metrics_group_release(db_instance, smg); + else +#endif + rrddim_metrics_group_release(db_instance, smg); +} + +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +static inline STORAGE_COLLECT_HANDLE *storage_metric_store_init(STORAGE_ENGINE_BACKEND backend, STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_init(db_metric_handle, update_every, smg); +#endif + return rrddim_collect_init(db_metric_handle, update_every, smg); +} + +void rrdeng_store_metric_next( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); + +void rrddim_collect_store_metric( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); + +static inline void storage_engine_store_metric( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_next(collection_handle, point_in_time_ut, + n, min_value, max_value, + count, anomaly_count, flags); +#endif + return rrddim_collect_store_metric(collection_handle, point_in_time_ut, + n, min_value, max_value, + count, anomaly_count, flags); +} + +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); +void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle); +static inline void storage_engine_store_flush(STORAGE_COLLECT_HANDLE *collection_handle) { + if(unlikely(!collection_handle)) + return; + + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_store_metric_flush_current_page(collection_handle); + else +#endif + rrddim_store_metric_flush(collection_handle); +} + +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +// a finalization function to run after collection is over +// returns 1 if it's safe to delete the dimension +static inline int storage_engine_store_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_finalize(collection_handle); +#endif + + return rrddim_collect_finalize(collection_handle); +} + +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +static inline void storage_engine_store_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_store_metric_change_collection_frequency(collection_handle, update_every); + else +#endif + rrddim_store_metric_change_collection_frequency(collection_handle, update_every); +} - STORAGE_METRICS_GROUP *(*metrics_group_get)(STORAGE_INSTANCE *db_instance, uuid_t *uuid); - void (*metrics_group_release)(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *sa); -}; // ---------------------------------------------------------------------------- +// STORAGE ENGINE QUERY OPS + +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +time_t rrddim_query_oldest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle); +static inline time_t storage_engine_oldest_time_s(STORAGE_ENGINE_BACKEND backend, STORAGE_METRIC_HANDLE *db_metric_handle) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_metric_oldest_time(db_metric_handle); +#endif + return rrddim_query_oldest_time_s(db_metric_handle); +} -// function pointers that handle database queries -struct storage_engine_query_ops { - // run this before starting a series of next_metric() database queries - void (*init)(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority); +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle); +time_t rrddim_query_latest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle); +static inline time_t storage_engine_latest_time_s(STORAGE_ENGINE_BACKEND backend, STORAGE_METRIC_HANDLE *db_metric_handle) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - // run this to load each metric number from the database - STORAGE_POINT (*next_metric)(struct storage_engine_query_handle *handle); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_metric_latest_time(db_metric_handle); +#endif + return rrddim_query_latest_time_s(db_metric_handle); +} - // run this to test if the series of next_metric() database queries is finished - int (*is_finished)(struct storage_engine_query_handle *handle); +void rrdeng_load_metric_init( + STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle, + time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority); - // run this after finishing a series of load_metric() database queries - void (*finalize)(struct storage_engine_query_handle *handle); +void rrddim_query_init( + STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, + time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority); - // get the timestamp of the last entry of this metric - time_t (*latest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle); +static inline void storage_engine_query_init( + STORAGE_ENGINE_BACKEND backend, + STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, + time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - // get the timestamp of the first entry of this metric - time_t (*oldest_time_s)(STORAGE_METRIC_HANDLE *db_metric_handle); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_load_metric_init(db_metric_handle, handle, start_time_s, end_time_s, priority); + else +#endif + rrddim_query_init(db_metric_handle, handle, start_time_s, end_time_s, priority); +} - // adapt 'before' timestamp to the optimal for the query - // can only move 'before' ahead (to the future) - time_t (*align_to_optimal_before)(struct storage_engine_query_handle *handle); -}; +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle); +STORAGE_POINT rrddim_query_next_metric(struct storage_engine_query_handle *handle); +static inline STORAGE_POINT storage_engine_query_next_metric(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_load_metric_next(handle); +#endif + return rrddim_query_next_metric(handle); +} + +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle); +int rrddim_query_is_finished(struct storage_engine_query_handle *handle); +static inline int storage_engine_query_is_finished(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_load_metric_is_finished(handle); +#endif + return rrddim_query_is_finished(handle); +} + +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle); +void rrddim_query_finalize(struct storage_engine_query_handle *handle); +static inline void storage_engine_query_finalize(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_load_metric_finalize(handle); + else +#endif + rrddim_query_finalize(handle); +} -typedef struct storage_engine STORAGE_ENGINE; +time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle); +time_t rrddim_query_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle); +static inline time_t storage_engine_align_to_optimal_before(struct storage_engine_query_handle *handle) { + internal_fatal(!is_valid_backend(handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_load_align_to_optimal_before(handle); +#endif + return rrddim_query_align_to_optimal_before(handle); +} // ------------------------------------------------------------------------ // function pointers for all APIs provided by a storage engine @@ -472,17 +639,14 @@ typedef struct storage_engine_api { void (*metric_release)(STORAGE_METRIC_HANDLE *); STORAGE_METRIC_HANDLE *(*metric_dup)(STORAGE_METRIC_HANDLE *); bool (*metric_retention_by_uuid)(STORAGE_INSTANCE *db_instance, uuid_t *uuid, time_t *first_entry_s, time_t *last_entry_s); - - // operations - struct storage_engine_collect_ops collect_ops; - struct storage_engine_query_ops query_ops; } STORAGE_ENGINE_API; -struct storage_engine { +typedef struct storage_engine { + STORAGE_ENGINE_BACKEND backend; RRD_MEMORY_MODE id; const char* name; STORAGE_ENGINE_API api; -}; +} STORAGE_ENGINE; STORAGE_ENGINE* storage_engine_get(RRD_MEMORY_MODE mmode); STORAGE_ENGINE* storage_engine_find(const char* name); |