summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorEmmanuel Vasilakis <mrzammler@mm.st>2022-07-26 11:37:38 +0300
committerGitHub <noreply@github.com>2022-07-26 11:37:38 +0300
commit98e77284cbcd2fdb001302525db3f1dfe4af9389 (patch)
tree8f28d671ac59c75a3d9c51e23c8c57ec761844c8 /database
parent652083841c106b55399fe239d81e41c7e9fd0282 (diff)
Set value to SN_EMPTY_SLOT if flags is SN_EMPTY_SLOT (#13417)
* set value to SN_EMPTY_SLOT if flags is SN_EMPTY_SLOT * SN_EMPTY_SLOT should be SN_ANOMALOUS_ZERO * added the const attribute to pack_storage_number() * tier1 uses floats * zero should not be empty slot * add unlikely * rename all SN flags to be more meaningful * proper check for zero double value * properly check if pages are full with empty points Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'database')
-rw-r--r--database/engine/rrdengine.c40
-rwxr-xr-xdatabase/engine/rrdengineapi.c48
-rw-r--r--database/ram/rrddim_mem.c6
-rw-r--r--database/rrd.h4
-rw-r--r--database/rrdset.c24
5 files changed, 87 insertions, 35 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 5998a3177c..20d83e6c37 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -235,6 +235,43 @@ void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, str
freez(xt_io_descr);
}
+static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) {
+ switch(type) {
+ case PAGE_METRICS: {
+ storage_number n = pack_storage_number(NAN, SN_FLAG_NONE);
+ storage_number *array = (storage_number *)page;
+ size_t slots = page_length / sizeof(n);
+ for(size_t i = 0; i < slots ; i++)
+ array[i] = n;
+ }
+ break;
+
+ case PAGE_TIER: {
+ storage_number_tier1_t n = {
+ .min_value = NAN,
+ .max_value = NAN,
+ .sum_value = NAN,
+ .count = 1,
+ .anomaly_count = 0,
+ };
+ storage_number_tier1_t *array = (storage_number_tier1_t *)page;
+ size_t slots = page_length / sizeof(n);
+ for(size_t i = 0; i < slots ; i++)
+ array[i] = n;
+ }
+ break;
+
+ default: {
+ static bool logged = false;
+ if(!logged) {
+ error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type);
+ logged = true;
+ }
+ memset(page, 0, page_length);
+ }
+ }
+}
+
void read_extent_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@@ -359,8 +396,7 @@ after_crc_check:
/* care, we don't hold the descriptor mutex */
if (have_read_error) {
- /* Applications should make sure NULL values match 0 as does SN_EMPTY_SLOT */
- memset(page, SN_EMPTY_SLOT, descr->page_length);
+ fill_page_with_nulls(page, descr->page_length, descr->type);
} else if (RRD_NO_COMPRESSION == header->compression_algorithm) {
(void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length);
} else {
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 8102bf3609..694055a48e 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -177,18 +177,38 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
/* The page must be populated and referenced */
static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
{
- unsigned i;
- uint8_t has_only_empty_metrics = 1;
- storage_number *page;
+ switch(descr->type) {
+ case PAGE_METRICS: {
+ size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
+ storage_number *array = (storage_number *)descr->pg_cache_descr->page;
+ for (size_t i = 0 ; i < slots; ++i) {
+ if(does_storage_number_exist(array[i]))
+ return 0;
+ }
+ }
+ break;
- page = descr->pg_cache_descr->page;
- for (i = 0 ; i < descr->page_length / PAGE_POINT_SIZE_BYTES(descr); ++i) {
- if (SN_EMPTY_SLOT != page[i]) {
- has_only_empty_metrics = 0;
- break;
+ case PAGE_TIER: {
+ size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
+ storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page;
+ for (size_t i = 0 ; i < slots; ++i) {
+ if(fpclassify(array[i].sum_value) != FP_NAN)
+ return 0;
+ }
+ }
+ break;
+
+ default: {
+ static bool logged = false;
+ if(!logged) {
+ error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type);
+ logged = true;
+ }
+ return 0;
}
}
- return has_only_empty_metrics;
+
+ return 1;
}
void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
@@ -222,7 +242,9 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
handle->descr = NULL;
}
-void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE n,
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
+ usec_t point_in_time,
+ NETDATA_DOUBLE n,
NETDATA_DOUBLE min_value,
NETDATA_DOUBLE max_value,
uint16_t count,
@@ -516,15 +538,15 @@ STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle)
case PAGE_METRICS: {
storage_number n = handle->page[position];
sp.min = sp.max = sp.sum = unpack_storage_number(n);
- sp.flags = n & SN_ALL_FLAGS;
+ sp.flags = n & SN_USER_FLAGS;
sp.count = 1;
- sp.anomaly_count = (n & SN_ANOMALY_BIT) ? 0 : 1;
+ sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
}
break;
case PAGE_TIER: {
tier1_value = ((storage_number_tier1_t *)handle->page)[position];
- sp.flags = tier1_value.anomaly_count ? 0 : SN_ANOMALY_BIT;
+ sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
sp.count = tier1_value.count;
sp.anomaly_count = tier1_value.anomaly_count;
sp.min = tier1_value.min_value;
diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c
index 4ab14877fa..3226d3c0de 100644
--- a/database/ram/rrddim_mem.c
+++ b/database/ram/rrddim_mem.c
@@ -15,7 +15,7 @@ void rrddim_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle __maybe_unused)
STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle) {
RRDDIM *rd = (RRDDIM *)db_metric_handle;
- rd->db[rd->rrdset->current_entry] = SN_EMPTY_SLOT;
+ rd->db[rd->rrdset->current_entry] = pack_storage_number(NAN, SN_FLAG_NONE);
struct mem_collect_handle *ch = calloc(1, sizeof(struct mem_collect_handle));
ch->rd = rd;
return (STORAGE_COLLECT_HANDLE *)ch;
@@ -191,8 +191,8 @@ STORAGE_POINT rrddim_query_next_metric(struct rrddim_query_handle *handle) {
h->slot = slot;
h->slot_timestamp += h->dt;
- sp.anomaly_count = (n & SN_ANOMALY_BIT) ? 0 : 1;
- sp.flags = (n & SN_ALL_FLAGS);
+ sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
+ sp.flags = (n & SN_USER_FLAGS);
sp.min = sp.max = sp.sum = unpack_storage_number(n);
return sp;
diff --git a/database/rrd.h b/database/rrd.h
index b31cfe517f..fbb870b5fc 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -378,7 +378,7 @@ typedef struct storage_point {
(x).min = (x).max = (x).sum = NAN; \
(x).count = 0; \
(x).anomaly_count = 0; \
- (x).flags = SN_EMPTY_SLOT; \
+ (x).flags = SN_FLAG_NONE; \
(x).start_time = 0; \
(x).end_time = 0; \
} while(0)
@@ -387,7 +387,7 @@ typedef struct storage_point {
(x).min = (x).max = (x).sum = NAN; \
(x).count = 1; \
(x).anomaly_count = 0; \
- (x).flags = SN_EMPTY_SLOT; \
+ (x).flags = SN_FLAG_NONE; \
(x).start_time = start_t; \
(x).end_time = end_t; \
} while(0)
diff --git a/database/rrdset.c b/database/rrdset.c
index 9d60b9beb5..9693ee2113 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -1041,8 +1041,7 @@ void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, u
NAN,
NAN,
0,
- 0,
- SN_EMPTY_SLOT);
+ 0, SN_FLAG_NONE);
}
t->virtual_point.count = 0;
@@ -1075,7 +1074,7 @@ static void store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
.max = n,
.sum = n,
.count = 1,
- .anomaly_count = (flags & SN_ANOMALY_BIT) ? 0 : 1,
+ .anomaly_count = (flags & SN_FLAG_NOT_ANOMALOUS) ? 0 : 1,
.flags = flags
};
@@ -1110,7 +1109,7 @@ static inline size_t rrdset_done_interpolate(
SN_FLAGS storage_flags = SN_DEFAULT_FLAGS;
if (has_reset_value)
- storage_flags |= SN_EXISTS_RESET;
+ storage_flags |= SN_FLAG_RESET;
for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) {
@@ -1207,8 +1206,7 @@ static inline size_t rrdset_done_interpolate(
if(unlikely(!store_this_entry)) {
(void) ml_is_anomalous(rd, 0, false);
-// rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, 0, 0, 1, SN_EMPTY_SLOT, 0);
- store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
+ store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
continue;
}
@@ -1217,10 +1215,9 @@ static inline size_t rrdset_done_interpolate(
if (ml_is_anomalous(rd, new_value, true)) {
// clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
- dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT);
+ dim_storage_flags &= ~((storage_number)SN_FLAG_NOT_ANOMALOUS);
}
-// rd->state->collect_ops.store_metric(rd, next_store_ut, new_value, 0, 0, 1, dim_storage_flags, 0);
store_metric(rd, next_store_ut, new_value, dim_storage_flags);
rd->last_stored_value = new_value;
}
@@ -1228,19 +1225,16 @@ static inline size_t rrdset_done_interpolate(
(void) ml_is_anomalous(rd, 0, false);
#ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING "
- , rd->name
- , current_entry
- );
+ rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rd->name, current_entry);
#endif
-// rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, 0, 0, 1, SN_EMPTY_SLOT, 0);
- store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
+ store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
rd->last_stored_value = NAN;
}
stored_entries++;
}
+
// reset the storage flags for the next point, if any;
storage_flags = SN_DEFAULT_FLAGS;
@@ -1277,7 +1271,7 @@ static inline void rrdset_done_fill_the_gap(RRDSET *st) {
long current_entry = st->current_entry;
for(c = 0; c < entries && next_store_ut <= now_collect_ut ; next_store_ut += update_every_ut, c++) {
- rd->db[current_entry] = SN_EMPTY_SLOT;
+ rd->db[current_entry] = pack_storage_number(NAN, SN_FLAG_NONE);
current_entry = ((current_entry + 1) >= entries) ? 0 : current_entry + 1;
#ifdef NETDATA_INTERNAL_CHECKS