summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-26 12:49:57 +0300
committerGitHub <noreply@github.com>2023-06-26 12:49:57 +0300
commitf90d56f18d29c2835bc278f6a22e840230b9ca86 (patch)
tree8e6b156baf5769353deba3552df4e7ed5fb17e26 /database
parentff0b64dce2abb1f4e77cffaeb4a954fd5f985326 (diff)
Relax jnfv2 caching (#15224)
* readers should be able to recursively acquire the lock, even when there is a writer waiting * dont madvise dontneed and random * dont validate extents and metrics on jnfv2 * dont validate crc * Delay journal metric check * added MRG stress test --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database')
-rw-r--r--database/engine/journalfile.c79
-rw-r--r--database/engine/journalfile.h2
-rw-r--r--database/engine/metric.c322
-rw-r--r--database/engine/metric.h6
4 files changed, 201 insertions, 208 deletions
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index e95ce23838..07766f12aa 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,57 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-
-// DBENGINE2: Helper
-
-static void update_metric_retention_and_granularity_by_uuid(
- struct rrdengine_instance *ctx, uuid_t *uuid,
- time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s)
-{
- if(unlikely(last_time_s > now_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
- "fixing last time to now",
- first_time_s, last_time_s, now_s);
- last_time_s = now_s;
- }
-
- if (unlikely(first_time_s > last_time_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
- "fixing first time to last time",
- first_time_s, last_time_s, now_s);
-
- first_time_s = last_time_s;
- }
-
- if (unlikely(first_time_s == 0 || last_time_s == 0)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
- "using them as-is",
- first_time_s, last_time_s, now_s);
- }
-
- bool added = false;
- METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
- if (!metric) {
- MRG_ENTRY entry = {
- .section = (Word_t) ctx,
- .first_time_s = first_time_s,
- .last_time_s = last_time_s,
- .latest_update_every_s = (uint32_t) update_every_s
- };
- uuid_copy(entry.uuid, *uuid);
- metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
- }
-
- if (likely(!added))
- mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
-
- mrg_metric_release(main_mrg, metric);
-}
-
static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
{
worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
@@ -265,8 +214,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
- madvise_random(journalfile->mmap.data, journalfile->mmap.size);
- madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
+// madvise_willneed(journalfile->mmap.data, journalfile->v2.size_of_directory);
+// madvise_random(journalfile->mmap.data, journalfile->mmap.size);
+// madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
@@ -459,6 +409,7 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd,
struct journal_v2_header *j2_header = journalfile->mmap.data;
journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
+ // journalfile->v2.size_of_directory = j2_header->metric_offset + j2_header->metric_count * sizeof(struct journal_metric_list);
journalfile_v2_mounted_data_unmount(journalfile, true, true);
@@ -957,12 +908,12 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size);
if (rc) return 1;
- rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
- if (rc) return 1;
-
if (!db_engine_journal_check)
return 0;
+ rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
+ if (rc) return 1;
+
// Verify complete UUID chain
struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
@@ -1027,6 +978,15 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
uint8_t *data_start = (uint8_t *)j2_header;
uint32_t entries = j2_header->metric_count;
+ if (journalfile->v2.flags & JOURNALFILE_FLAG_METRIC_CRC_CHECK) {
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_METRIC_CRC_CHECK;
+ if (journalfile_check_v2_metric_list(data_start, j2_header->journal_v2_file_size)) {
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_AVAILABLE;
+ // needs rebuild
+ return;
+ }
+ }
+
struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
time_t now_s = max_acceptable_collected_time();
@@ -1034,8 +994,8 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
time_t start_time_s = header_start_time_s + metric->delta_start_s;
time_t end_time_s = header_start_time_s + metric->delta_end_s;
- update_metric_retention_and_granularity_by_uuid(
- ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
metric++;
}
@@ -1138,6 +1098,9 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
);
// Initialize the journal file to be able to access the data
+
+ if (!db_engine_journal_check)
+ journalfile->v2.flags |= JOURNALFILE_FLAG_METRIC_CRC_CHECK;
journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size);
ctx_current_disk_space_increase(ctx, journal_v2_file_size);
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index 5ffd839e67..62e8f55ee7 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -21,6 +21,7 @@ typedef enum __attribute__ ((__packed__)) {
JOURNALFILE_FLAG_IS_AVAILABLE = (1 << 0),
JOURNALFILE_FLAG_IS_MOUNTED = (1 << 1),
JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION = (1 << 2),
+ JOURNALFILE_FLAG_METRIC_CRC_CHECK = (1 << 3),
} JOURNALFILE_FLAGS;
/* only one event loop is supported for now */
@@ -39,6 +40,7 @@ struct rrdengine_journalfile {
time_t first_time_s;
time_t last_time_s;
time_t not_needed_since_s;
+ // uint32_t size_of_directory;
} v2;
struct {
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 75969ec5a6..331d881527 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -188,7 +188,7 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME
return ret;
}
-static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
+static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, &entry->uuid);
METRIC *allocation = aral_mallocz(mrg->aral[partition]);
@@ -230,6 +230,7 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
}
METRIC *metric = allocation;
+ // memcpy(metric->uuid, entry->uuid, sizeof(uuid_t));
uuid_copy(metric->uuid, entry->uuid);
metric->section = entry->section;
metric->first_time_s = MAX(0, entry->first_time_s);
@@ -254,7 +255,7 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
return metric;
}
-static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
size_t partition = uuid_partition(mrg, uuid);
mrg_index_read_lock(mrg, partition);
@@ -283,8 +284,8 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
return metric;
}
-static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
- size_t partition = uuid_partition(mrg, &metric->uuid);
+static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
+ size_t partition = metric->partition;
size_t mem_before_judyl, mem_after_judyl;
@@ -333,7 +334,7 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
// ----------------------------------------------------------------------------
// public API
-MRG *mrg_create(void) {
+inline MRG *mrg_create(void) {
MRG *mrg = callocz(1, sizeof(MRG));
for(size_t i = 0; i < MRG_PARTITIONS ; i++) {
@@ -354,15 +355,15 @@ MRG *mrg_create(void) {
return mrg;
}
-size_t mrg_aral_structures(void) {
+inline size_t mrg_aral_structures(void) {
return aral_structures_from_stats(&mrg_aral_statistics);
}
-size_t mrg_aral_overhead(void) {
+inline size_t mrg_aral_overhead(void) {
return aral_overhead_from_stats(&mrg_aral_statistics);
}
-void mrg_destroy(MRG *mrg __maybe_unused) {
+inline void mrg_destroy(MRG *mrg __maybe_unused) {
// no destruction possible
// we can't traverse the metrics list
@@ -372,43 +373,43 @@ void mrg_destroy(MRG *mrg __maybe_unused) {
;
}
-METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
+inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
return metric_add_and_acquire(mrg, &entry, ret);
}
-METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
return metric_get_and_acquire(mrg, uuid, section);
}
-bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
return acquired_metric_del(mrg, metric);
}
-METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
+inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
metric_acquire(mrg, metric, false);
return metric;
}
-bool mrg_metric_release(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_release(MRG *mrg, METRIC *metric) {
return metric_release_and_can_be_deleted(mrg, metric);
}
-Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
+inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
return (Word_t)metric;
}
-uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
+inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
return &metric->uuid;
}
-Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
+inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
return metric->section;
}
-bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
if(unlikely(first_time_s < 0))
@@ -422,7 +423,7 @@ bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t
return true;
}
-void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
+inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
"DBENGINE METRIC: timestamp is negative");
internal_fatal(first_time_s > max_acceptable_collected_time(),
@@ -460,7 +461,7 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
spinlock_unlock(&metric->spinlock);
}
-bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
bool ret = false;
@@ -476,7 +477,7 @@ bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metr
return ret;
}
-time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t first_time_s;
spinlock_lock(&metric->spinlock);
@@ -496,7 +497,7 @@ time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
return first_time_s;
}
-void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
+inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
spinlock_lock(&metric->spinlock);
if(unlikely(!metric->first_time_s)) {
@@ -514,7 +515,7 @@ void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *f
spinlock_unlock(&metric->spinlock);
}
-bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
if(unlikely(latest_time_s < 0))
@@ -539,7 +540,7 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric,
}
// returns true when metric still has retention
-bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
+inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
Word_t section = mrg_metric_section(mrg, metric);
bool do_again = false;
size_t countdown = 5;
@@ -590,7 +591,7 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
return ret;
}
-bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
@@ -610,7 +611,7 @@ bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, t
return true;
}
-time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t max;
spinlock_lock(&metric->spinlock);
max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
@@ -618,7 +619,7 @@ time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
return max;
}
-bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
+inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
if(update_every_s <= 0)
@@ -631,7 +632,7 @@ bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t
return true;
}
-bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
+inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
if(update_every_s <= 0)
@@ -645,7 +646,7 @@ bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metr
return true;
}
-time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t update_every_s;
spinlock_lock(&metric->spinlock);
@@ -655,7 +656,7 @@ time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
return update_every_s;
}
-bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
bool done = false;
spinlock_lock(&metric->spinlock);
if(!metric->writer) {
@@ -669,7 +670,7 @@ bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
return done;
}
-bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
bool done = false;
spinlock_lock(&metric->spinlock);
if(metric->writer) {
@@ -681,7 +682,56 @@ bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
return done;
}
-void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
+inline void mrg_update_metric_retention_and_granularity_by_uuid(
+ MRG *mrg, Word_t section, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ time_t update_every_s, time_t now_s)
+{
+ if(unlikely(last_time_s > now_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
+ last_time_s = now_s;
+ }
+
+ if (unlikely(first_time_s > last_time_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
+
+ first_time_s = last_time_s;
+ }
+
+ if (unlikely(first_time_s == 0 || last_time_s == 0)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
+ }
+
+ bool added = false;
+ METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section);
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .section = section,
+ .first_time_s = first_time_s,
+ .last_time_s = last_time_s,
+ .latest_update_every_s = (uint32_t) update_every_s
+ };
+ // memcpy(entry.uuid, *uuid, sizeof(uuid_t));
+ uuid_copy(entry.uuid, *uuid);
+ metric = mrg_metric_add_and_acquire(mrg, entry, &added);
+ }
+
+ if (likely(!added))
+ mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
+
+ mrg_metric_release(mrg, metric);
+}
+
+inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
memset(s, 0, sizeof(struct mrg_statistics));
for(int i = 0; i < MRG_PARTITIONS ;i++) {
@@ -707,111 +757,55 @@ void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
// ----------------------------------------------------------------------------
// unit test
-#ifdef MRG_STRESS_TEST
-
-static void mrg_stress(MRG *mrg, size_t entries, size_t sections) {
- bool ret;
+struct mrg_stress_entry {
+ uuid_t uuid;
+ time_t after;
+ time_t before;
+};
- info("DBENGINE METRIC: stress testing %zu entries on %zu sections...", entries, sections);
+struct mrg_stress {
+ MRG *mrg;
+ bool stop;
+ size_t entries;
+ struct mrg_stress_entry *array;
+ size_t updates;
+};
- METRIC *array[entries][sections];
- for(size_t i = 0; i < entries ; i++) {
- MRG_ENTRY e = {
- .first_time_s = (time_t)(i + 1),
- .latest_time_s = (time_t)(i + 2),
- .latest_update_every_s = (time_t)(i + 3),
- };
- uuid_generate_random(e.uuid);
+static void *mrg_stress(void *ptr) {
+ struct mrg_stress *t = ptr;
+ MRG *mrg = t->mrg;
- for(size_t section = 0; section < sections ;section++) {
- e.section = section;
- array[i][section] = mrg_metric_add_and_acquire(mrg, e, &ret);
- if(!ret)
- fatal("DBENGINE METRIC: failed to add metric %zu, section %zu", i, section);
+ ssize_t start = 0;
+ ssize_t end = (ssize_t)t->entries;
+ ssize_t step = 1;
- if(mrg_metric_add_and_acquire(mrg, e, &ret) != array[i][section])
- fatal("DBENGINE METRIC: adding the same metric twice, returns a different metric");
+ if(gettid() % 2) {
+ start = (ssize_t)t->entries - 1;
+ end = -1;
+ step = -1;
+ }
- if(ret)
- fatal("DBENGINE METRIC: adding the same metric twice, returns success");
+ while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) {
+ for (ssize_t i = start; i != end; i += step) {
+ struct mrg_stress_entry *e = &t->array[i];
- if(mrg_metric_get_and_acquire(mrg, &e.uuid, e.section) != array[i][section])
- fatal("DBENGINE METRIC: cannot get back the same metric");
+ time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED);
+ time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED);
- if(uuid_compare(*mrg_metric_uuid(mrg, array[i][section]), e.uuid) != 0)
- fatal("DBENGINE METRIC: uuids do not match");
- }
- }
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, 0x01,
+ &e->uuid,
+ after,
+ before,
+ 1,
+ before);
- for(size_t i = 0; i < entries ; i++) {
- for (size_t section = 0; section < sections; section++) {
- uuid_t uuid;
- uuid_generate_random(uuid);
-
- if(mrg_metric_get_and_acquire(mrg, &uuid, section))
- fatal("DBENGINE METRIC: found non-existing uuid");
-
- if(mrg_metric_id(mrg, array[i][section]) != (Word_t)array[i][section])
- fatal("DBENGINE METRIC: metric id does not match");
-
- if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)(i + 1))
- fatal("DBENGINE METRIC: wrong first time returned");
- if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)(i + 2))
- fatal("DBENGINE METRIC: wrong latest time returned");
- if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)(i + 3))
- fatal("DBENGINE METRIC: wrong latest time returned");
-
- if(!mrg_metric_set_first_time_s(mrg, array[i][section], (time_t)((i + 1) * 2)))
- fatal("DBENGINE METRIC: cannot set first time");
- if(!mrg_metric_set_clean_latest_time_s(mrg, array[i][section], (time_t) ((i + 1) * 3)))
- fatal("DBENGINE METRIC: cannot set latest time");
- if(!mrg_metric_set_update_every(mrg, array[i][section], (time_t)((i + 1) * 4)))
- fatal("DBENGINE METRIC: cannot set update every");
-
- if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 2))
- fatal("DBENGINE METRIC: wrong first time returned");
- if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 3))
- fatal("DBENGINE METRIC: wrong latest time returned");
- if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)((i + 1) * 4))
- fatal("DBENGINE METRIC: wrong latest time returned");
+ __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED);
}
}
- for(size_t i = 0; i < entries ; i++) {
- for (size_t section = 0; section < sections; section++) {
- if(!mrg_metric_release_and_delete(mrg, array[i][section]))
- fatal("DBENGINE METRIC: failed to delete metric");
- }
- }
-}
-
-static void *mrg_stress_test_thread1(void *ptr) {
- MRG *mrg = ptr;
-
- for(int i = 0; i < 5 ; i++)
- mrg_stress(mrg, 10000, 5);
-
- return ptr;
-}
-
-static void *mrg_stress_test_thread2(void *ptr) {
- MRG *mrg = ptr;
-
- for(int i = 0; i < 10 ; i++)
- mrg_stress(mrg, 500, 50);
-
- return ptr;
-}
-
-static void *mrg_stress_test_thread3(void *ptr) {
- MRG *mrg = ptr;
-
- for(int i = 0; i < 50 ; i++)
- mrg_stress(mrg, 5000, 1);
-
return ptr;
}
-#endif
int mrg_unittest(void) {
MRG *mrg = mrg_create();
@@ -894,47 +888,75 @@ int mrg_unittest(void) {
if(s.entries != 0)
fatal("DBENGINE METRIC: invalid entries counter");
-#ifdef MRG_STRESS_TEST
- usec_t started_ut = now_monotonic_usec();
- pthread_t thread1;
- netdata_thread_create(&thread1, "TH1",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread1, mrg);
+ size_t entries = 1000000;
+ size_t threads = MRG_PARTITIONS / 3 + 1;
+ size_t tiers = 3;
+ size_t run_for_secs = 5;
+ info("preparing stress test of %zu entries...", entries);
+ struct mrg_stress t = {
+ .mrg = mrg,
+ .entries = entries,
+ .array = callocz(entries, sizeof(struct mrg_stress_entry)),
+ };
+
+ time_t now = max_acceptable_collected_time();
+ for(size_t i = 0; i < entries ;i++) {
+ uuid_generate_random(t.array[i].uuid);
+ t.array[i].after = now / 3;
+ t.array[i].before = now / 2;
+ }
+ info("stress test is populating MRG with 3 tiers...");
+ for(size_t i = 0; i < entries ;i++) {
+ struct mrg_stress_entry *e = &t.array[i];
+ for(size_t tier = 1; tier <= tiers ;tier++) {
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, tier,
+ &e->uuid,
+ e->after,
+ e->before,
+ 1,
+ e->before);
+ }
+ }
+ info("stress test ready to run...");
- pthread_t thread2;
- netdata_thread_create(&thread2, "TH2",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread2, mrg);
+ usec_t started_ut = now_monotonic_usec();
- pthread_t thread3;
- netdata_thread_create(&thread3, "TH3",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread3, mrg);
+ pthread_t th[threads];
+ for(size_t i = 0; i < threads ; i++) {
+ char buf[15 + 1];
+ snprintfz(buf, 15, "TH[%zu]", i);
+ netdata_thread_create(&th[i], buf,
+ NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
+ mrg_stress, &t);
+ }
+ sleep_usec(run_for_secs * USEC_PER_SEC);
+ __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED);
- sleep_usec(5 * USEC_PER_SEC);
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_cancel(th[i]);
- netdata_thread_cancel(thread1);
- netdata_thread_cancel(thread2);
- netdata_thread_cancel(thread3);
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_join(th[i], NULL);
- netdata_thread_join(thread1, NULL);
- netdata_thread_join(thread2, NULL);
- netdata_thread_join(thread3, NULL);
usec_t ended_ut = now_monotonic_usec();
+ struct mrg_statistics stats;
+ mrg_get_statistics(mrg, &stats);
+
info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
"%zu deletions, %zu wrong deletions, "
"%zu successful searches, %zu wrong searches, "
- "%zu successful pointer validations, %zu wrong pointer validations "
"in %llu usecs",
- mrg->stats.additions, mrg->stats.additions_duplicate,
- mrg->stats.deletions, mrg->stats.delete_misses,
- mrg->stats.search_hits, mrg->stats.search_misses,
- mrg->stats.pointer_validation_hits, mrg->stats.pointer_validation_misses,
+ stats.additions, stats.additions_duplicate,
+ stats.deletions, stats.delete_misses,
+ stats.search_hits, stats.search_misses,
ended_ut - started_ut);
-#endif
+ info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread",
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0,
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads);
mrg_destroy(mrg);
diff --git a/database/engine/metric.h b/database/engine/metric.h
index e539e3f05d..7ed9d7984b 100644
--- a/database/engine/metric.h
+++ b/database/engine/metric.h
@@ -87,4 +87,10 @@ void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s);
size_t mrg_aral_structures(void);
size_t mrg_aral_overhead(void);
+
+void mrg_update_metric_retention_and_granularity_by_uuid(
+ MRG *mrg, Word_t section, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ time_t update_every_s, time_t now_s);
+
#endif // DBENGINE_METRIC_H