summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2019-10-02 07:12:11 +0300
committerGitHub <noreply@github.com>2019-10-02 07:12:11 +0300
commit1d667b145c16811639143afa377293d3ef8d62f0 (patch)
treee9ecdc8bb6441ac94d96848ec84d20bda8dad576 /database/engine
parentc48d52eeb5b2387ead1e1fa50311750f4ccd328a (diff)
Fix dbengine consistency when a writer modifies a page concurrently with a reader querying its metrics (#6979)
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/pagecache.h38
-rw-r--r--database/engine/rrdengineapi.c28
2 files changed, 54 insertions, 12 deletions
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index d464211e9a..ab1a5c1ad0 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -183,4 +183,42 @@ extern void free_page_cache(struct rrdengine_instance *ctx);
extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
+static inline void
+ pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)
+{
+ usec_t end_time, old_end_time;
+ uint32_t page_length;
+
+ if (NULL == descr->extent) {
+ /* this page is currently being modified, get consistent info locklessly */
+ do {
+ end_time = descr->end_time;
+ __sync_synchronize();
+ old_end_time = end_time;
+ page_length = descr->page_length;
+ __sync_synchronize();
+ end_time = descr->end_time;
+ __sync_synchronize();
+ } while ((end_time != old_end_time || (end_time & 1) != 0));
+
+ *end_timep = end_time;
+ *page_lengthp = page_length;
+ } else {
+ *end_timep = descr->end_time;
+ *page_lengthp = descr->page_length;
+ }
+}
+
+/* The caller must hold a reference to the page and must have already set the new data */
+static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length)
+{
+ assert(!(end_time & 1));
+ __sync_synchronize();
+ descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */
+ __sync_synchronize();
+ descr->page_length = page_length;
+ __sync_synchronize();
+ descr->end_time = end_time; /* mark end of uncertainty period */
+}
+
#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 26fbe8f7ef..80a11c9f05 100644
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -184,8 +184,8 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
}
page = descr->pg_cache_descr->page;
page[descr->page_length / sizeof(number)] = number;
- descr->end_time = point_in_time;
- descr->page_length += sizeof(number);
+ pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number));
+
if (perfect_page_alignment)
rd->rrdset->rrddim_page_alignment = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time)) {
@@ -470,7 +470,8 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position, entries;
- usec_t next_page_time, current_position_time;
+ usec_t next_page_time, current_position_time, page_end_time;
+ uint32_t page_length;
handle = &rrdimm_handle->rrdeng;
if (unlikely(INVALID_TIME == handle->next_page_time)) {
@@ -480,15 +481,17 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
if (unlikely(NULL == (descr = handle->descr))) {
/* it's the first call */
next_page_time = handle->next_page_time * USEC_PER_SEC;
+ } else {
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
}
position = handle->position + 1;
if (unlikely(NULL == descr ||
- position >= (descr->page_length / sizeof(storage_number)))) {
+ position >= (page_length / sizeof(storage_number)))) {
/* We need to get a new page */
if (descr) {
/* Drop old page's reference */
- handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1;
+ handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
goto no_more_metrics;
}
@@ -508,26 +511,27 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
#endif
handle->descr = descr;
+ pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
if (unlikely(INVALID_TIME == descr->start_time ||
- INVALID_TIME == descr->end_time)) {
+ INVALID_TIME == page_end_time)) {
goto no_more_metrics;
}
- if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) {
+ if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
/* we're in the middle of the page somewhere */
- entries = descr->page_length / sizeof(storage_number);
- position = ((uint64_t)(next_page_time - descr->start_time)) * entries /
- (descr->end_time - descr->start_time + 1);
+ entries = page_length / sizeof(storage_number);
+ position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
+ (page_end_time - descr->start_time);
} else {
position = 0;
}
}
page = descr->pg_cache_descr->page;
ret = page[position];
- entries = descr->page_length / sizeof(storage_number);
+ entries = page_length / sizeof(storage_number);
if (entries > 1) {
usec_t dt;
- dt = (descr->end_time - descr->start_time) / (entries - 1);
+ dt = (page_end_time - descr->start_time) / (entries - 1);
current_position_time = descr->start_time + position * dt;
} else {
current_position_time = descr->start_time;