// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
/* Default global database instance */
static struct rrdengine_instance default_global_ctx;
int default_rrdeng_page_cache_mb = 32;
int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB;
/*
* Gets a handle for storing metrics to the database.
* The handle must be released with rrdeng_store_metric_final().
*/
void rrdeng_store_metric_init(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct page_cache *pg_cache;
struct rrdengine_instance *ctx;
uuid_t temp_id;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index;
EVP_MD_CTX *evpctx;
unsigned char hash_value[EVP_MAX_MD_SIZE];
unsigned int hash_len;
//&default_global_ctx; TODO: test this use case or remove it?
ctx = rd->rrdset->rrdhost->rrdeng_ctx;
pg_cache = &ctx->pg_cache;
handle = &rd->state->handle.rrdeng;
handle->ctx = ctx;
evpctx = EVP_MD_CTX_create();
EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
EVP_DigestUpdate(evpctx, rd->id, strlen(rd->id));
EVP_DigestUpdate(evpctx, rd->rrdset->id, strlen(rd->rrdset->id));
EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
EVP_MD_CTX_destroy(evpctx);
assert(hash_len > sizeof(temp_id));
memcpy(&temp_id, hash_value, sizeof(temp_id));
handle->descr = NULL;
handle->prev_descr = NULL;
handle->unaligned_page = 0;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t));
if (likely(NULL != PValue)) {
page_index = *PValue;
}
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
if (NULL == PValue) {
/* First time we see the UUID */
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(&temp_id);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
rd->state->rrdeng_uuid = &page_index->id;
handle->page_index = page_index;
}
/* The page must be populated and referenced */
static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
{
unsigned i;
uint8_t has_only_empty_metrics = 1;
storage_number *page;
page = descr->pg_cache_descr->page;
for (i = 0 ; i < descr->page_length / sizeof(storage_number); ++i) {
if (SN_EMPTY_SLOT != page[i]) {
has_only_empty_metrics = 0;
break;
}
}
return has_only_empty_metrics;
}
void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
{
struct rrdeng_collect_handle *handle;
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
descr = handle->descr;
if (unlikely(NULL == descr)) {
return;
}
if (likely(descr->page_length)) {
int ret, page_is_empty;
rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
if (handle->prev_descr) {
/* unpin old second page */
pg_cache_put(ctx, handle->prev_descr);
}
page_is_empty = page_has_only_empty_metrics(descr);
if (page_is_empty) {
debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_put(ctx, descr);
pg_cache_punch_hole(ctx, descr, 1);
handle->prev_descr = NULL;
} else {
/* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
rrdeng_page_descr_mutex_lock(ctx, descr);
ret = pg_cache_try_get_unsafe(descr, 0);
rrdeng_page_descr_mutex_unlock(ctx, descr);
assert (1 == ret);
rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
handle->prev_descr = descr;
}
} else {
freez(descr->pg_cache_descr->page);
rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);