summaryrefslogtreecommitdiffstats
path: root/database/engine/pdc.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-25 01:56:49 +0200
committerGitHub <noreply@github.com>2023-01-25 01:56:49 +0200
commit3a430c181e7655a8460b40e9864395694f223e46 (patch)
tree538c69896374d1a8587d6f6719033c160014e650 /database/engine/pdc.c
parent0c1fbbe591d5b99f747877feb02557354ff621b2 (diff)
DBENGINE v2 - improvements part 8 (#14319)
* cache 100 pages for each size our tiers need * smarter page caching * account the caching structures * dynamic max number of cached pages * make variables const to ensure they are not changed * make sure replication timestamps do not go to the future * replication now sends chart and dimension states atomically; replication receivers ignores chart and dimension states when rbegin is also ignored * make sure all pages are flushed on shutdown * take into account empty points too * when recalculating retention update first_time_s on metrics only when they are bigger * Report the datafile number we use to recalculate retention * Report the datafile number we use to recalculate retention * rotate db at startup * make query plans overlap * Calculate properly first time s * updated event labels * negative page caching fix * Atempt to create missing tables on query failure * Atempt to create missing tables on query failure (part 2) * negative page caching for all gaps, to eliminate jv2 scans * Fix unittest Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/engine/pdc.c')
-rw-r--r--database/engine/pdc.c110
1 files changed, 29 insertions, 81 deletions
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
index 5e642828e6..d0daaa5c12 100644
--- a/database/engine/pdc.c
+++ b/database/engine/pdc.c
@@ -790,43 +790,6 @@ void pdc_to_epdl_router(struct rrdengine_instance *ctx, PDC *pdc, execute_extent
pdc_release_and_destroy_if_unreferenced(pdc, true, true);
}
-static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) {
- switch(type) {
- case PAGE_METRICS: {
- storage_number n = pack_storage_number(NAN, SN_FLAG_NONE);
- storage_number *array = (storage_number *)page;
- size_t slots = page_length / sizeof(n);
- for(size_t i = 0; i < slots ; i++)
- array[i] = n;
- }
- break;
-
- case PAGE_TIER: {
- storage_number_tier1_t n = {
- .min_value = NAN,
- .max_value = NAN,
- .sum_value = NAN,
- .count = 1,
- .anomaly_count = 0,
- };
- storage_number_tier1_t *array = (storage_number_tier1_t *)page;
- size_t slots = page_length / sizeof(n);
- for(size_t i = 0; i < slots ; i++)
- array[i] = n;
- }
- break;
-
- default: {
- static bool logged = false;
- if(!logged) {
- error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type);
- logged = true;
- }
- memset(page, 0, page_length);
- }
- }
-}
-
void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
if(flags & RRDENG_PAGE_PAST_COLLECTION)
buffer_strcat(wb, "PAST_COLLECTION ");
@@ -870,7 +833,6 @@ inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_
now_s,
overwrite_zero_update_every_s,
have_read_error,
- true,
"loaded", 0);
}
@@ -885,7 +847,6 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
time_t now_s, // can be zero, to disable future timestamp check
time_t overwrite_zero_update_every_s, // can be zero, if unknown
bool have_read_error,
- bool minimize_invalid_size,
const char *msg,
RRDENG_COLLECT_PAGE_FLAGS flags) {
@@ -988,21 +949,6 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
uuid_str, msg, vd.type,
vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):""
);
-
- if(minimize_invalid_size) {
- // since the page is going to be loaded
- // let's minimize the memory the invalid page will occupy
-
- if(vd.start_time_s == vd.end_time_s) {
- vd.page_length = vd.point_size;
- vd.entries = 1;
- }
- else {
- vd.page_length = vd.point_size * 2;
- vd.update_every_s = vd.end_time_s - vd.start_time_s;
- vd.entries = 2;
- }
- }
}
else {
const char *err_valid = (vd.is_valid) ? "" : "found invalid, ";
@@ -1138,7 +1084,7 @@ static bool epdl_populate_pages_from_extent_data(
}
if(worker)
- worker_is_busy(UV_EVENT_EXT_DECOMPRESSION);
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION);
if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) {
// find the uncompressed extent size
@@ -1169,7 +1115,7 @@ static bool epdl_populate_pages_from_extent_data(
}
if(worker)
- worker_is_busy(UV_EVENT_PAGE_LOOKUP);
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_LOOKUP);
size_t stats_data_from_main_cache = 0;
size_t stats_data_from_extent = 0;
@@ -1211,33 +1157,35 @@ static bool epdl_populate_pages_from_extent_data(
have_read_error);
if(worker)
- worker_is_busy(UV_EVENT_PAGE_POPULATION);
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_POPULATION);
- void *page_data = dbengine_page_alloc(vd.page_length);
+ void *page_data;
if (unlikely(!vd.is_valid)) {
- fill_page_with_nulls(page_data, vd.page_length, vd.type);
+ page_data = DBENGINE_EMPTY_PAGE;
stats_load_invalid_page++;
}
-
- else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
- memcpy(page_data, data + payload_offset + page_offset, (size_t) vd.page_length);
- stats_load_uncompressed++;
- }
-
else {
- if(unlikely(page_offset + vd.page_length > uncompressed_payload_length)) {
- error_limit_static_global_var(erl, 10, 0);
- error_limit(&erl,
- "DBENGINE: page %u offset %u + page length %zu exceeds the uncompressed buffer size %u",
- i, page_offset, vd.page_length, uncompressed_payload_length);
-
- fill_page_with_nulls(page_data, vd.page_length, vd.type);
- stats_load_invalid_page++;
+ if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ page_data = dbengine_page_alloc(vd.page_length);
+ memcpy(page_data, data + payload_offset + page_offset, (size_t) vd.page_length);
+ stats_load_uncompressed++;
}
else {
- memcpy(page_data, uncompressed_buf + page_offset, vd.page_length);
- stats_load_compressed++;
+ if (unlikely(page_offset + vd.page_length > uncompressed_payload_length)) {
+ error_limit_static_global_var(erl, 10, 0);
+ error_limit(&erl,
+ "DBENGINE: page %u offset %u + page length %zu exceeds the uncompressed buffer size %u",
+ i, page_offset, vd.page_length, uncompressed_payload_length);
+
+ page_data = DBENGINE_EMPTY_PAGE;
+ stats_load_invalid_page++;
+ }
+ else {
+ page_data = dbengine_page_alloc(vd.page_length);
+ memcpy(page_data, uncompressed_buf + page_offset, vd.page_length);
+ stats_load_compressed++;
+ }
}
}
@@ -1248,7 +1196,7 @@ static bool epdl_populate_pages_from_extent_data(
.start_time_s = vd.start_time_s,
.end_time_s = vd.end_time_s,
.update_every_s = vd.update_every_s,
- .size = (size_t) vd.page_length,
+ .size = (size_t) ((page_data == DBENGINE_EMPTY_PAGE) ? 0 : vd.page_length),
.data = page_data
};
@@ -1269,13 +1217,13 @@ static bool epdl_populate_pages_from_extent_data(
pd->page = page;
pd->page_length = pgc_page_data_size(main_cache, page);
- pdc_page_status_set(pd, PDC_PAGE_READY | tags);
+ pdc_page_status_set(pd, PDC_PAGE_READY | tags | ((page_data == DBENGINE_EMPTY_PAGE) ? PDC_PAGE_EMPTY : 0));
pd = pd->load.next;
} while(pd);
if(worker)
- worker_is_busy(UV_EVENT_PAGE_LOOKUP);
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_LOOKUP);
}
if(stats_data_from_main_cache)
@@ -1332,7 +1280,7 @@ void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *e
}
if(worker)
- worker_is_busy(UV_EVENT_EXTENT_CACHE);
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
bool extent_found_in_cache = false;
@@ -1353,7 +1301,7 @@ void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *e
}
else {
if(worker)
- worker_is_busy(UV_EVENT_EXTENT_MMAP);
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_MMAP);
off_t map_start = ALIGN_BYTES_FLOOR(epdl->extent_offset);
size_t length = ALIGN_BYTES_CEILING(epdl->extent_offset + epdl->extent_size) - map_start;
@@ -1369,7 +1317,7 @@ void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *e
fatal_assert(0 == ret);
if(worker)
- worker_is_busy(UV_EVENT_EXTENT_CACHE);
+ worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
bool added = false;
extent_cache_page = pgc_page_add_and_acquire(extent_cache, (PGC_ENTRY) {