summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2024-01-15 18:25:24 +0200
committerGitHub <noreply@github.com>2024-01-15 18:25:24 +0200
commit10721be4ec3feaf72ff45856a7c447fe95a4b435 (patch)
treeb573ebbccd23225bbed289a0dd8b3f48d538bc49
parentd5b2ddcfcd9b033fc23d6df4bf6035fc3ddbea05 (diff)
Improve context load (#16659)
* Improve single thread load. Handle thread creation failure as well Remove RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS Improve chart label cleanup * Init thread index
-rw-r--r--database/rrd.h11
-rw-r--r--database/rrdhost.c2
-rw-r--r--database/sqlite/sqlite_metadata.c160
3 files changed, 102 insertions, 71 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 1a98e5eef2..5bd9be18d8 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -975,19 +975,18 @@ typedef enum __attribute__ ((__packed__)) rrdhost_flags {
// ACLK
RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 21), // when set, we should send ACLK stream context updates
- RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // set when the receiver part is disconnected
+ RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // Host should stream alerts
// Metadata
RRDHOST_FLAG_METADATA_UPDATE = (1 << 23), // metadata needs to be stored in the database
RRDHOST_FLAG_METADATA_LABELS = (1 << 24), // metadata needs to be stored in the database
RRDHOST_FLAG_METADATA_INFO = (1 << 25), // metadata needs to be stored in the database
- RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // metadata needs to be stored in the database
- RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS = (1 << 27), // metadata needs to be stored in the database
+ RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // Context needs to be loaded
- RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database
- RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected
+ RRDHOST_FLAG_METADATA_CLAIMID = (1 << 27), // metadata needs to be stored in the database
+ RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 28), // set when the receiver part is disconnected
- RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED = (1 << 30), // set when the host has updated global functions
+ RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED = (1 << 29), // set when the host has updated global functions
} RRDHOST_FLAGS;
#define rrdhost_flag_check(host, flag) (__atomic_load_n(&((host)->flags), __ATOMIC_SEQ_CST) & (flag))
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 4af110b5ae..52e14afcb9 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -1739,7 +1739,7 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
s->db.instances = host->rrdctx.instances;
s->db.contexts = dictionary_entries(host->rrdctx.contexts);
if(!s->db.first_time_s || !s->db.last_time_s || !s->db.metrics || !s->db.instances || !s->db.contexts ||
- (flags & (RRDHOST_FLAG_PENDING_CONTEXT_LOAD|RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS)))
+ (flags & (RRDHOST_FLAG_PENDING_CONTEXT_LOAD)))
s->db.status = RRDHOST_DB_STATUS_INITIALIZING;
else
s->db.status = RRDHOST_DB_STATUS_QUERYABLE;
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 636f519665..02361a0a61 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -873,7 +873,7 @@ static void check_dimension_metadata(struct metadata_wc *wc)
uint32_t total_checked = 0;
uint32_t total_deleted = 0;
- internal_error(true, "METADATA: Checking dimensions starting after row %"PRIu64, last_row_id);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking dimensions starting after row %" PRIu64, last_row_id);
bool more_to_do = run_cleanup_loop(
res,
@@ -896,8 +896,10 @@ static void check_dimension_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_DIM_CHECK_INTERVAL;
}
- internal_error(true,
- "METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds",
+ nd_log(
+ NDLS_DAEMON,
+ NDLP_DEBUG,
+ "Dimensions checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
last_row_id ? "resume" : "restart",
@@ -932,7 +934,7 @@ static void check_chart_metadata(struct metadata_wc *wc)
uint32_t total_checked = 0;
uint32_t total_deleted = 0;
- internal_error(true, "METADATA: Checking charts starting after row %"PRIu64, last_row_id);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking charts starting after row %" PRIu64, last_row_id);
sqlite3_stmt *check_res = NULL;
sqlite3_stmt *action_res = NULL;
@@ -963,8 +965,10 @@ static void check_chart_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_CHART_CHECK_INTERVAL;
}
- internal_error(true,
- "METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds",
+ nd_log(
+ NDLS_DAEMON,
+ NDLP_DEBUG,
+ "Charts checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
last_row_id ? "resume" : "restart",
@@ -1000,7 +1004,7 @@ static void check_label_metadata(struct metadata_wc *wc)
uint32_t total_checked = 0;
uint32_t total_deleted = 0;
- internal_error(true,"METADATA: Checking charts labels starting after row %"PRIu64, last_row_id);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking charts labels starting after row %" PRIu64, last_row_id);
sqlite3_stmt *check_res = NULL;
sqlite3_stmt *action_res = NULL;
@@ -1032,8 +1036,10 @@ static void check_label_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL;
}
- internal_error(true,
- "METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds",
+ nd_log(
+ NDLS_DAEMON,
+ NDLP_DEBUG,
+ "Chart labels checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
last_row_id ? "resume" : "restart",
@@ -1223,13 +1229,17 @@ static void restore_host_context(void *arg)
rrdhost_load_rrdcontext_data(host);
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
- rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD);
#ifdef ENABLE_ACLK
aclk_queue_node_info(host, false);
#endif
- internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host),
+ nd_log(
+ NDLS_DAEMON,
+ NDLP_DEBUG,
+ "Contexts for host %s loaded in %0.2f ms",
+ rrdhost_hostname(host),
(double)(ended_ut - started_ut) / USEC_PER_MS);
__atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE);
@@ -1246,12 +1256,15 @@ static void after_start_host_load_context(uv_work_t *req, int status __maybe_unu
static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait)
{
+ if (!hclt)
+ return;
+
for (size_t index = 0; index < max_thread_slots; index++) {
if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED)
|| (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) {
int rc = uv_thread_join(&(hclt[index].thread));
if (rc)
- netdata_log_error("Failed to join thread, rc = %d",rc);
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "Failed to join thread, rc = %d", rc);
__atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE);
__atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE);
}
@@ -1290,40 +1303,50 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)
size_t max_threads = MIN(get_netdata_cpus() / 2, 6);
if (max_threads < 1)
max_threads = 1;
- nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Using %zu threads for context loading", max_threads);
- struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt));
- size_t thread_index;
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Using %zu threads for context loading", max_threads);
+ struct host_context_load_thread *hclt = max_threads > 1 ? callocz(max_threads, sizeof(*hclt)) : NULL;
+
+ size_t thread_index = 0;
dfe_start_reentrant(rrdhost_root_index, host) {
- if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) ||
- !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
+ if (!rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
continue;
- rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
- internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host));
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Loading context for host %s", rrdhost_hostname(host));
- bool found_slot = false;
- do {
- if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
- break;
+ int rc = 0;
+ if (hclt) {
+ bool found_slot = false;
+ do {
+ if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
+ break;
+
+ cleanup_finished_threads(hclt, max_threads, false);
+ found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
+ } while (!found_slot);
- cleanup_finished_threads(hclt, max_threads, false);
- found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
- } while (!found_slot);
+ if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
+ break;
- if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
- break;
+ __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
+ hclt[thread_index].host = host;
+ rc = uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]);
+ }
+ // if single thread or thread creation failed
+ if (rc || !hclt) {
+ struct host_context_load_thread hclt_sync = {.host = host};
+ restore_host_context(&hclt_sync);
- __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
- hclt[thread_index].host = host;
- fatal_assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
+ if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
+ break;
+ }
}
dfe_done(host);
cleanup_finished_threads(hclt, max_threads, true);
freez(hclt);
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
- nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
worker_is_idle();
}
@@ -1335,11 +1358,10 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
struct metadata_wc *wc = data->wc;
metadata_flag_clear(wc, METADATA_FLAG_PROCESSING);
- internal_error(true, "METADATA: scanning hosts complete");
- if (unlikely(wc->scan_complete)) {
+
+ if (unlikely(wc->scan_complete))
completion_mark_complete(wc->scan_complete);
- internal_error(true, "METADATA: Sending completion done");
- }
+
freez(data);
}
@@ -1448,6 +1470,30 @@ struct host_chart_label_cleanup {
Word_t count;
};
+static void do_chart_label_cleanup(struct host_chart_label_cleanup *cl_cleanup_data)
+{
+ if (!cl_cleanup_data)
+ return;
+
+ Word_t Index = 0;
+ bool first = true;
+ Pvoid_t *PValue;
+ while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) {
+ char *machine_guid = *PValue;
+
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid);
+ if (likely(!host)) {
+ uuid_t host_uuid;
+ if (!uuid_parse(machine_guid, host_uuid))
+ delete_host_chart_labels(&host_uuid);
+ }
+
+ freez(machine_guid);
+ }
+ JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0);
+ freez(cl_cleanup_data);
+}
+
// Worker thread to scan hosts for pending metadata to store
static void start_metadata_hosts(uv_work_t *req __maybe_unused)
{
@@ -1461,30 +1507,10 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
BUFFER *work_buffer = data->work_buffer;
usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut;
- internal_error(true, "METADATA: checking all hosts...");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking all hosts started");
usec_t started_ut = now_monotonic_usec(); (void)started_ut;
- struct host_chart_label_cleanup *cl_cleanup_data = data->data;
-
- if (cl_cleanup_data) {
- Word_t Index = 0;
- bool first = true;
- Pvoid_t *PValue;
- while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) {
- char *machine_guid = *PValue;
-
- host = rrdhost_find_by_guid(machine_guid);
- if (likely(!host)) {
- uuid_t host_uuid;
- if (!uuid_parse(machine_guid, host_uuid))
- delete_host_chart_labels(&host_uuid);
- }
-
- freez(machine_guid);
- }
- JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0);
- freez(cl_cleanup_data);
- }
+ do_chart_label_cleanup((struct host_chart_label_cleanup *) data->data);
bool run_again = false;
worker_is_busy(UV_EVENT_METADATA_STORE);
@@ -1550,12 +1576,15 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) {
run_again = true;
rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
- internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host));
}
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
- internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms",
- rrdhost_hostname(host), query_counter,
- (double)(ended_ut - started_ut) / USEC_PER_MS);
+ nd_log(
+ NDLS_DAEMON,
+ NDLP_DEBUG,
+ "Host %s saved metadata with %zu SQL statements, in %0.2f ms",
+ rrdhost_hostname(host),
+ query_counter,
+ (double)(ended_ut - started_ut) / USEC_PER_MS);
}
dfe_done(host);
@@ -1563,8 +1592,11 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
transaction_started = db_execute(db_meta, "COMMIT TRANSACTION");
usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
- internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
- (double)(all_ended_ut - all_started_ut) / USEC_PER_MS);
+ nd_log(
+ NDLS_DAEMON,
+ NDLP_DEBUG,
+ "Checking all hosts completed in %0.2f ms",
+ (double)(all_ended_ut - all_started_ut) / USEC_PER_MS);
if (unlikely(run_again))
wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;