summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2020-02-06 21:58:13 +0200
committerGitHub <noreply@github.com>2020-02-06 21:58:13 +0200
commit6b119d9170fce726e9a5720edc83f6d9ac88e7ce (patch)
tree90dc7094ba92af299f5e7c0532e519f706b47d92
parentb2b3c182548fe81e6d1c9a599b2571dabfdabcaa (diff)
Drop dirty dbengine pages if disk cannot keep up (#7777)
* Introduce dirty page pressure handling in the dbengine page cache that invalidates pages when the disk cannot keep up with the flushing speed.
-rw-r--r--configs.signatures2
-rw-r--r--daemon/global_statistics.c86
-rw-r--r--daemon/unit_test.c3
-rw-r--r--database/engine/pagecache.c31
-rw-r--r--database/engine/pagecache.h4
-rw-r--r--database/engine/rrdengine.c257
-rw-r--r--database/engine/rrdengine.h21
-rwxr-xr-xdatabase/engine/rrdengineapi.c44
-rw-r--r--database/engine/rrdengineapi.h5
-rw-r--r--database/engine/rrdenginelib.c12
-rw-r--r--health/health.d/dbengine.conf26
11 files changed, 375 insertions, 116 deletions
diff --git a/configs.signatures b/configs.signatures
index ef4b99aa60..994b62a80e 100644
--- a/configs.signatures
+++ b/configs.signatures
@@ -381,7 +381,7 @@ declare -A configs_signatures=(
['7deb236ec68a512b9bdd18e6a51d76f7']='python.d/mysql.conf'
['7e5fc1644aa7a54f9dbb1bd102521b09']='health.d/memcached.conf'
['7f13631183fbdf79c21c8e5a171e9b34']='health.d/zfs.conf'
- ['8edc8c73a8f3ca40b32e27fe452c70f3']='health.d/dbengine.conf'
+ ['82f1dc0a477a175ae31d7b815411e44e']='health.d/dbengine.conf'
['7fb8184d56a27040e73261ed9c6fc76f']='health_alarm_notify.conf'
['80266bddd3df374923c750a6de91d120']='health.d/apache.conf'
['803a7f9dcb942eeac0fd764b9e3e38ca']='fping.conf'
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index 5197dcc100..e5aeb6e128 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -544,7 +544,7 @@ void global_statistics_charts(void) {
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
++hosts_with_dbengine;
/* get localhost's DB engine's statistics */
- rrdeng_get_35_statistics(host->rrdeng_ctx, local_stats_array);
+ rrdeng_get_37_statistics(host->rrdeng_ctx, local_stats_array);
for (i = 0 ; i < RRDENG_NR_STATS ; ++i) {
/* aggregate statistics across hosts */
stats_array[i] += local_stats_array[i];
@@ -558,6 +558,8 @@ void global_statistics_charts(void) {
stats_array[30] = local_stats_array[30];
stats_array[31] = local_stats_array[31];
stats_array[32] = local_stats_array[32];
+ stats_array[34] = local_stats_array[34];
+ stats_array[36] = local_stats_array[36];
// ----------------------------------------------------------------
@@ -642,7 +644,6 @@ void global_statistics_charts(void) {
old_misses = misses;
if (hits_delta + misses_delta) {
- // allow negative savings
ratio = (hits_delta * 100 * 1000) / (hits_delta + misses_delta);
} else {
ratio = 0;
@@ -658,11 +659,10 @@ void global_statistics_charts(void) {
static RRDSET *st_pg_cache_pages = NULL;
static RRDDIM *rd_descriptors = NULL;
static RRDDIM *rd_populated = NULL;
- static RRDDIM *rd_committed = NULL;
- static RRDDIM *rd_insertions = NULL;
- static RRDDIM *rd_deletions = NULL;
+ static RRDDIM *rd_dirty = NULL;
static RRDDIM *rd_backfills = NULL;
static RRDDIM *rd_evictions = NULL;
+ static RRDDIM *rd_used_by_collectors = NULL;
if (unlikely(!st_pg_cache_pages)) {
st_pg_cache_pages = rrdset_create_localhost(
@@ -671,7 +671,7 @@ void global_statistics_charts(void) {
, NULL
, "dbengine"
, NULL
- , "NetData DB engine page statistics"
+ , "NetData dbengine page cache statistics"
, "pages"
, "netdata"
, "stats"
@@ -682,28 +682,69 @@ void global_statistics_charts(void) {
rd_descriptors = rrddim_add(st_pg_cache_pages, "descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_populated = rrddim_add(st_pg_cache_pages, "populated", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- rd_committed = rrddim_add(st_pg_cache_pages, "committed", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- rd_insertions = rrddim_add(st_pg_cache_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
- rd_deletions = rrddim_add(st_pg_cache_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_dirty = rrddim_add(st_pg_cache_pages, "dirty", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_backfills = rrddim_add(st_pg_cache_pages, "backfills", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_evictions = rrddim_add(st_pg_cache_pages, "evictions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_used_by_collectors = rrddim_add(st_pg_cache_pages, "used_by_collectors", NULL, 1, 1,
+ RRD_ALGORITHM_ABSOLUTE);
}
else
rrdset_next(st_pg_cache_pages);
rrddim_set_by_pointer(st_pg_cache_pages, rd_descriptors, (collected_number)stats_array[27]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_populated, (collected_number)stats_array[3]);
- rrddim_set_by_pointer(st_pg_cache_pages, rd_committed, (collected_number)stats_array[4]);
- rrddim_set_by_pointer(st_pg_cache_pages, rd_insertions, (collected_number)stats_array[5]);
- rrddim_set_by_pointer(st_pg_cache_pages, rd_deletions, (collected_number)stats_array[6]);
+ rrddim_set_by_pointer(st_pg_cache_pages, rd_dirty, (collected_number)stats_array[0] + stats_array[4]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_backfills, (collected_number)stats_array[9]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_evictions, (collected_number)stats_array[10]);
+ rrddim_set_by_pointer(st_pg_cache_pages, rd_used_by_collectors, (collected_number)stats_array[0]);
rrdset_done(st_pg_cache_pages);
}
// ----------------------------------------------------------------
{
+ static RRDSET *st_long_term_pages = NULL;
+ static RRDDIM *rd_total = NULL;
+ static RRDDIM *rd_insertions = NULL;
+ static RRDDIM *rd_deletions = NULL;
+ static RRDDIM *rd_flushing_pressure_deletions = NULL;
+
+ if (unlikely(!st_long_term_pages)) {
+ st_long_term_pages = rrdset_create_localhost(
+ "netdata"
+ , "dbengine_long_term_page_stats"
+ , NULL
+ , "dbengine"
+ , NULL
+ , "NetData dbengine long-term page statistics"
+ , "pages"
+ , "netdata"
+ , "stats"
+ , 130505
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+
+ rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_flushing_pressure_deletions = rrddim_add(st_long_term_pages, "flushing_pressure_deletions", NULL, -1,
+ 1, RRD_ALGORITHM_INCREMENTAL);
+ }
+ else
+ rrdset_next(st_long_term_pages);
+
+ rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]);
+ rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]);
+ rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]);
+ rrddim_set_by_pointer(st_long_term_pages, rd_flushing_pressure_deletions,
+ (collected_number)stats_array[36]);
+ rrdset_done(st_long_term_pages);
+ }
+
+ // ----------------------------------------------------------------
+
+ {
static RRDSET *st_io_stats = NULL;
static RRDDIM *rd_reads = NULL;
static RRDDIM *rd_writes = NULL;
@@ -719,7 +760,7 @@ void global_statistics_charts(void) {
, "MiB/s"
, "netdata"
, "stats"
- , 130505
+ , 130506
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@@ -753,7 +794,7 @@ void global_statistics_charts(void) {
, "operations/s"
, "netdata"
, "stats"
- , 130506
+ , 130507
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@@ -775,7 +816,7 @@ void global_statistics_charts(void) {
static RRDSET *st_errors = NULL;
static RRDDIM *rd_fs_errors = NULL;
static RRDDIM *rd_io_errors = NULL;
- static RRDDIM *rd_flushing_errors = NULL;
+ static RRDDIM *pg_cache_over_half_dirty_events = NULL;
if (unlikely(!st_errors)) {
st_errors = rrdset_create_localhost(
@@ -788,21 +829,22 @@ void global_statistics_charts(void) {
, "errors/s"
, "netdata"
, "stats"
- , 130507
+ , 130508
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
- rd_io_errors = rrddim_add(st_errors, "I/O errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
- rd_fs_errors = rrddim_add(st_errors, "FS errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
- rd_flushing_errors = rrddim_add(st_errors, "flushing errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_io_errors = rrddim_add(st_errors, "io_errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_fs_errors = rrddim_add(st_errors, "fs_errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ pg_cache_over_half_dirty_events = rrddim_add(st_errors, "pg_cache_over_half_dirty_events", NULL, 1, 1,
+ RRD_ALGORITHM_INCREMENTAL);
}
else
rrdset_next(st_errors);
rrddim_set_by_pointer(st_errors, rd_io_errors, (collected_number)stats_array[30]);
rrddim_set_by_pointer(st_errors, rd_fs_errors, (collected_number)stats_array[31]);
- rrddim_set_by_pointer(st_errors, rd_flushing_errors, (collected_number)stats_array[34]);
+ rrddim_set_by_pointer(st_errors, pg_cache_over_half_dirty_events, (collected_number)stats_array[34]);
rrdset_done(st_errors);
}
@@ -824,7 +866,7 @@ void global_statistics_charts(void) {
, "descriptors"
, "netdata"
, "stats"
- , 130508
+ , 130509
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@@ -863,7 +905,7 @@ void global_statistics_charts(void) {
, "MiB"
, "netdata"
, "stats"
- , 130509
+ , 130510
, localhost->rrd_update_every
, RRDSET_TYPE_STACKED
);
diff --git a/daemon/unit_test.c b/daemon/unit_test.c
index 87d281d766..323ae285a3 100644
--- a/daemon/unit_test.c
+++ b/daemon/unit_test.c
@@ -1491,6 +1491,9 @@ static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number
static RRDHOST *dbengine_rrdhost_find_or_create(char *name)
{
+ /* We don't want to drop metrics when generating load, we prefer to block data generation itself */
+ rrdeng_drop_metrics_under_page_cache_pressure = 0;
+
return rrdhost_find_or_create(
name
, name
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 60fd7a5f89..e90ac13fe4 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -217,7 +217,6 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb
/*
* This function returns the maximum number of pages allowed in the page cache.
- * The caller must hold the page cache lock.
*/
unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
{
@@ -228,7 +227,6 @@ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
/*
* This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
* number of pages below that number.
- * The caller must hold the page cache lock.
*/
unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
{
@@ -237,6 +235,16 @@ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
}
/*
+ * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
+ * cache.
+ */
+unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
+{
+ /* We remove the active pages of the producers from the calculation and only allow 50% of the extra pinned pages */
+ return ctx->cache_pages_low_watermark + (unsigned long)ctx->stats.metric_API_producers / 2;
+}
+
+/*
* This function will block until it reserves #number populated pages.
* It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
*/
@@ -375,7 +383,11 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
return 0;
}
-void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
+/*
+ * Callers of this function need to make sure they're not deleting the same descriptor concurrently
+ */
+void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
+ uint8_t is_exclusive_holder)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = NULL;
@@ -408,11 +420,14 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
- while (!pg_cache_try_get_unsafe(descr, 1)) {
- debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
+ if (!is_exclusive_holder) {
+ /* If we don't hold an exclusive page reference get one */
+ while (!pg_cache_try_get_unsafe(descr, 1)) {
+ debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
+ if (unlikely(debug_flags & D_RRDENGINE))
+ print_page_cache_descr(descr);
+ pg_cache_wait_event_unsafe(descr);
+ }
}
if (remove_dirty) {
pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 9fd9991b5e..3722a7e1cc 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -163,7 +163,8 @@ extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
struct rrdeng_page_descr *descr);
-extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty);
+extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
+ uint8_t is_exclusive_holder);
extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
usec_t start_time, usec_t end_time);
extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
@@ -185,6 +186,7 @@ extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index,
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
+extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx);
static inline void
pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 3d49379048..586f2e0b48 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -6,7 +6,8 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
-rrdeng_stats_t global_flushing_errors = 0;
+rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
+rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
static void sanity_check(void)
{
@@ -248,6 +249,109 @@ static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t ty
}
}
+static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ int error;
+
+ error = uv_thread_join(wc->now_invalidating_dirty_pages);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+}
+
+static void invalidate_oldest_committed(void *arg)
+{
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config *wc = &ctx->worker_config;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ int ret;
+ struct rrdeng_page_descr *descr;
+ struct page_cache_descr *pg_cache_descr;
+ Pvoid_t *PValue;
+ Word_t Index;
+ unsigned nr_committed_pages;
+
+ do {
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ for (Index = 0,
+ PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue;
+
+ descr != NULL;
+
+ PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
+ descr = unlikely(NULL == PValue) ? NULL : *PValue) {
+ assert(0 != descr->page_length);
+
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_descr = descr->pg_cache_descr;
+ if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) {
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
+ assert(1 == ret);
+ break;
+ }
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ }
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+
+ if (!descr) {
+ info("Failed to invalidate any dirty pages to relieve page cache pressure.");
+
+ goto out;
+ }
+ pg_cache_punch_hole(ctx, descr, 1, 1);
+
+ uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1);
+ rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1);
+
+ } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx));
+out:
+ wc->cleanup_thread_invalidating_dirty_pages = 1;
+ /* wake up event loop */
+ assert(0 == uv_async_send(&wc->async));
+}
+
+void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
+{
+ struct rrdengine_instance *ctx = wc->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ unsigned nr_committed_pages;
+ int error;
+
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
+ nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
+
+ if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
+ /* delete the oldest page in memory */
+ if (wc->now_invalidating_dirty_pages) {
+ /* already deleting a page */
+ return;
+ }
+ errno = 0;
+ error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
+ "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path);
+
+ wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages));
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+
+ error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_invalidating_dirty_pages);
+ wc->now_invalidating_dirty_pages = NULL;
+ }
+ }
+}
+
void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@@ -294,6 +398,7 @@ void flush_pages_cb(uv_fs_t* req)
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
pg_cache->committed_page_index.nr_committed_pages -= count;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ wc->inflight_dirty_pages -= count;
}
/*
@@ -366,6 +471,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
complete(completion);
return 0;
}
+ wc->inflight_dirty_pages += count;
+
xt_io_descr = mallocz(sizeof(*xt_io_descr));
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
switch (compression_algorithm) {
@@ -466,17 +573,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
return ALIGN_BYTES_CEILING(size_bytes);
}
-static void after_delete_old_data(uv_work_t *req, int status)
+static void after_delete_old_data(struct rrdengine_worker_config* wc)
{
- struct rrdengine_instance *ctx = req->data;
- struct rrdengine_worker_config* wc = &ctx->worker_config;
+ struct rrdengine_instance *ctx = wc->ctx;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
- int ret;
+ int ret, error;
char path[RRDENG_PATH_MAX];
- (void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
datafile_bytes = datafile->pos;
@@ -503,15 +608,24 @@ static void after_delete_old_data(uv_work_t *req, int status)
ctx->disk_space -= deleted_bytes;
info("Reclaimed %u bytes of disk space.", deleted_bytes);
+ error = uv_thread_join(wc->now_deleting_files);
+ if (error) {
+ error("uv_thread_join(): %s", uv_strerror(error));
+ }
+ freez(wc->now_deleting_files);
/* unfreeze command processing */
- wc->now_deleting.data = NULL;
- /* wake up event loop */
- assert(0 == uv_async_send(&wc->async));
+ wc->now_deleting_files = NULL;
+
+ wc->cleanup_thread_deleting_files = 0;
+
+ /* interrupt event loop */
+ uv_stop(wc->loop);
}
-static void delete_old_data(uv_work_t *req)
+static void delete_old_data(void *arg)
{
- struct rrdengine_instance *ctx = req->data;
+ struct rrdengine_instance *ctx = arg;
+ struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct extent_info *extent, *next;
struct rrdeng_page_descr *descr;
@@ -524,11 +638,14 @@ static void delete_old_data(uv_work_t *req)
count = extent->number_of_pages;
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
- pg_cache_punch_hole(ctx, descr, 0);
+ pg_cache_punch_hole(ctx, descr, 0, 0);
}
next = extent->next;
freez(extent);
}
+ wc->cleanup_thread_deleting_files = 1;
+ /* wake up event loop */
+ assert(0 == uv_async_send(&wc->async));
}
void rrdeng_test_quota(struct rrdengine_worker_config* wc)
@@ -537,7 +654,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
- int ret;
+ int ret, error;
out_of_space = 0;
if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
@@ -559,7 +676,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
}
if (unlikely(out_of_space)) {
/* delete old data */
- if (wc->now_deleting.data) {
+ if (wc->now_deleting_files) {
/* already deleting data */
return;
}
@@ -571,8 +688,33 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
}
info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
- wc->now_deleting.data = ctx;
- assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
+ wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files));
+ wc->cleanup_thread_deleting_files = 0;
+
+ error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ freez(wc->now_deleting_files);
+ wc->now_deleting_files = NULL;
+ }
+ }
+}
+
+static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc)
+{
+ if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) {
+ return 1;
+ }
+ return 0;
+}
+
+static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
+{
+ if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) {
+ after_invalidate_oldest_committed(wc);
+ }
+ if (unlikely(wc->cleanup_thread_deleting_files)) {
+ after_delete_old_data(wc);
}
}
@@ -662,34 +804,37 @@ void timer_cb(uv_timer_t* handle)
uv_update_time(handle->loop);
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
- if (likely(!wc->now_deleting.data)) {
- /* There is free space so we can write to disk */
+ if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
+ /* There is free space so we can write to disk and we are not actively deleting dirty buffers */
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
high_watermark;
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
+ uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
producers = ctx->stats.metric_API_producers;
/* are flushable pages more than 25% of the maximum page cache size */
high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
- if (nr_committed_pages > producers &&
- /* committed to be written pages are more than the produced number */
- nr_committed_pages - producers > high_watermark) {
- /* Flushing speed must increase to stop page cache from filling with dirty pages */
- bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
- }
- bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
+ /* Flush more pages only if disk can keep up */
+ if (wc->inflight_dirty_pages < high_watermark + producers) {
+ if (nr_committed_pages > producers &&
+ /* committed to be written pages are more than the produced number */
+ nr_committed_pages - producers > high_watermark) {
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
+ bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
+ }
+ bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
- debug(D_RRDENGINE, "Flushing pages to disk.");
- for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
- bytes_written && (total_bytes < bytes_to_write) ;
- total_bytes += bytes_written) {
- bytes_written = do_flush_pages(wc, 0, NULL);
+ debug(D_RRDENGINE, "Flushing pages to disk.");
+ for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL);
+ bytes_written && (total_bytes < bytes_to_write);
+ total_bytes += bytes_written) {
+ bytes_written = do_flush_pages(wc, 0, NULL);
+ }
}
}
#ifdef NETDATA_INTERNAL_CHECKS
@@ -730,7 +875,12 @@ void rrdeng_worker(void* arg)
}
wc->async.data = wc;
- wc->now_deleting.data = NULL;
+ wc->now_deleting_files = NULL;
+ wc->cleanup_thread_deleting_files = 0;
+
+ wc->now_invalidating_dirty_pages = NULL;
+ wc->cleanup_thread_invalidating_dirty_pages = 0;
+ wc->inflight_dirty_pages = 0;
/* dirty page flushing timer */
ret = uv_timer_init(loop, &timer_req);
@@ -746,8 +896,9 @@ void rrdeng_worker(void* arg)
assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
- while (shutdown == 0 || uv_loop_alive(loop)) {
+ while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
uv_run(loop, UV_RUN_DEFAULT);
+ rrdeng_cleanup_finished_threads(wc);
/* wait for commands */
cmd_batch_size = 0;
@@ -769,14 +920,6 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour and we need to be aware if this becomes
- * an issue in the future.
- */
- uv_close((uv_handle_t *)&wc->async, NULL);
- assert(0 == uv_timer_stop(&timer_req));
- uv_close((uv_handle_t *)&timer_req, NULL);
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@@ -788,16 +931,16 @@ void rrdeng_worker(void* arg)
do_commit_transaction(wc, STORE_DATA, NULL);
break;
case RRDENG_FLUSH_PAGES: {
- unsigned bytes_written;
-
- /* First I/O should be enough to call completion */
- bytes_written = do_flush_pages(wc, 1, cmd.completion);
- if (bytes_written) {
- while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) {
- ; /* Force flushing of all committed pages if there is free space. */
- }
+ if (wc->now_invalidating_dirty_pages) {
+ /* Do not flush if the disk cannot keep up */
+ complete(cmd.completion);
+ } else {
+ (void)do_flush_pages(wc, 1, cmd.completion);
}
break;
+ case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE:
+ rrdeng_invalidate_oldest_committed(wc);
+ break;
}
default:
debug(D_RRDENGINE, "%s: default.", __func__);
@@ -805,11 +948,19 @@ void rrdeng_worker(void* arg)
}
} while (opcode != RRDENG_NOOP);
}
+
/* cleanup operations of the event loop */
- if (unlikely(wc->now_deleting.data)) {
- info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
- }
info("Shutting down RRD engine event loop.");
+
+ /*
+ * uv_async_send after uv_close does not seem to crash in linux at the moment,
+ * it is however undocumented behaviour and we need to be aware if this becomes
+ * an issue in the future.
+ */
+ uv_close((uv_handle_t *)&wc->async, NULL);
+ assert(0 == uv_timer_stop(&timer_req));
+ uv_close((uv_handle_t *)&timer_req, NULL);
+
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all committed pages. */
}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 67f1b41c3d..22c3d33bef 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -49,6 +49,7 @@ enum rrdeng_opcode {
RRDENG_COMMIT_PAGE,
RRDENG_FLUSH_PAGES,
RRDENG_SHUTDOWN,
+ RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
RRDENG_MAX_OPCODE
};
@@ -102,7 +103,16 @@ struct rrdengine_worker_config {
uv_thread_t thread;
uv_loop_t* loop;
uv_async_t async;
- uv_work_t now_deleting;
+
+ /* file deletion thread */
+ uv_thread_t *now_deleting_files;
+ unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
+
+ /* dirty page deletion thread */
+ uv_thread_t *now_invalidating_dirty_pages;
+ /* set to 0 when now_invalidating_dirty_pages is still running */
+ unsigned long cleanup_thread_invalidating_dirty_pages;
+ unsigned inflight_dirty_pages;
/* FIFO command queue */
uv_mutex_t cmd_mutex;
@@ -145,7 +155,8 @@ struct rrdengine_statistics {
rrdeng_stats_t page_cache_descriptors;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
- rrdeng_stats_t flushing_errors;
+ rrdeng_stats_t pg_cache_over_half_dirty_events;
+ rrdeng_stats_t flushing_pressure_page_deletions;
};
/* I/O errors global counter