summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2019-05-30 15:27:45 +0300
committerChris Akritidis <43294513+cakrit@users.noreply.github.com>2019-05-30 14:27:45 +0200
commit2f72bfafba57cd30a7b5bcf1b5ccc3281d713759 (patch)
tree7f4d57c8d6d76baf7b5cca5e6a6f3657fef13ee9
parentcaf7b1919496c3e8806546dbfcf940668c901955 (diff)
Add empty page detection in DB engine (#6173)
-rw-r--r--database/engine/pagecache.c27
-rw-r--r--database/engine/pagecache.h2
-rw-r--r--database/engine/rrdengine.c2
-rw-r--r--database/engine/rrdengineapi.c64
-rw-r--r--database/engine/rrdenginelib.c3
5 files changed, 69 insertions, 29 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index beb5d4e9bc..d512a34081 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -221,7 +221,7 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
- debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
+ debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
number);
while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
@@ -263,7 +263,7 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
debug(D_RRDENGINE,
- "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
+ "==Page cache full. Trying to reserve %u pages.==",
number);
do {
if (!pg_cache_try_evict_one_page_unsafe(ctx))
@@ -337,10 +337,7 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
return 0;
}
-/*
- * TODO: last waiter frees descriptor ?
- */
-void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = NULL;
@@ -382,12 +379,14 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
print_page_cache_descr(descr);
pg_cache_wait_event_unsafe(descr);
}
- /* even a locked page could be dirty */
- while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
- debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
+ if (!remove_dirty) {
+ /* even a locked page could be dirty */
+ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
+ debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
}
if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
/* only after locking can it be safely deleted from LRU */
@@ -403,7 +402,9 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
}
destroy:
- assert(0 == descr->pg_cache_descr_state);
+ if (!remove_dirty) {
+ assert(0 == descr->pg_cache_descr_state);
+ }
freez(descr);
pg_cache_update_metric_times(page_index);
}
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 17cdd661f7..0447d14e69 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -145,7 +145,7 @@ extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
struct rrdeng_page_descr *descr);
-extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
+extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty);
extern struct pg_cache_page_index *
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time);
extern struct rrdeng_page_descr *
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index bde9920115..43922caaf5 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -482,7 +482,7 @@ static void delete_old_data(uv_work_t *req)
count = extent->number_of_pages;
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
- pg_cache_punch_hole(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 0);
}
next = extent->next;
free(extent);
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 02b670c2ac..6d703eeb2f 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -60,6 +60,24 @@ void rrdeng_store_metric_init(RRDDIM *rd)
handle->page_index = page_index;
}
+/* The page must be populated and referenced */
+static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
+{
+ unsigned i;
+ uint8_t has_only_empty_metrics = 1;
+ storage_number *page;
+
+ page = descr->pg_cache_descr->page;
+ for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) {
+ if (SN_EMPTY_SLOT != page[i]) {
+ has_only_empty_metrics = 0;
+ break;
+ }
+ }
+ return has_only_empty_metrics;
+}
+
+
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
struct rrdeng_collect_handle *handle;
@@ -75,23 +93,32 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
if (descr) {
if (descr->page_length) {
- int ret;
+ int ret, page_is_empty;
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
#endif
- /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
- rrdeng_page_descr_mutex_lock(ctx, descr);
- ret = pg_cache_try_get_unsafe(descr, 0);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- assert (1 == ret);
-
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1);
+ handle->descr = NULL;
+ } else {
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ ret = pg_cache_try_get_unsafe(descr, 0);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ assert (1 == ret);
+
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ }
if (handle->prev_descr) {
/* unpin old second page */
pg_cache_put(ctx, handle->prev_descr);
}
- handle->prev_descr = descr;
} else {
free(descr->pg_cache_descr->page);
rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
@@ -138,10 +165,21 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
descr = handle->descr;
if (descr) {
if (descr->page_length) {
+ int page_is_empty;
+
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
#endif
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1);
+ } else {
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ }
if (handle->prev_descr) {
/* unpin old second page */
pg_cache_put(ctx, handle->prev_descr);
@@ -304,7 +342,7 @@ void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrde
pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
pg_cache_descr->refcnt = 1;
- debug(D_RRDENGINE, "-----------------\nCreated new page:\n-----------------");
+ debug(D_RRDENGINE, "Created new page:");
if(unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
rrdeng_page_descr_mutex_unlock(ctx, descr);
@@ -340,7 +378,7 @@ void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
if (NULL == descr) {
*handle = NULL;
@@ -359,7 +397,7 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
- debug(D_RRDENGINE, "----------------------\nReading existing page:\n----------------------");
+ debug(D_RRDENGINE, "Reading existing page:");
descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
if (NULL == descr) {
*handle = NULL;
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 776e56d1b2..176a01dcc1 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -23,8 +23,9 @@ void print_page_cache_descr(struct rrdeng_page_descr *descr)
} else {
pos += snprintfz(str + pos, BUFSIZE - pos, "%"PRIu64, descr->extent->offset);
}
+
snprintfz(str + pos, BUFSIZE - pos, " flags:0x%2.2lX refcnt:%u\n\n", pg_cache_descr->flags, pg_cache_descr->refcnt);
- fputs(str, stderr);
+ debug(D_RRDENGINE, "%s", str);
}
void print_page_descr(struct rrdeng_page_descr *descr)