diff options
author | Markos Fountoulakis <44345837+mfundul@users.noreply.github.com> | 2019-06-04 18:20:38 +0300 |
---|---|---|
committer | Chris Akritidis <43294513+cakrit@users.noreply.github.com> | 2019-06-04 17:20:38 +0200 |
commit | 118534bd8b6c900c6b72144aedc253deeff7f06b (patch) | |
tree | 1e548714ee08040bafc9a62f2e56ab69f8efe282 /database | |
parent | 74952493fbe90abfde1ffd9d302a46c64406690c (diff) |
Fill chart gaps efficiently. (#6216)
Diffstat (limited to 'database')
-rw-r--r-- | database/engine/rrdengineapi.c | 117 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 1 | ||||
-rw-r--r-- | database/rrdset.c | 21 |
3 files changed, 76 insertions, 63 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 6d703eeb2f..8e9e0d8bbb 100644 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -77,6 +77,53 @@ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) return has_only_empty_metrics; } +void rrdeng_store_metric_flush_current_page(RRDDIM *rd) +{ + struct rrdeng_collect_handle *handle; + struct rrdengine_instance *ctx; + struct rrdeng_page_descr *descr; + + handle = &rd->state->handle.rrdeng; + ctx = handle->ctx; + descr = handle->descr; + if (unlikely(NULL == descr)) { + return; + } + if (likely(descr->page_length)) { + int ret, page_is_empty; + +#ifdef NETDATA_INTERNAL_CHECKS + rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); +#endif + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); + } + page_is_empty = page_has_only_empty_metrics(descr); + if (page_is_empty) { + debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); + if(unlikely(debug_flags & D_RRDENGINE)) + print_page_cache_descr(descr); + pg_cache_put(ctx, descr); + pg_cache_punch_hole(ctx, descr, 1); + handle->prev_descr = NULL; + } else { + /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ + rrdeng_page_descr_mutex_lock(ctx, descr); + ret = pg_cache_try_get_unsafe(descr, 0); + rrdeng_page_descr_mutex_unlock(ctx, descr); + assert (1 == ret); + + rrdeng_commit_page(ctx, descr, handle->page_correlation_id); + } + handle->prev_descr = descr; + } else { + free(descr->pg_cache_descr->page); + rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); + free(descr); + } + handle->descr = NULL; +} void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) { @@ -91,45 +138,13 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n pg_cache = &ctx->pg_cache; descr = handle->descr; if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) { - if (descr) { - if (descr->page_length) { - int ret, page_is_empty; + rrdeng_store_metric_flush_current_page(rd); -#ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); -#endif - page_is_empty = page_has_only_empty_metrics(descr); - if (page_is_empty) { - debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); - pg_cache_put(ctx, descr); - pg_cache_punch_hole(ctx, descr, 1); - handle->descr = NULL; - } else { - /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */ - rrdeng_page_descr_mutex_lock(ctx, descr); - ret = pg_cache_try_get_unsafe(descr, 0); - rrdeng_page_descr_mutex_unlock(ctx, descr); - assert (1 == ret); - - rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - } - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(ctx, handle->prev_descr); - } - } else { - free(descr->pg_cache_descr->page); - rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); - free(descr); - handle->descr = NULL; - } - } page = rrdeng_create_page(ctx, &handle->page_index->id, &descr); assert(page); - handle->prev_descr = handle->descr; + handle->descr = descr; + uv_rwlock_wrlock(&pg_cache->commited_page_index.lock); handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++; uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock); @@ -158,37 +173,13 @@ void rrdeng_store_metric_finalize(RRDDIM *rd) { struct rrdeng_collect_handle *handle; struct rrdengine_instance *ctx; - struct rrdeng_page_descr *descr; handle = &rd->state->handle.rrdeng; ctx = handle->ctx; - descr = handle->descr; - if (descr) { - if (descr->page_length) { - int page_is_empty; - -#ifdef NETDATA_INTERNAL_CHECKS - rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1); -#endif - page_is_empty = page_has_only_empty_metrics(descr); - if (page_is_empty) { - debug(D_RRDENGINE, "Page has empty metrics only, deleting:"); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr); - pg_cache_put(ctx, descr); - pg_cache_punch_hole(ctx, descr, 1); - } else { - rrdeng_commit_page(ctx, descr, handle->page_correlation_id); - } - if (handle->prev_descr) { - /* unpin old second page */ - pg_cache_put(ctx, handle->prev_descr); - } - } else { - free(descr->pg_cache_descr->page); - rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr); - free(descr); - } + rrdeng_store_metric_flush_current_page(rd); + if (handle->prev_descr) { + /* unpin old second page */ + pg_cache_put(ctx, handle->prev_descr); } } diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 3351f8e457..895f998088 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -20,6 +20,7 @@ extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle); extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle); extern void rrdeng_store_metric_init(RRDDIM *rd); +extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd); extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); extern void rrdeng_store_metric_finalize(RRDDIM *rd); extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, diff --git a/database/rrdset.c b/database/rrdset.c index fed799f7e3..5c01337e87 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -258,6 +258,11 @@ void rrdset_reset(RRDSET *st) { rd->last_collected_time.tv_usec = 0; rd->collections_counter = 0; // memset(rd->values, 0, rd->entries * sizeof(storage_number)); +#ifdef ENABLE_DBENGINE + if (RRD_MEMORY_MODE_DBENGINE == st->rrd_memory_mode) { + rrdeng_store_metric_flush_current_page(rd); + } +#endif } } @@ -1273,6 +1278,22 @@ void rrdset_done(RRDSET *st) { first_entry = 1; } +#ifdef ENABLE_DBENGINE + // check if we will re-write the entire page + if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && + dt_usec(&st->last_collected_time, &st->last_updated) > (RRDENG_BLOCK_SIZE / sizeof(storage_number)) * update_every_ut)) { + info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec); + rrdset_reset(st); + rrdset_init_last_updated_time(st); + + st->usec_since_last_update = update_every_ut; + + // the first entry should not be stored + store_this_entry = 0; + first_entry = 1; + } +#endif + // these are the 3 variables that will help us in interpolation // last_stored_ut = the last time we added a value to the storage // now_collect_ut = the time the current value has been collected |