summaryrefslogtreecommitdiffstats
path: root/database/engine/pagecache.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/pagecache.c')
-rw-r--r--database/engine/pagecache.c569
1 files changed, 402 insertions, 167 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index d65cb35a57..7738db8527 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -134,6 +134,9 @@ struct rrdeng_page_descr *pg_cache_create_descr(void)
descr->pg_cache_descr_state = 0;
descr->pg_cache_descr = NULL;
descr->update_every_s = 0;
+ descr->extent_entry = NULL;
+ descr->type = 0;
+ descr->file = -1;
return descr;
}
@@ -158,13 +161,41 @@ void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
+#ifdef NETDATA_INTERNAL_CHECKS
+void pg_cache_wait_event_unsafe_with_trace(struct rrdeng_page_descr *descr, const char *function, size_t line)
+#else
void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
+#endif
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(pg_cache_descr->owner.tid != gettid())
+ fatal("DBENGINE: pg_cache_descr is not locked by me in %s(). It is locked by thread %u, I am %u",
+ __FUNCTION__, (unsigned)pg_cache_descr->owner.tid, (unsigned)gettid());
+
+ struct pg_cache_waiter w = {
+ .line = line,
+ .function = function,
+ .tid = gettid(),
+ .next = NULL,
+ .prev = NULL,
+ };
+
+ DOUBLE_LINKED_LIST_PREPEND_UNSAFE(pg_cache_descr->wait_list, &w, prev, next);
+#endif
+
++pg_cache_descr->waiters;
uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
--pg_cache_descr->waiters;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(pg_cache_descr->wait_list, &w, prev, next);
+
+ pg_cache_descr->owner.function = function;
+ pg_cache_descr->owner.line = line;
+ pg_cache_descr->owner.tid = gettid();
+#endif
}
/*
@@ -173,15 +204,43 @@ void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
* to exist after this function returns.
* Returns UV_ETIMEDOUT if timeout_sec seconds pass.
*/
+#ifdef NETDATA_INTERNAL_CHECKS
+int pg_cache_timedwait_event_unsafe_with_trace(struct rrdeng_page_descr *descr, uint64_t timeout_sec, const char *function, size_t line)
+#else
int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
+#endif
{
int ret;
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(pg_cache_descr->owner.tid != gettid())
+ fatal("DBENGINE: pg_cache_descr is not locked by me in %s(). It is locked by thread %u, I am %u",
+ __FUNCTION__, (unsigned)pg_cache_descr->owner.tid, (unsigned)gettid());
+
+ struct pg_cache_waiter w = {
+ .line = line,
+ .function = function,
+ .tid = gettid(),
+ .next = NULL,
+ .prev = NULL,
+ };
+
+ DOUBLE_LINKED_LIST_PREPEND_UNSAFE(pg_cache_descr->wait_list, &w, prev, next);
+#endif
+
++pg_cache_descr->waiters;
ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
--pg_cache_descr->waiters;
+#ifdef NETDATA_INTERNAL_CHECKS
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(pg_cache_descr->wait_list, &w, prev, next);
+
+ pg_cache_descr->owner.function = function;
+ pg_cache_descr->owner.line = line;
+ pg_cache_descr->owner.tid = gettid();
+#endif
+
return ret;
}
@@ -295,6 +354,11 @@ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
}
+unsigned long pg_cache_warn_limit(struct rrdengine_instance *ctx)
+{
+ return ctx->cache_pages_warn_watermark + (unsigned long)ctx->metric_API_max_producers;
+}
+
/*
* This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
* cache.
@@ -322,9 +386,10 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb
if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
number);
+
while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
- if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
+ if (!(pg_cache_try_evict_one_page_unsafe(ctx))) {
/* failed to evict */
struct completion compl;
struct rrdeng_cmd cmd;
@@ -390,7 +455,6 @@ static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned n
ret = 1; /* success */
}
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
-
return ret;
}
@@ -413,8 +477,8 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_
* If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
* or it sets it to NULL if no write-back is in progress.
*
- * Returns 1 on success and 0 on failure.
- */
+* Returns 1 on success and 0 on failure.
+*/
static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@@ -458,10 +522,16 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
* @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
* @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
* NULL. Otherwise, metric_id is not set.
+ * @spin True to keep trying to release the page, false to try once
* @return 1 if it's safe to delete the metric, 0 otherwise.
*/
-uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
- uint8_t is_exclusive_holder, uuid_t *metric_id)
+uint8_t pg_cache_punch_hole(
+ struct rrdengine_instance *ctx,
+ struct rrdeng_page_descr *descr,
+ uint8_t remove_dirty,
+ uint8_t is_exclusive_holder,
+ uuid_t(*metric_id),
+ bool update_page_duration)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = NULL;
@@ -485,7 +555,8 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
}
goto destroy;
}
- --page_index->page_count;
+ if (update_page_duration)
+ --page_index->page_count;
if (!page_index->writers && !page_index->page_count) {
can_delete_metric = 1;
if (metric_id) {
@@ -497,7 +568,11 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
++ctx->stats.pg_cache_deletions;
- --pg_cache->page_descriptors;
+ if (update_page_duration)
+ --pg_cache->page_descriptors;
+
+ if (is_descr_journal_v2(descr))
+ --pg_cache->active_descriptors;
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
rrdeng_page_descr_mutex_lock(ctx, descr);
@@ -512,7 +587,7 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
}
}
if (remove_dirty) {
- pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
+ pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_INVALID);
} else {
/* even a locked page could be dirty */
while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
@@ -540,7 +615,8 @@ uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_d
}
destroy:
rrdeng_page_descr_freez(descr);
- pg_cache_update_metric_times(page_index);
+ if (update_page_duration)
+ pg_cache_update_metric_times(page_index);
return can_delete_metric;
}
@@ -556,33 +632,272 @@ static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t
(pg_start >= start_time && pg_start <= end_time);
}
-static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
+static uint32_t find_matching_page(struct journal_page_header *page_list_header, uint32_t delta_start_time_s)
{
- return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut);
+ uint32_t left = 0;
+ uint32_t right = page_list_header->entries;
+
+ while (left < right) {
+ struct journal_page_list *page_list = (struct journal_page_list *) ((uint8_t *) page_list_header + sizeof(*page_list_header));
+ struct journal_page_list *page_entry;
+
+ uint32_t middle_delta_start_s;
+ uint32_t middle_delta_end_s;
+
+ uint32_t middle = (left + right) >> 1;
+
+ page_entry = &page_list[middle];
+ middle_delta_start_s = page_entry->delta_start_s;
+ middle_delta_end_s = page_entry->delta_end_s;
+
+ if (delta_start_time_s >= middle_delta_start_s && delta_start_time_s <= middle_delta_end_s)
+ return middle;
+
+ if (delta_start_time_s < middle_delta_end_s)
+ right = middle;
+ else if(delta_start_time_s > middle_delta_end_s)
+ left = middle + 1;
+ else
+ return middle;
+ }
+ return right;
}
-/* The caller must hold the page index lock */
-static inline struct rrdeng_page_descr *
- find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
+bool descr_exists_unsafe( struct pg_cache_page_index *page_index, time_t start_time_s)
{
+ return (NULL != JudyLGet(page_index->JudyL_array, start_time_s, PJE0));
+}
+
+void mark_journalfile_descriptor( struct page_cache *pg_cache, struct rrdengine_journalfile *journalfile, uint32_t page_offset, uint32_t Index)
+{
+ Pvoid_t *PValue;
+
+ uv_rwlock_wrlock(&pg_cache->v2_lock);
+ PValue = JudyLIns(&journalfile->JudyL_array, (Word_t)page_offset, PJE0);
+ *(uint32_t *)PValue = (Index + 1);
+ journalfile->last_access = now_realtime_sec();
+ uv_rwlock_wrunlock(&pg_cache->v2_lock);
+}
+
+static void update_journal_access_time(struct rrdengine_journalfile *journalfile, struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
+{
+ if (journalfile) {
+ journalfile->last_access = now_realtime_sec();
+ return;
+ }
+
+ if (unlikely(!page_index || !descr))
+ return;
+
+ if (!is_descr_journal_v2(descr))
+ return;
+
+ struct rrdengine_instance *ctx = page_index->ctx;
+
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ struct rrdengine_datafile *datafile = ctx->datafiles.first;
+ while (datafile) {
+ journalfile = datafile->journalfile;
+ if (!journalfile->journal_data) {
+ datafile = datafile->next;
+ continue;
+ }
+ if (datafile->file == descr->file) {
+ journalfile->last_access = now_realtime_sec();
+ break;
+ }
+ datafile = datafile->next;
+ }
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+}
+
+// Note: We have read lock on page index
+// We release and escalate to write lock
+// Return to read lock when done
+static struct rrdeng_page_descr *add_pages_from_timerange(
+ struct journal_page_header *page_list_header,
+ uint32_t delta_start_time_s,
+ uint32_t delta_end_time_s,
+ usec_t journal_start_time_ut,
+ struct pg_cache_page_index *page_index,
+ struct journal_extent_list *extent_list,
+ struct rrdengine_datafile *datafile,
+ uint32_t cache_pages)
+{
+
+ struct rrdengine_instance *ctx = page_index->ctx;
+ struct page_cache *pg_cache = &ctx->pg_cache;
+ time_t journal_start_time_s = (time_t)(journal_start_time_ut / USEC_PER_SEC);
+ struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header));
+
+ uint32_t pos = find_matching_page(page_list_header, delta_start_time_s);
+
+ // This is the page offset that we will store to check for v2 descriptors later on
+ uint32_t page_offset = (uint8_t *) page_list_header - (uint8_t *) datafile->journalfile->journal_data;
+ uint32_t entries = page_list_header->entries;
+ uint32_t pages_to_cache = MIN(pos + cache_pages, entries);
+
struct rrdeng_page_descr *descr = NULL;
+
+ bool journal_updated = false;
+ bool rw_lock_acquired = false;
+
+ // We will cache pages_to_cache pages or until our end time is out of range
+ for (uint32_t x = pos; x < pages_to_cache; x++) {
+
+ struct journal_page_list *page_entry = &page_list[x];
+
+ if (page_entry->extent_index == UINT32_MAX)
+ continue;
+
+ if (delta_end_time_s < page_entry->delta_start_s)
+ break;
+
+ time_t index_time_s = (time_t) (journal_start_time_s + page_entry->delta_start_s);
+
+ if (!descr_exists_unsafe(page_index, index_time_s)) {
+ struct rrdeng_page_descr *new_descr = pg_cache_create_descr();
+ new_descr->page_length = page_entry->page_length;
+ new_descr->start_time_ut = index_time_s * USEC_PER_SEC;
+ new_descr->end_time_ut = (journal_start_time_s + page_entry->delta_end_s) * USEC_PER_SEC;
+ new_descr->id = &page_index->id;
+ new_descr->extent = NULL;
+ new_descr->extent_entry = &extent_list[page_entry->extent_index];
+ new_descr->type = page_entry->type;
+ new_descr->update_every_s = page_entry->update_every_s;
+ new_descr->file = datafile->file;
+
+ if (false == rw_lock_acquired) {
+ uv_rwlock_rdunlock(&page_index->lock);
+ uv_rwlock_wrlock(&page_index->lock);
+ rw_lock_acquired = true;
+ }
+
+ struct rrdeng_page_descr *added_descr = pg_cache_insert(ctx, page_index, new_descr, false);
+ if (unlikely(added_descr != new_descr))
+ rrdeng_page_descr_freez(new_descr);
+
+ if (!descr) {
+ descr = added_descr;
+ // Mark the area to check
+ mark_journalfile_descriptor(pg_cache, datafile->journalfile, page_offset, x);
+ journal_updated = true;
+ }
+ }
+ }
+
+ if (!journal_updated)
+ update_journal_access_time(datafile->journalfile, NULL, NULL);
+
+ // Check if we have switched to rw lock for the page index and switch back
+ if (rw_lock_acquired) {
+ uv_rwlock_wrunlock(&page_index->lock);
+ uv_rwlock_rdlock(&page_index->lock);
+ }
+ return descr;
+};
+
+static int journal_metric_uuid_compare(const void *key, const void *metric)
+{
+ return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
+}
+
+//
+// Steps
+// 1. Find which journal has the start time within its range
+// 2. Find the UUID in that journal
+// 3. Find the array of times for that UUID (convert from the journal header to the offset needed)
+// Note: We have page_index lock
+// cache pages is the maximum pages to fetch (create metadata for)
+// This will be limited by the end_time or if we run out of pages in the matching journal
+// pages that could be precached but exist in another journal will not be precached
+static struct rrdeng_page_descr *populate_page_index(
+ struct pg_cache_page_index *page_index,
+ usec_t start_time_ut,
+ usec_t end_time_ut,
+ uint32_t cache_pages)
+{
+ struct rrdengine_instance *ctx = page_index->ctx;
+
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+
+ struct rrdengine_datafile *datafile = ctx->datafiles.first;
+ while (datafile) {
+ struct journal_v2_header *journal_header = (struct journal_v2_header *) datafile->journalfile->journal_data;
+ if (!journal_header) {
+ datafile = datafile->next;
+ continue;
+ }
+ if (start_time_ut >= journal_header->start_time_ut && start_time_ut <= journal_header->end_time_ut) {
+
+ struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) journal_header + journal_header->metric_offset);
+
+ struct journal_metric_list *uuid_entry = bsearch(
+ &page_index->id,
+ uuid_list,
+ (size_t)journal_header->metric_count,
+ sizeof(struct journal_metric_list),
+ journal_metric_uuid_compare);
+
+ uint32_t delta_start_time = (start_time_ut - journal_header->start_time_ut) / USEC_PER_SEC;
+ uint32_t delta_end_time = (end_time_ut - journal_header->start_time_ut) / USEC_PER_SEC;
+
+ if (uuid_entry && ((delta_start_time >= uuid_entry->delta_start && delta_start_time <= uuid_entry->delta_end))) {
+
+ struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) journal_header + uuid_entry->page_offset);
+ struct rrdeng_page_descr *descr = add_pages_from_timerange(
+ page_list_header,
+ delta_start_time,
+ delta_end_time,
+ journal_header->start_time_ut,
+ page_index,
+ (void *)((uint8_t *)journal_header + journal_header->extent_offset),
+ datafile,
+ cache_pages);
+
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+ return descr;
+ }
+ }
+ datafile = datafile->next;
+ }
+
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
+ return NULL;
+}
+
+/* The caller must hold the page index lock */
+static inline struct rrdeng_page_descr *find_first_page_in_time_range(
+ struct pg_cache_page_index *page_index,
+ usec_t start_time_ut,
+ usec_t end_time_ut,
+ uint32_t cache_pages)
+{
+ struct rrdeng_page_descr *descr= NULL;
+
Pvoid_t *PValue;
Word_t Index;
- Index = (Word_t)(start_time / USEC_PER_SEC);
+ Index = (Word_t) (start_time_ut / USEC_PER_SEC);
PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
if (likely(NULL != PValue)) {
descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
+ if (is_page_in_time_range(descr, start_time_ut, end_time_ut)) {
+ update_journal_access_time(NULL, page_index, descr);
return descr;
}
}
- Index = (Word_t)(start_time / USEC_PER_SEC);
+ descr = populate_page_index(page_index, start_time_ut, end_time_ut, cache_pages);
+ if (descr)
+ return descr;
+
+ Index = (Word_t) (start_time_ut / USEC_PER_SEC);
PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
if (likely(NULL != PValue)) {
descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
+ if (is_page_in_time_range(descr, start_time_ut, end_time_ut)) {
+ update_journal_access_time(NULL, page_index, descr);
return descr;
}
}
@@ -638,9 +953,13 @@ void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
page_index->latest_time_ut = latest_time;
}
+
/* If index is NULL lookup by UUID (descr->id) */
-void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_descr *descr)
+struct rrdeng_page_descr *pg_cache_insert(
+ struct rrdengine_instance *ctx,
+ struct pg_cache_page_index *index,
+ struct rrdeng_page_descr *descr,
+ bool lock_and_count)
{
struct page_cache *pg_cache = &ctx->pg_cache;
Pvoid_t *PValue;
@@ -669,44 +988,34 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
page_index = index;
}
- uv_rwlock_wrlock(&page_index->lock);
+ if (lock_and_count)
+ uv_rwlock_wrlock(&page_index->lock);
+
PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0);
+ fatal_assert(NULL != PValue);
+
+ if (unlikely(*PValue) && !lock_and_count)
+ return *PValue;
+
*PValue = descr;
- ++page_index->page_count;
- pg_cache_add_new_metric_time(page_index, descr);
- uv_rwlock_wrunlock(&page_index->lock);
+ if (lock_and_count)
+ ++page_index->page_count;
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- ++ctx->stats.pg_cache_insertions;
- ++pg_cache->page_descriptors;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
-}
+ pg_cache_add_new_metric_time(page_index, descr);
-usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
+ if (lock_and_count)
+ uv_rwlock_wrunlock(&page_index->lock);
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- return INVALID_TIME;
+ uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ if (lock_and_count) {
+ ++ctx->stats.pg_cache_insertions;
+ ++pg_cache->page_descriptors;
}
+ if (is_descr_journal_v2(descr))
+ ++pg_cache->active_descriptors;
+ uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- uv_rwlock_rdlock(&page_index->lock);
- descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
- if (NULL == descr) {
- uv_rwlock_rdunlock(&page_index->lock);
- return INVALID_TIME;
- }
- uv_rwlock_rdunlock(&page_index->lock);
- return descr->start_time_ut;
+ return descr;
}
/**
@@ -758,8 +1067,11 @@ void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_c
* 2. It did not succeed to get a reference.
* 3. It did not succeed to reserve a spot in the page cache.
*/
-struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
- usec_t start_time_ut)
+struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(
+ struct rrdengine_instance *ctx,
+ uuid_t(*id),
+ usec_t start_time_ut,
+ struct pg_alignment *alignment)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct rrdeng_page_descr *descr = NULL;
@@ -776,6 +1088,11 @@ struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_
}
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ if (page_index && page_index->alignment && alignment && page_index->alignment != alignment) {
+ if (pg_cache->populated_pages >= pg_cache_warn_limit(ctx))
+ return NULL;
+ }
+
if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) {
/* Failed to find page or failed to reserve a spot in the cache */
return NULL;
@@ -855,7 +1172,7 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
}
uv_rwlock_rdlock(&page_index->lock);
- descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut, PAGE_CACHE_MAX_PRELOAD_PAGES);
if (NULL == descr) {
uv_rwlock_rdunlock(&page_index->lock);
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
@@ -869,12 +1186,18 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
*page_info_arrayp = mallocz(page_info_array_max_size);
}
+ struct rrdeng_page_descr *last_descr = NULL;
for (count = 0, preload_count = 0 ;
descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ;
PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
/* Iterate all pages in range */
+ if (last_descr == descr)
+ break;
+
+ last_descr = descr;
+
if (unlikely(0 == descr->page_length))
continue;
if (page_info_arrayp) {
@@ -907,7 +1230,6 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
}
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
-
}
uv_rwlock_rdunlock(&page_index->lock);
@@ -933,7 +1255,8 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
if (NULL == next) {
continue;
}
- if (descr->extent == next->extent) {
+ if ((descr->extent && descr->extent == next->extent) ||
+ ((descr->extent_entry && descr->extent_entry == next->extent_entry))) {
/* same extent, consolidate */
if (!pg_cache_try_reserve_pages(ctx, 1)) {
failed_to_reserve = 1;
@@ -969,113 +1292,6 @@ unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t sta
}
/*
- * Searches for a page and gets a reference.
- * When point_in_time is INVALID_TIME get any page.
- * If index is NULL lookup by UUID (id).
- */
-struct rrdeng_page_descr *
- pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t point_in_time_ut)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL;
- struct page_cache_descr *pg_cache_descr = NULL;
- unsigned long flags;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
- Word_t Index;
- uint8_t page_not_in_cache;
-
- if (unlikely(NULL == index)) {
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- return NULL;
- }
- } else {
- page_index = index;
- }
- pg_cache_reserve_pages(ctx, 1);
-
- page_not_in_cache = 0;
- uv_rwlock_rdlock(&page_index->lock);
- while (1) {
- Index = (Word_t)(point_in_time_ut / USEC_PER_SEC);
- PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- }
- if (NULL == PValue ||
- 0 == descr->page_length ||
- (INVALID_TIME != point_in_time_ut &&
- !is_point_in_time_in_page(descr, point_in_time_ut))) {
- /* non-empty page not found */
- uv_rwlock_rdunlock(&page_index->lock);
-
- pg_cache_release_pages(ctx, 1);
- return NULL;
- }
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- flags = pg_cache_descr->flags;
- if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
- /* success */
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
- break;
- }
- if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- struct rrdeng_cmd cmd;
-
- uv_rwlock_rdunlock(&page_index->lock);
-
- cmd.opcode = RRDENG_READ_PAGE;
- cmd.read_page.page_cache_descr = descr;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
-
- debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
- pg_cache_wait_event_unsafe(descr);
- }
- /* success */
- /* Downgrade exclusive reference to allow other readers */
- pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
- pg_cache_wake_up_waiters_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- return descr;
- }
- uv_rwlock_rdunlock(&page_index->lock);
- debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- if (!(flags & RRD_PAGE_POPULATED))
- page_not_in_cache = 1;
- pg_cache_wait_event_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
-
- /* reset scan to find again */
- uv_rwlock_rdlock(&page_index->lock);
- }
- uv_rwlock_rdunlock(&page_index->lock);
-
- if (!(flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_set_hot(ctx, descr);
- pg_cache_release_pages(ctx, 1);
- if (page_not_in_cache)
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- else
- rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
- return descr;
-}
-
-/*
* Searches for the first page between start_time and end_time and gets a reference.
* start_time and end_time are inclusive.
* If index is NULL lookup by UUID (id).
@@ -1111,7 +1327,7 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
uv_rwlock_rdlock(&page_index->lock);
int retry_count = 0;
while (1) {
- descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
+ descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut, PAGE_CACHE_MAX_PRELOAD_PAGES);
if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) {
/* non-empty page not found */
if (retry_count == default_rrdeng_page_fetch_retries)
@@ -1124,6 +1340,22 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
flags = pg_cache_descr->flags;
+
+ if ((flags & RRD_PAGE_INVALID)) {
+ bool can_drop_page = pg_cache_try_get_unsafe(descr, 1);
+ rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ uv_rwlock_rdunlock(&page_index->lock);
+ pg_cache_release_pages(ctx, 1);
+
+ if (likely(can_drop_page)) {
+ info("Dropping invalid page descr=%lu - pg_cache=%lu - Ref=%u", descr->pg_cache_descr_state,
+ descr->pg_cache_descr->flags, descr->pg_cache_descr->refcnt);
+ pg_cache_punch_hole(ctx, descr, 0, 1, NULL, false);
+ }
+ return NULL;
+ }
+
if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
/* success */
rrdeng_page_descr_mutex_unlock(ctx, descr);
@@ -1165,7 +1397,6 @@ pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index
++retry_count;
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
-
/* reset scan to find again */
uv_rwlock_rdlock(&page_index->lock);
}
@@ -1186,9 +1417,9 @@ struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_insta
struct pg_cache_page_index *page_index;
page_index = mallocz(sizeof(*page_index));
+ fatal_assert(0 == uv_rwlock_init(&page_index->lock));
page_index->JudyL_array = (Pvoid_t) NULL;
uuid_copy(page_index->id, *id);
- fatal_assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time_ut = INVALID_TIME;
page_index->latest_time_ut = INVALID_TIME;
page_index->prev = NULL;
@@ -1235,12 +1466,15 @@ void init_page_cache(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->page_descriptors = 0;
+ pg_cache->active_descriptors = 0;
pg_cache->populated_pages = 0;
fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
init_metrics_index(ctx);
init_replaceQ(ctx);
init_committed_page_index(ctx);
+
+ fatal_assert(0 == uv_rwlock_init(&pg_cache->v2_lock));
}
void free_page_cache(struct rrdengine_instance *ctx)
@@ -1304,3 +1538,4 @@ void free_page_cache(struct rrdengine_instance *ctx)
fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes);
}
+