summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2023-03-16 17:27:17 +0200
committerGitHub <noreply@github.com>2023-03-16 17:27:17 +0200
commit4c6a13e5bd09afd7405d0c309a76b88961e08630 (patch)
treef985eda5b4948a65161aa7c4fa1a4971857a7a58
parent4e8be0f23d72ef1fd99bab8bce2aaccbea2fe64c (diff)
Use one thread for ACLK synchonization (#14281)
* Remove aclk sync threads * Disable functions if compiled with --disable-cloud * Allocate and reuse buffer when scanning hosts Tune transactions when writing metadata Error checking when executing db_execute (it is already within a loop with retries) * Schedule host context load in parallel Child connection will be delayed if context load is not complete Event loop cleanup * Delay retention check if context is not loaded Remove context load check from regular metadata host scan * Improve checks to check finished threads * Cleanup warnings when compiling with --disable-cloud * Clean chart labels that were created before our current maximum retention * Fix sql statement * Remove structures members that of no use Remove buffer allocations when not needed * Fix compilation error * Don't check for service running when not from a worker * Code cleanup if agent is compiled with --disable-cloud Setup ACLK tables in the database if needed Submit node status update messages to the cloud * Fix compilation warning when --disable-cloud is specified * Address codacy issues * Remove empty file -- has already been moved under contexts * Use enum instead of numbers * Use UUID_STR_LEN * Add newline at the end of file * Release node_id to prevent memory leak under certain cases * Add queries in defines * Ignore rc from transaction start -- if there is an active transaction, we will use it (same with commit) should further improve in a future PR * Remove commented out code * If host is null (it should not be) do not allocate config (coverity reports Resource leak) * Do garbage collection when contexts is initialized * Handle the case when config is not yet available for a host
-rw-r--r--aclk/aclk.c2
-rw-r--r--daemon/event_loop.c1
-rw-r--r--daemon/event_loop.h1
-rw-r--r--daemon/main.c6
-rw-r--r--daemon/main.h1
-rw-r--r--database/contexts/worker.c7
-rw-r--r--database/rrd.h44
-rw-r--r--database/rrdhost.c77
-rw-r--r--database/sqlite/sqlite_aclk.c917
-rw-r--r--database/sqlite/sqlite_aclk.h102
-rw-r--r--database/sqlite/sqlite_aclk_alert.c464
-rw-r--r--database/sqlite/sqlite_aclk_alert.h15
-rw-r--r--database/sqlite/sqlite_aclk_node.c98
-rw-r--r--database/sqlite/sqlite_aclk_node.h3
-rw-r--r--database/sqlite/sqlite_context.c4
-rw-r--r--database/sqlite/sqlite_functions.c73
-rw-r--r--database/sqlite/sqlite_functions.h2
-rw-r--r--database/sqlite/sqlite_health.c163
-rw-r--r--database/sqlite/sqlite_metadata.c519
-rw-r--r--database/sqlite/sqlite_metadata.h1
-rw-r--r--streaming/receiver.c6
21 files changed, 1188 insertions, 1318 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 9e03e04c8c..29a49a438d 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -1222,7 +1222,7 @@ void add_aclk_host_labels(void) {
}
void aclk_queue_node_info(RRDHOST *host) {
- struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
+ struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
if (likely(wc)) {
wc->node_info_send = 1;
}
diff --git a/daemon/event_loop.c b/daemon/event_loop.c
index 6f09cd654a..5fd02377eb 100644
--- a/daemon/event_loop.c
+++ b/daemon/event_loop.c
@@ -49,6 +49,7 @@ void register_libuv_worker_jobs() {
worker_register_job_name(UV_EVENT_DBENGINE_SHUTDOWN, "dbengine shutdown");
// metadata
+ worker_register_job_name(UV_EVENT_HOST_CONTEXT_LOAD, "metadata load host context");
worker_register_job_name(UV_EVENT_METADATA_STORE, "metadata store host");
worker_register_job_name(UV_EVENT_METADATA_CLEANUP, "metadata cleanup");
diff --git a/daemon/event_loop.h b/daemon/event_loop.h
index 0d3cc0d07c..1ff1c2c1cb 100644
--- a/daemon/event_loop.h
+++ b/daemon/event_loop.h
@@ -41,6 +41,7 @@ enum event_loop_job {
UV_EVENT_DBENGINE_SHUTDOWN,
// metadata
+ UV_EVENT_HOST_CONTEXT_LOAD,
UV_EVENT_METADATA_STORE,
UV_EVENT_METADATA_CLEANUP,
diff --git a/daemon/main.c b/daemon/main.c
index 5afc100e31..85d2996ef9 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -348,6 +348,7 @@ void netdata_cleanup_and_exit(int ret) {
| ABILITY_WEB_REQUESTS
| ABILITY_STREAMING_CONNECTIONS
| SERVICE_ACLK
+ | SERVICE_ACLKSYNC
);
delta_shutdown_time("stop replication, exporters, ML training, health and web servers threads");
@@ -388,11 +389,6 @@ void netdata_cleanup_and_exit(int ret) {
metadata_sync_shutdown_prepare();
-#ifdef ENABLE_ACLK
- delta_shutdown_time("signal aclk sync to stop");
- aclk_sync_exit_all();
-#endif
-
delta_shutdown_time("stop aclk threads");
timeout = !service_wait_exit(
diff --git a/daemon/main.h b/daemon/main.h
index 8704d60977..7e659e939a 100644
--- a/daemon/main.h
+++ b/daemon/main.h
@@ -43,6 +43,7 @@ typedef enum {
SERVICE_CONTEXT = (1 << 12),
SERVICE_ANALYTICS = (1 << 13),
SERVICE_EXPORTERS = (1 << 14),
+ SERVICE_ACLKSYNC = (1 << 15)
} SERVICE_TYPE;
typedef enum {
diff --git a/database/contexts/worker.c b/database/contexts/worker.c
index fe9d8b3193..22e28b2ad7 100644
--- a/database/contexts/worker.c
+++ b/database/contexts/worker.c
@@ -327,7 +327,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
RRDCONTEXT *rc;
dfe_start_reentrant(host->rrdctx.contexts, rc) {
- if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+ if(unlikely(worker_jobs && !service_running(SERVICE_CONTEXT))) break;
if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP);
@@ -335,7 +335,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
RRDINSTANCE *ri;
dfe_start_reentrant(rc->rrdinstances, ri) {
- if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+ if(unlikely(worker_jobs && !service_running(SERVICE_CONTEXT))) break;
RRDMETRIC *rm;
dfe_start_write(ri->rrdmetrics, rm) {
@@ -1080,7 +1080,8 @@ void *rrdcontext_main(void *ptr) {
dictionary_garbage_collect(host->rrdctx.hub_queue);
}
- dictionary_garbage_collect(host->rrdctx.contexts);
+ if (host->rrdctx.contexts)
+ dictionary_garbage_collect(host->rrdctx.contexts);
}
dfe_done(host);
diff --git a/database/rrd.h b/database/rrd.h
index c3c2cc03fe..784db2217d 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -773,35 +773,41 @@ bool rrdset_memory_load_or_create_map_save(RRDSET *st_on_file, RRD_MEMORY_MODE m
// and may lead to missing information.
typedef enum __attribute__ ((__packed__)) rrdhost_flags {
+
+ // Careful not to overlap with rrdhost_options to avoid bugs if
+ // rrdhost_flags_xxx is used instead of rrdhost_option_xxx or vice-versa
// Orphan, Archived and Obsolete flags
- RRDHOST_FLAG_ORPHAN = (1 << 10), // this host is orphan (not receiving data)
- RRDHOST_FLAG_ARCHIVED = (1 << 11), // The host is archived, no collected charts yet
- RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 12), // the host has pending chart obsoletions
- RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 13), // the host has pending dimension obsoletions
+ RRDHOST_FLAG_ORPHAN = (1 << 8), // this host is orphan (not receiving data)
+ RRDHOST_FLAG_ARCHIVED = (1 << 9), // The host is archived, no collected charts yet
+ RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS = (1 << 10), // the host has pending chart obsoletions
+ RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS = (1 << 11), // the host has pending dimension obsoletions
// Streaming sender
- RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 14), // the host has initialized rrdpush structures
- RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 15), // When set, the sender thread is running
- RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 16), // When set, the host is connected to a parent
- RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 17), // when set, rrdset_done() should push metrics to parent
- RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 18), // when set, we have logged the status of metrics streaming
+ RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED = (1 << 12), // the host has initialized rrdpush structures
+ RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN = (1 << 13), // When set, the sender thread is running
+ RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED = (1 << 14), // When set, the host is connected to a parent
+ RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS = (1 << 15), // when set, rrdset_done() should push metrics to parent
+ RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS = (1 << 16), // when set, we have logged the status of metrics streaming
// Health
- RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 20), // contains charts and dims with uninitialized variables
- RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 21), // the host has initialized health structures
+ RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 17), // contains charts and dims with uninitialized variables
+ RRDHOST_FLAG_INITIALIZED_HEALTH = (1 << 18), // the host has initialized health structures
// Exporting
- RRDHOST_FLAG_EXPORTING_SEND = (1 << 22), // send it to external databases
- RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 23), // don't send it to external databases
+ RRDHOST_FLAG_EXPORTING_SEND = (1 << 19), // send it to external databases
+ RRDHOST_FLAG_EXPORTING_DONT_SEND = (1 << 20), // don't send it to external databases
// ACLK
- RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 24), // when set, we should send ACLK stream context updates
+ RRDHOST_FLAG_ACLK_STREAM_CONTEXTS = (1 << 21), // when set, we should send ACLK stream context updates
+ RRDHOST_FLAG_ACLK_STREAM_ALERTS = (1 << 22), // set when the receiver part is disconnected
// Metadata
- RRDHOST_FLAG_METADATA_UPDATE = (1 << 25), // metadata needs to be stored in the database
- RRDHOST_FLAG_METADATA_LABELS = (1 << 26), // metadata needs to be stored in the database
- RRDHOST_FLAG_METADATA_INFO = (1 << 27), // metadata needs to be stored in the database
- RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_UPDATE = (1 << 23), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_LABELS = (1 << 24), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_INFO = (1 << 25), // metadata needs to be stored in the database
+ RRDHOST_FLAG_PENDING_CONTEXT_LOAD = (1 << 26), // metadata needs to be stored in the database
+ RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS = (1 << 27), // metadata needs to be stored in the database
+ RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database
RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected
} RRDHOST_FLAGS;
@@ -1028,7 +1034,7 @@ struct rrdhost {
struct sender_state *sender;
netdata_thread_t rrdpush_sender_thread; // the sender thread
size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent
- void *dbsync_worker;
+ void *aclk_sync_host_config;
// ------------------------------------------------------------------------
// streaming of data from remote hosts - rrdpush receiver
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 709ffb4d44..2917b0977b 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -519,15 +519,14 @@ int is_legacy = 1;
, string2str(host->health.health_default_recipient)
);
- if(!archived)
+ if(!archived) {
metaqueue_host_update_info(host);
-
- rrdhost_load_rrdcontext_data(host);
- if (!archived) {
+ rrdhost_load_rrdcontext_data(host);
+// rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
ml_host_new(host);
ml_host_start_training_thread(host);
} else
- rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED | RRDHOST_FLAG_ORPHAN);
+ rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_ARCHIVED | RRDHOST_FLAG_ORPHAN);
return host;
}
@@ -719,31 +718,30 @@ RRDHOST *rrdhost_find_or_create(
);
}
else {
-
- rrdhost_update(host
- , hostname
- , registry_hostname
- , guid
- , os
- , timezone
- , abbrev_timezone
- , utc_offset
- , tags
- , program_name
- , program_version
- , update_every
- , history
- , mode
- , health_enabled
- , rrdpush_enabled
- , rrdpush_destination
- , rrdpush_api_key
- , rrdpush_send_charts_matching
- , rrdpush_enable_replication
- , rrdpush_seconds_to_replicate
- , rrdpush_replication_step
- , system_info);
-
+ if (likely(!rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)))
+ rrdhost_update(host
+ , hostname
+ , registry_hostname
+ , guid
+ , os
+ , timezone
+ , abbrev_timezone
+ , utc_offset
+ , tags
+ , program_name
+ , program_version
+ , update_every
+ , history
+ , mode
+ , health_enabled
+ , rrdpush_enabled
+ , rrdpush_destination
+ , rrdpush_api_key
+ , rrdpush_send_charts_matching
+ , rrdpush_enable_replication
+ , rrdpush_seconds_to_replicate
+ , rrdpush_replication_step
+ , system_info);
}
return host;
@@ -1167,21 +1165,6 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) {
return;
}
-#ifdef ENABLE_ACLK
- struct aclk_database_worker_config *wc = host->dbsync_worker;
- if (wc && !netdata_exit) {
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_ORPHAN_HOST;
- struct aclk_completion compl ;
- init_aclk_completion(&compl );
- cmd.completion = &compl ;
- aclk_database_enq_cmd(wc, &cmd);
- wait_for_aclk_completion(&compl );
- destroy_aclk_completion(&compl );
- }
-#endif
-
// ------------------------------------------------------------------------
// free it
@@ -1218,10 +1201,6 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) {
string_freez(host->hostname);
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(RRDHOST), __ATOMIC_RELAXED);
freez(host);
-#ifdef ENABLE_ACLK
- if (wc)
- wc->is_orphan = 0;
-#endif
}
void rrdhost_free_all(void) {
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index d084397a0e..8d049e107e 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -5,76 +5,174 @@
#include "sqlite_aclk_node.h"
+struct aclk_sync_config_s {
+ uv_thread_t thread;
+ uv_loop_t loop;
+ uv_timer_t timer_req;
+ time_t cleanup_after; // Start a cleanup after this timestamp
+ uv_async_t async;
+ /* FIFO command queue */
+ uv_mutex_t cmd_mutex;
+ uv_cond_t cmd_cond;
+ bool initialized;
+ volatile unsigned queue_size;
+ struct aclk_database_cmdqueue cmd_queue;
+} aclk_sync_config = { 0 };
+
+
void sanity_check(void) {
// make sure the compiler will stop on misconfigurations
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
}
-void schedule_node_info_update(RRDHOST *host)
+
+int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd)
{
- if (unlikely(!host))
- return;
+ unsigned queue_size;
- struct aclk_database_worker_config *wc = host->dbsync_worker;
+ /* wait for free space in queue */
+ uv_mutex_lock(&aclk_sync_config.cmd_mutex);
+ if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
+ return 1;
+ }
- if (unlikely(!wc))
- return;
+ fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
+ aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
+ aclk_sync_config.cmd_queue.tail + 1 : 0;
+ aclk_sync_config.queue_size = queue_size + 1;
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
+ return 0;
+}
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_NODE_STATE;
- cmd.completion = NULL;
- aclk_database_enq_cmd(wc, &cmd);
+static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd)
+{
+ unsigned queue_size;
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&aclk_sync_config.cmd_mutex);
+ while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
+ uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex);
+ }
+ fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
+ aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
+ aclk_sync_config.cmd_queue.tail + 1 : 0;
+ aclk_sync_config.queue_size = queue_size + 1;
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
+
+ /* wake up event loop */
+ int rc = uv_async_send(&aclk_sync_config.async);
+ if (unlikely(rc))
+ debug(D_ACLK_SYNC, "Failed to wake up event loop");
}
-#ifdef ENABLE_ACLK
-static int sql_check_aclk_table(void *data, int argc, char **argv, char **column)
+enum {
+ IDX_HOST_ID,
+ IDX_HOSTNAME,
+ IDX_REGISTRY,
+ IDX_UPDATE_EVERY,
+ IDX_OS,
+ IDX_TIMEZONE,
+ IDX_TAGS,
+ IDX_HOPS,
+ IDX_MEMORY_MODE,
+ IDX_ABBREV_TIMEZONE,
+ IDX_UTC_OFFSET,
+ IDX_PROGRAM_NAME,
+ IDX_PROGRAM_VERSION,
+ IDX_ENTRIES,
+ IDX_HEALTH_ENABLED,
+};
+
+static int create_host_callback(void *data, int argc, char **argv, char **column)
{
- struct aclk_database_worker_config *wc = data;
+ UNUSED(data);
UNUSED(argc);
UNUSED(column);
- debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]);
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_DELETE_HOST;
- cmd.data = strdupz((char *) argv[0]);
- aclk_database_enq_cmd_noblock(wc, &cmd);
- return 0;
-}
+ char guid[UUID_STR_LEN];
+ uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid);
-#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
+ struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
-static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
-{
- char *err_msg = NULL;
- debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
- int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
- sqlite3_free(err_msg);
- }
+ system_info->hops = str2i((const char *) argv[IDX_HOPS]);
+
+ sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info);
+
+ RRDHOST *host = rrdhost_find_or_create(
+ (const char *) argv[IDX_HOSTNAME]
+ , (const char *) argv[IDX_REGISTRY]
+ , guid
+ , (const char *) argv[IDX_OS]
+ , (const char *) argv[IDX_TIMEZONE]
+ , (const char *) argv[IDX_ABBREV_TIMEZONE]
+ , (int32_t) (argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0)
+ , (const char *) argv[IDX_TAGS]
+ , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown")
+ , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown")
+ , argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1
+ , argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0
+ , default_rrd_memory_mode
+ , 0 // health
+ , 0 // rrdpush enabled
+ , NULL //destination
+ , NULL // api key
+ , NULL // send charts matching
+ , false // rrdpush_enable_replication
+ , 0 // rrdpush_seconds_to_replicate
+ , 0 // rrdpush_replication_step
+ , system_info
+ , 1
+ );
+ if (likely(host))
+ host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ char node_str[UUID_STR_LEN] = "<none>";
+ if (likely(host->node_id))
+ uuid_unparse_lower(*host->node_id, node_str);
+ internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str);
+#endif
+ return 0;
}
-static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+#ifdef ENABLE_ACLK
+static struct aclk_database_cmd aclk_database_deq_cmd(void)
{
- UNUSED(cmd);
-
- debug(D_ACLK, "Checking database for %s", wc->host_guid);
+ struct aclk_database_cmd ret;
+ unsigned queue_size;
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
+ uv_mutex_lock(&aclk_sync_config.cmd_mutex);
+ queue_size = aclk_sync_config.queue_size;
+ if (queue_size == 0) {
+ memset(&ret, 0, sizeof(ret));
+ ret.opcode = ACLK_DATABASE_NOOP;
+ ret.completion = NULL;
- buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND "
- "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL);
- db_execute(buffer_tostring(sql));
+ } else {
+ /* dequeue command */
+ ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head];
+ if (queue_size == 1) {
+ aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
+ } else {
+ aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
+ aclk_sync_config.cmd_queue.head + 1 : 0;
+ }
+ aclk_sync_config.queue_size = queue_size - 1;
+ /* wake up producers */
+ uv_cond_signal(&aclk_sync_config.cmd_cond);
+ }
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- buffer_free(sql);
+ return ret;
}
-
#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
-
static int is_host_available(uuid_t *host_id)
{
sqlite3_stmt *res = NULL;
@@ -94,7 +192,7 @@ static int is_host_available(uuid_t *host_id)
rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id parameter to select node instance information");
+ error_report("Failed to bind host_id parameter to check host existence");
goto failed;
}
rc = sqlite3_step_monitored(res);
@@ -107,15 +205,13 @@ failed:
}
// OPCODE: ACLK_DATABASE_DELETE_HOST
-void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+static void sql_delete_aclk_table_list(char *host_guid)
{
- UNUSED(wc);
- char uuid_str[GUID_LEN + 1];
- char host_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
+ char host_str[UUID_STR_LEN];
int rc;
uuid_t host_uuid;
- char *host_guid = (char *)cmd.data;
if (unlikely(!host_guid))
return;
@@ -157,274 +253,67 @@ void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct a
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc);
- db_execute(buffer_tostring(sql));
+ rc = db_execute(buffer_tostring(sql));
+ if (unlikely(rc))
+ error("Failed to drop unused ACLK tables");
fail:
buffer_free(sql);
}
-#endif
-
-uv_mutex_t aclk_async_lock;
-struct aclk_database_worker_config *aclk_thread_head = NULL;
-
-int claimed()
-{
- int rc;
- rrdhost_aclk_state_lock(localhost);
- rc = (localhost->aclk_state.claimed_id != NULL);
- rrdhost_aclk_state_unlock(localhost);
- return rc;
-}
-void aclk_add_worker_thread(struct aclk_database_worker_config *wc)
+static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused)
{
- if (unlikely(!wc))
- return;
-
- uv_mutex_lock(&aclk_async_lock);
- if (unlikely(!wc->host)) {
- wc->next = aclk_thread_head;
- aclk_thread_head = wc;
- }
- uv_mutex_unlock(&aclk_async_lock);
+ debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]);
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_DELETE_HOST;
+ cmd.param[0] = strdupz((char *) argv[0]);
+ aclk_database_enq_cmd_noblock(&cmd);
+ return 0;
}
-void aclk_del_worker_thread(struct aclk_database_worker_config *wc)
-{
- if (unlikely(!wc))
- return;
-
- uv_mutex_lock(&aclk_async_lock);
- struct aclk_database_worker_config **tmp = &aclk_thread_head;
- while (*tmp && (*tmp) != wc)
- tmp = &(*tmp)->next;
- if (*tmp)
- *tmp = wc->next;
- uv_mutex_unlock(&aclk_async_lock);
-}
+#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
+ "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
-int aclk_worker_thread_exists(char *guid)
+static void sql_check_aclk_table_list(void)
{
- int rc = 0;
- uv_mutex_lock(&aclk_async_lock);
-
- struct aclk_database_worker_config *tmp = aclk_thread_head;
-
- while (tmp && !rc) {
- rc = strcmp(tmp->uuid_str, guid) == 0;
- tmp = tmp->next;
+ char *err_msg = NULL;
+ debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
+ int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg);
+ if (rc != SQLITE_OK) {
+ error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
+ sqlite3_free(err_msg);
}
- uv_mutex_unlock(&aclk_async_lock);
- return rc;
}
-void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc)
-{
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- wc->queue_size = 0;
- fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
- fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
-}
+#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d;"
-int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
+static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE || wc->is_shutting_down) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return 1;
- }
-
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
- uv_mutex_unlock(&wc->cmd_mutex);
+ char sql[512];
+ snprintfz(sql,511, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL);
+ if (unlikely(db_execute(sql)))
+ error_report("Failed to clean stale ACLK alert entries");
return 0;
}
-void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
-{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- if (wc->is_shutting_down) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return;
- }
- while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
- }
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
- uv_mutex_unlock(&wc->cmd_mutex);
-
- /* wake up event loop */
- int rc = uv_async_send(&wc->async);
- if (unlikely(rc))
- debug(D_ACLK_SYNC, "Failed to wake up event loop");
-}
-
-struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_config* wc)
-{
- struct aclk_database_cmd ret;
- unsigned queue_size;
-
- uv_mutex_lock(&wc->cmd_mutex);
- queue_size = wc->queue_size;
- if (queue_size == 0 || wc->is_shutting_down) {
- memset(&ret, 0, sizeof(ret));
- ret.opcode = ACLK_DATABASE_NOOP;
- ret.completion = NULL;
- if (wc->is_shutting_down)
- uv_cond_signal(&wc->cmd_cond);
- } else {
- /* dequeue command */
- ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
- if (queue_size == 1) {
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- } else {
- wc->cmd_queue.head = wc->cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.head + 1 : 0;
- }
- wc->queue_size = queue_size - 1;
- /* wake up producers */
- uv_cond_signal(&wc->cmd_cond);
- }
- uv_mutex_unlock(&wc->cmd_mutex);
- return ret;
-}
+#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');"
-struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id)