diff options
Diffstat (limited to 'database/engine/metric.c')
-rw-r--r-- | database/engine/metric.c | 265 |
1 files changed, 212 insertions, 53 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c index d16bc063d9..bad15b50c2 100644 --- a/database/engine/metric.c +++ b/database/engine/metric.c @@ -3,6 +3,10 @@ typedef int32_t REFCOUNT; #define REFCOUNT_DELETING (-100) +typedef enum __attribute__ ((__packed__)) { + METRIC_FLAG_HAS_RETENTION = (1 << 0), +} METRIC_FLAGS; + struct metric { uuid_t uuid; // never changes Word_t section; // never changes @@ -12,6 +16,8 @@ struct metric { time_t latest_time_s_hot; // latest time of the currently collected page uint32_t latest_update_every_s; // pid_t writer; + METRIC_FLAGS flags; + REFCOUNT refcount; SPINLOCK spinlock; // protects all variable members // THIS IS allocated with malloc() @@ -19,8 +25,9 @@ struct metric { }; struct mrg { + ARAL *aral[MRG_PARTITIONS]; + struct pgc_index { - ARAL *aral; netdata_rwlock_t rwlock; Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers) } index[MRG_PARTITIONS]; @@ -95,9 +102,75 @@ static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { return u[UUID_SZ - 1] % MRG_PARTITIONS; } -static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) { +static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) { + bool has_retention = (metric->first_time_s || metric->latest_time_s_clean || metric->latest_time_s_hot); + + if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) { + metric->flags |= METRIC_FLAG_HAS_RETENTION; + __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + } + else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) { + metric->flags &= ~METRIC_FLAG_HAS_RETENTION; + __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + } + + return has_retention; +} + +static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) { + REFCOUNT refcount; + + if(!having_spinlock) + netdata_spinlock_lock(&metric->spinlock); + + if(unlikely(metric->refcount < 0)) + fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); + + refcount = ++metric->refcount; + + // update its retention flags + metric_has_retention_unsafe(mrg, metric); + + if(!having_spinlock) + netdata_spinlock_unlock(&metric->spinlock); + + if(refcount == 1) + __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + + __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + + return refcount; +} + +static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { + bool ret = true; + REFCOUNT refcount; + + netdata_spinlock_lock(&metric->spinlock); + + if(unlikely(metric->refcount <= 0)) + fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); + + refcount = --metric->refcount; + + if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0)) + ret = false; + + netdata_spinlock_unlock(&metric->spinlock); + + if(unlikely(!refcount)) + __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + + __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + + return ret; +} + +static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { size_t partition = uuid_partition(mrg, &entry->uuid); + METRIC *allocation = aral_mallocz(mrg->aral[partition]); + mrg_index_write_lock(mrg, partition); size_t mem_before_judyl, mem_after_judyl; @@ -117,18 +190,22 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) { if(unlikely(!PValue || PValue == PJERR)) fatal("DBENGINE METRIC: corrupted section JudyL array"); - if(*PValue != NULL) { + if(unlikely(*PValue != NULL)) { METRIC *metric = *PValue; + + metric_acquire(mrg, metric, false); mrg_index_write_unlock(mrg, partition); if(ret) *ret = false; + aral_freez(mrg->aral[partition], allocation); + MRG_STATS_DUPLICATE_ADD(mrg); return metric; } - METRIC *metric = arrayalloc_mallocz(mrg->index[partition].aral); + METRIC *metric = allocation; uuid_copy(metric->uuid, entry->uuid); metric->section = entry->section; metric->first_time_s = entry->first_time_s; @@ -136,7 +213,10 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) { metric->latest_time_s_hot = 0; metric->latest_update_every_s = entry->latest_update_every_s; metric->writer = 0; + metric->refcount = 0; + metric->flags = 0; netdata_spinlock_init(&metric->spinlock); + metric_acquire(mrg, metric, true); // no spinlock use required here *PValue = metric; mrg_index_write_unlock(mrg, partition); @@ -149,7 +229,7 @@ static METRIC *metric_add(MRG *mrg, MRG_ENTRY *entry, bool *ret) { return metric; } -static METRIC *metric_get(MRG *mrg, uuid_t *uuid, Word_t section) { +static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { size_t partition = uuid_partition(mrg, uuid); mrg_index_read_lock(mrg, partition); @@ -170,19 +250,27 @@ static METRIC *metric_get(MRG *mrg, uuid_t *uuid, Word_t section) { METRIC *metric = *PValue; + metric_acquire(mrg, metric, false); + mrg_index_read_unlock(mrg, partition); MRG_STATS_SEARCH_HIT(mrg); return metric; } -static bool metric_del(MRG *mrg, METRIC *metric) { +static bool acquired_metric_del(MRG *mrg, METRIC *metric) { size_t partition = uuid_partition(mrg, &metric->uuid); size_t mem_before_judyl, mem_after_judyl; mrg_index_write_lock(mrg, partition); + if(!metric_release_and_can_be_deleted(mrg, metric)) { + mrg_index_write_unlock(mrg, partition); + __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED); + return false; + } + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { mrg_index_write_unlock(mrg, partition); @@ -208,11 +296,10 @@ static bool metric_del(MRG *mrg, METRIC *metric) { mrg_stats_size_judyhs_removed_uuid(mrg); } - // arrayalloc is running lockless here - arrayalloc_freez(mrg->index[partition].aral, metric); - mrg_index_write_unlock(mrg, partition); + aral_freez(mrg->aral[partition], metric); + MRG_STATS_DELETED_METRIC(mrg, partition); return true; @@ -223,11 +310,22 @@ static bool metric_del(MRG *mrg, METRIC *metric) { MRG *mrg_create(void) { MRG *mrg = callocz(1, sizeof(MRG)); + for(size_t i = 0; i < MRG_PARTITIONS ; i++) { + char buf[ARAL_MAX_NAME + 1]; + snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i); netdata_rwlock_init(&mrg->index[i].rwlock); - mrg->index[i].aral = arrayalloc_create(sizeof(METRIC), 32768 / sizeof(METRIC), NULL, NULL, false, true); + + mrg->aral[i] = aral_create("mrg", + sizeof(METRIC), + 0, + 512, + NULL, NULL, false, + false); } + mrg->stats.size = sizeof(MRG); + return mrg; } @@ -242,32 +340,27 @@ void mrg_destroy(MRG *mrg __maybe_unused) { } METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { - // FIXME - support refcount - // internal_fatal(entry.latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); - return metric_add(mrg, &entry, ret); + return metric_add_and_acquire(mrg, &entry, ret); } METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { - // FIXME - support refcount - return metric_get(mrg, uuid, section); + return metric_get_and_acquire(mrg, uuid, section); } bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { - // FIXME - support refcount - return metric_del(mrg, metric); + return acquired_metric_del(mrg, metric); } -METRIC *mrg_metric_dup(MRG *mrg __maybe_unused, METRIC *metric) { - // FIXME - duplicate refcount +METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { + metric_acquire(mrg, metric, false); return metric; } -void mrg_metric_release(MRG *mrg __maybe_unused, METRIC *metric __maybe_unused) { - // FIXME - release refcount - +bool mrg_metric_release(MRG *mrg, METRIC *metric) { + return metric_release_and_can_be_deleted(mrg, metric); } Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { @@ -285,6 +378,7 @@ Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { netdata_spinlock_lock(&metric->spinlock); metric->first_time_s = first_time_s; + metric_has_retention_unsafe(mrg, metric); netdata_spinlock_unlock(&metric->spinlock); return true; @@ -311,6 +405,7 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t else if(unlikely(!metric->latest_update_every_s && update_every_s)) metric->latest_update_every_s = update_every_s; + metric_has_retention_unsafe(mrg, metric); netdata_spinlock_unlock(&metric->spinlock); } @@ -322,6 +417,7 @@ bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metr metric->first_time_s = first_time_s; ret = true; } + metric_has_retention_unsafe(mrg, metric); netdata_spinlock_unlock(&metric->spinlock); return ret; @@ -382,10 +478,63 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, // if(unlikely(metric->first_time_s > latest_time_s)) // metric->first_time_s = latest_time_s; + metric_has_retention_unsafe(mrg, metric); netdata_spinlock_unlock(&metric->spinlock); return true; } +// returns true when metric still has retention +bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { + Word_t section = mrg_metric_section(mrg, metric); + bool do_again = false; + size_t countdown = 5; + bool ret = true; + + do { + time_t min_first_time_s = LONG_MAX; + time_t max_end_time_s = 0; + PGC_PAGE *page; + PGC_SEARCH method = PGC_SEARCH_FIRST; + time_t page_first_time_s = 0; + time_t page_end_time_s = 0; + while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) { + method = PGC_SEARCH_NEXT; + + bool is_hot = pgc_is_page_hot(page); + bool is_dirty = pgc_is_page_dirty(page); + page_first_time_s = pgc_page_start_time_s(page); + page_end_time_s = pgc_page_end_time_s(page); + + if ((is_hot || is_dirty) && page_first_time_s < min_first_time_s) + min_first_time_s = page_first_time_s; + + if (is_dirty && page_end_time_s > max_end_time_s) + max_end_time_s = page_end_time_s; + + pgc_page_release(main_cache, page); + } + + if (min_first_time_s == LONG_MAX) + min_first_time_s = 0; + + netdata_spinlock_lock(&metric->spinlock); + if (--countdown && !min_first_time_s && metric->latest_time_s_hot) + do_again = true; + else { + internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention"); + + do_again = false; + metric->first_time_s = min_first_time_s; + metric->latest_time_s_clean = max_end_time_s; + + ret = metric_has_retention_unsafe(mrg, metric); + } + netdata_spinlock_unlock(&metric->spinlock); + } while(do_again); + + return ret; +} + bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { // internal_fatal(latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); @@ -399,6 +548,7 @@ bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, t // if(unlikely(metric->first_time_s > latest_time_s)) // metric->first_time_s = latest_time_s; + metric_has_retention_unsafe(mrg, metric); netdata_spinlock_unlock(&metric->spinlock); return true; } @@ -444,7 +594,7 @@ time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { return update_every_s; } -bool mrg_metric_writer_acquire(MRG *mrg, METRIC *metric) { +bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { bool done = false; netdata_spinlock_lock(&metric->spinlock); if(!metric->writer) { @@ -452,11 +602,13 @@ bool mrg_metric_writer_acquire(MRG *mrg, METRIC *metric) { __atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED); done = true; } + else + __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED); netdata_spinlock_unlock(&metric->spinlock); return done; } -bool mrg_metric_writer_release(MRG *mrg, METRIC *metric) { +bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { bool done = false; netdata_spinlock_lock(&metric->spinlock); if(metric->writer) { @@ -584,73 +736,80 @@ static void *mrg_stress_test_thread3(void *ptr) { int mrg_unittest(void) { MRG *mrg = mrg_create(); - METRIC *metric1, *metric2; + METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0; + METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1; bool ret; MRG_ENTRY entry = { - .section = 1, + .section = 0, .first_time_s = 2, .last_time_s = 3, .latest_update_every_s = 4, }; uuid_generate(entry.uuid); - metric1 = mrg_metric_add_and_acquire(mrg, entry, &ret); + m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); if(!ret) fatal("DBENGINE METRIC: failed to add metric"); // add the same metric again - if(mrg_metric_add_and_acquire(mrg, entry, &ret) != metric1) + m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m2_t0 != m1_t0) fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); if(ret) fatal("DBENGINE METRIC: managed to add the same metric twice"); - if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric1) + m3_t0 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section); + if(m3_t0 != m1_t0) fatal("DBENGINE METRIC: cannot find the metric added"); // add the same metric again - if(mrg_metric_add_and_acquire(mrg, entry, &ret) != metric1) + m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m4_t0 != m1_t0) fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); if(ret) fatal("DBENGINE METRIC: managed to add the same metric twice"); // add the same metric in another section - entry.section = 0; - metric2 = mrg_metric_add_and_acquire(mrg, entry, &ret); + entry.section = 1; + m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); if(!ret) - fatal("DBENGINE METRIC: failed to add metric in different section"); + fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section); // add the same metric again - if(mrg_metric_add_and_acquire(mrg, entry, &ret) != metric2) - fatal("DBENGINE METRIC: adding the same metric twice (section 0), does not return the same pointer"); + m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m2_t1 != m1_t1) + fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section); if(ret) fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)"); - if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric2) - fatal("DBENGINE METRIC: cannot find the metric added (section 0)"); + m3_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section); + if(m3_t1 != m1_t1) + fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section); // delete the first metric - if(!mrg_metric_release_and_delete(mrg, metric1)) + mrg_metric_release(mrg, m2_t0); + mrg_metric_release(mrg, m3_t0); + mrg_metric_release(mrg, m4_t0); + mrg_metric_set_first_time_s(mrg, m1_t0, 0); + mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0); + mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0); + if(!mrg_metric_release_and_delete(mrg, m1_t0)) fatal("DBENGINE METRIC: cannot delete the first metric"); - if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric2) - fatal("DBENGINE METRIC: cannot find the metric added (section 0), after deleting the first one"); - - // delete the first metric again - metric1 pointer is invalid now - if(mrg_metric_release_and_delete(mrg, metric1)) - fatal("DBENGINE METRIC: deleted again an already deleted metric"); - - // find the section 0 metric again - if(mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section) != metric2) - fatal("DBENGINE METRIC: cannot find the metric added (section 0), after deleting the first one twice"); + m4_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section); + if(m4_t1 != m1_t1) + fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section); // delete the second metric - if(!mrg_metric_release_and_delete(mrg, metric2)) + mrg_metric_release(mrg, m2_t1); + mrg_metric_release(mrg, m3_t1); + mrg_metric_release(mrg, m4_t1); + mrg_metric_set_first_time_s(mrg, m1_t1, 0); + mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0); + mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0); + if(!mrg_metric_release_and_delete(mrg, m1_t1)) fatal("DBENGINE METRIC: cannot delete the second metric"); - // delete the second metric again - if(mrg_metric_release_and_delete(mrg, metric2)) - fatal("DBENGINE METRIC: managed to delete an already deleted metric"); - if(mrg->stats.entries != 0) fatal("DBENGINE METRIC: invalid entries counter"); |