summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2019-06-04 18:20:38 +0300
committerChris Akritidis <43294513+cakrit@users.noreply.github.com>2019-06-04 17:20:38 +0200
commit118534bd8b6c900c6b72144aedc253deeff7f06b (patch)
tree1e548714ee08040bafc9a62f2e56ab69f8efe282 /database
parent74952493fbe90abfde1ffd9d302a46c64406690c (diff)
Fill chart gaps efficiently. (#6216)
Diffstat (limited to 'database')
-rw-r--r--database/engine/rrdengineapi.c117
-rw-r--r--database/engine/rrdengineapi.h1
-rw-r--r--database/rrdset.c21
3 files changed, 76 insertions, 63 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 6d703eeb2f..8e9e0d8bbb 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -77,6 +77,53 @@ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
return has_only_empty_metrics;
}
+void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
+{
+ struct rrdeng_collect_handle *handle;
+ struct rrdengine_instance *ctx;
+ struct rrdeng_page_descr *descr;
+
+ handle = &rd->state->handle.rrdeng;
+ ctx = handle->ctx;
+ descr = handle->descr;
+ if (unlikely(NULL == descr)) {
+ return;
+ }
+ if (likely(descr->page_length)) {
+ int ret, page_is_empty;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
+#endif
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
+ }
+ page_is_empty = page_has_only_empty_metrics(descr);
+ if (page_is_empty) {
+ debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
+ if(unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_put(ctx, descr);
+ pg_cache_punch_hole(ctx, descr, 1);
+ handle->prev_descr = NULL;
+ } else {
+ /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ ret = pg_cache_try_get_unsafe(descr, 0);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ assert (1 == ret);
+
+ rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
+ }
+ handle->prev_descr = descr;
+ } else {
+ free(descr->pg_cache_descr->page);
+ rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
+ free(descr);
+ }
+ handle->descr = NULL;
+}
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
@@ -91,45 +138,13 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
pg_cache = &ctx->pg_cache;
descr = handle->descr;
if (unlikely(NULL == descr || descr->page_length + sizeof(number) > RRDENG_BLOCK_SIZE)) {
- if (descr) {
- if (descr->page_length) {
- int ret, page_is_empty;
+ rrdeng_store_metric_flush_current_page(rd);
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- page_is_empty = page_has_only_empty_metrics(descr);
- if (page_is_empty) {
- debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_put(ctx, descr);
- pg_cache_punch_hole(ctx, descr, 1);
- handle->descr = NULL;
- } else {
- /* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
- rrdeng_page_descr_mutex_lock(ctx, descr);
- ret = pg_cache_try_get_unsafe(descr, 0);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- assert (1 == ret);
-
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- }
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(ctx, handle->prev_descr);
- }
- } else {
- free(descr->pg_cache_descr->page);
- rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
- free(descr);
- handle->descr = NULL;
- }
- }
page = rrdeng_create_page(ctx, &handle->page_index->id, &descr);
assert(page);
- handle->prev_descr = handle->descr;
+
handle->descr = descr;
+
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
@@ -158,37 +173,13 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
- struct rrdeng_page_descr *descr;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
- descr = handle->descr;
- if (descr) {
- if (descr->page_length) {
- int page_is_empty;
-
-#ifdef NETDATA_INTERNAL_CHECKS
- rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
-#endif
- page_is_empty = page_has_only_empty_metrics(descr);
- if (page_is_empty) {
- debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_put(ctx, descr);
- pg_cache_punch_hole(ctx, descr, 1);
- } else {
- rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
- }
- if (handle->prev_descr) {
- /* unpin old second page */
- pg_cache_put(ctx, handle->prev_descr);
- }
- } else {
- free(descr->pg_cache_descr->page);
- rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
- free(descr);
- }
+ rrdeng_store_metric_flush_current_page(rd);
+ if (handle->prev_descr) {
+ /* unpin old second page */
+ pg_cache_put(ctx, handle->prev_descr);
}
}
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 3351f8e457..895f998088 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -20,6 +20,7 @@ extern void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id,
extern void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle);
extern void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle);
extern void rrdeng_store_metric_init(RRDDIM *rd);
+extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd);
extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
extern void rrdeng_store_metric_finalize(RRDDIM *rd);
extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
diff --git a/database/rrdset.c b/database/rrdset.c
index fed799f7e3..5c01337e87 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -258,6 +258,11 @@ void rrdset_reset(RRDSET *st) {
rd->last_collected_time.tv_usec = 0;
rd->collections_counter = 0;
// memset(rd->values, 0, rd->entries * sizeof(storage_number));
+#ifdef ENABLE_DBENGINE
+ if (RRD_MEMORY_MODE_DBENGINE == st->rrd_memory_mode) {
+ rrdeng_store_metric_flush_current_page(rd);
+ }
+#endif
}
}
@@ -1273,6 +1278,22 @@ void rrdset_done(RRDSET *st) {
first_entry = 1;
}
+#ifdef ENABLE_DBENGINE
+ // check if we will re-write the entire page
+ if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE &&
+ dt_usec(&st->last_collected_time, &st->last_updated) > (RRDENG_BLOCK_SIZE / sizeof(storage_number)) * update_every_ut)) {
+ info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
+ rrdset_reset(st);
+ rrdset_init_last_updated_time(st);
+
+ st->usec_since_last_update = update_every_ut;
+
+ // the first entry should not be stored
+ store_this_entry = 0;
+ first_entry = 1;
+ }
+#endif
+
// these are the 3 variables that will help us in interpolation
// last_stored_ut = the last time we added a value to the storage
// now_collect_ut = the time the current value has been collected