// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "rrdengine.h"
/* Forward declerations */
static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
/* always inserts into tail */
static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
if (likely(NULL != pg_cache->replaceQ.tail)) {
pg_cache_descr->prev = pg_cache->replaceQ.tail;
pg_cache->replaceQ.tail->next = pg_cache_descr;
}
if (unlikely(NULL == pg_cache->replaceQ.head)) {
pg_cache->replaceQ.head = pg_cache_descr;
}
pg_cache->replaceQ.tail = pg_cache_descr;
}
static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
prev = pg_cache_descr->prev;
next = pg_cache_descr->next;
if (likely(NULL != prev)) {
prev->next = next;
}
if (likely(NULL != next)) {
next->prev = prev;
}
if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
pg_cache->replaceQ.head = next;
}
if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
pg_cache->replaceQ.tail = prev;
}
pg_cache_descr->prev = pg_cache_descr->next = NULL;
}
void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
pg_cache_replaceQ_insert_unsafe(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
pg_cache_replaceQ_delete_unsafe(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
struct rrdeng_page_descr *descr)
{
struct page_cache *pg_cache = &ctx->pg_cache;
uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
pg_cache_replaceQ_delete_unsafe(ctx, descr);
pg_cache_replaceQ_insert_unsafe(ctx, descr);
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
}
struct rrdeng_page_descr *pg_cache_create_descr(void)
{
struct rrdeng_page_descr *descr;
descr = mallocz(sizeof(*descr));
descr->page_length = 0;
descr->start_time = INVALID_TIME;
descr->end_time = INVALID_TIME;
descr->id = NULL;
descr->extent = NULL;
descr->pg_cache_descr_state = 0;
descr->pg_cache_descr = NULL;
return descr;
}
/* The caller must hold page descriptor lock. */
void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
if (pg_cache_descr->waiters)
uv_cond_broadcast(&pg_cache_descr->cond);
}
/*
* The caller must hold page descriptor lock.
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
++pg_cache_descr->waiters;
uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
--pg_cache_descr->waiters;
}
/*
* Returns page flags.
* The lock will be released and re-acquired. The descriptor is not guaranteed
* to exist after this function returns.
*/
unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
unsigned long flags;
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_wait_event_unsafe(descr);
flags = pg_cache_descr->flags;
rrdeng_page_descr_mutex_unlock(ctx, descr);
return