diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2024-01-15 18:25:24 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-15 18:25:24 +0200 |
commit | 10721be4ec3feaf72ff45856a7c447fe95a4b435 (patch) | |
tree | b573ebbccd23225bbed289a0dd8b3f48d538bc49 | |
parent | d5b2ddcfcd9b033fc23d6df4bf6035fc3ddbea05 (diff) |
Improve context load (#16659)
* Improve single thread load. Handle thread creation failure as well
Remove RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS
Improve chart label cleanup
* Init thread index
-rw-r--r-- | database/rrd.h | 11 | ||||
-rw-r--r-- | database/rrdhost.c | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 160 |
3 files changed, 102 insertions, 71 deletions
diff --git a/database/rrd.h b/database/rrd.h index 1a98e5eef2..5bd9be18d8 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -975,19 +975,18 @@ typedef enum __attribute__ ((__packed__)) rrdhost_flags { // ACLK RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 21), // when set, we should send ACLK stream context updates - RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // set when the receiver part is disconnected + RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // Host should stream alerts // Metadata RRDHOST_FLAG_METADATA_UPDATE = (1 << 23), // metadata needs to be stored in the database RRDHOST_FLAG_METADATA_LABELS = (1 << 24), // metadata needs to be stored in the database RRDHOST_FLAG_METADATA_INFO = (1 << 25), // metadata needs to be stored in the database - RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // metadata needs to be stored in the database - RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS = (1 << 27), // metadata needs to be stored in the database + RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // Context needs to be loaded - RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database - RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected + RRDHOST_FLAG_METADATA_CLAIMID = (1 << 27), // metadata needs to be stored in the database + RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 28), // set when the receiver part is disconnected - RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED = (1 << 30), // set when the host has updated global functions + RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED = (1 << 29), // set when the host has updated global functions } RRDHOST_FLAGS; #define rrdhost_flag_check(host, flag) (__atomic_load_n(&((host)->flags), __ATOMIC_SEQ_CST) & (flag)) diff --git a/database/rrdhost.c b/database/rrdhost.c index 4af110b5ae..52e14afcb9 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -1739,7 +1739,7 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) { s->db.instances = host->rrdctx.instances; s->db.contexts = dictionary_entries(host->rrdctx.contexts); if(!s->db.first_time_s || !s->db.last_time_s || !s->db.metrics || !s->db.instances || !s->db.contexts || - (flags & (RRDHOST_FLAG_PENDING_CONTEXT_LOAD|RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS))) + (flags & (RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) s->db.status = RRDHOST_DB_STATUS_INITIALIZING; else s->db.status = RRDHOST_DB_STATUS_QUERYABLE; diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 636f519665..02361a0a61 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -873,7 +873,7 @@ static void check_dimension_metadata(struct metadata_wc *wc) uint32_t total_checked = 0; uint32_t total_deleted = 0; - internal_error(true, "METADATA: Checking dimensions starting after row %"PRIu64, last_row_id); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking dimensions starting after row %" PRIu64, last_row_id); bool more_to_do = run_cleanup_loop( res, @@ -896,8 +896,10 @@ static void check_dimension_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_DIM_CHECK_INTERVAL; } - internal_error(true, - "METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds", + nd_log( + NDLS_DAEMON, + NDLP_DEBUG, + "Dimensions checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, last_row_id ? "resume" : "restart", @@ -932,7 +934,7 @@ static void check_chart_metadata(struct metadata_wc *wc) uint32_t total_checked = 0; uint32_t total_deleted = 0; - internal_error(true, "METADATA: Checking charts starting after row %"PRIu64, last_row_id); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking charts starting after row %" PRIu64, last_row_id); sqlite3_stmt *check_res = NULL; sqlite3_stmt *action_res = NULL; @@ -963,8 +965,10 @@ static void check_chart_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_CHART_CHECK_INTERVAL; } - internal_error(true, - "METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds", + nd_log( + NDLS_DAEMON, + NDLP_DEBUG, + "Charts checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, last_row_id ? "resume" : "restart", @@ -1000,7 +1004,7 @@ static void check_label_metadata(struct metadata_wc *wc) uint32_t total_checked = 0; uint32_t total_deleted = 0; - internal_error(true,"METADATA: Checking charts labels starting after row %"PRIu64, last_row_id); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking charts labels starting after row %" PRIu64, last_row_id); sqlite3_stmt *check_res = NULL; sqlite3_stmt *action_res = NULL; @@ -1032,8 +1036,10 @@ static void check_label_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL; } - internal_error(true, - "METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds", + nd_log( + NDLS_DAEMON, + NDLP_DEBUG, + "Chart labels checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, last_row_id ? "resume" : "restart", @@ -1223,13 +1229,17 @@ static void restore_host_context(void *arg) rrdhost_load_rrdcontext_data(host); usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; - rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); + rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD); #ifdef ENABLE_ACLK aclk_queue_node_info(host, false); #endif - internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host), + nd_log( + NDLS_DAEMON, + NDLP_DEBUG, + "Contexts for host %s loaded in %0.2f ms", + rrdhost_hostname(host), (double)(ended_ut - started_ut) / USEC_PER_MS); __atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE); @@ -1246,12 +1256,15 @@ static void after_start_host_load_context(uv_work_t *req, int status __maybe_unu static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait) { + if (!hclt) + return; + for (size_t index = 0; index < max_thread_slots; index++) { if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED) || (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) { int rc = uv_thread_join(&(hclt[index].thread)); if (rc) - netdata_log_error("Failed to join thread, rc = %d",rc); + nd_log(NDLS_DAEMON, NDLP_WARNING, "Failed to join thread, rc = %d", rc); __atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE); __atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE); } @@ -1290,40 +1303,50 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused) size_t max_threads = MIN(get_netdata_cpus() / 2, 6); if (max_threads < 1) max_threads = 1; - nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Using %zu threads for context loading", max_threads); - struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt)); - size_t thread_index; + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Using %zu threads for context loading", max_threads); + struct host_context_load_thread *hclt = max_threads > 1 ? callocz(max_threads, sizeof(*hclt)) : NULL; + + size_t thread_index = 0; dfe_start_reentrant(rrdhost_root_index, host) { - if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) || - !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) + if (!rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) continue; - rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); - internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host)); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Loading context for host %s", rrdhost_hostname(host)); - bool found_slot = false; - do { - if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) - break; + int rc = 0; + if (hclt) { + bool found_slot = false; + do { + if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) + break; + + cleanup_finished_threads(hclt, max_threads, false); + found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); + } while (!found_slot); - cleanup_finished_threads(hclt, max_threads, false); - found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); - } while (!found_slot); + if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) + break; - if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) - break; + __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); + hclt[thread_index].host = host; + rc = uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]); + } + // if single thread or thread creation failed + if (rc || !hclt) { + struct host_context_load_thread hclt_sync = {.host = host}; + restore_host_context(&hclt_sync); - __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); - hclt[thread_index].host = host; - fatal_assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index])); + if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) + break; + } } dfe_done(host); cleanup_finished_threads(hclt, max_threads, true); freez(hclt); usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; - nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); worker_is_idle(); } @@ -1335,11 +1358,10 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) struct metadata_wc *wc = data->wc; metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); - internal_error(true, "METADATA: scanning hosts complete"); - if (unlikely(wc->scan_complete)) { + + if (unlikely(wc->scan_complete)) completion_mark_complete(wc->scan_complete); - internal_error(true, "METADATA: Sending completion done"); - } + freez(data); } @@ -1448,6 +1470,30 @@ struct host_chart_label_cleanup { Word_t count; }; +static void do_chart_label_cleanup(struct host_chart_label_cleanup *cl_cleanup_data) +{ + if (!cl_cleanup_data) + return; + + Word_t Index = 0; + bool first = true; + Pvoid_t *PValue; + while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) { + char *machine_guid = *PValue; + + RRDHOST *host = rrdhost_find_by_guid(machine_guid); + if (likely(!host)) { + uuid_t host_uuid; + if (!uuid_parse(machine_guid, host_uuid)) + delete_host_chart_labels(&host_uuid); + } + + freez(machine_guid); + } + JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0); + freez(cl_cleanup_data); +} + // Worker thread to scan hosts for pending metadata to store static void start_metadata_hosts(uv_work_t *req __maybe_unused) { @@ -1461,30 +1507,10 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) BUFFER *work_buffer = data->work_buffer; usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut; - internal_error(true, "METADATA: checking all hosts..."); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Checking all hosts started"); usec_t started_ut = now_monotonic_usec(); (void)started_ut; - struct host_chart_label_cleanup *cl_cleanup_data = data->data; - - if (cl_cleanup_data) { - Word_t Index = 0; - bool first = true; - Pvoid_t *PValue; - while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) { - char *machine_guid = *PValue; - - host = rrdhost_find_by_guid(machine_guid); - if (likely(!host)) { - uuid_t host_uuid; - if (!uuid_parse(machine_guid, host_uuid)) - delete_host_chart_labels(&host_uuid); - } - - freez(machine_guid); - } - JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0); - freez(cl_cleanup_data); - } + do_chart_label_cleanup((struct host_chart_label_cleanup *) data->data); bool run_again = false; worker_is_busy(UV_EVENT_METADATA_STORE); @@ -1550,12 +1576,15 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) { run_again = true; rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE); - internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host)); } usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; - internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms", - rrdhost_hostname(host), query_counter, - (double)(ended_ut - started_ut) / USEC_PER_MS); + nd_log( + NDLS_DAEMON, + NDLP_DEBUG, + "Host %s saved metadata with %zu SQL statements, in %0.2f ms", + rrdhost_hostname(host), + query_counter, + (double)(ended_ut - started_ut) / USEC_PER_MS); } dfe_done(host); @@ -1563,8 +1592,11 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) transaction_started = db_execute(db_meta, "COMMIT TRANSACTION"); usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut; - internal_error(true, "METADATA: checking all hosts completed in %0.2f ms", - (double)(all_ended_ut - all_started_ut) / USEC_PER_MS); + nd_log( + NDLS_DAEMON, + NDLP_DEBUG, + "Checking all hosts completed in %0.2f ms", + (double)(all_ended_ut - all_started_ut) / USEC_PER_MS); if (unlikely(run_again)) wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE; |