diff options
author | Emmanuel Vasilakis <mrzammler@mm.st> | 2022-07-26 11:37:38 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-26 11:37:38 +0300 |
commit | 98e77284cbcd2fdb001302525db3f1dfe4af9389 (patch) | |
tree | 8f28d671ac59c75a3d9c51e23c8c57ec761844c8 /database | |
parent | 652083841c106b55399fe239d81e41c7e9fd0282 (diff) |
Set value to SN_EMPTY_SLOT if flags is SN_EMPTY_SLOT (#13417)
* set value to SN_EMPTY_SLOT if flags is SN_EMPTY_SLOT
* SN_EMPTY_SLOT should be SN_ANOMALOUS_ZERO
* added the const attribute to pack_storage_number()
* tier1 uses floats
* zero should not be empty slot
* add unlikely
* rename all SN flags to be more meaningful
* proper check for zero double value
* properly check if pages are full with empty points
Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'database')
-rw-r--r-- | database/engine/rrdengine.c | 40 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 48 | ||||
-rw-r--r-- | database/ram/rrddim_mem.c | 6 | ||||
-rw-r--r-- | database/rrd.h | 4 | ||||
-rw-r--r-- | database/rrdset.c | 24 |
5 files changed, 87 insertions, 35 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 5998a3177c..20d83e6c37 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -235,6 +235,43 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str freez(xt_io_descr); } +static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) { + switch(type) { + case PAGE_METRICS: { + storage_number n = pack_storage_number(NAN, SN_FLAG_NONE); + storage_number *array = (storage_number *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + case PAGE_TIER: { + storage_number_tier1_t n = { + .min_value = NAN, + .max_value = NAN, + .sum_value = NAN, + .count = 1, + .anomaly_count = 0, + }; + storage_number_tier1_t *array = (storage_number_tier1_t *)page; + size_t slots = page_length / sizeof(n); + for(size_t i = 0; i < slots ; i++) + array[i] = n; + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type); + logged = true; + } + memset(page, 0, page_length); + } + } +} + void read_extent_cb(uv_fs_t* req) { struct rrdengine_worker_config* wc = req->loop->data; @@ -359,8 +396,7 @@ after_crc_check: /* care, we don't hold the descriptor mutex */ if (have_read_error) { - /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */ - memset(page, SN_EMPTY_SLOT, descr->page_length); + fill_page_with_nulls(page, descr->page_length, descr->type); } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); } else { diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 8102bf3609..694055a48e 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -177,18 +177,38 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri /* The page must be populated and referenced */ static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr) { - unsigned i; - uint8_t has_only_empty_metrics = 1; - storage_number *page; + switch(descr->type) { + case PAGE_METRICS: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number *array = (storage_number *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(does_storage_number_exist(array[i])) + return 0; + } + } + break; - page = descr->pg_cache_descr->page; - for (i = 0 ; i < descr->page_length / PAGE_POINT_SIZE_BYTES(descr); ++i) { - if (SN_EMPTY_SLOT != page[i]) { - has_only_empty_metrics = 0; - break; + case PAGE_TIER: { + size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr); + storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page; + for (size_t i = 0 ; i < slots; ++i) { + if(fpclassify(array[i].sum_value) != FP_NAN) + return 0; + } + } + break; + + default: { + static bool logged = false; + if(!logged) { + error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type); + logged = true; + } + return 0; } } - return has_only_empty_metrics; + + return 1; } void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { @@ -222,7 +242,9 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h handle->descr = NULL; } -void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE n, +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, + usec_t point_in_time, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, uint16_t count, @@ -516,15 +538,15 @@ STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) case PAGE_METRICS: { storage_number n = handle->page[position]; sp.min = sp.max = sp.sum = unpack_storage_number(n); - sp.flags = n & SN_ALL_FLAGS; + sp.flags = n & SN_USER_FLAGS; sp.count = 1; - sp.anomaly_count = (n & SN_ANOMALY_BIT) ? 0 : 1; + sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; } break; case PAGE_TIER: { tier1_value = ((storage_number_tier1_t *)handle->page)[position]; - sp.flags = tier1_value.anomaly_count ? 0 : SN_ANOMALY_BIT; + sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS; sp.count = tier1_value.count; sp.anomaly_count = tier1_value.anomaly_count; sp.min = tier1_value.min_value; diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c index 4ab14877fa..3226d3c0de 100644 --- a/database/ram/rrddim_mem.c +++ b/database/ram/rrddim_mem.c @@ -15,7 +15,7 @@ void rrddim_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle __maybe_unused) STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle) { RRDDIM *rd = (RRDDIM *)db_metric_handle; - rd->db[rd->rrdset->current_entry] = SN_EMPTY_SLOT; + rd->db[rd->rrdset->current_entry] = pack_storage_number(NAN, SN_FLAG_NONE); struct mem_collect_handle *ch = calloc(1, sizeof(struct mem_collect_handle)); ch->rd = rd; return (STORAGE_COLLECT_HANDLE *)ch; @@ -191,8 +191,8 @@ STORAGE_POINT rrddim_query_next_metric(struct rrddim_query_handle *handle) { h->slot = slot; h->slot_timestamp += h->dt; - sp.anomaly_count = (n & SN_ANOMALY_BIT) ? 0 : 1; - sp.flags = (n & SN_ALL_FLAGS); + sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; + sp.flags = (n & SN_USER_FLAGS); sp.min = sp.max = sp.sum = unpack_storage_number(n); return sp; diff --git a/database/rrd.h b/database/rrd.h index b31cfe517f..fbb870b5fc 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -378,7 +378,7 @@ typedef struct storage_point { (x).min = (x).max = (x).sum = NAN; \ (x).count = 0; \ (x).anomaly_count = 0; \ - (x).flags = SN_EMPTY_SLOT; \ + (x).flags = SN_FLAG_NONE; \ (x).start_time = 0; \ (x).end_time = 0; \ } while(0) @@ -387,7 +387,7 @@ typedef struct storage_point { (x).min = (x).max = (x).sum = NAN; \ (x).count = 1; \ (x).anomaly_count = 0; \ - (x).flags = SN_EMPTY_SLOT; \ + (x).flags = SN_FLAG_NONE; \ (x).start_time = start_t; \ (x).end_time = end_t; \ } while(0) diff --git a/database/rrdset.c b/database/rrdset.c index 9d60b9beb5..9693ee2113 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -1041,8 +1041,7 @@ void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, u NAN, NAN, 0, - 0, - SN_EMPTY_SLOT); + 0, SN_FLAG_NONE); } t->virtual_point.count = 0; @@ -1075,7 +1074,7 @@ static void store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, .max = n, .sum = n, .count = 1, - .anomaly_count = (flags & SN_ANOMALY_BIT) ? 0 : 1, + .anomaly_count = (flags & SN_FLAG_NOT_ANOMALOUS) ? 0 : 1, .flags = flags }; @@ -1110,7 +1109,7 @@ static inline size_t rrdset_done_interpolate( SN_FLAGS storage_flags = SN_DEFAULT_FLAGS; if (has_reset_value) - storage_flags |= SN_EXISTS_RESET; + storage_flags |= SN_FLAG_RESET; for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) { @@ -1207,8 +1206,7 @@ static inline size_t rrdset_done_interpolate( if(unlikely(!store_this_entry)) { (void) ml_is_anomalous(rd, 0, false); -// rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, 0, 0, 1, SN_EMPTY_SLOT, 0); - store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT); + store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); continue; } @@ -1217,10 +1215,9 @@ static inline size_t rrdset_done_interpolate( if (ml_is_anomalous(rd, new_value, true)) { // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous - dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT); + dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS); } -// rd->state->collect_ops.store_metric(rd, next_store_ut, new_value, 0, 0, 1, dim_storage_flags, 0); store_metric(rd, next_store_ut, new_value, dim_storage_flags); rd->last_stored_value = new_value; } @@ -1228,19 +1225,16 @@ static inline size_t rrdset_done_interpolate( (void) ml_is_anomalous(rd, 0, false); #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING " - , rd->name - , current_entry - ); + rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rd->name, current_entry); #endif -// rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, 0, 0, 1, SN_EMPTY_SLOT, 0); - store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT); + store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); rd->last_stored_value = NAN; } stored_entries++; } + // reset the storage flags for the next point, if any; storage_flags = SN_DEFAULT_FLAGS; @@ -1277,7 +1271,7 @@ static inline void rrdset_done_fill_the_gap(RRDSET *st) { long current_entry = st->current_entry; for(c = 0; c < entries && next_store_ut <= now_collect_ut ; next_store_ut += update_every_ut, c++) { - rd->db[current_entry] = SN_EMPTY_SLOT; + rd->db[current_entry] = pack_storage_number(NAN, SN_FLAG_NONE); current_entry = ((current_entry + 1) >= entries) ? 0 : current_entry + 1; #ifdef NETDATA_INTERNAL_CHECKS |