// SPDX-License-Identifier: GPL-3.0-or-later
#include "sqlite_functions.h"
#include "sqlite_aclk.h"
#include "sqlite_aclk_chart.h"
#include "sqlite_aclk_node.h"
#ifdef ENABLE_ACLK
#include "../../aclk/aclk.h"
#endif
void sanity_check(void) {
// make sure the compiler will stop on misconfigurations
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
}
const char *aclk_sync_config[] = {
"CREATE TABLE IF NOT EXISTS dimension_delete (dimension_id blob, dimension_name text, chart_type_id text, "
"dim_id blob, chart_id blob, host_id blob, date_created);",
"CREATE INDEX IF NOT EXISTS ind_h1 ON dimension_delete (host_id);",
"CREATE TRIGGER IF NOT EXISTS tr_dim_del AFTER DELETE ON dimension BEGIN INSERT INTO dimension_delete "
"(dimension_id, dimension_name, chart_type_id, dim_id, chart_id, host_id, date_created)"
" select old.id, old.name, c.type||\".\"||c.id, old.dim_id, old.chart_id, c.host_id, unixepoch() FROM"
" chart c WHERE c.chart_id = old.chart_id; END;",
"DELETE FROM dimension_delete WHERE host_id NOT IN"
" (SELECT host_id FROM host) OR unixepoch() - date_created > 604800;",
NULL,
};
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
int retention_running = 0;
#ifdef ENABLE_ACLK
static void stop_retention_run()
{
uv_mutex_lock(&aclk_async_lock);
retention_running = 0;
uv_mutex_unlock(&aclk_async_lock);
}
static int request_retention_run()
{
int rc = 0;
uv_mutex_lock(&aclk_async_lock);
if (unlikely(retention_running))
rc = 1;
else
retention_running = 1;
uv_mutex_unlock(&aclk_async_lock);
return rc;
}
#endif
int claimed()
{
int rc;
rrdhost_aclk_state_lock(localhost);
rc = (localhost->aclk_state.claimed_id != NULL);
rrdhost_aclk_state_unlock(localhost);
return rc;
}
void aclk_add_worker_thread(struct aclk_database_worker_config *wc)
{
if (unlikely(!wc))
return;
uv_mutex_lock(&aclk_async_lock);
if (unlikely(!wc->host)) {
wc->next = aclk_thread_head;
aclk_thread_head = wc;
}
uv_mutex_unlock(&aclk_async_lock);
return;
}
void aclk_del_worker_thread(struct aclk_database_worker_config *wc)
{
if (unlikely(!wc))
return;
uv_mutex_lock(&aclk_async_lock);
struct aclk_database_worker_config **tmp = &aclk_thread_head;
while (*tmp && (*tmp) != wc)
tmp = &(*tmp)->next;
if (*tmp)
*tmp = wc->next;
uv_mutex_unlock(&aclk_async_lock);
return;
}
int aclk_worker_thread_exists(char *guid)
{
int rc = 0;
uv_mutex_lock(&aclk_async_lock);
struct aclk_database_worker_config *tmp = aclk_thread_head;
while (tmp && !rc) {
rc = strcmp(tmp->uuid_str, guid) == 0;
tmp = tmp->next;
}
uv_mutex_unlock(&aclk_async_lock);
return rc;
}
void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
wc->queue_size = 0;
fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
{
unsigned queue_size;
/* wait for free space in queue */
uv_mutex_lock(&wc->cmd_mutex);
if ((queue_size