summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengineapi.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rw-r--r--database/engine/rrdengineapi.c73
1 files changed, 41 insertions, 32 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 4771c43b56..02b670c2ac 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -65,7 +65,7 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page;
handle = &rd->state->handle.rrdeng;
@@ -74,7 +74,6 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
descr = handle->descr;
if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
if (descr) {
- descr->handle = NULL;
if (descr->page_length) {
int ret;
@@ -82,33 +81,33 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
#endif
/* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
- uv_mutex_lock(&descr->mutex);
+ rrdeng_page_descr_mutex_lock(ctx, descr);
ret = pg_cache_try_get_unsafe(descr, 0);
- uv_mutex_unlock(&descr->mutex);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
assert (1 == ret);
rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
if (handle->prev_descr) {
/* unpin old second page */
- pg_cache_put(handle->prev_descr);
+ pg_cache_put(ctx, handle->prev_descr);
}
handle->prev_descr = descr;
} else {
- free(descr->page);
+ free(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
free(descr);
handle->descr = NULL;
}
}
- page = rrdeng_create_page(&handle->page_index->id, &descr);
+ page = rrdeng_create_page(ctx, &handle->page_index->id, &descr);
assert(page);
handle->prev_descr = handle->descr;
handle->descr = descr;
- descr->handle = handle;
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
}
- page = descr->page;
+ page = descr->pg_cache_descr->page;
page[descr->page_length / sizeof(number)] = number;
descr->end_time = point_in_time;
@@ -132,13 +131,12 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
descr = handle->descr;
if (descr) {
- descr->handle = NULL;
if (descr->page_length) {
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
@@ -146,10 +144,11 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
if (handle->prev_descr) {
/* unpin old second page */
- pg_cache_put(handle->prev_descr);
+ pg_cache_put(ctx, handle->prev_descr);
}
} else {
- free(descr->page);
+ free(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
free(descr);
}
}
@@ -180,7 +179,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position;
usec_t point_in_time;
@@ -204,7 +203,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
handle->descr = NULL;
}
descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
@@ -222,7 +221,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
ret = SN_EMPTY_SLOT;
goto out;
}
- page = descr->page;
+ page = descr->pg_cache_descr->page;
if (unlikely(descr->start_time == descr->end_time)) {
ret = page[0];
goto out;
@@ -254,7 +253,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
handle = &rrdimm_handle->rrdeng;
ctx = handle->ctx;
@@ -263,7 +262,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
}
@@ -289,28 +288,32 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd)
}
/* Also gets a reference for the page */
-void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr)
+void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
void *page;
/* TODO: check maximum number of pages in page cache limit */
- page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
descr = pg_cache_create_descr();
- descr->page = page;
descr->id = id; /* TODO: add page type: metric, log, something? */
- descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
- descr->refcnt = 1;
+ page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ pg_cache_descr->page = page;
+ pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
+ pg_cache_descr->refcnt = 1;
debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------");
if(unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
*ret_descr = descr;
return page;
}
/* The page must not be empty */
-void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr,
+void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
Word_t page_correlation_id)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -328,13 +331,14 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache
++pg_cache->commited_page_index.nr_commited_pages;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
- pg_cache_put(descr);
+ pg_cache_put(ctx, descr);
}
/* Gets a reference for the page */
void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
@@ -344,14 +348,16 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
/* Gets a reference for the page */
void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
{
- struct rrdeng_page_cache_descr *descr;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
@@ -361,11 +367,12 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
return NULL;
}
*handle = descr;
+ pg_cache_descr = descr->pg_cache_descr;
- return descr->page;
+ return pg_cache_descr->page;
}
-void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
+void rrdeng_get_28_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -396,13 +403,15 @@ void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long
array[24] = (uint64_t)ctx->stats.datafile_deletions;
array[25] = (uint64_t)ctx->stats.journalfile_creations;
array[26] = (uint64_t)ctx->stats.journalfile_deletions;
+ array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
+ assert(RRDENG_NR_STATS == 28);
}
/* Releases reference to page */
void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
{
(void)ctx;
- pg_cache_put((struct rrdeng_page_cache_descr *)handle);
+ pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
}
/*