summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rw-r--r--database/engine/rrdengineapi.c72
1 files changed, 52 insertions, 20 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 90f00ded10..a87ce6d648 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -55,6 +55,8 @@ void rrdeng_store_metric_init(RRDDIM *rd)
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(&temp_id);
+ page_index->prev = pg_cache->metrics_index.last_page_index;
+ pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
rd->state->rrdeng_uuid = &page_index->id;
@@ -119,9 +121,9 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
handle->prev_descr = descr;
}
} else {
- free(descr->pg_cache_descr->page);
+ freez(descr->pg_cache_descr->page);
rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
- free(descr);
+ freez(descr);
}
handle->descr = NULL;
}
@@ -434,7 +436,13 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
return pg_cache_descr->page;
}
-void rrdeng_get_28_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+/*
+ * Gathers Database Engine statistics.
+ * Careful when modifying this function.
+ * You must not change the indices of the statistics or user code will break.
+ * You must not exceed RRDENG_NR_STATS or it will crash.
+ */
+void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -466,7 +474,12 @@ void rrdeng_get_28_statistics(struct rrdengine_instance *ctx, unsigned long long
array[25] = (uint64_t)ctx->stats.journalfile_creations;
array[26] = (uint64_t)ctx->stats.journalfile_deletions;
array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
- assert(RRDENG_NR_STATS == 28);
+ array[28] = (uint64_t)ctx->stats.io_errors;
+ array[29] = (uint64_t)ctx->stats.fs_errors;
+ array[30] = (uint64_t)global_io_errors;
+ array[31] = (uint64_t)global_fs_errors;
+ array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
+ assert(RRDENG_NR_STATS == 33);
}
/* Releases reference to page */
@@ -477,14 +490,29 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
}
/*
- * Returns 0 on success, 1 on error
+ * Returns 0 on success, negative on error
*/
int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
{
struct rrdengine_instance *ctx;
int error;
+ uint32_t max_open_files;
sanity_check();
+
+ max_open_files = rlimit_nofile.rlim_cur / 4;
+
+ /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
+ if (rrdeng_reserved_file_descriptors > max_open_files) {
+ error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
+ (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
+
+ rrd_stat_atomic_add(&global_fs_errors, 1);
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EMFILE;
+ }
+
if (NULL == ctxp) {
/* for testing */
ctx = &default_global_ctx;
@@ -492,10 +520,6 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
} else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
}
- if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) {
- return 1;
- }
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING;
ctx->global_compress_alg = RRD_LZ4;
if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
@@ -514,11 +538,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
init_commit_log(ctx);
error = init_rrd_files(ctx);
if (error) {
- ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED;
- if (ctx != &default_global_ctx) {
- freez(ctx);
- }
- return 1;
+ goto error_after_init_rrd_files;
}
init_completion(&ctx->rrdengine_completion);
@@ -526,9 +546,21 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
/* wait for worker thread to initialize */
wait_for_completion(&ctx->rrdengine_completion);
destroy_completion(&ctx->rrdengine_completion);
-
- ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED;
+ if (ctx->worker_config.error) {
+ goto error_after_rrdeng_worker;
+ }
return 0;
+
+error_after_rrdeng_worker:
+ finalize_rrd_files(ctx);
+error_after_init_rrd_files:
+ free_page_cache(ctx);
+ if (ctx != &default_global_ctx) {
+ freez(ctx);
+ *ctxp = NULL;
+ }
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
+ return UV_EIO;
}
/*
@@ -539,10 +571,6 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
struct rrdeng_cmd cmd;
if (NULL == ctx) {
- /* TODO: move to per host basis */
- ctx = &default_global_ctx;
- }
- if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) {
return 1;
}
@@ -552,8 +580,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
assert(0 == uv_thread_join(&ctx->worker_config.thread));
+ finalize_rrd_files(ctx);
+ free_page_cache(ctx);
+
if (ctx != &default_global_ctx) {
freez(ctx);
}
+ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
} \ No newline at end of file