summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r--database/sqlite/sqlite_aclk.c144
1 files changed, 68 insertions, 76 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index db4b4008e6..78c42678ec 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -3,17 +3,17 @@
#include "sqlite_functions.h"
#include "sqlite_aclk.h"
-// TODO: To be added
#include "sqlite_aclk_chart.h"
-//#include "sqlite_aclk_alert.h"
#include "sqlite_aclk_node.h"
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#include "../../aclk/aclk.h"
+#endif
+
const char *aclk_sync_config[] = {
NULL,
};
-int aclk_architecture = 0;
-
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
@@ -78,24 +78,6 @@ void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc)
fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
-void aclk_database_enq_cmd_nowake(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
-{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
- }
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
- uv_mutex_unlock(&wc->cmd_mutex);
-}
-
int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
{
unsigned queue_size;
@@ -207,7 +189,7 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
void sql_aclk_sync_init(void)
{
-#ifdef ACLK_NEWARCH_DEVMODE
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
char *err_msg = NULL;
int rc;
@@ -251,35 +233,49 @@ static void async_cb(uv_async_t *handle)
static void timer_cb(uv_timer_t* handle)
{
- struct aclk_database_worker_config *wc = handle->data;
uv_stop(handle->loop);
uv_update_time(handle->loop);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ struct aclk_database_worker_config *wc = handle->data;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_TIMER;
aclk_database_enq_cmd_noblock(wc, &cmd);
- if (wc->cleanup_after && wc->cleanup_after < now_realtime_sec()) {
+ time_t now = now_realtime_sec();
+
+ if (wc->cleanup_after && wc->cleanup_after < now) {
cmd.opcode = ACLK_DATABASE_CLEANUP;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
- if (wc->chart_updates && !wc->chart_pending) {
- cmd.opcode = ACLK_DATABASE_PUSH_CHART;
- cmd.count = ACLK_MAX_CHART_BATCH;
- cmd.completion = NULL;
- cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT;
- if (!aclk_database_enq_cmd_noblock(wc, &cmd))
- wc->chart_pending = 1;
- }
+ if (aclk_use_new_cloud_arch && aclk_connected) {
+ if (wc->rotation_after && wc->rotation_after < now) {
+ cmd.opcode = ACLK_DATABASE_NODE_INFO;
+ aclk_database_enq_cmd_noblock(wc, &cmd);
- if (wc->alert_updates) {
- cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
- cmd.count = ACLK_MAX_ALERT_UPDATES;
- aclk_database_enq_cmd_noblock(wc, &cmd);
+ cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
+ if (!aclk_database_enq_cmd_noblock(wc, &cmd))
+ wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL;
+ }
+
+ if (wc->chart_updates && !wc->chart_pending) {
+ cmd.opcode = ACLK_DATABASE_PUSH_CHART;
+ cmd.count = ACLK_MAX_CHART_BATCH;
+ cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT;
+ if (!aclk_database_enq_cmd_noblock(wc, &cmd))
+ wc->chart_pending = 1;
+ }
+
+ if (wc->alert_updates) {
+ cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
+ cmd.count = ACLK_MAX_ALERT_UPDATES;
+ aclk_database_enq_cmd_noblock(wc, &cmd);
+ }
}
+#endif
}
#define MAX_CMD_BATCH_SIZE (256)
@@ -334,8 +330,14 @@ void aclk_database_worker(void *arg)
wc->node_info_send = (wc->host && !localhost);
aclk_add_worker_thread(wc);
info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc));
-// TODO: To be added
-// sql_get_last_chart_sequence(wc, cmd);
+
+ memset(&cmd, 0, sizeof(cmd));
+ sql_get_last_chart_sequence(wc, cmd);
+ wc->chart_updates = 0;
+ wc->alert_updates = 0;
+ wc->startup_time = now_realtime_sec();
+ wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST;
+ wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY;
while (likely(shutdown == 0)) {
uv_run(loop, UV_RUN_DEFAULT);
@@ -345,11 +347,8 @@ void aclk_database_worker(void *arg)
/* wait for commands */
cmd_batch_size = 0;
do {
- if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) {
- info("DEBUG: %s Processed %u commands, current queue about %u",
- wc->uuid_str, cmd_batch_size, wc->queue_size);
+ if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
break;
- }
cmd = aclk_database_deq_cmd(wc);
opcode = cmd.opcode;
++cmd_batch_size;
@@ -365,14 +364,6 @@ void aclk_database_worker(void *arg)
if (wc->host == localhost)
sql_check_aclk_table_list(wc);
break;
- case ACLK_DATABASE_CHECK:
- debug(D_ACLK_SYNC, "Checking database dimensions for %s", wc->host_guid);
-// sql_check_dimension_state(wc, cmd);
- break;
- case ACLK_DATABASE_CHECK_ROTATION:
- debug(D_ACLK_SYNC, "Checking database for rotation %s", wc->host_guid);
-// sql_check_rotation_state(wc, cmd);
- break;
case ACLK_DATABASE_DELETE_HOST:
debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data);
sql_delete_aclk_table_list(wc, cmd);
@@ -407,19 +398,19 @@ void aclk_database_worker(void *arg)
// ALERTS
case ACLK_DATABASE_ADD_ALERT:
debug(D_ACLK_SYNC,"Adding alert event for %s", wc->host_guid);
-// aclk_add_alert_event(wc, cmd);
+ aclk_add_alert_event(wc, cmd);
break;
case ACLK_DATABASE_PUSH_ALERT_CONFIG:
debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid);
-// aclk_push_alert_config_event(wc, cmd);
+ aclk_push_alert_config_event(wc, cmd);
break;
case ACLK_DATABASE_PUSH_ALERT:
debug(D_ACLK_SYNC, "Pushing alert info to the cloud for %s", wc->host_guid);
-// aclk_push_alert_event(wc, cmd);
+ aclk_push_alert_event(wc, cmd);
break;
case ACLK_DATABASE_ALARM_HEALTH_LOG:
debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid);
-// aclk_push_alarm_health_log(wc, cmd);
+ aclk_push_alarm_health_log(wc, cmd);
break;
// NODE OPERATIONS
@@ -427,8 +418,9 @@ void aclk_database_worker(void *arg)
debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str);
sql_build_node_info(wc, cmd);
break;
- case ACLK_DATABASE_UPD_STATS:
-// sql_update_metric_statistics(wc, cmd);
+ case ACLK_DATABASE_UPD_RETENTION:
+ debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
+ aclk_update_retention(wc, cmd);
break;
// NODE_INSTANCE DETECTION
@@ -446,7 +438,7 @@ void aclk_database_worker(void *arg)
}
}
}
- if (wc->node_info_send && wc->host && localhost && claimed()) {
+ if (wc->node_info_send && wc->host && localhost && claimed() && aclk_connected) {
cmd.opcode = ACLK_DATABASE_NODE_INFO;
cmd.completion = NULL;
wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd);
@@ -471,7 +463,7 @@ void aclk_database_worker(void *arg)
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour and we need to be aware if this becomes
+ * it is however undocumented behaviour we need to be aware if this becomes
* an issue in the future.
*/
uv_close((uv_handle_t *)&wc->async, NULL);
@@ -509,13 +501,9 @@ error_after_loop_init:
// -------------------------------------------------------------
-void aclk_set_architecture(int mode)
-{
- aclk_architecture = mode;
-}
-
void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
{
+#ifdef ENABLE_ACLK
char uuid_str[GUID_LEN + 1];
char host_guid[GUID_LEN + 1];
@@ -568,15 +556,17 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
if (likely(host))
host->dbsync_worker = (void *) wc;
wc->host = host;
- wc->chart_updates = 0;
- wc->alert_updates = 0;
- wc->startup_time = now_realtime_sec();
- wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST;
strcpy(wc->uuid_str, uuid_str);
strcpy(wc->host_guid, host_guid);
if (node_id && !uuid_is_null(*node_id))
uuid_unparse_lower(*node_id, wc->node_id);
fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc));
+#else
+ UNUSED(host);
+ UNUSED(host_uuid);
+ UNUSED(node_id);
+#endif
+ return;
}
void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
@@ -722,19 +712,20 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
return;
}
-void aclk_data_rotated(RRDHOST *host)
+void aclk_data_rotated(void)
{
- UNUSED(host);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- debug(D_ACLK_SYNC,"Processing data base rotation event");
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_UPD_STATS;
+ if (!aclk_use_new_cloud_arch || !aclk_connected)
+ return;
+ time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY;
rrd_wrlock();
RRDHOST *this_host = localhost;
while (this_host) {
- aclk_database_enq_cmd((struct aclk_database_worker_config *)this_host->dbsync_worker, &cmd);
+ struct aclk_database_worker_config *wc = this_host->dbsync_worker;
+ if (wc)
+ wc->rotation_after = next_rotation_time;
this_host = this_host->next;
}
rrd_unlock();
@@ -743,9 +734,10 @@ void aclk_data_rotated(RRDHOST *host)
uv_mutex_lock(&aclk_async_lock);
while (tmp) {
- aclk_database_enq_cmd(tmp, &cmd);
+ tmp->rotation_after = next_rotation_time;
tmp = tmp->next;
}
uv_mutex_unlock(&aclk_async_lock);
+#endif
return;
}