summaryrefslogtreecommitdiffstats
path: root/database/engine/pdc.c
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2024-02-01 13:41:44 +0200
committerGitHub <noreply@github.com>2024-02-01 13:41:44 +0200
commit115d074a6cc7b0ede2f7002c8ee43b6d99cf64e1 (patch)
treec30d39f0ad7106a010322ba3aecd59944a3c4e55 /database/engine/pdc.c
parent2edc0c4890558bea4108ee65c00c1d0da7cb8a99 (diff)
Create a top-level directory to contain source code. (#16896)
* Move ML under src * Move spwan under src * Move cli/ under src/ * move registry/ under src/ * move streaming/ under src/ * Move claim under src. Update docs * Move database/ under src/ * Move libnetdata/ under src/ * Update references to libnetdata * Fix logsmanagement includes * Update generated script path.
Diffstat (limited to 'database/engine/pdc.c')
-rw-r--r--database/engine/pdc.c1334
1 files changed, 0 insertions, 1334 deletions
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
deleted file mode 100644
index 1b57a49ba6..0000000000
--- a/database/engine/pdc.c
+++ /dev/null
@@ -1,1334 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#define NETDATA_RRD_INTERNALS
-#include "pdc.h"
-
-struct extent_page_details_list {
- uv_file file;
- uint64_t extent_offset;
- uint32_t extent_size;
- unsigned number_of_pages_in_JudyL;
- Pvoid_t page_details_by_metric_id_JudyL;
- struct page_details_control *pdc;
- struct rrdengine_datafile *datafile;
-
- struct rrdeng_cmd *cmd;
- bool head_to_datafile_extent_queries_pending_for_extent;
-
- struct {
- struct extent_page_details_list *prev;
- struct extent_page_details_list *next;
- } query;
-};
-
-typedef struct datafile_extent_offset_list {
- uv_file file;
- unsigned fileno;
- Pvoid_t extent_pd_list_by_extent_offset_JudyL;
-} DEOL;
-
-// ----------------------------------------------------------------------------
-// PDC cache
-
-static struct {
- struct {
- ARAL *ar;
- } pdc;
-
- struct {
- ARAL *ar;
- } pd;
-
- struct {
- ARAL *ar;
- } epdl;
-
- struct {
- ARAL *ar;
- } deol;
-} pdc_globals = {};
-
-void pdc_init(void) {
- pdc_globals.pdc.ar = aral_create(
- "dbengine-pdc",
- sizeof(PDC),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-PDC *pdc_get(void) {
- PDC *pdc = aral_mallocz(pdc_globals.pdc.ar);
- memset(pdc, 0, sizeof(PDC));
- return pdc;
-}
-
-static void pdc_release(PDC *pdc) {
- aral_freez(pdc_globals.pdc.ar, pdc);
-}
-
-size_t pdc_cache_size(void) {
- return aral_overhead(pdc_globals.pdc.ar) + aral_structures(pdc_globals.pdc.ar);
-}
-
-// ----------------------------------------------------------------------------
-// PD cache
-
-void page_details_init(void) {
- pdc_globals.pd.ar = aral_create(
- "dbengine-pd",
- sizeof(struct page_details),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-struct page_details *page_details_get(void) {
- struct page_details *pd = aral_mallocz(pdc_globals.pd.ar);
- memset(pd, 0, sizeof(struct page_details));
- return pd;
-}
-
-static void page_details_release(struct page_details *pd) {
- aral_freez(pdc_globals.pd.ar, pd);
-}
-
-size_t pd_cache_size(void) {
- return aral_overhead(pdc_globals.pd.ar) + aral_structures(pdc_globals.pd.ar);
-}
-
-// ----------------------------------------------------------------------------
-// epdl cache
-
-void epdl_init(void) {
- pdc_globals.epdl.ar = aral_create(
- "dbengine-epdl",
- sizeof(EPDL),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-static EPDL *epdl_get(void) {
- EPDL *epdl = aral_mallocz(pdc_globals.epdl.ar);
- memset(epdl, 0, sizeof(EPDL));
- return epdl;
-}
-
-static void epdl_release(EPDL *epdl) {
- aral_freez(pdc_globals.epdl.ar, epdl);
-}
-
-size_t epdl_cache_size(void) {
- return aral_overhead(pdc_globals.epdl.ar) + aral_structures(pdc_globals.epdl.ar);
-}
-
-// ----------------------------------------------------------------------------
-// deol cache
-
-void deol_init(void) {
- pdc_globals.deol.ar = aral_create(
- "dbengine-deol",
- sizeof(DEOL),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-static DEOL *deol_get(void) {
- DEOL *deol = aral_mallocz(pdc_globals.deol.ar);
- memset(deol, 0, sizeof(DEOL));
- return deol;
-}
-
-static void deol_release(DEOL *deol) {
- aral_freez(pdc_globals.deol.ar, deol);
-}
-
-size_t deol_cache_size(void) {
- return aral_overhead(pdc_globals.deol.ar) + aral_structures(pdc_globals.deol.ar);
-}
-
-// ----------------------------------------------------------------------------
-// extent with buffer cache
-
-static struct {
- struct {
- SPINLOCK spinlock;
- struct extent_buffer *available_items;
- size_t available;
- } protected;
-
- struct {
- size_t allocated;
- size_t allocated_bytes;
- } atomics;
-
- size_t max_size;
-
-} extent_buffer_globals = {
- .protected = {
- .spinlock = NETDATA_SPINLOCK_INITIALIZER,
- .available_items = NULL,
- .available = 0,
- },
- .atomics = {
- .allocated = 0,
- .allocated_bytes = 0,
- },
- .max_size = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE,
-};
-
-void extent_buffer_init(void) {
- size_t max_extent_uncompressed = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE;
- size_t max_size = (size_t)LZ4_compressBound(MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE);
- if(max_size < max_extent_uncompressed)
- max_size = max_extent_uncompressed;
-
- extent_buffer_globals.max_size = max_size;
-}
-
-void extent_buffer_cleanup1(void) {
- struct extent_buffer *item = NULL;
-
- if(!spinlock_trylock(&extent_buffer_globals.protected.spinlock))
- return;
-
- if(extent_buffer_globals.protected.available_items && extent_buffer_globals.protected.available > 1) {
- item = extent_buffer_globals.protected.available_items;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, item, cache.prev, cache.next);
- extent_buffer_globals.protected.available--;
- }
-
- spinlock_unlock(&extent_buffer_globals.protected.spinlock);
-
- if(item) {
- size_t bytes = sizeof(struct extent_buffer) + item->bytes;
- freez(item);
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
- }
-}
-
-struct extent_buffer *extent_buffer_get(size_t size) {
- internal_fatal(size > extent_buffer_globals.max_size, "DBENGINE: extent size is too big");
-
- struct extent_buffer *eb = NULL;
-
- if(size < extent_buffer_globals.max_size)
- size = extent_buffer_globals.max_size;
-
- spinlock_lock(&extent_buffer_globals.protected.spinlock);
- if(likely(extent_buffer_globals.protected.available_items)) {
- eb = extent_buffer_globals.protected.available_items;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
- extent_buffer_globals.protected.available--;
- }
- spinlock_unlock(&extent_buffer_globals.protected.spinlock);
-
- if(unlikely(eb && eb->bytes < size)) {
- size_t bytes = sizeof(struct extent_buffer) + eb->bytes;
- freez(eb);
- eb = NULL;
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
- }
-
- if(unlikely(!eb)) {
- size_t bytes = sizeof(struct extent_buffer) + size;
- eb = mallocz(bytes);
- eb->bytes = size;
- __atomic_add_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
- }
-
- return eb;
-}
-
-void extent_buffer_release(struct extent_buffer *eb) {
- if(unlikely(!eb)) return;
-
- spinlock_lock(&extent_buffer_globals.protected.spinlock);
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
- extent_buffer_globals.protected.available++;
- spinlock_unlock(&extent_buffer_globals.protected.spinlock);
-}
-
-size_t extent_buffer_cache_size(void) {
- return __atomic_load_n(&extent_buffer_globals.atomics.allocated_bytes, __ATOMIC_RELAXED);
-}
-
-// ----------------------------------------------------------------------------
-// epdl logic
-
-static void epdl_destroy(EPDL *epdl)
-{
- Pvoid_t *pd_by_start_time_s_JudyL;
- Word_t metric_id_index = 0;
- bool metric_id_first = true;
- while ((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(
- epdl->page_details_by_metric_id_JudyL,
- &metric_id_index, &metric_id_first)))
- PDCJudyLFreeArray(pd_by_start_time_s_JudyL, PJE0);
-
- PDCJudyLFreeArray(&epdl->page_details_by_metric_id_JudyL, PJE0);
- epdl_release(epdl);
-}
-
-static void epdl_mark_all_not_loaded_pages_as_failed(EPDL *epdl, PDC_PAGE_STATUS tags, size_t *statistics_counter)
-{
- size_t pages_matched = 0;
-
- Word_t metric_id_index = 0;
- bool metric_id_first = true;
- Pvoid_t *pd_by_start_time_s_JudyL;
- while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
-
- Word_t start_time_index = 0;
- bool start_time_first = true;
- Pvoid_t *PValue;
- while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
- struct page_details *pd = *PValue;
-
- if(!pd->page && !pdc_page_status_check(pd, PDC_PAGE_FAILED|PDC_PAGE_READY)) {
- pdc_page_status_set(pd, PDC_PAGE_FAILED | tags);
- pages_matched++;
- }
- }
- }
-
- if(pages_matched && statistics_counter)
- __atomic_add_fetch(statistics_counter, pages_matched, __ATOMIC_RELAXED);
-}
-/*
-static bool epdl_check_if_pages_are_already_in_cache(struct rrdengine_instance *ctx, EPDL *epdl, PDC_PAGE_STATUS tags)
-{
- size_t count_remaining = 0;
- size_t found = 0;
-
- Word_t metric_id_index = 0;
- bool metric_id_first = true;
- Pvoid_t *pd_by_start_time_s_JudyL;
- while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
-
- Word_t start_time_index = 0;
- bool start_time_first = true;
- Pvoid_t *PValue;
- while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
- struct page_details *pd = *PValue;
- if (pd->page)
- continue;
-
- pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
- if (pd->page) {
- found++;
- pdc_page_status_set(pd, PDC_PAGE_READY | tags);
- }
- else
- count_remaining++;
- }
- }
-
- if(found) {
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_preloaded, found, __ATOMIC_RELAXED);
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, found, __ATOMIC_RELAXED);
- }
-
- return count_remaining == 0;
-}
-*/
-
-// ----------------------------------------------------------------------------
-// PDC logic
-
-static void pdc_destroy(PDC *pdc) {
- mrg_metric_release(main_mrg, pdc->metric);
- completion_destroy(&pdc->prep_completion);
- completion_destroy(&pdc->page_completion);
-
- Pvoid_t *PValue;
- struct page_details *pd;
- Word_t time_index = 0;
- bool first_then_next = true;
- size_t unroutable = 0, cancelled = 0;
- while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
- pd = *PValue;
-
- // no need for atomics here - we are done...
- PDC_PAGE_STATUS status = pd->status;
-
- if(status & PDC_PAGE_DATAFILE_ACQUIRED) {
- datafile_release(pd->datafile.ptr, DATAFILE_ACQUIRE_PAGE_DETAILS);
- pd->datafile.ptr = NULL;
- }
-
- internal_fatal(pd->datafile.ptr, "DBENGINE: page details has a datafile.ptr that is not released.");
-
- if(!pd->page && !(status & (PDC_PAGE_READY | PDC_PAGE_FAILED | PDC_PAGE_RELEASED | PDC_PAGE_SKIP | PDC_PAGE_INVALID | PDC_PAGE_CANCELLED))) {
- // pdc_page_status_set(pd, PDC_PAGE_FAILED);
- unroutable++;
- }
- else if(!pd->page && (status & PDC_PAGE_CANCELLED))
- cancelled++;
-
- if(pd->page && !(status & PDC_PAGE_RELEASED)) {
- pgc_page_release(main_cache, pd->page);
- // pdc_page_status_set(pd, PDC_PAGE_RELEASED);
- }
-
- page_details_release(pd);
- }
-
- PDCJudyLFreeArray(&pdc->page_list_JudyL, PJE0);
-
- __atomic_sub_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&pdc->ctx->atomic.inflight_queries, 1, __ATOMIC_RELAXED);
- pdc_release(pdc);
-
- if(unroutable)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_unroutable, unroutable, __ATOMIC_RELAXED);
-
- if(cancelled)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_cancelled, cancelled, __ATOMIC_RELAXED);
-}
-
-void pdc_acquire(PDC *pdc) {
- spinlock_lock(&pdc->refcount_spinlock);
-
- if(pdc->refcount < 1)
- fatal("DBENGINE: pdc is not referenced and cannot be acquired");
-
- pdc->refcount++;
- spinlock_unlock(&pdc->refcount_spinlock);
-}
-
-bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) {
- if(unlikely(!pdc))
- return true;
-
- spinlock_lock(&pdc->refcount_spinlock);
-
- if(pdc->refcount <= 0)
- fatal("DBENGINE: pdc is not referenced and cannot be released");
-
- pdc->refcount--;
-
- if (pdc->refcount <= 1 && worker) {
- // when 1 refcount is remaining, and we are a worker,
- // we can mark the job completed:
- // - if the remaining refcount is from the query caller, we will wake it up
- // - if the remaining refcount is from another worker, the query thread is already away
- completion_mark_complete(&pdc->page_completion);
- }
-
- if (pdc->refcount == 0) {
- spinlock_unlock(&pdc->refcount_spinlock);
- pdc_destroy(pdc);
- return true;
- }
-
- spinlock_unlock(&pdc->refcount_spinlock);
- return false;
-}
-
-void epdl_cmd_queued(void *epdl_ptr, struct rrdeng_cmd *cmd) {
- EPDL *epdl = epdl_ptr;
- epdl->cmd = cmd;
-}
-
-void epdl_cmd_dequeued(void *epdl_ptr) {
- EPDL *epdl = epdl_ptr;
- epdl->cmd = NULL;
-}
-
-static struct rrdeng_cmd *epdl_get_cmd(void *epdl_ptr) {
- EPDL *epdl = epdl_ptr;
- return epdl->cmd;
-}
-
-static bool epdl_pending_add(EPDL *epdl) {
- bool added_new;
-
- spinlock_lock(&epdl->datafile->extent_queries.spinlock);
- Pvoid_t *PValue = JudyLIns(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
- internal_fatal(!PValue || PValue == PJERR, "DBENGINE: corrupted pending extent judy");
-
- EPDL *base = *PValue;
-
- if(!base) {
- added_new = true;
- epdl->head_to_datafile_extent_queries_pending_for_extent = true;
- }
- else {
- added_new = false;
- epdl->head_to_datafile_extent_queries_pending_for_extent = false;
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_extent_merged, 1, __ATOMIC_RELAXED);
-
- if(base->pdc->priority > epdl->pdc->priority)
- rrdeng_req_cmd(epdl_get_cmd, base, epdl->pdc->priority);
- }
-
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, epdl, query.prev, query.next);
- *PValue = base;
-
- spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
-
- return added_new;
-}
-
-static void epdl_pending_del(EPDL *epdl) {
- spinlock_lock(&epdl->datafile->extent_queries.spinlock);
- if(epdl->head_to_datafile_extent_queries_pending_for_extent) {
- epdl->head_to_datafile_extent_queries_pending_for_extent = false;
- int rc = JudyLDel(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
- (void) rc;
- internal_fatal(!rc, "DBENGINE: epdl not found in pending list");
- }
- spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
-}
-
-void pdc_to_epdl_router(struct rrdengine_instance *ctx, PDC *pdc, execute_extent_page_details_list_t exec_first_extent_list, execute_extent_page_details_list_t exec_rest_extent_list)
-{
- Pvoid_t *PValue;
- Pvoid_t *PValue1;
- Pvoid_t *PValue2;
- Word_t time_index = 0;
- struct page_details *pd = NULL;
-
- // this is the entire page list
- // Lets do some deduplication
- // 1. Per datafile
- // 2. Per extent
- // 3. Pages per extent will be added to the cache either as acquired or not
-
- Pvoid_t JudyL_datafile_list = NULL;
-
- DEOL *deol;
- EPDL *epdl;
-
- if (pdc->page_list_JudyL) {
- bool first_then_next = true;
- while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
- pd = *PValue;
-
- internal_fatal(!pd,
- "DBENGINE: pdc page list has an empty page details entry");
-
- if (!(pd->status & PDC_PAGE_DISK_PENDING))
- continue;
-
- internal_fatal(!(pd->status & PDC_PAGE_DATAFILE_ACQUIRED),
- "DBENGINE: page details has not acquired the datafile");
-
- internal_fatal((pd->status & (PDC_PAGE_READY | PDC_PAGE_FAILED)),
- "DBENGINE: page details has disk pending flag but it is ready/failed");
-
- internal_fatal(pd->page,
- "DBENGINE: page details has a page linked to it, but it is marked for loading");
-
- PValue1 = PDCJudyLIns(&JudyL_datafile_list, pd->datafile.fileno, PJE0);
- if (PValue1 && !*PValue1) {
- *PValue1 = deol = deol_get();
- deol->extent_pd_list_by_extent_offset_JudyL = NULL;
- deol->fileno = pd->datafile.fileno;
- }
- else
- deol = *PValue1;
-
- PValue2 = PDCJudyLIns(&deol->extent_pd_list_by_extent_offset_JudyL, pd->datafile.extent.pos, PJE0);
- if (PValue2 && !*PValue2) {
- *PValue2 = epdl = epdl_get();
- epdl->page_details_by_metric_id_JudyL = NULL;
- epdl->number_of_pages_in_JudyL = 0;
- epdl->file = pd->datafile.file;
- epdl->extent_offset = pd->datafile.extent.pos;
- epdl->extent_size = pd->datafile.extent.bytes;
- epdl->datafile = pd->datafile.ptr;
- }
- else
- epdl = *PValue2;
-
- epdl->number_of_pages_in_JudyL++;
-
- Pvoid_t *pd_by_first_time_s_judyL = PDCJudyLIns(&epdl->page_details_by_metric_id_JudyL, pd->metric_id, PJE0);
- Pvoid_t *pd_pptr = PDCJudyLIns(pd_by_first_time_s_judyL, pd->first_time_s, PJE0);
- *pd_pptr = pd;
- }
-
- size_t extent_list_no = 0;
- Word_t datafile_no = 0;
- first_then_next = true;
- while((PValue = PDCJudyLFirstThenNext(JudyL_datafile_list, &datafile_no, &first_then_next))) {
- deol = *PValue;
-
- bool first_then_next_extent = true;
- Word_t pos = 0;
- while ((PValue = PDCJudyLFirstThenNext(deol->extent_pd_list_by_extent_offset_JudyL, &pos, &first_then_next_extent))) {
- epdl = *PValue;
- internal_fatal(!epdl, "DBENGINE: extent_list is not populated properly");
-
- // The extent page list can be dispatched to a worker
- // It will need to populate the cache with "acquired" pages that are in the list (pd) only
- // the rest of the extent pages will be added to the cache butnot acquired
-
- pdc_acquire(pdc); // we do this for the next worker: do_read_extent_work()
- epdl->pdc = pdc;
-
- if(epdl_pending_add(epdl)) {
- if (extent_list_no++ == 0)
- exec_first_extent_list(ctx, epdl, pdc->priority);
- else
- exec_rest_extent_list(ctx, epdl, pdc->priority);
- }
- }
- PDCJudyLFreeArray(&deol->extent_pd_list_by_extent_offset_JudyL, PJE0);
- deol_release(deol);
- }
- PDCJudyLFreeArray(&JudyL_datafile_list, PJE0);
- }
-
- pdc_release_and_destroy_if_unreferenced(pdc, true, true);
-}
-
-void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
- if(flags & RRDENG_PAGE_PAST_COLLECTION)
- buffer_strcat(wb, "PAST_COLLECTION ");
- if(flags & RRDENG_PAGE_REPEATED_COLLECTION)
- buffer_strcat(wb, "REPEATED_COLLECTION ");
- if(flags & RRDENG_PAGE_BIG_GAP)
- buffer_strcat(wb, "BIG_GAP ");
- if(flags & RRDENG_PAGE_GAP)
- buffer_strcat(wb, "GAP ");
- if(flags & RRDENG_PAGE_FUTURE_POINT)
- buffer_strcat(wb, "FUTURE_POINT ");
- if(flags & RRDENG_PAGE_CREATED_IN_FUTURE)
- buffer_strcat(wb, "CREATED_IN_FUTURE ");
- if(flags & RRDENG_PAGE_COMPLETED_IN_FUTURE)
- buffer_strcat(wb, "COMPLETED_IN_FUTURE ");
- if(flags & RRDENG_PAGE_UNALIGNED)
- buffer_strcat(wb, "UNALIGNED ");
- if(flags & RRDENG_PAGE_CONFLICT)
- buffer_strcat(wb, "CONFLICT ");
- if(flags & RRDENG_PAGE_FULL)
- buffer_strcat(wb, "PAGE_FULL");
- if(flags & RRDENG_PAGE_COLLECT_FINALIZE)
- buffer_strcat(wb, "COLLECT_FINALIZE");
- if(flags & RRDENG_PAGE_UPDATE_EVERY_CHANGE)
- buffer_strcat(wb, "UPDATE_EVERY_CHANGE");
- if(flags & RRDENG_PAGE_STEP_TOO_SMALL)
- buffer_strcat(wb, "STEP_TOO_SMALL");
- if(flags & RRDENG_PAGE_STEP_UNALIGNED)
- buffer_strcat(wb, "STEP_UNALIGNED");
-}
-
-inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error) {
- time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
-
- time_t end_time_s = 0;
- size_t entries = 0;
-
- switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
- end_time_s = descr->end_time_ut / USEC_PER_SEC;
- entries = 0;
- break;
- case PAGE_GORILLA_METRICS:
- end_time_s = start_time_s + descr->gorilla.delta_time_s;
- entries = descr->gorilla.entries;
- break;
- default:
- // Nothing to do. Validate page will notify the user.
- break;
- }
-
- return validate_page(
- (uuid_t *)descr->uuid,
- start_time_s,
- end_time_s,
- 0,
- descr->page_length,
- descr->type,
- entries,
- now_s,
- overwrite_zero_update_every_s,
- have_read_error,
- "loaded", 0);
-}
-
-VALIDATED_PAGE_DESCRIPTOR validate_page(
- uuid_t *uuid,
- time_t start_time_s,
- time_t end_time_s,
- uint32_t update_every_s, // can be zero, if unknown
- size_t page_length,
- uint8_t page_type,
- size_t entries, // can be zero, if unknown
- time_t now_s, // can be zero, to disable future timestamp check
- uint32_t overwrite_zero_update_every_s, // can be zero, if unknown
- bool have_read_error,
- const char *msg,
- RRDENG_COLLECT_PAGE_FLAGS flags)
-{
- VALIDATED_PAGE_DESCRIPTOR vd = {
- .start_time_s = start_time_s,
- .end_time_s = end_time_s,
- .update_every_s = update_every_s,
- .page_length = page_length,
- .point_size = page_type_size[page_type],
- .type = page_type,
- .is_valid = true,
- };
-
- bool known_page_type = true;
- switch (page_type) {
- case PAGE_METRICS:
- case PAGE_TIER:
- // always calculate entries by size
- vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
-
- // allow to be called without entries (when loading pages from disk)
- if(!entries)
- entries = vd.entries;
- break;
- case PAGE_GORILLA_METRICS:
- internal_fatal(entries == 0, "0 number of entries found on gorilla page");
- vd.entries = entries;
- break;
- default:
- known_page_type = false;
- break;
- }
-
- // allow to be called without update every (when loading pages from disk)
- if(!update_every_s) {
- vd.update_every_s = (vd.entries > 1) ? ((vd.end_time_s - vd.start_time_s) / (time_t) (vd.entries - 1))
- : overwrite_zero_update_every_s;
-
- update_every_s = vd.update_every_s;
- }
-
- // another such set of checks exists in
- // update_metric_retention_and_granularity_by_uuid()
-
- bool updated = false;
-
- size_t max_page_length = RRDENG_BLOCK_SIZE;
-
- // If gorilla can not compress the data we might end up needing slightly more
- // than 4KiB. However, gorilla pages extend the page length by increments of
- // 512 bytes.
- max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
-
- if (!known_page_type ||
- have_read_error ||
- vd.page_length == 0 ||
- vd.page_length > max_page_length ||
- vd.start_time_s > vd.end_time_s ||
- (now_s && vd.end_time_s > now_s) ||
- vd.start_time_s <= 0 ||
- vd.end_time_s <= 0 ||
- (vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
- (vd.update_every_s == 0 && vd.entries > 1))
- {
- vd.is_valid = false;
- }
- else {
- if(unlikely(vd.entries != entries || vd.update_every_s != update_every_s))
- updated = true;
-
- if (likely(vd.update_every_s)) {
- size_t entries_by_time = page_entries_by_time(vd.start_time_s, vd.end_time_s, vd.update_every_s);
-
- if (vd.entries != entries_by_time) {
- if (overwrite_zero_update_every_s < vd.update_every_s)
- vd.update_every_s = overwrite_zero_update_every_s;
-
- time_t new_end_time_s = (time_t)(vd.start_time_s + (vd.entries - 1) * vd.update_every_s);
-
- if(new_end_time_s <= vd.end_time_s) {
- // end time is wrong
- vd.end_time_s = new_end_time_s;
- }
- else {
- // update every is wrong
- vd.update_every_s = overwrite_zero_update_every_s;
- vd.end_time_s = (time_t)(vd.start_time_s + (vd.entries - 1) * vd.update_every_s);
- }
-
- updated = true;
- }
- }
- else if(overwrite_zero_update_every_s) {
- vd.update_every_s = overwrite_zero_update_every_s;
- updated = true;
- }
- }
-
- if(unlikely(!vd.is_valid || updated)) {
-#ifndef NETDATA_INTERNAL_CHECKS
- nd_log_limit_static_global_var(erl, 1, 0);
-#endif
- char uuid_str[UUID_STR_LEN + 1];
- uuid_unparse(*uuid, uuid_str);
-
- BUFFER *wb = NULL;
-
- if(flags) {
- wb = buffer_create(0, NULL);
- collect_page_flags_to_buffer(wb, flags);
- }
-
- if(!vd.is_valid) {
-#ifdef NETDATA_INTERNAL_CHECKS
- internal_error(true,
-#else
- nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
-#endif
- "DBENGINE: metric '%s' %s invalid page of type %u "
- "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s)",
- uuid_str, msg, vd.type,
- vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):""
- );
- }
- else {
- const char *err_valid = "";
- const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, ";
- const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, ";
- const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, ";
- const char *err_length = (vd.page_length == page_length) ? "" : "page length updated, ";
- const char *err_entries = (vd.entries == entries) ? "" : "entries updated, ";
- const char *err_future = (now_s && vd.end_time_s <= now_s) ? "" : "future end time, ";
-
-#ifdef NETDATA_INTERNAL_CHECKS
- internal_error(true,
-#else
- nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
-#endif
- "DBENGINE: metric '%s' %s page of type %u "
- "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s), "
- "found inconsistent - the right is "
- "from %ld to %ld, update every %u, page length %zu, entries %zu: "
- "%s%s%s%s%s%s%s",
- uuid_str, msg, vd.type,
- start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"",
- vd.start_time_s, vd.end_time_s, vd.update_every_s, vd.page_length, vd.entries,
- err_valid, err_start, err_end, err_update, err_length, err_entries, err_future
- );
- }
-
- buffer_free(wb);
- }
-
- return vd;
-}
-
-static inline struct page_details *epdl_get_pd_load_link_list_from_metric_start_time(EPDL *epdl, Word_t metric_id, time_t start_time_s) {
-
- if(unlikely(epdl->head_to_datafile_extent_queries_pending_for_extent))
- // stop appending more pages to this epdl
- epdl_pending_del(epdl);
-
- struct page_details *pd_list = NULL;
-
- for(EPDL *ep = epdl; ep ;ep = ep->query.next) {
- Pvoid_t *pd_by_start_time_s_judyL = PDCJudyLGet(ep->page_details_by_metric_id_JudyL, metric_id, PJE0);
- internal_fatal(pd_by_start_time_s_judyL == PJERR, "DBENGINE: corrupted extent metrics JudyL");
-
- if (unlikely(pd_by_start_time_s_judyL && *pd_by_start_time_s_judyL)) {
- Pvoid_t *pd_pptr = PDCJudyLGet(*pd_by_start_time_s_judyL, start_time_s, PJE0);
- internal_fatal(pd_pptr == PJERR, "DBENGINE: corrupted metric page details JudyHS");
-
- if(likely(pd_pptr && *pd_pptr)) {
- struct page_details *pd = *pd_pptr;
- internal_fatal(metric_id != pd->metric_id, "DBENGINE: metric ids do not match");
-
- if(likely(!pd->page)) {
- if (unlikely(__atomic_load_n(&ep->pdc->workers_should_stop, __ATOMIC_RELAXED)))
- pdc_page_status_set(pd, PDC_PAGE_FAILED | PDC_PAGE_CANCELLED);
- else
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(pd_list, pd, load.prev, load.next);
- }
- }
- }
- }
-
- return pd_list;
-}
-
-static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *epdl, struct rrdeng_extent_page_descr *descr, const char *msg) {
- char uuid[UUID_STR_LEN] = "";
- time_t start_time_s = 0;
- time_t end_time_s = 0;
- bool used_epdl = false;
- bool used_descr = false;
-
- if (descr) {
- start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
- switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
- end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
- break;
- case PAGE_GORILLA_METRICS:
- end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
- break;
- }
- uuid_unparse_lower(descr->uuid, uuid);
- used_descr = true;
- }
- else {
- struct page_details *pd = NULL;
-
- Word_t start = 0;
- Pvoid_t *pd_by_start_time_s_judyL = PDCJudyLFirst(epdl->page_details_by_metric_id_JudyL, &start, PJE0);
- if(pd_by_start_time_s_judyL) {
- start = 0;
- Pvoid_t *pd_pptr = PDCJudyLFirst(*pd_by_start_time_s_judyL, &start, PJE0);
- if(pd_pptr) {
- pd = *pd_pptr;
- start_time_s = pd->first_time_s;
- end_time_s = pd->last_time_s;
- METRIC *metric = (METRIC *)pd->metric_id;
- uuid_t *u = mrg_metric_uuid(main_mrg, metric);
- uuid_unparse_lower(*u, uuid);
- used_epdl = true;
- }
- }
- }
-
- if(!used_epdl && !used_descr && epdl->pdc) {
- start_time_s = epdl->pdc->start_time_s;
- end_time_s = epdl->pdc->end_time_s;
- }
-
- char start_time_str[LOG_DATE_LENGTH + 1] = "";
- if(start_time_s)
- log_date(start_time_str, LOG_DATE_LENGTH, start_time_s);
-
- char end_time_str[LOG_DATE_LENGTH + 1] = "";
- if(end_time_s)
- log_date(end_time_str, LOG_DATE_LENGTH, end_time_s);
-
- nd_log_limit_static_global_var(erl, 1, 0);
- nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
- "DBENGINE: error while reading extent from datafile %u of tier %d, at offset %" PRIu64 " (%u bytes) "
- "%s from %ld (%s) to %ld (%s) %s%s: "
- "%s",
- epdl->datafile->fileno, ctx->config.tier,
- epdl->extent_offset, epdl->extent_size,
- used_epdl ? "to extract page (PD)" : used_descr ? "expected page (DESCR)" : "part of a query (PDC)",
- start_time_s, start_time_str, end_time_s, end_time_str,
- used_epdl || used_descr ? " of metric " : "",