diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2021-09-21 22:37:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-21 22:37:12 +0300 |
commit | 2085a518c3fa415b9e4d08002c720465417c7c14 (patch) | |
tree | ce9a9f71d1c39ae3a9dec6c034d55404d4a6f18b /database | |
parent | 3ab85b94be68cf41184e5c3916feda34e544ddd4 (diff) |
Add chart message support for ACLK new architecture (#11447)
Diffstat (limited to 'database')
-rw-r--r-- | database/rrd.h | 7 | ||||
-rw-r--r-- | database/rrdhost.c | 5 | ||||
-rw-r--r-- | database/rrdset.c | 3 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 198 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 27 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 762 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.h | 40 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.h | 2 |
8 files changed, 879 insertions, 165 deletions
diff --git a/database/rrd.h b/database/rrd.h index ba6ef4d0b2..ba59eac16b 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -163,7 +163,8 @@ typedef enum rrddim_flags { RRDDIM_FLAG_OBSOLETE = (1 << 2), // this is marked by the collector/module as obsolete // No new values have been collected for this dimension since agent start or it was marked RRDDIM_FLAG_OBSOLETE at // least rrdset_free_obsolete_time seconds ago. - RRDDIM_FLAG_ARCHIVED = (1 << 3) + RRDDIM_FLAG_ARCHIVED = (1 << 3), + RRDDIM_FLAG_ACLK = (1 << 4) } RRDDIM_FLAGS; #ifdef HAVE_C___ATOMIC @@ -380,6 +381,9 @@ struct rrddim_volatile { uuid_t *rrdeng_uuid; // database engine metric UUID struct pg_cache_page_index *page_index; #endif +#ifdef ENABLE_ACLK + int aclk_live_status; +#endif uuid_t metric_uuid; // global UUID for this metric (unique_across hosts) union rrddim_collect_handle handle; // ------------------------------------------------------------------------ @@ -423,6 +427,7 @@ struct rrddim_volatile { struct rrdset_volatile { char *old_title; char *old_context; + uuid_t hash_id; struct label *new_labels; struct label_index labels; }; diff --git a/database/rrdhost.c b/database/rrdhost.c index 7922822b87..59e2c6d77c 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -1443,6 +1443,11 @@ restart_after_removal: continue; } + if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK)) { + last = rd; + rd = rd->next; + continue; + } rrddim_flag_set(rd, RRDDIM_FLAG_ARCHIVED); while (rd->variables) rrddimvar_free(rd->variables); diff --git a/database/rrdset.c b/database/rrdset.c index 6b64561b92..0af189f77c 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -1861,7 +1861,8 @@ after_second_database_work: rrdset_wrlock(st); for( rd = st->dimensions, last = NULL ; likely(rd) ; ) { - if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE) && (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) { + if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE) && !rrddim_flag_check(rd, RRDDIM_FLAG_ACLK) + && (rd->last_collected_time.tv_sec + rrdset_free_obsolete_time < now))) { info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id); if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || rd->rrd_memory_mode == RRD_MEMORY_MODE_MAP)) { diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 7dbc8e69ee..db4b4008e6 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -4,7 +4,7 @@ #include "sqlite_aclk.h" // TODO: To be added -//#include "sqlite_aclk_chart.h" +#include "sqlite_aclk_chart.h" //#include "sqlite_aclk_alert.h" #include "sqlite_aclk_node.h" @@ -170,6 +170,25 @@ struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_confi return ret; } +int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd) +{ + if (unlikely(!node_id || !cmd)) + return 0; + + uv_mutex_lock(&aclk_async_lock); + struct aclk_database_worker_config *wc = aclk_thread_head; + + while (wc) { + if (!strcmp(wc->node_id, node_id)) + break; + wc = wc->next; + } + if (wc) + aclk_database_enq_cmd(wc, cmd); + uv_mutex_unlock(&aclk_async_lock); + return (wc == NULL); +} + int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) { char uuid_str[GUID_LEN + 1]; @@ -247,11 +266,13 @@ static void timer_cb(uv_timer_t* handle) wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } - if (wc->chart_updates) { + if (wc->chart_updates && !wc->chart_pending) { cmd.opcode = ACLK_DATABASE_PUSH_CHART; cmd.count = ACLK_MAX_CHART_BATCH; + cmd.completion = NULL; cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT; - aclk_database_enq_cmd_noblock(wc, &cmd); + if (!aclk_database_enq_cmd_noblock(wc, &cmd)) + wc->chart_pending = 1; } if (wc->alert_updates) { @@ -310,9 +331,9 @@ void aclk_database_worker(void *arg) wc->error = 0; shutdown = 0; + wc->node_info_send = (wc->host && !localhost); aclk_add_worker_thread(wc); - - info("Starting ACLK sync event loop for host with GUID %s (Host is '%s')", wc->host_guid, wc->host ? "connected" : "not connected"); + info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc)); // TODO: To be added // sql_get_last_chart_sequence(wc, cmd); while (likely(shutdown == 0)) { @@ -324,12 +345,14 @@ void aclk_database_worker(void *arg) /* wait for commands */ cmd_batch_size = 0; do { - if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) + if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) { + info("DEBUG: %s Processed %u commands, current queue about %u", + wc->uuid_str, cmd_batch_size, wc->queue_size); break; + } cmd = aclk_database_deq_cmd(wc); opcode = cmd.opcode; ++cmd_batch_size; - db_lock(); switch (opcode) { case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ @@ -356,41 +379,29 @@ void aclk_database_worker(void *arg) break; // CHART / DIMENSION OPERATIONS + case ACLK_DATABASE_ADD_CHART: + debug(D_ACLK_SYNC, "Adding chart event for %s", wc->host_guid); + aclk_add_chart_event(wc, cmd); + break; + case ACLK_DATABASE_ADD_DIMENSION: + debug(D_ACLK_SYNC, "Adding dimension event for %s", wc->host_guid); + aclk_add_dimension_event(wc, cmd); + break; case ACLK_DATABASE_PUSH_CHART: debug(D_ACLK_SYNC, "Pushing chart info to the cloud for node %s", wc->host_guid); -// aclk_push_chart_event(wc, cmd); + aclk_send_chart_event(wc, cmd); break; case ACLK_DATABASE_PUSH_CHART_CONFIG: debug(D_ACLK_SYNC, "Pushing chart config info to the cloud for node %s", wc->host_guid); -// aclk_push_chart_config(wc, cmd); + aclk_send_chart_config(wc, cmd); break; case ACLK_DATABASE_CHART_ACK: debug(D_ACLK_SYNC, "ACK chart SEQ for %s to %"PRIu64, wc->uuid_str, (uint64_t) cmd.param1); -// sql_set_chart_ack(wc, cmd); + aclk_receive_chart_ack(wc, cmd); break; case ACLK_DATABASE_RESET_CHART: debug(D_ACLK_SYNC, "RESET chart SEQ for %s to %"PRIu64, wc->uuid_str, (uint64_t) cmd.param1); -// sql_reset_chart_event(wc, cmd); - break; - case ACLK_DATABASE_STATUS_CHART: - debug(D_ACLK_SYNC,"Requesting chart status for %s", wc->host_guid); -// aclk_status_chart_event(wc, cmd); - break; - case ACLK_DATABASE_ADD_CHART: - debug(D_ACLK_SYNC,"Adding chart event for %s", wc->host_guid); -// aclk_add_chart_event(wc, cmd); - break; - case ACLK_DATABASE_ADD_DIMENSION: - debug(D_ACLK_SYNC,"Adding dimension event for %s", wc->host_guid); -// aclk_add_dimension_event(wc, cmd); - break; - case ACLK_DATABASE_DEDUP_CHART: - debug(D_ACLK_SYNC,"Running chart deduplication for %s", wc->host_guid); -// sql_chart_deduplicate(wc, cmd); - break; - case ACLK_DATABASE_SYNC_CHART_SEQ: - debug(D_ACLK_SYNC,"Calculatting chart sequence for %s", wc->host_guid); -// sql_get_last_chart_sequence(wc, cmd); + aclk_receive_chart_reset(wc, cmd); break; // ALERTS @@ -431,14 +442,15 @@ void aclk_database_worker(void *arg) uv_thread_set_name_np(wc->thread, threadname); wc->host->dbsync_worker = wc; aclk_del_worker_thread(wc); - if (wc->host->node_id) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - cmd.completion = NULL; - aclk_database_enq_cmd(wc, &cmd); - } + wc->node_info_send = 1; } } } + if (wc->node_info_send && wc->host && localhost && claimed()) { + cmd.opcode = ACLK_DATABASE_NODE_INFO; + cmd.completion = NULL; + wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd); + } break; case ACLK_DATABASE_SHUTDOWN: shutdown = 1; @@ -449,7 +461,6 @@ void aclk_database_worker(void *arg) debug(D_ACLK_SYNC, "%s: default.", __func__); break; } - db_unlock(); if (cmd.completion) aclk_complete(cmd.completion); } while (opcode != ACLK_DATABASE_NOOP); @@ -503,119 +514,6 @@ void aclk_set_architecture(int mode) aclk_architecture = mode; } -#define SELECT_HOST_DIMENSION_LIST "SELECT d.dim_id, c.update_every, c.type||'.'||c.id FROM chart c, dimension d, host h " \ - "WHERE d.chart_id = c.chart_id AND c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;" - -#define SELECT_HOST_CHART_LIST "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \ - "WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;" -// -//void sql_update_metric_statistics(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -//{ -// UNUSED(cmd); -// -// int rc; -// -// char *claim_id = is_agent_claimed(); -// if (unlikely(!claim_id)) -// return; -// -// sqlite3_stmt *res = NULL; -// -// if (!wc->host || wc->host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) -// rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_DIMENSION_LIST, -1, &res, 0); -// else -// rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_CHART_LIST, -1, &res, 0); -// -// if (unlikely(rc != SQLITE_OK)) { -// error_report("Failed to prepare statement to fetch host dimensions"); -// return; -// } -// -// if (wc->host) -// rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid , sizeof(wc->host->host_uuid), SQLITE_STATIC); -// else { -// uuid_t host_uuid; -// rc = uuid_parse(wc->host_guid, host_uuid); -// if (unlikely(rc)) -// goto failed; -// rc = sqlite3_bind_blob(res, 1, &host_uuid, sizeof(host_uuid), SQLITE_STATIC); -// } -// if (unlikely(rc != SQLITE_OK)) { -// error_report("Failed to bind host parameter to fetch host dimensions"); -// goto failed; -// } -// -// time_t start_time = LONG_MAX; -// time_t first_entry_t; -// uint32_t update_every = 0; -// -// struct retention_updated rotate_data; -// -// memset(&rotate_data, 0, sizeof(rotate_data)); -// -// int max_intervals = 32; -// -// rotate_data.interval_duration_count = 0; -// rotate_data.interval_durations = callocz(max_intervals, sizeof(*rotate_data.interval_durations)); -// -// now_realtime_timeval(&rotate_data.rotation_timestamp); -// rotate_data.memory_mode = wc->host ? wc->host->rrd_memory_mode : RRD_MEMORY_MODE_DBENGINE; -// rotate_data.claim_id = claim_id; -// rotate_data.node_id = strdupz(wc->node_id); -// -// while (sqlite3_step(res) == SQLITE_ROW) { -// if (!update_every || update_every != (uint32_t) sqlite3_column_int(res, 1)) { -// if (update_every) { -// debug(D_ACLK_SYNC,"Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); -// rotate_data.interval_durations[rotate_data.interval_duration_count].retention = rotate_data.rotation_timestamp.tv_sec - start_time; -// rotate_data.interval_duration_count++; -// } -// update_every = (uint32_t) sqlite3_column_int(res, 1); -// rotate_data.interval_durations[rotate_data.interval_duration_count].update_every = update_every; -// start_time = LONG_MAX; -// } -//#ifdef ENABLE_DBENGINE -// time_t last_entry_t; -// if (!wc->host || wc->host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) -// rc = rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t); -// else -//#endif -// { -// RRDSET *st = NULL; -// rc = (st = rrdset_find(wc->host, (const char *)sqlite3_column_text(res, 2))) ? 0 : 1; -// if (!rc) { -// first_entry_t = rrdset_first_entry_t(st); -//// info("DEBUG: Scanning SET = %s --> %ld", st->name, first_entry_t); -// } -// } -// -// if (likely(!rc && first_entry_t)) -// start_time = MIN(start_time, first_entry_t); -// } -// if (update_every) { -// debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); -// rotate_data.interval_durations[rotate_data.interval_duration_count].retention = rotate_data.rotation_timestamp.tv_sec - start_time; -// rotate_data.interval_duration_count++; -// } -// -// info("DEBUG: Scan update every for host"); -// for (int i = 0; i < rotate_data.interval_duration_count; ++i) { -// info("DEBUG: %d --> Update %s for %u Retention = %u", i, wc->host_guid, -// rotate_data.interval_durations[i].update_every, rotate_data.interval_durations[i].retention); -// }; -// aclk_retention_updated(&rotate_data); -// freez(rotate_data.node_id); -// freez(rotate_data.claim_id); -// freez(rotate_data.interval_durations); -// -//failed: -// rc = sqlite3_finalize(res); -// if (unlikely(rc != SQLITE_OK)) -// error_report("Failed to finalize the prepared statement when reading host dimensions"); -// return; -//} - - void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) { char uuid_str[GUID_LEN + 1]; diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index e8ce926a3a..96031bf5b1 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -6,13 +6,13 @@ #include "sqlite3.h" // TODO: To be added -//#include "../../aclk/schema-wrappers/chart_stream.h" +#include "../../aclk/schema-wrappers/chart_stream.h" #ifndef ACLK_MAX_CHART_BATCH -#define ACLK_MAX_CHART_BATCH (20) +#define ACLK_MAX_CHART_BATCH (200) #endif #ifndef ACLK_MAX_CHART_BATCH_COUNT -#define ACLK_MAX_CHART_BATCH_COUNT (5) +#define ACLK_MAX_CHART_BATCH_COUNT (10) #endif #define ACLK_MAX_ALERT_UPDATES (5) #define ACLK_SYNC_RETRY_COUNT "10" @@ -84,7 +84,7 @@ static inline char *get_str_from_uuid(uuid_t *uuid) return strdupz(uuid_str); } -#define TABLE_ACLK_CHART "CREATE TABLE IF NOT EXISTS aclk_chart_%s (sequence_id INTEGER PRIMARY KEY AUTOINCREMENT, " \ +#define TABLE_ACLK_CHART "CREATE TABLE IF NOT EXISTS aclk_chart_%s (sequence_id INTEGER PRIMARY KEY, " \ "date_created, date_updated, date_submitted, status, uuid, type, unique_id, " \ "update_count default 1, unique(uuid, status));" @@ -123,7 +123,6 @@ enum aclk_database_opcode { ACLK_DATABASE_CHECK, ACLK_DATABASE_CHECK_ROTATION, ACLK_DATABASE_CLEANUP, - ACLK_DATABASE_DEDUP_CHART, ACLK_DATABASE_DELETE_HOST, ACLK_DATABASE_NODE_INFO, ACLK_DATABASE_PUSH_ALERT, @@ -133,8 +132,6 @@ enum aclk_database_opcode { ACLK_DATABASE_RESET_CHART, ACLK_DATABASE_RESET_NODE, ACLK_DATABASE_SHUTDOWN, - ACLK_DATABASE_STATUS_CHART, - ACLK_DATABASE_SYNC_CHART_SEQ, ACLK_DATABASE_TIMER, ACLK_DATABASE_UPD_STATS, ACLK_DATABASE_MAX_OPCODE @@ -157,7 +154,7 @@ struct aclk_database_cmd { struct aclk_completion *completion; }; -#define ACLK_DATABASE_CMD_Q_MAX_SIZE (2048) +#define ACLK_DATABASE_CMD_Q_MAX_SIZE (16384) struct aclk_database_cmdqueue { unsigned head, tail; @@ -189,6 +186,9 @@ struct aclk_database_worker_config { int chart_updates; int alert_updates; time_t batch_created; + int node_info_send; + int chart_pending; + int chart_reset_count; struct aclk_database_worker_config *next; }; @@ -212,14 +212,15 @@ static inline RRDHOST *find_host_by_node_id(char *node_id) extern sqlite3 *db_meta; -extern void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); extern int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); -extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); +extern void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); extern void aclk_set_architecture(int mode); +extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); +int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd); +void aclk_data_rotated(RRDHOST *host); void sql_aclk_sync_init(void); -void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); +void sql_check_aclk_table_list(struct aclk_database_worker_config *wc); void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); void sql_drop_host_aclk_table_list(uuid_t *host_uuid); -void sql_check_aclk_table_list(struct aclk_database_worker_config *wc); -void aclk_data_rotated(RRDHOST *host); +void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); #endif //NETDATA_SQLITE_ACLK_H diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c new file mode 100644 index 0000000000..132a4bf056 --- /dev/null +++ b/database/sqlite/sqlite_aclk_chart.c @@ -0,0 +1,762 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "sqlite_functions.h" +#include "sqlite_aclk_chart.h" + +#include "../../aclk/aclk_charts_api.h" + +#define CHECK_SQLITE_CONNECTION(db_meta) \ + if (unlikely(!db_meta)) { \ + if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { \ + return 1; \ + } \ + error_report("Database has not been initialized"); \ + return 1; \ + } + +static inline int sql_queue_chart_payload(struct aclk_database_worker_config *wc, + void *data, enum aclk_database_opcode opcode) +{ + if (unlikely(!wc)) + return 1; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = opcode; + cmd.data = data; + aclk_database_enq_cmd(wc, &cmd); + return 0; +} + +static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) +{ + static __thread sqlite3_stmt *res = NULL; + int rc; + int send_status = 0; + + if (unlikely(!res)) { + BUFFER *sql = buffer_create(1024); + buffer_sprintf(sql,"SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " + "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", + uuid_str, uuid_str); + rc = prepare_statement(db_meta, (char *) buffer_tostring(sql), &res); + buffer_free(sql); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to check payload data"); + return 0; + } + } + + rc = sqlite3_bind_blob(res, 1, uuid , sizeof(*uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_blob(res, 2, payload , payload_size, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + while (sqlite3_step(res) == SQLITE_ROW) { + send_status = sqlite3_column_int(res, 0); + } + +bind_fail: + if (unlikely(sqlite3_reset(res) != SQLITE_OK)) + error_report("Failed to reset statement in check payload, rc = %d", rc); + return send_status; +} + +static int aclk_add_chart_payload(char *uuid_str, uuid_t *uuid, char *claim_id, ACLK_PAYLOAD_TYPE payload_type, + void *payload, size_t payload_size) +{ + static __thread sqlite3_stmt *res_chart = NULL; + int rc; + + rc = payload_sent(uuid_str, uuid, payload, payload_size); + if (rc == 1) + return 0; + + if (unlikely(!res_chart)) { + BUFFER *sql = buffer_create(1024); + + buffer_sprintf(sql,"INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ + "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", uuid_str); + + rc = prepare_statement(db_meta, (char *) buffer_tostring(sql), &res_chart); + buffer_free(sql); + + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to store chart payload data"); + return 1; + } + } + + uuid_t unique_uuid; + uuid_generate(unique_uuid); + + uuid_t claim_uuid; + uuid_parse(claim_id, claim_uuid); + + rc = sqlite3_bind_blob(res_chart, 1, &unique_uuid , sizeof(unique_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_blob(res_chart, 2, uuid , sizeof(*uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_blob(res_chart, 3, &claim_uuid , sizeof(claim_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res_chart, 4, payload_type); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_blob(res_chart, 5, payload, payload_size, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res_chart); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed store chart payload event, rc = %d", rc); + +bind_fail: + if (unlikely(sqlite3_reset(res_chart) != SQLITE_OK)) + error_report("Failed to reset statement in store chart payload, rc = %d", rc); + return (rc != SQLITE_DONE); +} + +int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ + int rc = 0; + CHECK_SQLITE_CONNECTION(db_meta); + +#ifdef ACLK_NG + char *claim_id = is_agent_claimed(); + + RRDSET *st = cmd.data; + + if (likely(claim_id)) { + struct chart_instance_updated chart_payload; + memset(&chart_payload, 0, sizeof(chart_payload)); + chart_payload.config_hash = get_str_from_uuid(&st->state->hash_id); + chart_payload.update_every = st->update_every; + chart_payload.memory_mode = st->rrd_memory_mode; + chart_payload.name = strdupz((char *)st->name); + chart_payload.node_id = strdupz(wc->node_id); + chart_payload.claim_id = claim_id; + chart_payload.id = strdupz(st->id); + + struct label_index *labels = &st->state->labels; + netdata_rwlock_wrlock(&labels->labels_rwlock); + struct label *label_list = labels->head; + struct label *chart_label = NULL; + while (label_list) { + chart_label = add_label_to_list(chart_label, label_list->key, label_list->value, label_list->label_source); + label_list = label_list->next; + } + netdata_rwlock_unlock(&labels->labels_rwlock); + chart_payload.label_head = chart_label; + + size_t size; + char *payload = generate_chart_instance_updated(&size, &chart_payload); + if (likely(payload)) + rc = aclk_add_chart_payload(wc->uuid_str, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size); + freez(payload); + chart_instance_updated_destroy(&chart_payload); + } +#else + UNUSED(wc); + UNUSED(cmd); +#endif + return rc; +} + +int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ + int rc = 0; + CHECK_SQLITE_CONNECTION(db_meta); + +#ifdef ACLK_NG + char *claim_id = is_agent_claimed(); + + RRDDIM *rd = cmd.data; + + if (likely(claim_id)) { + time_t now = now_realtime_sec(); + + time_t first_t = rd->state->query_ops.oldest_time(rd); + time_t last_t = rd->state->query_ops.latest_time(rd); + + int live = ((now - last_t) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every)); + + struct chart_dimension_updated dim_payload; + size_t size; + + memset(&dim_payload, 0, sizeof(dim_payload)); + dim_payload.node_id = strdupz(wc->node_id); + dim_payload.claim_id = claim_id; + dim_payload.name = strdupz(rd->name); + dim_payload.id = strdupz(rd->id); + + dim_payload.chart_id = strdupz(rd->rrdset->name); + dim_payload.created_at.tv_sec = first_t; + if (unlikely(!live)) + dim_payload.last_timestamp.tv_sec = last_t; + + char *payload = generate_chart_dimension_updated(&size, &dim_payload); + if (likely(payload)) + rc = aclk_add_chart_payload(wc->uuid_str, &rd->state->metric_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size); + freez((char *)dim_payload.node_id); + freez((char *)dim_payload.chart_id); + freez((char *)dim_payload.name); + freez((char *)dim_payload.id); + freez(payload); + freez(claim_id); + } + rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); +#else + UNUSED(wc); + UNUSED(cmd); +#endif + return rc; +} + +void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ +#ifdef ACLK_NG + int rc; + + wc->chart_pending = 0; + if (unlikely(!wc->chart_updates)) { + debug(D_ACLK_SYNC,"Ignoring chart push event, updates have been turned off for node %s", wc->node_id); + return; + } + + char *claim_id = is_agent_claimed(); + if (unlikely(!claim_id)) + return; + + int limit = cmd.count > 0 ? cmd.count : 1; + + uint64_t first_sequence; + uint64_t last_sequence; + time_t last_timestamp; + + BUFFER *sql = buffer_create(1024); + + sqlite3_stmt *res = NULL; + + buffer_sprintf(sql, "SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \ + "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \ + "WHERE ac.date_submitted IS NULL AND ac.unique_id = acp.unique_id AND ac.update_count > 0 " \ + "AND acp.claim_id = @claim_id ORDER BY ac.sequence_id ASC LIMIT %d;", wc->uuid_str, wc->uuid_str, limit); + + rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to send a chart update via ACLK"); + buffer_free(sql); + freez(claim_id); + return; + } + + rc = sqlite3_bind_text(res, 1, claim_id , -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + char **payload_list = callocz(limit+1, sizeof(char *)); + size_t *payload_list_size = callocz(limit+1, sizeof(size_t)); + size_t *payload_list_max_size = callocz(limit+1, sizeof(size_t)); + struct aclk_message_position *position_list = callocz(limit+1, sizeof(*position_list)); + int *is_dim = callocz(limit+1, sizeof(*is_dim)); + + int loop = cmd.param1; + + while (loop > 0) { + uint64_t previous_sequence_id = wc->chart_sequence_id; + int count = 0; + first_sequence = 0; + last_sequence = 0; + while (count < limit && sqlite3_step(res) == SQLITE_ROW) { + size_t payload_size = sqlite3_column_bytes(res, 1); + if (payload_list_max_size[count] < payload_size) { + freez(payload_list[count]); + payload_list_max_size[count] = payload_size; + payload_list[count] = mallocz(payload_size); + } + payload_list_size[count] = payload_size; + memcpy(payload_list[count], sqlite3_column_blob(res, 1), payload_size); + position_list[count].sequence_id = (uint64_t)sqlite3_column_int64(res, 0); + position_list[count].previous_sequence_id = previous_sequence_id; + position_list[count].seq_id_creation_time.tv_sec = sqlite3_column_int64(res, 2); + position_list[count].seq_id_creation_time.tv_usec = 0; + if (!first_sequence) + first_sequence = position_list[count].sequence_id; + last_sequence = position_list[count].sequence_id; + last_timestamp = position_list[count].seq_id_creation_time.tv_sec; + previous_sequence_id = last_sequence; + is_dim[count] = sqlite3_column_int(res, 3) > 0; + count++; + } + freez(payload_list[count]); + payload_list_max_size[count] = 0; + payload_list[count] = NULL; + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when pushing chart events, rc = %d", rc); + + if (likely(first_sequence)) { + buffer_flush(sql); + + db_lock(); + buffer_sprintf(sql, "UPDATE aclk_chart_%s SET status = NULL, date_submitted=strftime('%%s','now') " + "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", + wc->uuid_str, first_sequence, last_sequence); + db_execute(buffer_tostring(sql)); + + buffer_flush(sql); + buffer_sprintf(sql, "INSERT OR REPLACE INTO aclk_chart_latest_%s (uuid, unique_id, date_submitted) " + " SELECT uuid, unique_id, date_submitted FROM aclk_chart_%s s " + " WHERE date_submitted IS NOT NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 + " ;", + wc->uuid_str, wc->uuid_str, first_sequence, last_sequence); + db_execute(buffer_tostring(sql)); + db_unlock(); + + aclk_chart_inst_and_dim_update(payload_list, payload_list_size, is_dim, position_list, wc->batch_id); + wc->chart_sequence_id = last_sequence; + wc->chart_timestamp = last_timestamp; + } + --loop; + } + + for (int i = 0; i <= limit; ++i) + freez(payload_list[i]); + + freez(payload_list); + freez(payload_list_size); + freez(position_list); + freez(is_dim); + +bind_fail: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when pushing chart events, rc = %d", rc); + + buffer_free(sql); + freez(claim_id); +#else + UNUSED(wc); + UNUSED(cmd); +#endif + return; +} + + +// Push one chart config to the cloud +int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ + UNUSED(wc); +#ifdef ACLK_NG + + CHECK_SQLITE_CONNECTION(db_meta); + + sqlite3_stmt *res = NULL; + int rc = 0; + + char *hash_id = (char *) cmd.data_param; + + uuid_t hash_uuid; + rc = uuid_parse(hash_id, hash_uuid); + + if (unlikely(rc)) { + freez((char *) cmd.data_param); + return 1; + } + + BUFFER *sql = buffer_create(1024); + buffer_sprintf(sql, "SELECT type, family, context, title, priority, plugin, module, unit, chart_type " \ + "FROM chart_hash WHERE hash_id = @hash_id;"); + + rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to fetch a chart hash configuration"); + goto fail; + } + + rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + struct chart_config_updated chart_config; + chart_config.config_hash = NULL; + |