summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
authorthiagoftsm <thiagoftsm@gmail.com>2019-12-18 16:37:02 +0000
committerGitHub <noreply@github.com>2019-12-18 16:37:02 +0000
commitc8ded37b25237ec4aed8604888bd4281c33087a6 (patch)
tree087e14bd3c5eefb170b91c5fe100c470ac626239 /database/engine
parent4d9ed56f1b04b88ee73e4d359f478cac25c194c7 (diff)
Fix race condition in dbengine (#7565)
* fix_db_race_condition: unit test Adjust unit test for dbengine * fix_db_race_condition: page cache Fix database * fix_db_race_condition: Missing function call This commit brings the correct function call inside rrdengine.c
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/pagecache.c38
-rw-r--r--database/engine/pagecache.h1
-rw-r--r--database/engine/rrdengine.c10
-rwxr-xr-xdatabase/engine/rrdengineapi.c11
-rw-r--r--database/engine/rrdenglocking.c36
5 files changed, 55 insertions, 41 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 7ac4512d69..60fd7a5f89 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -101,6 +101,13 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
uv_cond_broadcast(&pg_cache_descr->cond);
}
+void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+{
+ rrdeng_page_descr_mutex_lock(ctx, descr);
+ pg_cache_wake_up_waiters_unsafe(descr);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+}
+
/*
* The caller must hold page descriptor lock.
* The lock will be released and re-acquired. The descriptor is not guaranteed
@@ -135,10 +142,8 @@ unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_
/*
* The caller must hold page descriptor lock.
- * Gets a reference to the page descriptor.
- * Returns 1 on success and 0 on failure.
*/
-int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
@@ -146,25 +151,25 @@ int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_acces
(exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
- if (exclusive_access)
- pg_cache_descr->flags |= RRD_PAGE_LOCKED;
- ++pg_cache_descr->refcnt;
return 1;
}
/*
* The caller must hold page descriptor lock.
- * Same return values as pg_cache_try_get_unsafe() without doing anything.
+ * Gets a reference to the page descriptor.
+ * Returns 1 on success and 0 on failure.
*/
-int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
+int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && pg_cache_descr->refcnt)) {
+ if (!pg_cache_can_get_unsafe(descr, exclusive_access))
return 0;
- }
+
+ if (exclusive_access)
+ pg_cache_descr->flags |= RRD_PAGE_LOCKED;
+ ++pg_cache_descr->refcnt;
return 1;
}
@@ -409,7 +414,9 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
print_page_cache_descr(descr);
pg_cache_wait_event_unsafe(descr);
}
- if (!remove_dirty) {
+ if (remove_dirty) {
+ pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
+ } else {
/* even a locked page could be dirty */
while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
@@ -429,8 +436,11 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
pg_cache_put(ctx, descr);
- if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED)
- rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+ while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
+ rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
+ (void)sleep_usec(1000); /* 1 msec */
+ }
destroy:
freez(descr);
pg_cache_update_metric_times(page_index);
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 7d8fa2a1df..9fd9991b5e 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -148,6 +148,7 @@ struct page_cache { /* TODO: add statistics */
};
extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
+extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 9615665f85..3d49379048 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -121,19 +121,19 @@ after_crc_check:
} else {
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
}
- pg_cache_replaceQ_insert(ctx, descr);
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
pg_cache_descr->flags |= RRD_PAGE_POPULATED;
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
- debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+ pg_cache_replaceQ_insert(ctx, descr);
if (xt_io_descr->release_descr) {
- pg_cache_put_unsafe(descr);
+ pg_cache_put(ctx, descr);
} else {
- pg_cache_wake_up_waiters_unsafe(descr);
+ debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
+ pg_cache_wake_up_waiters(ctx, descr);
}
- rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
freez(uncompressed_buf);
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index f824728798..fd41a0cd89 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -473,17 +473,18 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
/* We need to get a new page */
if (descr) {
/* Drop old page's reference */
- handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
- if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
- goto no_more_metrics;
- }
- next_page_time = handle->next_page_time * USEC_PER_SEC;
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
handle->descr = NULL;
+ handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
+ if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
+ goto no_more_metrics;
+ }
+ next_page_time = handle->next_page_time * USEC_PER_SEC;
}
+
descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
if (NULL == descr) {
diff --git a/database/engine/rrdenglocking.c b/database/engine/rrdenglocking.c
index 754d802a23..0c1d26f25a 100644
--- a/database/engine/rrdenglocking.c
+++ b/database/engine/rrdenglocking.c
@@ -20,12 +20,7 @@ struct page_cache_descr *rrdeng_create_pg_cache_descr(struct rrdengine_instance
void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_cache_descr *pg_cache_descr)
{
- /* Flush any lock and condition variable users */
- uv_mutex_lock(&pg_cache_descr->mutex);
- uv_mutex_unlock(&pg_cache_descr->mutex);
-
uv_cond_destroy(&pg_cache_descr->cond);
-
uv_mutex_destroy(&pg_cache_descr->mutex);
freez(pg_cache_descr);
rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1);
@@ -100,7 +95,7 @@ void rrdeng_page_descr_mutex_lock(struct rrdengine_instance *ctx, struct rrdeng_
void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
unsigned long old_state, new_state, ret_state, old_users;
- struct page_cache_descr *pg_cache_descr;
+ struct page_cache_descr *pg_cache_descr, *delete_pg_cache_descr = NULL;
uint8_t we_locked;
uv_mutex_unlock(&descr->pg_cache_descr->mutex);
@@ -116,7 +111,8 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, 0);
if (old_state == ret_state) {
/* success */
- break;
+ rrdeng_destroy_pg_cache_descr(ctx, delete_pg_cache_descr);
+ return;
}
continue; /* spin */
}
@@ -128,12 +124,15 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
pg_cache_descr = descr->pg_cache_descr;
/* caller is the only page cache descriptor user and there are no pending references on the page */
if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) &&
- !pg_cache_descr->flags && !pg_cache_descr->refcnt && !pg_cache_descr->waiters) {
+ !pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ assert(!pg_cache_descr->waiters);
+
new_state = PG_CACHE_DESCR_LOCKED;
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
if (old_state == ret_state) {
we_locked = 1;
- rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
+ delete_pg_cache_descr = pg_cache_descr;
+ descr->pg_cache_descr = NULL;
/* retry */
continue;
}
@@ -150,7 +149,6 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
}
/* spin */
}
-
}
/*
@@ -161,11 +159,11 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
unsigned long old_state, new_state, ret_state, old_users;
- struct page_cache_descr *pg_cache_descr;
- uint8_t just_locked, we_freed, must_unlock;
+ struct page_cache_descr *pg_cache_descr = NULL;
+ uint8_t just_locked, can_free, must_unlock;
just_locked = 0;
- we_freed = 0;
+ can_free = 0;
must_unlock = 0;
while (1) { /* spin */
old_state = descr->pg_cache_descr_state;
@@ -177,9 +175,11 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct
must_unlock = 1;
just_locked = 0;
/* Try deallocate if there are no pending references on the page */
- if (!pg_cache_descr->flags && !pg_cache_descr->refcnt && !pg_cache_descr->waiters) {
- rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
- we_freed = 1;
+ if (!pg_cache_descr->flags && !pg_cache_descr->refcnt) {
+ assert(!pg_cache_descr->waiters);
+
+ descr->pg_cache_descr = NULL;
+ can_free = 1;
/* success */
continue;
}
@@ -188,7 +188,7 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct
if (unlikely(must_unlock)) {
assert(0 == old_users);
- if (we_freed) {
+ if (can_free) {
/* success */
new_state = 0;
} else {
@@ -198,6 +198,8 @@ void rrdeng_try_deallocate_pg_cache_descr(struct rrdengine_instance *ctx, struct
ret_state = ulong_compare_and_swap(&descr->pg_cache_descr_state, old_state, new_state);
if (old_state == ret_state) {
/* unlocked */
+ if (can_free)
+ rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
return;
}
continue; /* spin */