summaryrefslogtreecommitdiffstats
path: root/database/engine/metric.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/metric.c')
-rw-r--r--database/engine/metric.c265
1 files changed, 212 insertions, 53 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c
index d16bc063d9..bad15b50c2 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -3,6 +3,10 @@
typedef int32_t REFCOUNT;
#define REFCOUNT_DELETING (-100)
+typedef enum __attribute__ ((__packed__)) {
+ METRIC_FLAG_HAS_RETENTION = (1 << 0),
+} METRIC_FLAGS;
+
struct metric {
uuid_t uuid; // never changes
Word_t section; // never changes
@@ -12,6 +16,8 @@ struct metric {
time_t latest_time_s_hot; // latest time of the currently collected page
uint32_t latest_update_every_s; //
pid_t writer;
+ METRIC_FLAGS flags;
+ REFCOUNT refcount;
SPINLOCK spinlock; // protects all variable members
// THIS IS allocated with malloc()
@@ -19,8 +25,9 @@ struct metric {
};
struct mrg {
+ ARAL *aral[MRG_PARTITIONS];
+
struct pgc_index {
- ARAL *aral;
netdata_rwlock_t rwlock;
Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers)
} index[MRG_PARTITIONS];
@@ -95,9 +102,75 @@ static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
return u[UUID_SZ - 1] % MRG_PARTITIONS;
}
-static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
+static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
+ bool has_retention = (metric->first_time_s || metric->latest_time_s_clean || metric->latest_time_s_hot);
+
+ if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
+ metric->flags |= METRIC_FLAG_HAS_RETENTION;
+ __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ }
+ else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) {
+ metric->flags &= ~METRIC_FLAG_HAS_RETENTION;
+ __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ }
+
+ return has_retention;
+}
+
+static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) {
+ REFCOUNT refcount;
+
+ if(!having_spinlock)
+ netdata_spinlock_lock(&metric->spinlock);
+
+ if(unlikely(metric->refcount < 0))
+ fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
+
+ refcount = ++metric->refcount;
+
+ // update its retention flags
+ metric_has_retention_unsafe(mrg, metric);
+
+ if(!having_spinlock)
+ netdata_spinlock_unlock(&metric->spinlock);
+
+ if(refcount == 1)
+ __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
+
+ __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
+
+ return refcount;
+}
+
+static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
+ bool ret = true;
+ REFCOUNT refcount;
+
+ netdata_spinlock_lock(&metric->spinlock);
+
+ if(unlikely(metric->refcount <= 0))
+ fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
+
+ refcount = --metric->refcount;
+
+ if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
+ ret = false;
+
+ netdata_spinlock_unlock(&metric->spinlock);
+
+ if(unlikely(!refcount))
+ __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
+
+ __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
+
+ return ret;
+}
+
+static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, &entry->uuid);
+ METRIC *allocation = aral_mallocz(mrg->aral[partition]);
+
mrg_index_write_lock(mrg, partition);
size_t mem_before_judyl, mem_after_judyl;
@@ -117,18 +190,22 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
if(unlikely(!PValue || PValue == PJERR))
fatal("DBENGINE METRIC: corrupted section JudyL array");
- if(*PValue != NULL) {
+ if(unlikely(*PValue != NULL)) {
METRIC *metric = *PValue;
+
+ metric_acquire(mrg, metric, false);
mrg_index_write_unlock(mrg, partition);
if(ret)
*ret = false;
+ aral_freez(mrg->aral[partition], allocation);
+
MRG_STATS_DUPLICATE_ADD(mrg);
return metric;
}
- METRIC *metric = arrayalloc_mallocz(mrg->index[partition].aral);
+ METRIC *metric = allocation;
uuid_copy(metric->uuid, entry->uuid);
metric->section = entry->section;
metric->first_time_s = entry->first_time_s;
@@ -136,7 +213,10 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
metric->latest_time_s_hot = 0;
metric->latest_update_every_s = entry->latest_update_every_s;
metric->writer = 0;
+ metric->refcount = 0;
+ metric->flags = 0;
netdata_spinlock_init(&metric->spinlock);
+ metric_acquire(mrg, metric, true); // no spinlock use required here
*PValue = metric;
mrg_index_write_unlock(mrg, partition);
@@ -149,7 +229,7 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
return metric;
}
-static METRIC *metric_get(MRG *mrg, uuid_t *uuid, Word_t section) {
+static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
size_t partition = uuid_partition(mrg, uuid);
mrg_index_read_lock(mrg, partition);
@@ -170,19 +250,27 @@ static METRIC *metric_get(MRG *mrg, uuid_t *uuid, Word_t section) {
METRIC *metric = *PValue;
+ metric_acquire(mrg, metric, false);
+
mrg_index_read_unlock(mrg, partition);
MRG_STATS_SEARCH_HIT(mrg);
return metric;
}
-static bool metric_del(MRG *mrg, METRIC *metric) {
+static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
size_t partition = uuid_partition(mrg, &metric->uuid);
size_t mem_before_judyl, mem_after_judyl;
mrg_index_write_lock(mrg, partition);
+ if(!metric_release_and_can_be_deleted(mrg, metric)) {
+ mrg_index_write_unlock(mrg, partition);
+ __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED);
+ return false;
+ }
+
Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
mrg_index_write_unlock(mrg, partition);
@@ -208,11 +296,10 @@ static bool metric_del(MRG *mrg, METRIC *metric) {
mrg_stats_size_judyhs_removed_uuid(mrg);
}
- // arrayalloc is running lockless here
- arrayalloc_freez(mrg->index[partition].aral, metric);
-
mrg_index_write_unlock(mrg, partition);
+ aral_freez(mrg->aral[partition], metric);
+
MRG_STATS_DELETED_METRIC(mrg, partition);
return true;
@@ -223,11 +310,22 @@ static bool metric_del(MRG *mrg, METRIC *metric) {
MRG *mrg_create(void) {
MRG *mrg = callocz(1, sizeof(MRG));
+
for(size_t i = 0; i < MRG_PARTITIONS ; i++) {
+ char buf[ARAL_MAX_NAME + 1];
+ snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i);
netdata_rwlock_init(&mrg->index[i].rwlock);
- mrg->index[i].aral = arrayalloc_create(sizeof(METRIC), 32768 / sizeof(METRIC), NULL, NULL, false, true);
+
+ mrg->aral[i] = aral_create("mrg",
+ sizeof(METRIC),
+ 0,
+ 512,
+ NULL, NULL, false,
+ false);
}
+
mrg->stats.size = sizeof(MRG);
+
return mrg;
}
@@ -242,32 +340,27 @@ void mrg_destroy(MRG *mrg __maybe_unused) {
}
METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
- // FIXME - support refcount
-
// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
- return metric_add(mrg, &entry, ret);
+ return metric_add_and_acquire(mrg, &entry, ret);
}
METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
- // FIXME - support refcount
- return metric_get(mrg, uuid, section);
+ return metric_get_and_acquire(mrg, uuid, section);
}
bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
- // FIXME - support refcount
- return metric_del(mrg, metric);
+ return acquired_metric_del(mrg, metric);
}
-METRIC *mrg_metric_dup(MRG *mrg __maybe_unused, METRIC *metric) {
- // FIXME - duplicate refcount
+METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
+ metric_acquire(mrg, metric, false);
return metric;
}
-void mrg_metric_release(MRG *mrg __maybe_unused, METRIC *metric __maybe_unused) {
- // FIXME - release refcount
-
+bool mrg_metric_release(MRG *mrg, METRIC *metric) {
+ return metric_release_and_can_be_deleted(mrg, metric);
}
Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -285,6 +378,7 @@ Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
netdata_spinlock_lock(&metric->spinlock);
metric->first_time_s = first_time_s;
+ metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
return true;
@@ -311,6 +405,7 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
else if(unlikely(!metric->latest_update_every_s && update_every_s))
metric->latest_update_every_s = update_every_s;
+ metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
}
@@ -322,6 +417,7 @@ bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metr
metric->first_time_s = first_time_s;
ret = true;
}
+ metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
return ret;
@@ -382,10 +478,63 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric,
// if(unlikely(metric->first_time_s > latest_time_s))
// metric->first_time_s = latest_time_s;
+ metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
return true;
}
+// returns true when metric still has retention
+bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
+ Word_t section = mrg_metric_section(mrg, metric);
+ bool do_again = false;
+ size_t countdown = 5;
+ bool ret = true;
+
+ do {
+ time_t min_first_time_s = LONG_MAX;
+ time_t max_end_time_s = 0;
+ PGC_PAGE *page;
+ PGC_SEARCH method = PGC_SEARCH_FIRST;
+ time_t page_first_time_s = 0;
+ time_t page_end_time_s = 0;
+ while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) {
+ method = PGC_SEARCH_NEXT;
+
+ bool is_hot = pgc_is_page_hot(page);
+ bool is_dirty = pgc_is_page_dirty(page);
+ page_first_time_s = pgc_page_start_time_s(page);
+ page_end_time_s = pgc_page_end_time_s(page);
+
+ if ((is_hot || is_dirty) && page_first_time_s < min_first_time_s)
+ min_first_time_s = page_first_time_s;
+
+ if (is_dirty && page_end_time_s > max_end_time_s)
+ max_end_time_s = page_end_time_s;
+
+ pgc_page_release(main_cache, page);
+ }
+
+ if (min_first_time_s == LONG_MAX)
+ min_first_time_s = 0;
+
+ netdata_spinlock_lock(&metric->spinlock);
+ if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
+ do_again = true;
+ else {
+ internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention");
+
+ do_again = false;
+ metric->first_time_s = min_first_time_s;
+ metric->latest_time_s_clean = max_end_time_s;
+
+ ret = metric_has_retention_unsafe(mrg, metric);
+ }
+ netdata_spinlock_unlock(&metric->spinlock);
+ } while(do_again);
+
+ return ret;
+}
+
bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
@@ -399,6 +548,7 @@ bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, t
// if(unlikely(metric->first_time_s > latest_time_s))
// metric->first_time_s = latest_time_s;
+ metric_has_retention_unsafe(mrg, metric);
netdata_spinlock_unlock(&metric->spinlock);
return true;
}
@@ -444,7 +594,7 @@ time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
return update_every_s;
}
-bool mrg_metric_writer_acquire(MRG *mrg, METRIC *metric) {
+bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
bool done = false;
netdata_spinlock_lock(&metric->spinlock);
if(!metric->writer) {
@@ -452,11 +602,13 @@ bool mrg_metric_writer_acquire(MRG *mrg, METRIC *metric) {
__atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED);
done = true;
}
+ else
+ __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED);
netdata_spinlock_unlock(&metric->spinlock);
return done;
}
-bool mrg_metric_writer_release(MRG *mrg, METRIC *metric) {
+bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
bool done = false;
netdata_spinlock_lock(&metric->spinlock);
if(metric->writer) {
@@ -584,73 +736,80 @@ static void *mrg_stress_test_thread3(void *ptr) {
int mrg_unittest(void) {
MRG *mrg = mrg_create();
- METRIC *metric1, *metric2;
+ METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0;
+ METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1;
bool ret;
MRG_ENTRY entry = {
- .section = 1,
+ .section = 0,
.first_time_s = 2,
.last_time_s = 3,
.latest_update_every_s = 4,
};
uuid_generate(entry.uuid);
- metric1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
if(!ret)
fatal("DBENGINE METRIC: failed to add metric");
// add the same metric again
- if(mrg_metric_add_and_acquire(mrg, entry, &ret) != metric1)
+ m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(m2_t0 != m1_t0)
fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
if(ret)
fatal("DBENGINE METRIC: managed to add the same metric twice");
- if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric1)
+ m3_t0 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section);
+ if(m3_t0 != m1_t0)
fatal("DBENGINE METRIC: cannot find the metric added");
// add the same metric again
- if(mrg_metric_add_and_acquire(mrg, entry, &ret) != metric1)
+ m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(m4_t0 != m1_t0)
fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
if(ret)
fatal("DBENGINE METRIC: managed to add the same metric twice");
// add the same metric in another section
- entry.section = 0;
- metric2 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ entry.section = 1;
+ m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
if(!ret)
- fatal("DBENGINE METRIC: failed to add metric in different section");
+ fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section);
// add the same metric again
- if(mrg_metric_add_and_acquire(mrg, entry, &ret) != metric2)
- fatal("DBENGINE METRIC: adding the same metric twice (section 0), does not return the same pointer");
+ m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(m2_t1 != m1_t1)
+ fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section);
if(ret)
fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)");
- if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric2)
- fatal("DBENGINE METRIC: cannot find the metric added (section 0)");
+ m3_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section);
+ if(m3_t1 != m1_t1)
+ fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section);
// delete the first metric
- if(!mrg_metric_release_and_delete(mrg, metric1))
+ mrg_metric_release(mrg, m2_t0);
+ mrg_metric_release(mrg, m3_t0);
+ mrg_metric_release(mrg, m4_t0);
+ mrg_metric_set_first_time_s(mrg, m1_t0, 0);
+ mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0);
+ mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0);
+ if(!mrg_metric_release_and_delete(mrg, m1_t0))
fatal("DBENGINE METRIC: cannot delete the first metric");
- if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric2)
- fatal("DBENGINE METRIC: cannot find the metric added (section 0), after deleting the first one");
-
- // delete the first metric again - metric1 pointer is invalid now
- if(mrg_metric_release_and_delete(mrg, metric1))
- fatal("DBENGINE METRIC: deleted again an already deleted metric");
-
- // find the section 0 metric again
- if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric2)
- fatal("DBENGINE METRIC: cannot find the metric added (section 0), after deleting the first one twice");
+ m4_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section);
+ if(m4_t1 != m1_t1)
+ fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section);
// delete the second metric
- if(!mrg_metric_release_and_delete(mrg, metric2))
+ mrg_metric_release(mrg, m2_t1);
+ mrg_metric_release(mrg, m3_t1);
+ mrg_metric_release(mrg, m4_t1);
+ mrg_metric_set_first_time_s(mrg, m1_t1, 0);
+ mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0);
+ mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0);
+ if(!mrg_metric_release_and_delete(mrg, m1_t1))
fatal("DBENGINE METRIC: cannot delete the second metric");
- // delete the second metric again
- if(mrg_metric_release_and_delete(mrg, metric2))
- fatal("DBENGINE METRIC: managed to delete an already deleted metric");
-
if(mrg->stats.entries != 0)
fatal("DBENGINE METRIC: invalid entries counter");