summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /aclk
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk.c71
-rw-r--r--aclk/aclk_query.c9
-rw-r--r--aclk/aclk_query_queue.c4
-rw-r--r--aclk/aclk_rx_msgs.c4
-rw-r--r--aclk/aclk_stats.c4
5 files changed, 51 insertions, 41 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 5afbba274e..7e5b1f8f8b 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -157,7 +157,7 @@ static int wait_till_cloud_enabled()
info("Waiting for Cloud to be enabled");
while (!netdata_cloud_setting) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
}
return 0;
@@ -176,7 +176,7 @@ static int wait_till_agent_claimed(void)
char *agent_id = get_agent_claimid();
while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
agent_id = get_agent_claimid();
}
@@ -196,7 +196,7 @@ static int wait_till_agent_claimed(void)
static int wait_till_agent_claim_ready()
{
url_t url;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
if (wait_till_agent_claimed())
return 1;
@@ -330,7 +330,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client);
static int handle_connection(mqtt_wss_client client)
{
time_t last_periodic_query_wakeup = now_monotonic_sec();
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
@@ -463,7 +463,7 @@ static int aclk_block_till_recon_allowed() {
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (recon_delay > NETDATA_EXIT_POLL_MS) {
sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
@@ -473,7 +473,7 @@ static int aclk_block_till_recon_allowed() {
sleep_usec(recon_delay * USEC_PER_MS);
recon_delay = 0;
}
- return netdata_exit;
+ return !service_running(SERVICE_ACLK);
}
#ifndef ACLK_DISABLE_CHALLENGE
@@ -516,7 +516,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error_report("Do not move the cloud base url out of post_conf_load!!");
@@ -564,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
continue;
}
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (aclk_env->encoding != ACLK_ENC_PROTO) {
@@ -748,7 +748,7 @@ void *aclk_main(void *ptr)
aclk_connected = 0;
log_access("ACLK DISCONNECTED");
}
- } while (!netdata_exit);
+ } while (service_running(SERVICE_ACLK));
aclk_graceful_disconnect(mqttwss_client);
@@ -783,35 +783,40 @@ exit:
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
- int ret;
+ int ret = 0;
if (!aclk_connected)
return;
- ret = get_node_id(&host->host_uuid, &node_id);
- if (ret > 0) {
- // this means we were not able to check if node_id already present
- error("Unable to check for node_id. Ignoring the host state update.");
- return;
+ if (host->node_id && !uuid_is_null(*host->node_id)) {
+ uuid_copy(node_id, *host->node_id);
}
- if (ret < 0) {
- // node_id not found
- aclk_query_t create_query;
- create_query = aclk_query_new(REGISTER_NODE);
- rrdhost_aclk_state_lock(localhost);
- node_instance_creation_t node_instance_creation = {
- .claim_id = localhost->aclk_state.claimed_id,
- .hops = host->system_info->hops,
- .hostname = rrdhost_hostname(host),
- .machine_guid = host->machine_guid
- };
- create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
- rrdhost_aclk_state_unlock(localhost);
- create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
- create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
- aclk_queue_query(create_query);
- return;
+ else {
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = rrdhost_hostname(host),
+ .machine_guid = host->machine_guid};
+ create_query->data.bin_payload.payload =
+ generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+ aclk_queue_query(create_query);
+ return;
+ }
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 249e2b5363..2d14badeef 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -327,6 +327,11 @@ static void worker_aclk_register(void) {
}
}
+static void aclk_query_request_cancel(void *data)
+{
+ pthread_cond_broadcast((pthread_cond_t *) data);
+}
+
/**
* Main query processing thread
*/
@@ -336,7 +341,9 @@ void *aclk_query_main_thread(void *ptr)
struct aclk_query_thread *query_thr = ptr;
- while (!netdata_exit) {
+ service_register(SERVICE_THREAD_TYPE_NETDATA, aclk_query_request_cancel, NULL, &query_cond_wait, false);
+
+ while (service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) {
aclk_query_process_msgs(query_thr);
worker_is_idle();
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 9a450571e1..e7cad5ded6 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -26,7 +26,7 @@ static inline int _aclk_queue_query(aclk_query_t query)
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
- if(!netdata_exit)
+ if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
aclk_query_free(query);
return 1;
@@ -66,7 +66,7 @@ aclk_query_t aclk_queue_pop(void)
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
- if(!netdata_exit)
+ if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
return NULL;
}
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 83bc5508be..104fbcb3ec 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -283,9 +283,7 @@ int create_node_instance_result(const char *msg, size_t msg_len)
node_state_update.live = 1;
node_state_update.hops = 0;
} else {
- netdata_mutex_lock(&host->receiver_lock);
- node_state_update.live = (host->receiver != NULL);
- netdata_mutex_unlock(&host->receiver_lock);
+ node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN));
node_state_update.hops = host->system_info->hops;
}
}
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 511ba952dc..4b6c03ed52 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -314,13 +314,13 @@ void *aclk_stats_main_thread(void *ptr)
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) {
netdata_thread_testcancel();
// ------------------------------------------------------------------------
// Wait for the next iteration point.
heartbeat_next(&hb, step_ut);
- if (netdata_exit) break;
+ if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break;
ACLK_STATS_LOCK;
// to not hold lock longer than necessary, especially not to hold it