diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2023-03-16 17:27:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-16 17:27:17 +0200 |
commit | 4c6a13e5bd09afd7405d0c309a76b88961e08630 (patch) | |
tree | f985eda5b4948a65161aa7c4fa1a4971857a7a58 | |
parent | 4e8be0f23d72ef1fd99bab8bce2aaccbea2fe64c (diff) |
Use one thread for ACLK synchonization (#14281)
* Remove aclk sync threads
* Disable functions if compiled with --disable-cloud
* Allocate and reuse buffer when scanning hosts
Tune transactions when writing metadata
Error checking when executing db_execute (it is already within a loop with retries)
* Schedule host context load in parallel
Child connection will be delayed if context load is not complete
Event loop cleanup
* Delay retention check if context is not loaded
Remove context load check from regular metadata host scan
* Improve checks to check finished threads
* Cleanup warnings when compiling with --disable-cloud
* Clean chart labels that were created before our current maximum retention
* Fix sql statement
* Remove structures members that of no use
Remove buffer allocations when not needed
* Fix compilation error
* Don't check for service running when not from a worker
* Code cleanup if agent is compiled with --disable-cloud
Setup ACLK tables in the database if needed
Submit node status update messages to the cloud
* Fix compilation warning when --disable-cloud is specified
* Address codacy issues
* Remove empty file -- has already been moved under contexts
* Use enum instead of numbers
* Use UUID_STR_LEN
* Add newline at the end of file
* Release node_id to prevent memory leak under certain cases
* Add queries in defines
* Ignore rc from transaction start -- if there is an active transaction, we will use it (same with commit) should further improve in a future PR
* Remove commented out code
* If host is null (it should not be) do not allocate config (coverity reports Resource leak)
* Do garbage collection when contexts is initialized
* Handle the case when config is not yet available for a host
-rw-r--r-- | aclk/aclk.c | 2 | ||||
-rw-r--r-- | daemon/event_loop.c | 1 | ||||
-rw-r--r-- | daemon/event_loop.h | 1 | ||||
-rw-r--r-- | daemon/main.c | 6 | ||||
-rw-r--r-- | daemon/main.h | 1 | ||||
-rw-r--r-- | database/contexts/worker.c | 7 | ||||
-rw-r--r-- | database/rrd.h | 44 | ||||
-rw-r--r-- | database/rrdhost.c | 77 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 917 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 102 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 464 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.h | 15 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 98 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.h | 3 | ||||
-rw-r--r-- | database/sqlite/sqlite_context.c | 4 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.c | 73 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.h | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_health.c | 163 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 519 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.h | 1 | ||||
-rw-r--r-- | streaming/receiver.c | 6 |
21 files changed, 1188 insertions, 1318 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 9e03e04c8c..29a49a438d 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -1222,7 +1222,7 @@ void add_aclk_host_labels(void) { } void aclk_queue_node_info(RRDHOST *host) { - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker; + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; if (likely(wc)) { wc->node_info_send = 1; } diff --git a/daemon/event_loop.c b/daemon/event_loop.c index 6f09cd654a..5fd02377eb 100644 --- a/daemon/event_loop.c +++ b/daemon/event_loop.c @@ -49,6 +49,7 @@ void register_libuv_worker_jobs() { worker_register_job_name(UV_EVENT_DBENGINE_SHUTDOWN, "dbengine shutdown"); // metadata + worker_register_job_name(UV_EVENT_HOST_CONTEXT_LOAD, "metadata load host context"); worker_register_job_name(UV_EVENT_METADATA_STORE, "metadata store host"); worker_register_job_name(UV_EVENT_METADATA_CLEANUP, "metadata cleanup"); diff --git a/daemon/event_loop.h b/daemon/event_loop.h index 0d3cc0d07c..1ff1c2c1cb 100644 --- a/daemon/event_loop.h +++ b/daemon/event_loop.h @@ -41,6 +41,7 @@ enum event_loop_job { UV_EVENT_DBENGINE_SHUTDOWN, // metadata + UV_EVENT_HOST_CONTEXT_LOAD, UV_EVENT_METADATA_STORE, UV_EVENT_METADATA_CLEANUP, diff --git a/daemon/main.c b/daemon/main.c index 5afc100e31..85d2996ef9 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -348,6 +348,7 @@ void netdata_cleanup_and_exit(int ret) { | ABILITY_WEB_REQUESTS | ABILITY_STREAMING_CONNECTIONS | SERVICE_ACLK + | SERVICE_ACLKSYNC ); delta_shutdown_time("stop replication, exporters, ML training, health and web servers threads"); @@ -388,11 +389,6 @@ void netdata_cleanup_and_exit(int ret) { metadata_sync_shutdown_prepare(); -#ifdef ENABLE_ACLK - delta_shutdown_time("signal aclk sync to stop"); - aclk_sync_exit_all(); -#endif - delta_shutdown_time("stop aclk threads"); timeout = !service_wait_exit( diff --git a/daemon/main.h b/daemon/main.h index 8704d60977..7e659e939a 100644 --- a/daemon/main.h +++ b/daemon/main.h @@ -43,6 +43,7 @@ typedef enum { SERVICE_CONTEXT = (1 << 12), SERVICE_ANALYTICS = (1 << 13), SERVICE_EXPORTERS = (1 << 14), + SERVICE_ACLKSYNC = (1 << 15) } SERVICE_TYPE; typedef enum { diff --git a/database/contexts/worker.c b/database/contexts/worker.c index fe9d8b3193..22e28b2ad7 100644 --- a/database/contexts/worker.c +++ b/database/contexts/worker.c @@ -327,7 +327,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo RRDCONTEXT *rc; dfe_start_reentrant(host->rrdctx.contexts, rc) { - if(unlikely(!service_running(SERVICE_CONTEXT))) break; + if(unlikely(worker_jobs && !service_running(SERVICE_CONTEXT))) break; if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP); @@ -335,7 +335,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo RRDINSTANCE *ri; dfe_start_reentrant(rc->rrdinstances, ri) { - if(unlikely(!service_running(SERVICE_CONTEXT))) break; + if(unlikely(worker_jobs && !service_running(SERVICE_CONTEXT))) break; RRDMETRIC *rm; dfe_start_write(ri->rrdmetrics, rm) { @@ -1080,7 +1080,8 @@ void *rrdcontext_main(void *ptr) { dictionary_garbage_collect(host->rrdctx.hub_queue); } - dictionary_garbage_collect(host->rrdctx.contexts); + if (host->rrdctx.contexts) + dictionary_garbage_collect(host->rrdctx.contexts); } dfe_done(host); diff --git a/database/rrd.h b/database/rrd.h index c3c2cc03fe..784db2217d 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -773,35 +773,41 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st_on_file, RRD_MEMORY_MODE m // and may lead to missing information. typedef enum __attribute__ ((__packed__)) rrdhost_flags { + + // Careful not to overlap with rrdhost_options to avoid bugs if + // rrdhost_flags_xxx is used instead of rrdhost_option_xxx or vice-versa // Orphan, Archived and Obsolete flags - RRDHOST_FLAG_ORPHAN = (1 << 10), // this host is orphan (not receiving data) - RRDHOST_FLAG_ARCHIVED = (1 << 11), // The host is archived, no collected charts yet - RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 12), // the host has pending chart obsoletions - RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 13), // the host has pending dimension obsoletions + RRDHOST_FLAG_ORPHAN = (1 << 8), // this host is orphan (not receiving data) + RRDHOST_FLAG_ARCHIVED = (1 << 9), // The host is archived, no collected charts yet + RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 10), // the host has pending chart obsoletions + RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 11), // the host has pending dimension obsoletions // Streaming sender - RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 14), // the host has initialized rrdpush structures - RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 15), // When set, the sender thread is running - RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 16), // When set, the host is connected to a parent - RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 17), // when set, rrdset_done() should push metrics to parent - RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 18), // when set, we have logged the status of metrics streaming + RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 12), // the host has initialized rrdpush structures + RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 13), // When set, the sender thread is running + RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 14), // When set, the host is connected to a parent + RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 15), // when set, rrdset_done() should push metrics to parent + RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 16), // when set, we have logged the status of metrics streaming // Health - RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 20), // contains charts and dims with uninitialized variables - RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 21), // the host has initialized health structures + RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 17), // contains charts and dims with uninitialized variables + RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 18), // the host has initialized health structures // Exporting - RRDHOST_FLAG_EXPORTING_SEND = (1 << 22), // send it to external databases - RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 23), // don't send it to external databases + RRDHOST_FLAG_EXPORTING_SEND = (1 << 19), // send it to external databases + RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 20), // don't send it to external databases // ACLK - RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 24), // when set, we should send ACLK stream context updates + RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 21), // when set, we should send ACLK stream context updates + RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // set when the receiver part is disconnected // Metadata - RRDHOST_FLAG_METADATA_UPDATE = (1 << 25), // metadata needs to be stored in the database - RRDHOST_FLAG_METADATA_LABELS = (1 << 26), // metadata needs to be stored in the database - RRDHOST_FLAG_METADATA_INFO = (1 << 27), // metadata needs to be stored in the database - RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_UPDATE = (1 << 23), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_LABELS = (1 << 24), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_INFO = (1 << 25), // metadata needs to be stored in the database + RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // metadata needs to be stored in the database + RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS = (1 << 27), // metadata needs to be stored in the database + RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected } RRDHOST_FLAGS; @@ -1028,7 +1034,7 @@ struct rrdhost { struct sender_state *sender; netdata_thread_t rrdpush_sender_thread; // the sender thread size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent - void *dbsync_worker; + void *aclk_sync_host_config; // ------------------------------------------------------------------------ // streaming of data from remote hosts - rrdpush receiver diff --git a/database/rrdhost.c b/database/rrdhost.c index 709ffb4d44..2917b0977b 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -519,15 +519,14 @@ int is_legacy = 1; , string2str(host->health.health_default_recipient) ); - if(!archived) + if(!archived) { metaqueue_host_update_info(host); - - rrdhost_load_rrdcontext_data(host); - if (!archived) { + rrdhost_load_rrdcontext_data(host); +// rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); ml_host_new(host); ml_host_start_training_thread(host); } else - rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED | RRDHOST_FLAG_ORPHAN); + rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_ARCHIVED | RRDHOST_FLAG_ORPHAN); return host; } @@ -719,31 +718,30 @@ RRDHOST *rrdhost_find_or_create( ); } else { - - rrdhost_update(host - , hostname - , registry_hostname - , guid - , os - , timezone - , abbrev_timezone - , utc_offset - , tags - , program_name - , program_version - , update_every - , history - , mode - , health_enabled - , rrdpush_enabled - , rrdpush_destination - , rrdpush_api_key - , rrdpush_send_charts_matching - , rrdpush_enable_replication - , rrdpush_seconds_to_replicate - , rrdpush_replication_step - , system_info); - + if (likely(!rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) + rrdhost_update(host + , hostname + , registry_hostname + , guid + , os + , timezone + , abbrev_timezone + , utc_offset + , tags + , program_name + , program_version + , update_every + , history + , mode + , health_enabled + , rrdpush_enabled + , rrdpush_destination + , rrdpush_api_key + , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step + , system_info); } return host; @@ -1167,21 +1165,6 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { return; } -#ifdef ENABLE_ACLK - struct aclk_database_worker_config *wc = host->dbsync_worker; - if (wc && !netdata_exit) { - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_ORPHAN_HOST; - struct aclk_completion compl ; - init_aclk_completion(&compl ); - cmd.completion = &compl ; - aclk_database_enq_cmd(wc, &cmd); - wait_for_aclk_completion(&compl ); - destroy_aclk_completion(&compl ); - } -#endif - // ------------------------------------------------------------------------ // free it @@ -1218,10 +1201,6 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { string_freez(host->hostname); __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(RRDHOST), __ATOMIC_RELAXED); freez(host); -#ifdef ENABLE_ACLK - if (wc) - wc->is_orphan = 0; -#endif } void rrdhost_free_all(void) { diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index d084397a0e..8d049e107e 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -5,76 +5,174 @@ #include "sqlite_aclk_node.h" +struct aclk_sync_config_s { + uv_thread_t thread; + uv_loop_t loop; + uv_timer_t timer_req; + time_t cleanup_after; // Start a cleanup after this timestamp + uv_async_t async; + /* FIFO command queue */ + uv_mutex_t cmd_mutex; + uv_cond_t cmd_cond; + bool initialized; + volatile unsigned queue_size; + struct aclk_database_cmdqueue cmd_queue; +} aclk_sync_config = { 0 }; + + void sanity_check(void) { // make sure the compiler will stop on misconfigurations BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } -void schedule_node_info_update(RRDHOST *host) + +int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd) { - if (unlikely(!host)) - return; + unsigned queue_size; - struct aclk_database_worker_config *wc = host->dbsync_worker; + /* wait for free space in queue */ + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + return 1; + } - if (unlikely(!wc)) - return; + fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); + /* enqueue command */ + aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; + aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.tail + 1 : 0; + aclk_sync_config.queue_size = queue_size + 1; + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + return 0; +} - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_NODE_STATE; - cmd.completion = NULL; - aclk_database_enq_cmd(wc, &cmd); +static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd) +{ + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { + uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex); + } + fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); + /* enqueue command */ + aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; + aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.tail + 1 : 0; + aclk_sync_config.queue_size = queue_size + 1; + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); + + /* wake up event loop */ + int rc = uv_async_send(&aclk_sync_config.async); + if (unlikely(rc)) + debug(D_ACLK_SYNC, "Failed to wake up event loop"); } -#ifdef ENABLE_ACLK -static int sql_check_aclk_table(void *data, int argc, char **argv, char **column) +enum { + IDX_HOST_ID, + IDX_HOSTNAME, + IDX_REGISTRY, + IDX_UPDATE_EVERY, + IDX_OS, + IDX_TIMEZONE, + IDX_TAGS, + IDX_HOPS, + IDX_MEMORY_MODE, + IDX_ABBREV_TIMEZONE, + IDX_UTC_OFFSET, + IDX_PROGRAM_NAME, + IDX_PROGRAM_VERSION, + IDX_ENTRIES, + IDX_HEALTH_ENABLED, +}; + +static int create_host_callback(void *data, int argc, char **argv, char **column) { - struct aclk_database_worker_config *wc = data; + UNUSED(data); UNUSED(argc); UNUSED(column); - debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DELETE_HOST; - cmd.data = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(wc, &cmd); - return 0; -} + char guid[UUID_STR_LEN]; + uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); -#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ - "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" + struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); + __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); -static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) -{ - char *err_msg = NULL; - debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); - int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); - if (rc != SQLITE_OK) { - error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); - sqlite3_free(err_msg); - } + system_info->hops = str2i((const char *) argv[IDX_HOPS]); + + sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); + + RRDHOST *host = rrdhost_find_or_create( + (const char *) argv[IDX_HOSTNAME] + , (const char *) argv[IDX_REGISTRY] + , guid + , (const char *) argv[IDX_OS] + , (const char *) argv[IDX_TIMEZONE] + , (const char *) argv[IDX_ABBREV_TIMEZONE] + , (int32_t) (argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0) + , (const char *) argv[IDX_TAGS] + , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown") + , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") + , argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 + , argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0 + , default_rrd_memory_mode + , 0 // health + , 0 // rrdpush enabled + , NULL //destination + , NULL // api key + , NULL // send charts matching + , false // rrdpush_enable_replication + , 0 // rrdpush_seconds_to_replicate + , 0 // rrdpush_replication_step + , system_info + , 1 + ); + if (likely(host)) + host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); + +#ifdef NETDATA_INTERNAL_CHECKS + char node_str[UUID_STR_LEN] = "<none>"; + if (likely(host->node_id)) + uuid_unparse_lower(*host->node_id, node_str); + internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str); +#endif + return 0; } -static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +#ifdef ENABLE_ACLK +static struct aclk_database_cmd aclk_database_deq_cmd(void) { - UNUSED(cmd); - - debug(D_ACLK, "Checking database for %s", wc->host_guid); + struct aclk_database_cmd ret; + unsigned queue_size; - BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite); + uv_mutex_lock(&aclk_sync_config.cmd_mutex); + queue_size = aclk_sync_config.queue_size; + if (queue_size == 0) { + memset(&ret, 0, sizeof(ret)); + ret.opcode = ACLK_DATABASE_NOOP; + ret.completion = NULL; - buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " - "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); - db_execute(buffer_tostring(sql)); + } else { + /* dequeue command */ + ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head]; + if (queue_size == 1) { + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; + } else { + aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? + aclk_sync_config.cmd_queue.head + 1 : 0; + } + aclk_sync_config.queue_size = queue_size - 1; + /* wake up producers */ + uv_cond_signal(&aclk_sync_config.cmd_cond); + } + uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - buffer_free(sql); + return ret; } - #define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" - static int is_host_available(uuid_t *host_id) { sqlite3_stmt *res = NULL; @@ -94,7 +192,7 @@ static int is_host_available(uuid_t *host_id) rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to select node instance information"); + error_report("Failed to bind host_id parameter to check host existence"); goto failed; } rc = sqlite3_step_monitored(res); @@ -107,15 +205,13 @@ failed: } // OPCODE: ACLK_DATABASE_DELETE_HOST -void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +static void sql_delete_aclk_table_list(char *host_guid) { - UNUSED(wc); - char uuid_str[GUID_LEN + 1]; - char host_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; + char host_str[UUID_STR_LEN]; int rc; uuid_t host_uuid; - char *host_guid = (char *)cmd.data; if (unlikely(!host_guid)) return; @@ -157,274 +253,67 @@ void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct a if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc); - db_execute(buffer_tostring(sql)); + rc = db_execute(buffer_tostring(sql)); + if (unlikely(rc)) + error("Failed to drop unused ACLK tables"); fail: buffer_free(sql); } -#endif - -uv_mutex_t aclk_async_lock; -struct aclk_database_worker_config *aclk_thread_head = NULL; - -int claimed() -{ - int rc; - rrdhost_aclk_state_lock(localhost); - rc = (localhost->aclk_state.claimed_id != NULL); - rrdhost_aclk_state_unlock(localhost); - return rc; -} -void aclk_add_worker_thread(struct aclk_database_worker_config *wc) +static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused) { - if (unlikely(!wc)) - return; - - uv_mutex_lock(&aclk_async_lock); - if (unlikely(!wc->host)) { - wc->next = aclk_thread_head; - aclk_thread_head = wc; - } - uv_mutex_unlock(&aclk_async_lock); + debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]); + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DELETE_HOST; + cmd.param[0] = strdupz((char *) argv[0]); + aclk_database_enq_cmd_noblock(&cmd); + return 0; } -void aclk_del_worker_thread(struct aclk_database_worker_config *wc) -{ - if (unlikely(!wc)) - return; - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config **tmp = &aclk_thread_head; - while (*tmp && (*tmp) != wc) - tmp = &(*tmp)->next; - if (*tmp) - *tmp = wc->next; - uv_mutex_unlock(&aclk_async_lock); -} +#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \ + "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');" -int aclk_worker_thread_exists(char *guid) +static void sql_check_aclk_table_list(void) { - int rc = 0; - uv_mutex_lock(&aclk_async_lock); - - struct aclk_database_worker_config *tmp = aclk_thread_head; - - while (tmp && !rc) { - rc = strcmp(tmp->uuid_str, guid) == 0; - tmp = tmp->next; + char *err_msg = NULL; + debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); + sqlite3_free(err_msg); } - uv_mutex_unlock(&aclk_async_lock); - return rc; } -void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc) -{ - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - wc->queue_size = 0; - fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); - fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); -} +#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d;" -int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) +static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE || wc->is_shutting_down) { - uv_mutex_unlock(&wc->cmd_mutex); - return 1; - } - - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); + char sql[512]; + snprintfz(sql,511, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL); + if (unlikely(db_execute(sql))) + error_report("Failed to clean stale ACLK alert entries"); return 0; } -void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) -{ - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if (wc->is_shutting_down) { - uv_mutex_unlock(&wc->cmd_mutex); - return; - } - while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); - } - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); - - /* wake up event loop */ - int rc = uv_async_send(&wc->async); - if (unlikely(rc)) - debug(D_ACLK_SYNC, "Failed to wake up event loop"); -} - -struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_config* wc) -{ - struct aclk_database_cmd ret; - unsigned queue_size; - - uv_mutex_lock(&wc->cmd_mutex); - queue_size = wc->queue_size; - if (queue_size == 0 || wc->is_shutting_down) { - memset(&ret, 0, sizeof(ret)); - ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; - if (wc->is_shutting_down) - uv_cond_signal(&wc->cmd_cond); - } else { - /* dequeue command */ - ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; - if (queue_size == 1) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - } else { - wc->cmd_queue.head = wc->cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.head + 1 : 0; - } - wc->queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&wc->cmd_cond); - } - uv_mutex_unlock(&wc->cmd_mutex); - return ret; -} +#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');" -struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id) |