summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-28 23:18:01 +0300
committerGitHub <noreply@github.com>2023-06-28 23:18:01 +0300
commitb52a989497f68cddeeb0282f5fd650c4e373e477 (patch)
tree592c33c5b51b16c18df65f2b316efef13a3cbac3 /database
parent5be9be74854d00879e6d52d6432ae12b5e8558cd (diff)
Optimizations Part 2 (#15267)
* make all pluginsd functions inline, instead of function pointers * dynamic MRG partitions based on the number of CPUs * report the right size of the MRG
Diffstat (limited to 'database')
-rw-r--r--database/engine/metric.c147
-rw-r--r--database/engine/metric.h14
-rw-r--r--database/engine/pagecache.c2
-rwxr-xr-xdatabase/engine/rrdengineapi.c4
4 files changed, 71 insertions, 96 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 331d881527..1e6f9b95bc 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -28,27 +28,16 @@ struct metric {
static struct aral_statistics mrg_aral_statistics;
struct mrg {
- ARAL *aral[MRG_PARTITIONS];
+ size_t partitions;
- struct pgc_index {
- MRG_CACHE_LINE_PADDING(0);
+ struct mrg_partition {
+ ARAL *aral; // not protected by our spinlock - it has its own
RW_SPINLOCK rw_spinlock;
-
- MRG_CACHE_LINE_PADDING(1);
-
- Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers)
-
- MRG_CACHE_LINE_PADDING(2);
+ Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers)
struct mrg_statistics stats;
-
- MRG_CACHE_LINE_PADDING(3);
- } index[MRG_PARTITIONS];
-
-#ifdef NETDATA_INTERNAL_CHECKS
- size_t entries_per_partition[MRG_PARTITIONS];
-#endif
+ } index[];
};
static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) {
@@ -59,20 +48,12 @@ static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) {
mrg->index[partition].stats.entries++;
mrg->index[partition].stats.additions++;
mrg->index[partition].stats.size += sizeof(METRIC);
-
-#ifdef NETDATA_INTERNAL_CHECKS
- __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
-#endif
}
static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) {
mrg->index[partition].stats.entries--;
mrg->index[partition].stats.size -= sizeof(METRIC);
mrg->index[partition].stats.deletions++;
-
-#ifdef NETDATA_INTERNAL_CHECKS
- __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
-#endif
}
static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) {
@@ -87,18 +68,13 @@ static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) {
mrg->index[partition].stats.delete_misses++;
}
-static inline void mrg_index_read_lock(MRG *mrg, size_t partition) {
- rw_spinlock_read_lock(&mrg->index[partition].rw_spinlock);
-}
-static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) {
- rw_spinlock_read_unlock(&mrg->index[partition].rw_spinlock);
-}
-static inline void mrg_index_write_lock(MRG *mrg, size_t partition) {
- rw_spinlock_write_lock(&mrg->index[partition].rw_spinlock);
-}
-static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) {
- rw_spinlock_write_unlock(&mrg->index[partition].rw_spinlock);
-}
+#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock)
+
+#define metric_lock(metric) spinlock_lock(&(metric)->spinlock)
+#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock)
static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) {
if(mem_after_judyl > mem_before_judyl)
@@ -117,7 +93,8 @@ static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition
static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
uint8_t *u = (uint8_t *)uuid;
- return u[UUID_SZ - 1] % MRG_PARTITIONS;
+ size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)];
+ return *n % mrg->partitions;
}
static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -142,7 +119,7 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b
REFCOUNT refcount;
if(!having_spinlock)
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(metric->refcount < 0))
fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
@@ -153,7 +130,7 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b
metric_has_retention_unsafe(mrg, metric);
if(!having_spinlock)
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
if(refcount == 1)
__atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
@@ -168,7 +145,7 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME
size_t partition = metric->partition;
REFCOUNT refcount;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(metric->refcount <= 0))
fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
@@ -178,7 +155,7 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME
if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
ret = false;
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
if(unlikely(!refcount))
__atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
@@ -191,7 +168,7 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME
static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, &entry->uuid);
- METRIC *allocation = aral_mallocz(mrg->aral[partition]);
+ METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
mrg_index_write_lock(mrg, partition);
@@ -224,7 +201,7 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
if(ret)
*ret = false;
- aral_freez(mrg->aral[partition], allocation);
+ aral_freez(mrg->index[partition].aral, allocation);
return metric;
}
@@ -326,7 +303,7 @@ static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
mrg_index_write_unlock(mrg, partition);
- aral_freez(mrg->aral[partition], metric);
+ aral_freez(mrg->index[partition].aral, metric);
return true;
}
@@ -334,22 +311,20 @@ static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
// ----------------------------------------------------------------------------
// public API
-inline MRG *mrg_create(void) {
- MRG *mrg = callocz(1, sizeof(MRG));
+inline MRG *mrg_create(size_t partitions) {
+ if(partitions < 1)
+ partitions = get_netdata_cpus();
+
+ MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions);
+ mrg->partitions = partitions;
- for(size_t i = 0; i < MRG_PARTITIONS ; i++) {
+ for(size_t i = 0; i < mrg->partitions ; i++) {
rw_spinlock_init(&mrg->index[i].rw_spinlock);
char buf[ARAL_MAX_NAME + 1];
snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i);
- mrg->aral[i] = aral_create(buf,
- sizeof(METRIC),
- 0,
- 16384,
- &mrg_aral_statistics,
- NULL, NULL, false,
- false);
+ mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false);
}
return mrg;
@@ -415,10 +390,10 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric,
if(unlikely(first_time_s < 0))
return false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->first_time_s = first_time_s;
metric_has_retention_unsafe(mrg, metric);
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
@@ -443,7 +418,7 @@ inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric,
if(unlikely(!first_time_s && !last_time_s && !update_every_s))
return;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
metric->first_time_s = first_time_s;
@@ -458,7 +433,7 @@ inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric,
metric->latest_update_every_s = (uint32_t) update_every_s;
metric_has_retention_unsafe(mrg, metric);
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
}
inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
@@ -466,13 +441,13 @@ inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRI
bool ret = false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(first_time_s > metric->first_time_s) {
metric->first_time_s = first_time_s;
ret = true;
}
metric_has_retention_unsafe(mrg, metric);
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return ret;
}
@@ -480,7 +455,7 @@ inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRI
inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t first_time_s;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(!metric->first_time_s)) {
if(metric->latest_time_s_clean)
@@ -492,13 +467,13 @@ inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metri
first_time_s = metric->first_time_s;
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return first_time_s;
}
inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(!metric->first_time_s)) {
if(metric->latest_time_s_clean)
@@ -512,7 +487,7 @@ inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, ti
*last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
*update_every_s = metric->latest_update_every_s;
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
}
inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
@@ -521,7 +496,7 @@ inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *
if(unlikely(latest_time_s < 0))
return false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
@@ -535,7 +510,7 @@ inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *
metric->first_time_s = latest_time_s;
metric_has_retention_unsafe(mrg, metric);
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
@@ -573,7 +548,7 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
if (min_first_time_s == LONG_MAX)
min_first_time_s = 0;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
do_again = true;
else {
@@ -585,7 +560,7 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
ret = metric_has_retention_unsafe(mrg, metric);
}
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
} while(do_again);
return ret;
@@ -600,22 +575,22 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me
if(unlikely(latest_time_s < 0))
return false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->latest_time_s_hot = latest_time_s;
if(unlikely(!metric->first_time_s))
metric->first_time_s = latest_time_s;
metric_has_retention_unsafe(mrg, metric);
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t max;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return max;
}
@@ -625,9 +600,9 @@ inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric,
if(update_every_s <= 0)
return false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->latest_update_every_s = (uint32_t) update_every_s;
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
@@ -638,10 +613,10 @@ inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRI
if(update_every_s <= 0)
return false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(!metric->latest_update_every_s)
metric->latest_update_every_s = (uint32_t) update_every_s;
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
@@ -649,16 +624,16 @@ inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRI
inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t update_every_s;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
update_every_s = metric->latest_update_every_s;
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return update_every_s;
}
inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
bool done = false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(!metric->writer) {
metric->writer = gettid();
__atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
@@ -666,19 +641,19 @@ inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
}
else
__atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED);
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return done;
}
inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
bool done = false;
- spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(metric->writer) {
metric->writer = 0;
__atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
done = true;
}
- spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return done;
}
@@ -734,7 +709,7 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid(
inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
memset(s, 0, sizeof(struct mrg_statistics));
- for(int i = 0; i < MRG_PARTITIONS ;i++) {
+ for(size_t i = 0; i < mrg->partitions ;i++) {
s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED);
@@ -751,7 +726,7 @@ inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED);
}
- s->size += sizeof(MRG);
+ s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions;
}
// ----------------------------------------------------------------------------
@@ -808,7 +783,7 @@ static void *mrg_stress(void *ptr) {
}
int mrg_unittest(void) {
- MRG *mrg = mrg_create();
+ MRG *mrg = mrg_create(0);
METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0;
METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1;
bool ret;
@@ -889,7 +864,7 @@ int mrg_unittest(void) {
fatal("DBENGINE METRIC: invalid entries counter");
size_t entries = 1000000;
- size_t threads = MRG_PARTITIONS / 3 + 1;
+ size_t threads = mrg->partitions / 3 + 1;
size_t tiers = 3;
size_t run_for_secs = 5;
info("preparing stress test of %zu entries...", entries);
diff --git a/database/engine/metric.h b/database/engine/metric.h
index 7ed9d7984b..871b6923ce 100644
--- a/database/engine/metric.h
+++ b/database/engine/metric.h
@@ -3,8 +3,6 @@
#include "../rrd.h"
-#define MRG_PARTITIONS 10
-
#define MRG_CACHE_LINE_PADDING(x) uint8_t padding##x[64]
typedef struct metric METRIC;
@@ -19,9 +17,10 @@ typedef struct mrg_entry {
} MRG_ENTRY;
struct mrg_statistics {
- // non-atomic - under a write lock
+ // --- non-atomic --- under a write lock
+
size_t entries;
- size_t size; // total memory used, with indexing
+ size_t size; // total memory used, with indexing
size_t additions;
size_t additions_duplicate;
@@ -30,9 +29,10 @@ struct mrg_statistics {
size_t delete_having_retention_or_referenced;
size_t delete_misses;
- // atomic - multiple readers / writers
-
MRG_CACHE_LINE_PADDING(0);
+
+ // --- atomic --- multiple readers / writers
+
size_t entries_referenced;
MRG_CACHE_LINE_PADDING(1);
@@ -50,7 +50,7 @@ struct mrg_statistics {
size_t writers_conflicts;
};
-MRG *mrg_create(void);
+MRG *mrg_create(size_t partitions);
void mrg_destroy(MRG *mrg);
METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric);
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 85c3a2ce98..c608c32700 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -1083,7 +1083,7 @@ size_t dynamic_extent_cache_size(void) {
void pgc_and_mrg_initialize(void)
{
- main_mrg = mrg_create();
+ main_mrg = mrg_create(0);
size_t target_cache_size = (size_t)default_rrdeng_page_cache_mb * 1024ULL * 1024ULL;
size_t main_cache_size = (target_cache_size / 100) * 95;
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index d0ce059d40..a960cbb466 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -1079,8 +1079,8 @@ static void rrdeng_populate_mrg(struct rrdengine_instance *ctx) {
if(cpus > (size_t)libuv_worker_threads)
cpus = (size_t)libuv_worker_threads;
- if(cpus >= MRG_PARTITIONS / 2)
- cpus = MRG_PARTITIONS / 2 - 1;
+ if(cpus >= (size_t)get_netdata_cpus() / 2)
+ cpus = get_netdata_cpus()/ 2 - 1;
if(cpus < 1)
cpus = 1;