diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-10 19:59:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-10 19:59:21 +0200 |
commit | 368a26cfee6887ca0cb2301d93138f63b75e353a (patch) | |
tree | b57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /aclk | |
parent | b513888be389f92b2323d1bb3fdf55c22d4e4bad (diff) |
DBENGINE v2 (#14125)
* count open cache pages refering to datafile
* eliminate waste flush attempts
* remove eliminated variable
* journal v2 scanning split functions
* avoid locking open cache for a long time while migrating to journal v2
* dont acquire datafile for the loop; disable thread cancelability while a query is running
* work on datafile acquiring
* work on datafile deletion
* work on datafile deletion again
* logs of dbengine should start with DBENGINE
* thread specific key for queries to check if a query finishes without a finalize
* page_uuid is not used anymore
* Cleanup judy traversal when building new v2
Remove not needed calls to metric registry
* metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent
* disable checks for invalid time-ranges
* Remove type from page details
* report scanning time
* remove infinite loop from datafile acquire for deletion
* remove infinite loop from datafile acquire for deletion again
* trace query handles
* properly allocate array of dimensions in replication
* metrics cleanup
* metrics registry uses arrayalloc
* arrayalloc free should be protected by lock
* use array alloc in page cache
* journal v2 scanning fix
* datafile reference leaking hunding
* do not load metrics of future timestamps
* initialize reasons
* fix datafile reference leak
* do not load pages that are entirely overlapped by others
* expand metric retention atomically
* split replication logic in initialization and execution
* replication prepare ahead queries
* replication prepare ahead queries fixed
* fix replication workers accounting
* add router active queries chart
* restore accounting of pages metadata sources; cleanup replication
* dont count skipped pages as unroutable
* notes on services shutdown
* do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file
* do not add pages we dont need to pdc
* time in range re-work to provide info about past and future matches
* finner control on the pages selected for processing; accounting of page related issues
* fix invalid reference to handle->page
* eliminate data collection handle of pg_lookup_next
* accounting for queries with gaps
* query preprocessing the same way the processing is done; cache now supports all operations on Judy
* dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3)
* get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query
* finner gaps calculation; accounting of overlapping pages in queries
* fix gaps accounting
* move datafile deletion to worker thread
* tune libuv workers and thread stack size
* stop netdata threads gradually
* run indexing together with cache flush/evict
* more work on clean shutdown
* limit the number of pages to evict per run
* do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction
* economies on flags for smaller page footprint; cleanup and renames
* eviction moves referenced pages to the end of the queue
* use murmur hash for indexing partition
* murmur should be static
* use more indexing partitions
* revert number of partitions to number of cpus
* cancel threads first, then stop services
* revert default thread stack size
* dont execute replication requests of disconnected senders
* wait more time for services that are exiting gradually
* fixed last commit
* finer control on page selection algorithm
* default stacksize of 1MB
* fix formatting
* fix worker utilization going crazy when the number is rotating
* avoid buffer full due to replication preprocessing of requests
* support query priorities
* add count of spins in spinlock when compiled with netdata internal checks
* remove prioritization from dbengine queries; cache now uses mutexes for the queues
* hot pages are now in sections judy arrays, like dirty
* align replication queries to optimal page size
* during flushing add to clean and evict in batches
* Revert "during flushing add to clean and evict in batches"
This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7.
* dont lock clean while evicting pages during flushing
* Revert "dont lock clean while evicting pages during flushing"
This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3.
* Revert "Revert "during flushing add to clean and evict in batches""
This reverts commit ca7a187537fb8f743992700427e13042561211ec.
* dont cross locks during flushing, for the fastest flushes possible
* low-priority queries load pages synchronously
* Revert "low-priority queries load pages synchronously"
This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7.
* cache uses spinlock again
* during flushing, dont lock the clean queue at all; each item is added atomically
* do smaller eviction runs
* evict one page at a time to minimize lock contention on the clean queue
* fix eviction statistics
* fix last commit
* plain should be main cache
* event loop cleanup; evictions and flushes can now happen concurrently
* run flush and evictions from tier0 only
* remove not needed variables
* flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time
* added worker jobs in timer to find the slow part of it
* support fast eviction of pages when all_of_them is set
* revert default thread stack size
* bypass event loop for dispatching read extent commands to workers - send them directly
* Revert "bypass event loop for dispatching read extent commands to workers - send them directly"
This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce.
* cache work requests
* minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors
* publish flushed pages to open cache in the thread pool
* prevent eventloop requests from getting stacked in the event loop
* single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c
* more rrdengine.c cleanup
* enable db rotation
* do not log when there is a filter
* do not run multiple migration to journal v2
* load all extents async
* fix wrong paste
* report opcodes waiting, works dispatched, works executing
* cleanup event loop memory every 10 minutes
* dont dispatch more work requests than the number of threads available
* use the dispatched counter instead of the executing counter to check if the worker thread pool is full
* remove UV_RUN_NOWAIT
* replication to fill the queues
* caching of extent buffers; code cleanup
* caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation
* single transaction wal
* synchronous flushing
* first cancel the threads, then signal them to exit
* caching of rrdeng query handles; added priority to query target; health is now low prio
* add priority to the missing points; do not allow critical priority in queries
* offload query preparation and routing to libuv thread pool
* updated timing charts for the offloaded query preparation
* caching of WALs
* accounting for struct caches (buffers); do not load extents with invalid sizes
* protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset
* also check if the expanded before is not the chart later updated time
* also check if the expanded before is not after the wall clock time of when the query started
* Remove unused variable
* replication to queue less queries; cleanup of internal fatals
* Mark dimension to be updated async
* caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol)
* disable pgc stress test, under an ifdef
* disable mrg stress test under an ifdef
* Mark chart and host labels, host info for async check and store in the database
* dictionary items use arrayalloc
* cache section pages structure is allocated with arrayalloc
* Add function to wakeup the aclk query threads and check for exit
Register function to be called during shutdown after signaling the service to exit
* parallel preparation of all dimensions of queries
* be more sensitive to enable streaming after replication
* atomically finish chart replication
* fix last commit
* fix last commit again
* fix last commit again again
* fix last commit again again again
* unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication
* do not cancel start streaming; use high priority queries when we have locked chart data collection
* prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered
* opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue
* Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore
* Fix bad memory allocation / cleanup
* Cleanup ACLK sync initialization (part 1)
* Don't update metric registry during shutdown (part 1)
* Prevent crash when dashboard is refreshed and host goes away
* Mark ctx that is shutting down.
Test not adding flushed pages to open cache as hot if we are shutting down
* make ML work
* Fix compile without NETDATA_INTERNAL_CHECKS
* shutdown each ctx independently
* fix completion of quiesce
* do not update shared ML charts
* Create ML charts on child hosts.
When a parent runs a ML for a child, the relevant-ML charts
should be created on the child host. These charts should use
the parent's hostname to differentiate multiple parents that might
run ML for a child.
The only exception to this rule is the training/prediction resource
usage charts. These are created on the localhost of the parent host,
because they provide information specific to said host.
* check new ml code
* first save the database, then free all memory
* dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db
* Cleanup metadata thread (part 2)
* increase refcount before dispatching prep command
* Do not try to stop anomaly detection threads twice.
A separate function call has been added to stop anomaly detection threads.
This commit removes the left over function calls that were made
internally when a host was being created/destroyed.
* Remove allocations when smoothing samples buffer
The number of dims per sample is always 1, ie. we are training and
predicting only individual dimensions.
* set the orphan flag when loading archived hosts
* track worker dispatch callbacks and threadpool worker init
* make ML threads joinable; mark ctx having flushing in progress as early as possible
* fix allocation counter
* Cleanup metadata thread (part 3)
* Cleanup metadata thread (part 4)
* Skip metadata host scan when running unittest
* unittest support during init
* dont use all the libuv threads for queries
* break an infinite loop when sleep_usec() is interrupted
* ml prediction is a collector for several charts
* sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit
* worker_unregister() in netdata threads cleanup
* moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h
* fixed ML issues
* removed engine2 directory
* added dbengine2 files in CMakeLists.txt
* move query plan data to query target, so that they can be exposed by in jsonwrap
* uniform definition of query plan according to the other query target members
* event_loop should be in daemon, not libnetdata
* metric_retention_by_uuid() is now part of the storage engine abstraction
* unify time_t variables to have the suffix _s (meaning: seconds)
* old dbengine statistics become "dbengine io"
* do not enable ML resource usage charts by default
* unify ml chart families, plugins and modules
* cleanup query plans from query target
* cleanup all extent buffers
* added debug info for rrddim slot to time
* rrddim now does proper gap management
* full rewrite of the mem modes
* use library functions for madvise
* use CHECKSUM_SZ for the checksum size
* fix coverity warning about the impossible case of returning a page that is entirely in the past of the query
* fix dbengine shutdown
* keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently
* fine tune cache evictions
* dont initialize health if the health service is not running - prevent crash on shutdown while children get connected
* rename AS threads to ACLK[hostname]
* prevent re-use of uninitialized memory in queries
* use JulyL instead of JudyL for PDC operations - to test it first
* add also JulyL files
* fix July memory accounting
* disable July for PDC (use Judy)
* use the function to remove datafiles from linked list
* fix july and event_loop
* add july to libnetdata subdirs
* rename time_t variables that end in _t to end in _s
* replicate when there is a gap at the beginning of the replication period
* reset postponing of sender connections when a receiver is connected
* Adjust update every properly
* fix replication infinite loop due to last change
* packed enums in rrd.h and cleanup of obsolete rrd structure members
* prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests()
* void unused variable
* void unused variables
* fix indentation
* entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps
* macros to caclulate page entries by time and size
* prevent statsd cleanup crash on exit
* cleanup health thread related variables
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 71 | ||||
-rw-r--r-- | aclk/aclk_query.c | 9 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 4 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 4 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 4 |
5 files changed, 51 insertions, 41 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 5afbba274e..7e5b1f8f8b 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -157,7 +157,7 @@ static int wait_till_cloud_enabled() info("Waiting for Cloud to be enabled"); while (!netdata_cloud_setting) { sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; } return 0; @@ -176,7 +176,7 @@ static int wait_till_agent_claimed(void) char *agent_id = get_agent_claimid(); while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; agent_id = get_agent_claimid(); } @@ -196,7 +196,7 @@ static int wait_till_agent_claimed(void) static int wait_till_agent_claim_ready() { url_t url; - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { if (wait_till_agent_claimed()) return 1; @@ -330,7 +330,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client); static int handle_connection(mqtt_wss_client client) { time_t last_periodic_query_wakeup = now_monotonic_sec(); - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { // timeout 1000 to check at least once a second // for netdata_exit if (mqtt_wss_service(client, 1000) < 0){ @@ -463,7 +463,7 @@ static int aclk_block_till_recon_allowed() { // we want to wake up from time to time to check netdata_exit while (recon_delay) { - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; if (recon_delay > NETDATA_EXIT_POLL_MS) { sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS); @@ -473,7 +473,7 @@ static int aclk_block_till_recon_allowed() { sleep_usec(recon_delay * USEC_PER_MS); recon_delay = 0; } - return netdata_exit; + return !service_running(SERVICE_ACLK); } #ifndef ACLK_DISABLE_CHALLENGE @@ -516,7 +516,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { error_report("Do not move the cloud base url out of post_conf_load!!"); @@ -564,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) continue; } - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; if (aclk_env->encoding != ACLK_ENC_PROTO) { @@ -748,7 +748,7 @@ void *aclk_main(void *ptr) aclk_connected = 0; log_access("ACLK DISCONNECTED"); } - } while (!netdata_exit); + } while (service_running(SERVICE_ACLK)); aclk_graceful_disconnect(mqttwss_client); @@ -783,35 +783,40 @@ exit: void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; - int ret; + int ret = 0; if (!aclk_connected) return; - ret = get_node_id(&host->host_uuid, &node_id); - if (ret > 0) { - // this means we were not able to check if node_id already present - error("Unable to check for node_id. Ignoring the host state update."); - return; + if (host->node_id && !uuid_is_null(*host->node_id)) { + uuid_copy(node_id, *host->node_id); } - if (ret < 0) { - // node_id not found - aclk_query_t create_query; - create_query = aclk_query_new(REGISTER_NODE); - rrdhost_aclk_state_lock(localhost); - node_instance_creation_t node_instance_creation = { - .claim_id = localhost->aclk_state.claimed_id, - .hops = host->system_info->hops, - .hostname = rrdhost_hostname(host), - .machine_guid = host->machine_guid - }; - create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); - rrdhost_aclk_state_unlock(localhost); - create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; - create_query->data.bin_payload.msg_name = "CreateNodeInstance"; - info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); - aclk_queue_query(create_query); - return; + else { + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + node_instance_creation_t node_instance_creation = { + .claim_id = localhost->aclk_state.claimed_id, + .hops = host->system_info->hops, + .hostname = rrdhost_hostname(host), + .machine_guid = host->machine_guid}; + create_query->data.bin_payload.payload = + generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); + rrdhost_aclk_state_unlock(localhost); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; + info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); + aclk_queue_query(create_query); + return; + } } aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 249e2b5363..2d14badeef 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -327,6 +327,11 @@ static void worker_aclk_register(void) { } } +static void aclk_query_request_cancel(void *data) +{ + pthread_cond_broadcast((pthread_cond_t *) data); +} + /** * Main query processing thread */ @@ -336,7 +341,9 @@ void *aclk_query_main_thread(void *ptr) struct aclk_query_thread *query_thr = ptr; - while (!netdata_exit) { + service_register(SERVICE_THREAD_TYPE_NETDATA, aclk_query_request_cancel, NULL, &query_cond_wait, false); + + while (service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) { aclk_query_process_msgs(query_thr); worker_is_idle(); diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 9a450571e1..e7cad5ded6 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -26,7 +26,7 @@ static inline int _aclk_queue_query(aclk_query_t query) ACLK_QUEUE_LOCK; if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; - if(!netdata_exit) + if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); aclk_query_free(query); return 1; @@ -66,7 +66,7 @@ aclk_query_t aclk_queue_pop(void) ACLK_QUEUE_LOCK; if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; - if(!netdata_exit) + if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); return NULL; } diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 83bc5508be..104fbcb3ec 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -283,9 +283,7 @@ int create_node_instance_result(const char *msg, size_t msg_len) node_state_update.live = 1; node_state_update.hops = 0; } else { - netdata_mutex_lock(&host->receiver_lock); - node_state_update.live = (host->receiver != NULL); - netdata_mutex_unlock(&host->receiver_lock); + node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)); node_state_update.hops = host->system_info->hops; } } diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 511ba952dc..4b6c03ed52 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -314,13 +314,13 @@ void *aclk_stats_main_thread(void *ptr) struct aclk_metrics_per_sample per_sample; struct aclk_metrics permanent; - while (!netdata_exit) { + while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) { netdata_thread_testcancel(); // ------------------------------------------------------------------------ // Wait for the next iteration point. heartbeat_next(&hb, step_ut); - if (netdata_exit) break; + if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break; ACLK_STATS_LOCK; // to not hold lock longer than necessary, especially not to hold it |