diff options
author | Markos Fountoulakis <44345837+mfundul@users.noreply.github.com> | 2019-10-02 07:12:11 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-02 07:12:11 +0300 |
commit | 1d667b145c16811639143afa377293d3ef8d62f0 (patch) | |
tree | e9ecdc8bb6441ac94d96848ec84d20bda8dad576 /database/engine | |
parent | c48d52eeb5b2387ead1e1fa50311750f4ccd328a (diff) |
Fix dbengine consistency when a writer modifies a page concurrently with a reader querying its metrics (#6979)
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/pagecache.h | 38 | ||||
-rw-r--r-- | database/engine/rrdengineapi.c | 28 |
2 files changed, 54 insertions, 12 deletions
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index d464211e9a..ab1a5c1ad0 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -183,4 +183,42 @@ extern void free_page_cache(struct rrdengine_instance *ctx); extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr); extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index); +static inline void + pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp) +{ + usec_t end_time, old_end_time; + uint32_t page_length; + + if (NULL == descr->extent) { + /* this page is currently being modified, get consistent info locklessly */ + do { + end_time = descr->end_time; + __sync_synchronize(); + old_end_time = end_time; + page_length = descr->page_length; + __sync_synchronize(); + end_time = descr->end_time; + __sync_synchronize(); + } while ((end_time != old_end_time || (end_time & 1) != 0)); + + *end_timep = end_time; + *page_lengthp = page_length; + } else { + *end_timep = descr->end_time; + *page_lengthp = descr->page_length; + } +} + +/* The caller must hold a reference to the page and must have already set the new data */ +static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length) +{ + assert(!(end_time & 1)); + __sync_synchronize(); + descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */ + __sync_synchronize(); + descr->page_length = page_length; + __sync_synchronize(); + descr->end_time = end_time; /* mark end of uncertainty period */ +} + #endif /* NETDATA_PAGECACHE_H */ diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 26fbe8f7ef..80a11c9f05 100644 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -184,8 +184,8 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n } page = descr->pg_cache_descr->page; page[descr->page_length / sizeof(number)] = number; - descr->end_time = point_in_time; - descr->page_length += sizeof(number); + pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number)); + if (perfect_page_alignment) rd->rrdset->rrddim_page_alignment = descr->page_length; if (unlikely(INVALID_TIME == descr->start_time)) { @@ -470,7 +470,8 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle struct rrdeng_page_descr *descr; storage_number *page, ret; unsigned position, entries; - usec_t next_page_time, current_position_time; + usec_t next_page_time, current_position_time, page_end_time; + uint32_t page_length; handle = &rrdimm_handle->rrdeng; if (unlikely(INVALID_TIME == handle->next_page_time)) { @@ -480,15 +481,17 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle if (unlikely(NULL == (descr = handle->descr))) { /* it's the first call */ next_page_time = handle->next_page_time * USEC_PER_SEC; + } else { + pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); } position = handle->position + 1; if (unlikely(NULL == descr || - position >= (descr->page_length / sizeof(storage_number)))) { + position >= (page_length / sizeof(storage_number)))) { /* We need to get a new page */ if (descr) { /* Drop old page's reference */ - handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1; + handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1; if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) { goto no_more_metrics; } @@ -508,26 +511,27 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1); #endif handle->descr = descr; + pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length); if (unlikely(INVALID_TIME == descr->start_time || - INVALID_TIME == descr->end_time)) { + INVALID_TIME == page_end_time)) { goto no_more_metrics; } - if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) { + if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) { /* we're in the middle of the page somewhere */ - entries = descr->page_length / sizeof(storage_number); - position = ((uint64_t)(next_page_time - descr->start_time)) * entries / - (descr->end_time - descr->start_time + 1); + entries = page_length / sizeof(storage_number); + position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) / + (page_end_time - descr->start_time); } else { position = 0; } } page = descr->pg_cache_descr->page; ret = page[position]; - entries = descr->page_length / sizeof(storage_number); + entries = page_length / sizeof(storage_number); if (entries > 1) { usec_t dt; - dt = (descr->end_time - descr->start_time) / (entries - 1); + dt = (page_end_time - descr->start_time) / (entries - 1); current_position_time = descr->start_time + position * dt; } else { current_position_time = descr->start_time; |