diff options
Diffstat (limited to 'database/engine/rrdengineapi.c')
-rw-r--r-- | database/engine/rrdengineapi.c | 73 |
1 files changed, 41 insertions, 32 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 4771c43b56..02b670c2ac 100644 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -65,7 +65,7 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; struct page_cache *pg_cache; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; storage_number *page; handle = &rd->state->handle.rrdeng; @@ -74,7 +74,6 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n descr = handle->descr; if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) { if (descr) { - descr->handle = NULL; if (descr->page_length) { int ret; @@ -82,33 +81,33 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); #endif /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ - uv_mutex_lock(&descr->mutex); + rrdeng_page_descr_mutex_lock(ctx, descr); ret = pg_cache_try_get_unsafe(descr, 0); - uv_mutex_unlock(&descr->mutex); + rrdeng_page_descr_mutex_unlock(ctx, descr); assert (1 == ret); rrdeng_commit_page(ctx, descr, handle->page_correlation_id); if (handle->prev_descr) { /* unpin old second page */ - pg_cache_put(handle->prev_descr); + pg_cache_put(ctx, handle->prev_descr); } handle->prev_descr = descr; } else { - free(descr->page); + free(descr->pg_cache_descr->page); + rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); free(descr); handle->descr = NULL; } } - page = rrdeng_create_page(&handle->page_index->id, &descr); + page = rrdeng_create_page(ctx, &handle->page_index->id, &descr); assert(page); handle->prev_descr = handle->descr; handle->descr = descr; - descr->handle = handle; uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); } - page = descr->page; + page = descr->pg_cache_descr->page; page[descr->page_length / sizeof(number)] = number; descr->end_time = point_in_time; @@ -132,13 +131,12 @@ void rrdeng_store_metric_finalize(RRDDIM *rd) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; descr = handle->descr; if (descr) { - descr->handle = NULL; if (descr->page_length) { #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); @@ -146,10 +144,11 @@ void rrdeng_store_metric_finalize(RRDDIM *rd) rrdeng_commit_page(ctx, descr, handle->page_correlation_id); if (handle->prev_descr) { /* unpin old second page */ - pg_cache_put(handle->prev_descr); + pg_cache_put(ctx, handle->prev_descr); } } else { - free(descr->page); + free(descr->pg_cache_descr->page); + rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); free(descr); } } @@ -180,7 +179,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; storage_number *page, ret; unsigned position; usec_t point_in_time; @@ -204,7 +203,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(descr); + pg_cache_put(ctx, descr); handle->descr = NULL; } descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time); @@ -222,7 +221,7 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle ret = SN_EMPTY_SLOT; goto out; } - page = descr->page; + page = descr->pg_cache_descr->page; if (unlikely(descr->start_time == descr->end_time)) { ret = page[0]; goto out; @@ -254,7 +253,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) { struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; handle = &rrdimm_handle->rrdeng; ctx = handle->ctx; @@ -263,7 +262,7 @@ void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle) #ifdef NETDATA_INTERNAL_CHECKS rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1); #endif - pg_cache_put(descr); + pg_cache_put(ctx, descr); } } @@ -289,28 +288,32 @@ time_t rrdeng_metric_oldest_time(RRDDIM *rd) } /* Also gets a reference for the page */ -void *rrdeng_create_page(uuid_t *id, struct rrdeng_page_cache_descr **ret_descr) +void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; void *page; /* TODO: check maximum number of pages in page cache limit */ - page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ descr = pg_cache_create_descr(); - descr->page = page; descr->id = id; /* TODO: add page type: metric, log, something? */ - descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; - descr->refcnt = 1; + page = mallocz(RRDENG_BLOCK_SIZE); /*TODO: add page size */ + rrdeng_page_descr_mutex_lock(ctx, descr); + pg_cache_descr = descr->pg_cache_descr; + pg_cache_descr->page = page; + pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */; + pg_cache_descr->refcnt = 1; debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------"); if(unlikely(debug_flags & D_RRDENGINE)) print_page_cache_descr(descr); + rrdeng_page_descr_mutex_unlock(ctx, descr); *ret_descr = descr; return page; } /* The page must not be empty */ -void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr, +void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, Word_t page_correlation_id) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -328,13 +331,14 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_cache ++pg_cache->commited_page_index.nr_commited_pages; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); - pg_cache_put(descr); + pg_cache_put(ctx, descr); } /* Gets a reference for the page */ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME); @@ -344,14 +348,16 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void ** return NULL; } *handle = descr; + pg_cache_descr = descr->pg_cache_descr; - return descr->page; + return pg_cache_descr->page; } /* Gets a reference for the page */ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle) { - struct rrdeng_page_cache_descr *descr; + struct rrdeng_page_descr *descr; + struct page_cache_descr *pg_cache_descr; debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------"); descr = pg_cache_lookup(ctx, NULL, id, point_in_time); @@ -361,11 +367,12 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i return NULL; } *handle = descr; + pg_cache_descr = descr->pg_cache_descr; - return descr->page; + return pg_cache_descr->page; } -void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long *array) +void rrdeng_get_28_statistics(struct rrdengine_instance *ctx, unsigned long long *array) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -396,13 +403,15 @@ void rrdeng_get_27_statistics(struct rrdengine_instance *ctx, unsigned long long array[24] = (uint64_t)ctx->stats.datafile_deletions; array[25] = (uint64_t)ctx->stats.journalfile_creations; array[26] = (uint64_t)ctx->stats.journalfile_deletions; + array[27] = (uint64_t)ctx->stats.page_cache_descriptors; + assert(RRDENG_NR_STATS == 28); } /* Releases reference to page */ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle) { (void)ctx; - pg_cache_put((struct rrdeng_page_cache_descr *)handle); + pg_cache_put(ctx, (struct rrdeng_page_descr *)handle); } /* |