diff options
Diffstat (limited to 'database/engine/pagecache.c')
-rw-r--r-- | database/engine/pagecache.c | 569 |
1 files changed, 402 insertions, 167 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index d65cb35a57..7738db8527 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -134,6 +134,9 @@ struct rrdeng_page_descr *pg_cache_create_descr(void) descr->pg_cache_descr_state = 0; descr->pg_cache_descr = NULL; descr->update_every_s = 0; + descr->extent_entry = NULL; + descr->type = 0; + descr->file = -1; return descr; } @@ -158,13 +161,41 @@ void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page * The lock will be released and re-acquired. The descriptor is not guaranteed * to exist after this function returns. */ +#ifdef NETDATA_INTERNAL_CHECKS +void pg_cache_wait_event_unsafe_with_trace(struct rrdeng_page_descr *descr, const char *function, size_t line) +#else void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) +#endif { struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; +#ifdef NETDATA_INTERNAL_CHECKS + if(pg_cache_descr->owner.tid != gettid()) + fatal("DBENGINE: pg_cache_descr is not locked by me in %s(). It is locked by thread %u, I am %u", + __FUNCTION__, (unsigned)pg_cache_descr->owner.tid, (unsigned)gettid()); + + struct pg_cache_waiter w = { + .line = line, + .function = function, + .tid = gettid(), + .next = NULL, + .prev = NULL, + }; + + DOUBLE_LINKED_LIST_PREPEND_UNSAFE(pg_cache_descr->wait_list, &w, prev, next); +#endif + ++pg_cache_descr->waiters; uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex); --pg_cache_descr->waiters; + +#ifdef NETDATA_INTERNAL_CHECKS + DOUBLE_LINKED_LIST_REMOVE_UNSAFE(pg_cache_descr->wait_list, &w, prev, next); + + pg_cache_descr->owner.function = function; + pg_cache_descr->owner.line = line; + pg_cache_descr->owner.tid = gettid(); +#endif } /* @@ -173,15 +204,43 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) * to exist after this function returns. * Returns UV_ETIMEDOUT if timeout_sec seconds pass. */ +#ifdef NETDATA_INTERNAL_CHECKS +int pg_cache_timedwait_event_unsafe_with_trace(struct rrdeng_page_descr *descr, uint64_t timeout_sec, const char *function, size_t line) +#else int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec) +#endif { int ret; struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; +#ifdef NETDATA_INTERNAL_CHECKS + if(pg_cache_descr->owner.tid != gettid()) + fatal("DBENGINE: pg_cache_descr is not locked by me in %s(). It is locked by thread %u, I am %u", + __FUNCTION__, (unsigned)pg_cache_descr->owner.tid, (unsigned)gettid()); + + struct pg_cache_waiter w = { + .line = line, + .function = function, + .tid = gettid(), + .next = NULL, + .prev = NULL, + }; + + DOUBLE_LINKED_LIST_PREPEND_UNSAFE(pg_cache_descr->wait_list, &w, prev, next); +#endif + ++pg_cache_descr->waiters; ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC); --pg_cache_descr->waiters; +#ifdef NETDATA_INTERNAL_CHECKS + DOUBLE_LINKED_LIST_REMOVE_UNSAFE(pg_cache_descr->wait_list, &w, prev, next); + + pg_cache_descr->owner.function = function; + pg_cache_descr->owner.line = line; + pg_cache_descr->owner.tid = gettid(); +#endif + return ret; } @@ -295,6 +354,11 @@ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; } +unsigned long pg_cache_warn_limit(struct rrdengine_instance *ctx) +{ + return ctx->cache_pages_warn_watermark + (unsigned long)ctx->metric_API_max_producers; +} + /* * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page * cache. @@ -322,9 +386,10 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==", number); + while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) { - if (!pg_cache_try_evict_one_page_unsafe(ctx)) { + if (!(pg_cache_try_evict_one_page_unsafe(ctx))) { /* failed to evict */ struct completion compl; struct rrdeng_cmd cmd; @@ -390,7 +455,6 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n ret = 1; /* success */ } uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); - return ret; } @@ -413,8 +477,8 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_ * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress, * or it sets it to NULL if no write-back is in progress. * - * Returns 1 on success and 0 on failure. - */ +* Returns 1 on success and 0 on failure. +*/ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) { struct page_cache *pg_cache = &ctx->pg_cache; @@ -458,10 +522,16 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference. * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not * NULL. Otherwise, metric_id is not set. + * @spin True to keep trying to release the page, false to try once * @return 1 if it's safe to delete the metric, 0 otherwise. */ -uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, - uint8_t is_exclusive_holder, uuid_t *metric_id) +uint8_t pg_cache_punch_hole( + struct rrdengine_instance *ctx, + struct rrdeng_page_descr *descr, + uint8_t remove_dirty, + uint8_t is_exclusive_holder, + uuid_t(*metric_id), + bool update_page_duration) { struct page_cache *pg_cache = &ctx->pg_cache; struct page_cache_descr *pg_cache_descr = NULL; @@ -485,7 +555,8 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d } goto destroy; } - --page_index->page_count; + if (update_page_duration) + --page_index->page_count; if (!page_index->writers && !page_index->page_count) { can_delete_metric = 1; if (metric_id) { @@ -497,7 +568,11 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); ++ctx->stats.pg_cache_deletions; - --pg_cache->page_descriptors; + if (update_page_duration) + --pg_cache->page_descriptors; + + if (is_descr_journal_v2(descr)) + --pg_cache->active_descriptors; uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); rrdeng_page_descr_mutex_lock(ctx, descr); @@ -512,7 +587,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d } } if (remove_dirty) { - pg_cache_descr->flags &= ~RRD_PAGE_DIRTY; + pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_INVALID); } else { /* even a locked page could be dirty */ while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { @@ -540,7 +615,8 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d } destroy: rrdeng_page_descr_freez(descr); - pg_cache_update_metric_times(page_index); + if (update_page_duration) + pg_cache_update_metric_times(page_index); return can_delete_metric; } @@ -556,33 +632,272 @@ static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t (pg_start >= start_time && pg_start <= end_time); } -static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time) +static uint32_t find_matching_page(struct journal_page_header *page_list_header, uint32_t delta_start_time_s) { - return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut); + uint32_t left = 0; + uint32_t right = page_list_header->entries; + + while (left < right) { + struct journal_page_list *page_list = (struct journal_page_list *) ((uint8_t *) page_list_header + sizeof(*page_list_header)); + struct journal_page_list *page_entry; + + uint32_t middle_delta_start_s; + uint32_t middle_delta_end_s; + + uint32_t middle = (left + right) >> 1; + + page_entry = &page_list[middle]; + middle_delta_start_s = page_entry->delta_start_s; + middle_delta_end_s = page_entry->delta_end_s; + + if (delta_start_time_s >= middle_delta_start_s && delta_start_time_s <= middle_delta_end_s) + return middle; + + if (delta_start_time_s < middle_delta_end_s) + right = middle; + else if(delta_start_time_s > middle_delta_end_s) + left = middle + 1; + else + return middle; + } + return right; } -/* The caller must hold the page index lock */ -static inline struct rrdeng_page_descr * - find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time) +bool descr_exists_unsafe( struct pg_cache_page_index *page_index, time_t start_time_s) { + return (NULL != JudyLGet(page_index->JudyL_array, start_time_s, PJE0)); +} + +void mark_journalfile_descriptor( struct page_cache *pg_cache, struct rrdengine_journalfile *journalfile, uint32_t page_offset, uint32_t Index) +{ + Pvoid_t *PValue; + + uv_rwlock_wrlock(&pg_cache->v2_lock); + PValue = JudyLIns(&journalfile->JudyL_array, (Word_t)page_offset, PJE0); + *(uint32_t *)PValue = (Index + 1); + journalfile->last_access = now_realtime_sec(); + uv_rwlock_wrunlock(&pg_cache->v2_lock); +} + +static void update_journal_access_time(struct rrdengine_journalfile *journalfile, struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) +{ + if (journalfile) { + journalfile->last_access = now_realtime_sec(); + return; + } + + if (unlikely(!page_index || !descr)) + return; + + if (!is_descr_journal_v2(descr)) + return; + + struct rrdengine_instance *ctx = page_index->ctx; + + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + struct rrdengine_datafile *datafile = ctx->datafiles.first; + while (datafile) { + journalfile = datafile->journalfile; + if (!journalfile->journal_data) { + datafile = datafile->next; + continue; + } + if (datafile->file == descr->file) { + journalfile->last_access = now_realtime_sec(); + break; + } + datafile = datafile->next; + } + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); +} + +// Note: We have read lock on page index +// We release and escalate to write lock +// Return to read lock when done +static struct rrdeng_page_descr *add_pages_from_timerange( + struct journal_page_header *page_list_header, + uint32_t delta_start_time_s, + uint32_t delta_end_time_s, + usec_t journal_start_time_ut, + struct pg_cache_page_index *page_index, + struct journal_extent_list *extent_list, + struct rrdengine_datafile *datafile, + uint32_t cache_pages) +{ + + struct rrdengine_instance *ctx = page_index->ctx; + struct page_cache *pg_cache = &ctx->pg_cache; + time_t journal_start_time_s = (time_t)(journal_start_time_ut / USEC_PER_SEC); + struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header)); + + uint32_t pos = find_matching_page(page_list_header, delta_start_time_s); + + // This is the page offset that we will store to check for v2 descriptors later on + uint32_t page_offset = (uint8_t *) page_list_header - (uint8_t *) datafile->journalfile->journal_data; + uint32_t entries = page_list_header->entries; + uint32_t pages_to_cache = MIN(pos + cache_pages, entries); + struct rrdeng_page_descr *descr = NULL; + + bool journal_updated = false; + bool rw_lock_acquired = false; + + // We will cache pages_to_cache pages or until our end time is out of range + for (uint32_t x = pos; x < pages_to_cache; x++) { + + struct journal_page_list *page_entry = &page_list[x]; + + if (page_entry->extent_index == UINT32_MAX) + continue; + + if (delta_end_time_s < page_entry->delta_start_s) + break; + + time_t index_time_s = (time_t) (journal_start_time_s + page_entry->delta_start_s); + + if (!descr_exists_unsafe(page_index, index_time_s)) { + struct rrdeng_page_descr *new_descr = pg_cache_create_descr(); + new_descr->page_length = page_entry->page_length; + new_descr->start_time_ut = index_time_s * USEC_PER_SEC; + new_descr->end_time_ut = (journal_start_time_s + page_entry->delta_end_s) * USEC_PER_SEC; + new_descr->id = &page_index->id; + new_descr->extent = NULL; + new_descr->extent_entry = &extent_list[page_entry->extent_index]; + new_descr->type = page_entry->type; + new_descr->update_every_s = page_entry->update_every_s; + new_descr->file = datafile->file; + + if (false == rw_lock_acquired) { + uv_rwlock_rdunlock(&page_index->lock); + uv_rwlock_wrlock(&page_index->lock); + rw_lock_acquired = true; + } + + struct rrdeng_page_descr *added_descr = pg_cache_insert(ctx, page_index, new_descr, false); + if (unlikely(added_descr != new_descr)) + rrdeng_page_descr_freez(new_descr); + + if (!descr) { + descr = added_descr; + // Mark the area to check + mark_journalfile_descriptor(pg_cache, datafile->journalfile, page_offset, x); + journal_updated = true; + } + } + } + + if (!journal_updated) + update_journal_access_time(datafile->journalfile, NULL, NULL); + + // Check if we have switched to rw lock for the page index and switch back + if (rw_lock_acquired) { + uv_rwlock_wrunlock(&page_index->lock); + uv_rwlock_rdlock(&page_index->lock); + } + return descr; +}; + +static int journal_metric_uuid_compare(const void *key, const void *metric) +{ + return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid); +} + +// +// Steps +// 1. Find which journal has the start time within its range +// 2. Find the UUID in that journal +// 3. Find the array of times for that UUID (convert from the journal header to the offset needed) +// Note: We have page_index lock +// cache pages is the maximum pages to fetch (create metadata for) +// This will be limited by the end_time or if we run out of pages in the matching journal +// pages that could be precached but exist in another journal will not be precached +static struct rrdeng_page_descr *populate_page_index( + struct pg_cache_page_index *page_index, + usec_t start_time_ut, + usec_t end_time_ut, + uint32_t cache_pages) +{ + struct rrdengine_instance *ctx = page_index->ctx; + + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + + struct rrdengine_datafile *datafile = ctx->datafiles.first; + while (datafile) { + struct journal_v2_header *journal_header = (struct journal_v2_header *) datafile->journalfile->journal_data; + if (!journal_header) { + datafile = datafile->next; + continue; + } + if (start_time_ut >= journal_header->start_time_ut && start_time_ut <= journal_header->end_time_ut) { + + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) journal_header + journal_header->metric_offset); + + struct journal_metric_list *uuid_entry = bsearch( + &page_index->id, + uuid_list, + (size_t)journal_header->metric_count, + sizeof(struct journal_metric_list), + journal_metric_uuid_compare); + + uint32_t delta_start_time = (start_time_ut - journal_header->start_time_ut) / USEC_PER_SEC; + uint32_t delta_end_time = (end_time_ut - journal_header->start_time_ut) / USEC_PER_SEC; + + if (uuid_entry && ((delta_start_time >= uuid_entry->delta_start && delta_start_time <= uuid_entry->delta_end))) { + + struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) journal_header + uuid_entry->page_offset); + struct rrdeng_page_descr *descr = add_pages_from_timerange( + page_list_header, + delta_start_time, + delta_end_time, + journal_header->start_time_ut, + page_index, + (void *)((uint8_t *)journal_header + journal_header->extent_offset), + datafile, + cache_pages); + + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + return descr; + } + } + datafile = datafile->next; + } + + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + return NULL; +} + +/* The caller must hold the page index lock */ +static inline struct rrdeng_page_descr *find_first_page_in_time_range( + struct pg_cache_page_index *page_index, + usec_t start_time_ut, + usec_t end_time_ut, + uint32_t cache_pages) +{ + struct rrdeng_page_descr *descr= NULL; + Pvoid_t *PValue; Word_t Index; - Index = (Word_t)(start_time / USEC_PER_SEC); + Index = (Word_t) (start_time_ut / USEC_PER_SEC); PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); if (likely(NULL != PValue)) { descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { + if (is_page_in_time_range(descr, start_time_ut, end_time_ut)) { + update_journal_access_time(NULL, page_index, descr); return descr; } } - Index = (Word_t)(start_time / USEC_PER_SEC); + descr = populate_page_index(page_index, start_time_ut, end_time_ut, cache_pages); + if (descr) + return descr; + + Index = (Word_t) (start_time_ut / USEC_PER_SEC); PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); if (likely(NULL != PValue)) { descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { + if (is_page_in_time_range(descr, start_time_ut, end_time_ut)) { + update_journal_access_time(NULL, page_index, descr); return descr; } } @@ -638,9 +953,13 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) page_index->latest_time_ut = latest_time; } + /* If index is NULL lookup by UUID (descr->id) */ -void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, - struct rrdeng_page_descr *descr) +struct rrdeng_page_descr *pg_cache_insert( + struct rrdengine_instance *ctx, + struct pg_cache_page_index *index, + struct rrdeng_page_descr *descr, + bool lock_and_count) { struct page_cache *pg_cache = &ctx->pg_cache; Pvoid_t *PValue; @@ -669,44 +988,34 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index page_index = index; } - uv_rwlock_wrlock(&page_index->lock); + if (lock_and_count) + uv_rwlock_wrlock(&page_index->lock); + PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0); + fatal_assert(NULL != PValue); + + if (unlikely(*PValue) && !lock_and_count) + return *PValue; + *PValue = descr; - ++page_index->page_count; - pg_cache_add_new_metric_time(page_index, descr); - uv_rwlock_wrunlock(&page_index->lock); + if (lock_and_count) + ++page_index->page_count; - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); - ++ctx->stats.pg_cache_insertions; - ++pg_cache->page_descriptors; - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); -} + pg_cache_add_new_metric_time(page_index, descr); -usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; + if (lock_and_count) + uv_rwlock_wrunlock(&page_index->lock); - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - return INVALID_TIME; + uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + if (lock_and_count) { + ++ctx->stats.pg_cache_insertions; + ++pg_cache->page_descriptors; } + if (is_descr_journal_v2(descr)) + ++pg_cache->active_descriptors; + uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); - uv_rwlock_rdlock(&page_index->lock); - descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); - if (NULL == descr) { - uv_rwlock_rdunlock(&page_index->lock); - return INVALID_TIME; - } - uv_rwlock_rdunlock(&page_index->lock); - return descr->start_time_ut; + return descr; } /** @@ -758,8 +1067,11 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c * 2. It did not succeed to get a reference. * 3. It did not succeed to reserve a spot in the page cache. */ -struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id, - usec_t start_time_ut) +struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock( + struct rrdengine_instance *ctx, + uuid_t(*id), + usec_t start_time_ut, + struct pg_alignment *alignment) { struct page_cache *pg_cache = &ctx->pg_cache; struct rrdeng_page_descr *descr = NULL; @@ -776,6 +1088,11 @@ struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_ } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + if (page_index && page_index->alignment && alignment && page_index->alignment != alignment) { + if (pg_cache->populated_pages >= pg_cache_warn_limit(ctx)) + return NULL; + } + if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) { /* Failed to find page or failed to reserve a spot in the cache */ return NULL; @@ -855,7 +1172,7 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta } uv_rwlock_rdlock(&page_index->lock); - descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut, PAGE_CACHE_MAX_PRELOAD_PAGES); if (NULL == descr) { uv_rwlock_rdunlock(&page_index->lock); debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); @@ -869,12 +1186,18 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta *page_info_arrayp = mallocz(page_info_array_max_size); } + struct rrdeng_page_descr *last_descr = NULL; for (count = 0, preload_count = 0 ; descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ; PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), descr = unlikely(NULL == PValue) ? NULL : *PValue) { /* Iterate all pages in range */ + if (last_descr == descr) + break; + + last_descr = descr; + if (unlikely(0 == descr->page_length)) continue; if (page_info_arrayp) { @@ -907,7 +1230,6 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta } } rrdeng_page_descr_mutex_unlock(ctx, descr); - } uv_rwlock_rdunlock(&page_index->lock); @@ -933,7 +1255,8 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta if (NULL == next) { continue; } - if (descr->extent == next->extent) { + if ((descr->extent && descr->extent == next->extent) || + ((descr->extent_entry && descr->extent_entry == next->extent_entry))) { /* same extent, consolidate */ if (!pg_cache_try_reserve_pages(ctx, 1)) { failed_to_reserve = 1; @@ -969,113 +1292,6 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta } /* - * Searches for a page and gets a reference. - * When point_in_time is INVALID_TIME get any page. - * If index is NULL lookup by UUID (id). - */ -struct rrdeng_page_descr * - pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, - usec_t point_in_time_ut) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL; - struct page_cache_descr *pg_cache_descr = NULL; - unsigned long flags; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - Word_t Index; - uint8_t page_not_in_cache; - - if (unlikely(NULL == index)) { - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - return NULL; - } - } else { - page_index = index; - } - pg_cache_reserve_pages(ctx, 1); - - page_not_in_cache = 0; - uv_rwlock_rdlock(&page_index->lock); - while (1) { - Index = (Word_t)(point_in_time_ut / USEC_PER_SEC); - PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - } - if (NULL == PValue || - 0 == descr->page_length || - (INVALID_TIME != point_in_time_ut && - !is_point_in_time_in_page(descr, point_in_time_ut))) { - /* non-empty page not found */ - uv_rwlock_rdunlock(&page_index->lock); - - pg_cache_release_pages(ctx, 1); - return NULL; - } - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - flags = pg_cache_descr->flags; - if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { - /* success */ - rrdeng_page_descr_mutex_unlock(ctx, descr); - debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); - break; - } - if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { - struct rrdeng_cmd cmd; - - uv_rwlock_rdunlock(&page_index->lock); - - cmd.opcode = RRDENG_READ_PAGE; - cmd.read_page.page_cache_descr = descr; - rrdeng_enq_cmd(&ctx->worker_config, &cmd); - - debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { - pg_cache_wait_event_unsafe(descr); - } - /* success */ - /* Downgrade exclusive reference to allow other readers */ - pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; - pg_cache_wake_up_waiters_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); - rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); - return descr; - } - uv_rwlock_rdunlock(&page_index->lock); - debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - if (!(flags & RRD_PAGE_POPULATED)) - page_not_in_cache = 1; - pg_cache_wait_event_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); - - /* reset scan to find again */ - uv_rwlock_rdlock(&page_index->lock); - } - uv_rwlock_rdunlock(&page_index->lock); - - if (!(flags & RRD_PAGE_DIRTY)) - pg_cache_replaceQ_set_hot(ctx, descr); - pg_cache_release_pages(ctx, 1); - if (page_not_in_cache) - rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); - else - rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); - return descr; -} - -/* * Searches for the first page between start_time and end_time and gets a reference. * start_time and end_time are inclusive. * If index is NULL lookup by UUID (id). @@ -1111,7 +1327,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index uv_rwlock_rdlock(&page_index->lock); int retry_count = 0; while (1) { - descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); + descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut, PAGE_CACHE_MAX_PRELOAD_PAGES); if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) { /* non-empty page not found */ if (retry_count == default_rrdeng_page_fetch_retries) @@ -1124,6 +1340,22 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; flags = pg_cache_descr->flags; + + if ((flags & RRD_PAGE_INVALID)) { + bool can_drop_page = pg_cache_try_get_unsafe(descr, 1); + rrdeng_page_descr_mutex_unlock(ctx, descr); + + uv_rwlock_rdunlock(&page_index->lock); + pg_cache_release_pages(ctx, 1); + + if (likely(can_drop_page)) { + info("Dropping invalid page descr=%lu - pg_cache=%lu - Ref=%u", descr->pg_cache_descr_state, + descr->pg_cache_descr->flags, descr->pg_cache_descr->refcnt); + pg_cache_punch_hole(ctx, descr, 0, 1, NULL, false); + } + return NULL; + } + if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { /* success */ rrdeng_page_descr_mutex_unlock(ctx, descr); @@ -1165,7 +1397,6 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index ++retry_count; } rrdeng_page_descr_mutex_unlock(ctx, descr); - /* reset scan to find again */ uv_rwlock_rdlock(&page_index->lock); } @@ -1186,9 +1417,9 @@ struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_insta struct pg_cache_page_index *page_index; page_index = mallocz(sizeof(*page_index)); + fatal_assert(0 == uv_rwlock_init(&page_index->lock)); page_index->JudyL_array = (Pvoid_t) NULL; uuid_copy(page_index->id, *id); - fatal_assert(0 == uv_rwlock_init(&page_index->lock)); page_index->oldest_time_ut = INVALID_TIME; page_index->latest_time_ut = INVALID_TIME; page_index->prev = NULL; @@ -1235,12 +1466,15 @@ void init_page_cache(struct rrdengine_instance *ctx) struct page_cache *pg_cache = &ctx->pg_cache; pg_cache->page_descriptors = 0; + pg_cache->active_descriptors = 0; pg_cache->populated_pages = 0; fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); init_metrics_index(ctx); init_replaceQ(ctx); init_committed_page_index(ctx); + + fatal_assert(0 == uv_rwlock_init(&pg_cache->v2_lock)); } void free_page_cache(struct rrdengine_instance *ctx) @@ -1304,3 +1538,4 @@ void free_page_cache(struct rrdengine_instance *ctx) fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes); } + |