diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-10 19:59:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-10 19:59:21 +0200 |
commit | 368a26cfee6887ca0cb2301d93138f63b75e353a (patch) | |
tree | b57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /database/sqlite/sqlite_aclk.c | |
parent | b513888be389f92b2323d1bb3fdf55c22d4e4bad (diff) |
DBENGINE v2 (#14125)
* count open cache pages refering to datafile
* eliminate waste flush attempts
* remove eliminated variable
* journal v2 scanning split functions
* avoid locking open cache for a long time while migrating to journal v2
* dont acquire datafile for the loop; disable thread cancelability while a query is running
* work on datafile acquiring
* work on datafile deletion
* work on datafile deletion again
* logs of dbengine should start with DBENGINE
* thread specific key for queries to check if a query finishes without a finalize
* page_uuid is not used anymore
* Cleanup judy traversal when building new v2
Remove not needed calls to metric registry
* metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent
* disable checks for invalid time-ranges
* Remove type from page details
* report scanning time
* remove infinite loop from datafile acquire for deletion
* remove infinite loop from datafile acquire for deletion again
* trace query handles
* properly allocate array of dimensions in replication
* metrics cleanup
* metrics registry uses arrayalloc
* arrayalloc free should be protected by lock
* use array alloc in page cache
* journal v2 scanning fix
* datafile reference leaking hunding
* do not load metrics of future timestamps
* initialize reasons
* fix datafile reference leak
* do not load pages that are entirely overlapped by others
* expand metric retention atomically
* split replication logic in initialization and execution
* replication prepare ahead queries
* replication prepare ahead queries fixed
* fix replication workers accounting
* add router active queries chart
* restore accounting of pages metadata sources; cleanup replication
* dont count skipped pages as unroutable
* notes on services shutdown
* do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file
* do not add pages we dont need to pdc
* time in range re-work to provide info about past and future matches
* finner control on the pages selected for processing; accounting of page related issues
* fix invalid reference to handle->page
* eliminate data collection handle of pg_lookup_next
* accounting for queries with gaps
* query preprocessing the same way the processing is done; cache now supports all operations on Judy
* dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3)
* get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query
* finner gaps calculation; accounting of overlapping pages in queries
* fix gaps accounting
* move datafile deletion to worker thread
* tune libuv workers and thread stack size
* stop netdata threads gradually
* run indexing together with cache flush/evict
* more work on clean shutdown
* limit the number of pages to evict per run
* do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction
* economies on flags for smaller page footprint; cleanup and renames
* eviction moves referenced pages to the end of the queue
* use murmur hash for indexing partition
* murmur should be static
* use more indexing partitions
* revert number of partitions to number of cpus
* cancel threads first, then stop services
* revert default thread stack size
* dont execute replication requests of disconnected senders
* wait more time for services that are exiting gradually
* fixed last commit
* finer control on page selection algorithm
* default stacksize of 1MB
* fix formatting
* fix worker utilization going crazy when the number is rotating
* avoid buffer full due to replication preprocessing of requests
* support query priorities
* add count of spins in spinlock when compiled with netdata internal checks
* remove prioritization from dbengine queries; cache now uses mutexes for the queues
* hot pages are now in sections judy arrays, like dirty
* align replication queries to optimal page size
* during flushing add to clean and evict in batches
* Revert "during flushing add to clean and evict in batches"
This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7.
* dont lock clean while evicting pages during flushing
* Revert "dont lock clean while evicting pages during flushing"
This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3.
* Revert "Revert "during flushing add to clean and evict in batches""
This reverts commit ca7a187537fb8f743992700427e13042561211ec.
* dont cross locks during flushing, for the fastest flushes possible
* low-priority queries load pages synchronously
* Revert "low-priority queries load pages synchronously"
This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7.
* cache uses spinlock again
* during flushing, dont lock the clean queue at all; each item is added atomically
* do smaller eviction runs
* evict one page at a time to minimize lock contention on the clean queue
* fix eviction statistics
* fix last commit
* plain should be main cache
* event loop cleanup; evictions and flushes can now happen concurrently
* run flush and evictions from tier0 only
* remove not needed variables
* flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time
* added worker jobs in timer to find the slow part of it
* support fast eviction of pages when all_of_them is set
* revert default thread stack size
* bypass event loop for dispatching read extent commands to workers - send them directly
* Revert "bypass event loop for dispatching read extent commands to workers - send them directly"
This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce.
* cache work requests
* minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors
* publish flushed pages to open cache in the thread pool
* prevent eventloop requests from getting stacked in the event loop
* single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c
* more rrdengine.c cleanup
* enable db rotation
* do not log when there is a filter
* do not run multiple migration to journal v2
* load all extents async
* fix wrong paste
* report opcodes waiting, works dispatched, works executing
* cleanup event loop memory every 10 minutes
* dont dispatch more work requests than the number of threads available
* use the dispatched counter instead of the executing counter to check if the worker thread pool is full
* remove UV_RUN_NOWAIT
* replication to fill the queues
* caching of extent buffers; code cleanup
* caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation
* single transaction wal
* synchronous flushing
* first cancel the threads, then signal them to exit
* caching of rrdeng query handles; added priority to query target; health is now low prio
* add priority to the missing points; do not allow critical priority in queries
* offload query preparation and routing to libuv thread pool
* updated timing charts for the offloaded query preparation
* caching of WALs
* accounting for struct caches (buffers); do not load extents with invalid sizes
* protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset
* also check if the expanded before is not the chart later updated time
* also check if the expanded before is not after the wall clock time of when the query started
* Remove unused variable
* replication to queue less queries; cleanup of internal fatals
* Mark dimension to be updated async
* caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol)
* disable pgc stress test, under an ifdef
* disable mrg stress test under an ifdef
* Mark chart and host labels, host info for async check and store in the database
* dictionary items use arrayalloc
* cache section pages structure is allocated with arrayalloc
* Add function to wakeup the aclk query threads and check for exit
Register function to be called during shutdown after signaling the service to exit
* parallel preparation of all dimensions of queries
* be more sensitive to enable streaming after replication
* atomically finish chart replication
* fix last commit
* fix last commit again
* fix last commit again again
* fix last commit again again again
* unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication
* do not cancel start streaming; use high priority queries when we have locked chart data collection
* prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered
* opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue
* Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore
* Fix bad memory allocation / cleanup
* Cleanup ACLK sync initialization (part 1)
* Don't update metric registry during shutdown (part 1)
* Prevent crash when dashboard is refreshed and host goes away
* Mark ctx that is shutting down.
Test not adding flushed pages to open cache as hot if we are shutting down
* make ML work
* Fix compile without NETDATA_INTERNAL_CHECKS
* shutdown each ctx independently
* fix completion of quiesce
* do not update shared ML charts
* Create ML charts on child hosts.
When a parent runs a ML for a child, the relevant-ML charts
should be created on the child host. These charts should use
the parent's hostname to differentiate multiple parents that might
run ML for a child.
The only exception to this rule is the training/prediction resource
usage charts. These are created on the localhost of the parent host,
because they provide information specific to said host.
* check new ml code
* first save the database, then free all memory
* dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db
* Cleanup metadata thread (part 2)
* increase refcount before dispatching prep command
* Do not try to stop anomaly detection threads twice.
A separate function call has been added to stop anomaly detection threads.
This commit removes the left over function calls that were made
internally when a host was being created/destroyed.
* Remove allocations when smoothing samples buffer
The number of dims per sample is always 1, ie. we are training and
predicting only individual dimensions.
* set the orphan flag when loading archived hosts
* track worker dispatch callbacks and threadpool worker init
* make ML threads joinable; mark ctx having flushing in progress as early as possible
* fix allocation counter
* Cleanup metadata thread (part 3)
* Cleanup metadata thread (part 4)
* Skip metadata host scan when running unittest
* unittest support during init
* dont use all the libuv threads for queries
* break an infinite loop when sleep_usec() is interrupted
* ml prediction is a collector for several charts
* sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit
* worker_unregister() in netdata threads cleanup
* moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h
* fixed ML issues
* removed engine2 directory
* added dbengine2 files in CMakeLists.txt
* move query plan data to query target, so that they can be exposed by in jsonwrap
* uniform definition of query plan according to the other query target members
* event_loop should be in daemon, not libnetdata
* metric_retention_by_uuid() is now part of the storage engine abstraction
* unify time_t variables to have the suffix _s (meaning: seconds)
* old dbengine statistics become "dbengine io"
* do not enable ML resource usage charts by default
* unify ml chart families, plugins and modules
* cleanup query plans from query target
* cleanup all extent buffers
* added debug info for rrddim slot to time
* rrddim now does proper gap management
* full rewrite of the mem modes
* use library functions for madvise
* use CHECKSUM_SZ for the checksum size
* fix coverity warning about the impossible case of returning a page that is entirely in the past of the query
* fix dbengine shutdown
* keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently
* fine tune cache evictions
* dont initialize health if the health service is not running - prevent crash on shutdown while children get connected
* rename AS threads to ACLK[hostname]
* prevent re-use of uninitialized memory in queries
* use JulyL instead of JudyL for PDC operations - to test it first
* add also JulyL files
* fix July memory accounting
* disable July for PDC (use Judy)
* use the function to remove datafiles from linked list
* fix july and event_loop
* add july to libnetdata subdirs
* rename time_t variables that end in _t to end in _s
* replicate when there is a gap at the beginning of the replication period
* reset postponing of sender connections when a receiver is connected
* Adjust update every properly
* fix replication infinite loop due to last change
* packed enums in rrd.h and cleanup of obsolete rrd structure members
* prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests()
* void unused variable
* void unused variables
* fix indentation
* entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps
* macros to caclulate page entries by time and size
* prevent statsd cleanup crash on exit
* cleanup health thread related variables
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 331 |
1 files changed, 150 insertions, 181 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 7e3a9b2eb9..5213556e08 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -10,10 +10,140 @@ void sanity_check(void) { BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } -const char *aclk_sync_config[] = { +static int sql_check_aclk_table(void *data, int argc, char **argv, char **column) +{ + struct aclk_database_worker_config *wc = data; + UNUSED(argc); + UNUSED(column); - NULL, -}; + debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DELETE_HOST; + cmd.data = strdupz((char *) argv[0]); + aclk_database_enq_cmd_noblock(wc, &cmd); + return 0; +} + +#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ + "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" + +static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) +{ + char *err_msg = NULL; + debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); + if (rc != SQLITE_OK) { + error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); + sqlite3_free(err_msg); + } +} + +static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ + UNUSED(cmd); + + debug(D_ACLK, "Checking database for %s", wc->host_guid); + + BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); + + buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " + "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); + db_execute(buffer_tostring(sql)); + + buffer_free(sql); +} + + +#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" + +static int is_host_available(uuid_t *host_id) +{ + sqlite3_stmt *res = NULL; + int rc; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) + error_report("Database has not been initialized"); + return 1; + } + + rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_UUID, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to select node instance information for a node"); + return 1; + } + + rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id parameter to select node instance information"); + goto failed; + } + rc = sqlite3_step_monitored(res); + +failed: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("Failed to finalize the prepared statement when checking host existence"); + + return (rc == SQLITE_ROW); +} + +// OPCODE: ACLK_DATABASE_DELETE_HOST +void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ + UNUSED(wc); + char uuid_str[GUID_LEN + 1]; + char host_str[GUID_LEN + 1]; + + int rc; + uuid_t host_uuid; + char *host_guid = (char *)cmd.data; + + if (unlikely(!host_guid)) + return; + + rc = uuid_parse(host_guid, host_uuid); + freez(host_guid); + if (rc) + return; + + uuid_unparse_lower(host_uuid, host_str); + uuid_unparse_lower_fix(&host_uuid, uuid_str); + + debug(D_ACLK_SYNC, "Checking if I should delete aclk tables for node %s", host_str); + + if (is_host_available(&host_uuid)) { + debug(D_ACLK_SYNC, "Host %s exists, not deleting aclk sync tables", host_str); + return; + } + + debug(D_ACLK_SYNC, "Host %s does NOT exist, can delete aclk sync tables", host_str); + + sqlite3_stmt *res = NULL; + BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); + + buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \ + "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str); + + rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to clean up aclk tables"); + goto fail; + } + buffer_flush(sql); + + while (sqlite3_step_monitored(res) == SQLITE_ROW) + buffer_strcat(sql, (char *) sqlite3_column_text(res, 0)); + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc); + + db_execute(buffer_tostring(sql)); + +fail: + buffer_free(sql); +} uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; @@ -38,7 +168,6 @@ void aclk_add_worker_thread(struct aclk_database_worker_config *wc) aclk_thread_head = wc; } uv_mutex_unlock(&aclk_async_lock); - return; } void aclk_del_worker_thread(struct aclk_database_worker_config *wc) @@ -53,7 +182,6 @@ void aclk_del_worker_thread(struct aclk_database_worker_config *wc) if (*tmp) *tmp = wc->next; uv_mutex_unlock(&aclk_async_lock); - return; } int aclk_worker_thread_exists(char *guid) @@ -199,7 +327,6 @@ void aclk_sync_exit_all() uv_mutex_unlock(&aclk_async_lock); } -#ifdef ENABLE_ACLK enum { IDX_HOST_ID, IDX_HOSTNAME, @@ -268,9 +395,9 @@ static int create_host_callback(void *data, int argc, char **argv, char **column #endif return 0; } -#endif -int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) +#ifdef ENABLE_ACLK +static int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) { char uuid_str[GUID_LEN + 1]; UNUSED(data); @@ -286,10 +413,9 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]); return 0; } - +#endif void sql_aclk_sync_init(void) { -#ifdef ENABLE_ACLK char *err_msg = NULL; int rc; @@ -301,21 +427,7 @@ void sql_aclk_sync_init(void) return; } - info("SQLite aclk sync initialization"); - - for (int i = 0; aclk_sync_config[i]; i++) { - debug(D_ACLK_SYNC, "Executing %s", aclk_sync_config[i]); - rc = sqlite3_exec_monitored(db_meta, aclk_sync_config[i], 0, 0, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error aclk sync initialization setup, rc = %d (%s)", rc, err_msg); - error_report("SQLite failed statement %s", aclk_sync_config[i]); - sqlite3_free(err_msg); - return; - } - } - info("SQLite aclk sync initialization completed"); - fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); - + info("Creating archived hosts"); rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, " "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " "program_version, entries, health_enabled FROM host WHERE hops >0;", @@ -325,14 +437,16 @@ void sql_aclk_sync_init(void) sqlite3_free(err_msg); } +#ifdef ENABLE_ACLK + fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE " "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg); sqlite3_free(err_msg); } + info("ACLK sync initialization completed"); #endif - return; } static void async_cb(uv_async_t *handle) @@ -374,9 +488,7 @@ static void timer_cb(uv_timer_t* handle) #endif } -#define MAX_CMD_BATCH_SIZE (256) - -void aclk_database_worker(void *arg) +static void aclk_database_worker(void *arg) { worker_register("ACLKSYNC"); worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); @@ -398,15 +510,12 @@ void aclk_database_worker(void *arg) enum aclk_database_opcode opcode; uv_timer_t timer_req; struct aclk_database_cmd cmd; - unsigned cmd_batch_size; - - //aclk_database_init_cmd_queue(wc); char threadname[NETDATA_THREAD_NAME_MAX+1]; if (wc->host) - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host)); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); else { - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->uuid_str); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", wc->uuid_str); threadname[11] = '\0'; } uv_thread_set_name_np(wc->thread, threadname); @@ -449,17 +558,13 @@ void aclk_database_worker(void *arg) uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ - cmd_batch_size = 0; do { - if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) - break; cmd = aclk_database_deq_cmd(wc); if (netdata_exit) break; opcode = cmd.opcode; - ++cmd_batch_size; if(likely(opcode != ACLK_DATABASE_NOOP)) worker_is_busy(opcode); @@ -535,7 +640,7 @@ void aclk_database_worker(void *arg) wc->host = rrdhost_find_by_guid(wc->host_guid); if (wc->host) { info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid); - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host)); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host)); uv_thread_set_name_np(wc->thread, threadname); wc->host->dbsync_worker = wc; if (unlikely(!wc->hostname)) @@ -584,10 +689,8 @@ void aclk_database_worker(void *arg) info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid); /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); -/* uv_mutex_destroy(&wc->cmd_mutex); */ - //fatal_assert(0 == uv_loop_close(loop)); - int rc; + int rc; do { rc = uv_loop_close(loop); } while (rc != UV_EBUSY); @@ -648,6 +751,10 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) if (likely(host)) { host->dbsync_worker = (void *)wc; wc->hostname = strdupz(rrdhost_hostname(host)); + if (node_id && !host->node_id) { + host->node_id = mallocz(sizeof(*host->node_id)); + uuid_copy(*host->node_id, *node_id); + } } else wc->hostname = get_hostname_by_node_id(wc->node_id); @@ -663,142 +770,4 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) UNUSED(host_uuid); UNUSED(node_id); #endif - return; -} - -void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(cmd); - - debug(D_ACLK, "Checking database for %s", wc->host_guid); - - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); - - buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " - "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); - db_execute(buffer_tostring(sql)); - - buffer_free(sql); - return; -} - -#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" - -static int is_host_available(uuid_t *host_id) -{ - sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return 1; - } - - rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_UUID, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to select node instance information for a node"); - return 1; - } - - rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to select node instance information"); - goto failed; - } - rc = sqlite3_step_monitored(res); - - failed: - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when checking host existence"); - - return (rc == SQLITE_ROW); -} - -// OPCODE: ACLK_DATABASE_DELETE_HOST -void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(wc); - char uuid_str[GUID_LEN + 1]; - char host_str[GUID_LEN + 1]; - - int rc; - uuid_t host_uuid; - char *host_guid = (char *)cmd.data; - - if (unlikely(!host_guid)) - return; - - rc = uuid_parse(host_guid, host_uuid); - freez(host_guid); - if (rc) - return; - - uuid_unparse_lower(host_uuid, host_str); - uuid_unparse_lower_fix(&host_uuid, uuid_str); - - debug(D_ACLK_SYNC, "Checking if I should delete aclk tables for node %s", host_str); - - if (is_host_available(&host_uuid)) { - debug(D_ACLK_SYNC, "Host %s exists, not deleting aclk sync tables", host_str); - return; - } - - debug(D_ACLK_SYNC, "Host %s does NOT exist, can delete aclk sync tables", host_str); - - sqlite3_stmt *res = NULL; - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); - - buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str); - - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to clean up aclk tables"); - goto fail; - } - buffer_flush(sql); - - while (sqlite3_step_monitored(res) == SQLITE_ROW) - buffer_strcat(sql, (char *) sqlite3_column_text(res, 0)); - - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc); - - db_execute(buffer_tostring(sql)); - -fail: - buffer_free(sql); - return; -} - -static int sql_check_aclk_table(void *data, int argc, char **argv, char **column) -{ - struct aclk_database_worker_config *wc = data; - UNUSED(argc); - UNUSED(column); - - debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DELETE_HOST; - cmd.data = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(wc, &cmd); - return 0; -} - -#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" - -void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) -{ - char *err_msg = NULL; - debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); - } - return; -} +}
\ No newline at end of file |