From 4886eee8f5e5c0ef37768e7cd6332cd4591a9539 Mon Sep 17 00:00:00 2001 From: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Date: Sun, 23 Aug 2020 22:42:08 +0300 Subject: Add code to store datafile info Add code to record page location (disabled for now) Add code to migrate page to SQLite (disabled) Define WAL and sync mode normal --- database/engine/datafile.c | 1 + database/engine/journalfile.c | 4 + database/engine/rrdengine.c | 2 + database/sqlite/sqlite_functions.c | 211 ++++++++++++++++++++++++++++++++++--- database/sqlite/sqlite_functions.h | 4 + 5 files changed, 210 insertions(+), 12 deletions(-) diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 01f6863d55..160ed3ae8d 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -232,6 +232,7 @@ static int load_data_file(struct rrdengine_datafile *datafile) datafile->file = file; datafile->pos = file_size; + sql_store_datafile_info(path, file, file_size); info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size); return 0; diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 0704c830f7..66a19994a6 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -289,6 +289,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden page_index = *PValue; } uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + char dim_str[37]; + uuid_unparse_lower(temp_id, dim_str); if (NULL == PValue) { /* First time we see the UUID */ uv_rwlock_wrlock(&pg_cache->metrics_index.lock); @@ -304,6 +306,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden descr->page_length = jf_metric_data->descr[i].page_length; descr->start_time = jf_metric_data->descr[i].start_time; descr->end_time = jf_metric_data->descr[i].end_time; + // info("JOURNAL: Page %d metric %s OFFSET(%llu, size=%d) (%llu - %llu) file %d", valid_pages, dim_str, my_offset, descr->page_length, descr->start_time, descr->end_time, extent->datafile->file); + sql_store_page_info(temp_id, valid_pages, descr->page_length, descr->start_time, descr->end_time, extent->datafile->file, extent->offset, extent->size); descr->id = &page_index->id; descr->extent = extent; extent->pages[valid_pages++] = descr; diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index abd97952a8..f2d88e2128 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -125,6 +125,8 @@ after_crc_check: rrdeng_page_descr_mutex_lock(ctx, descr); pg_cache_descr = descr->pg_cache_descr; pg_cache_descr->page = page; + // TODO: Migrate page to the SQLite + //sql_add_metric_page_from_extent(descr); pg_cache_descr->flags |= RRD_PAGE_POPULATED; pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; rrdeng_page_descr_mutex_unlock(ctx, descr); diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index 7293f75a68..9676073cf9 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -70,7 +70,7 @@ int sql_init_database() int rc = sqlite3_open("/tmp/database", &db); info("SQLite Database initialized (rc = %d)", rc); - char *sql = "PRAGMA synchronous=0 ; CREATE TABLE IF NOT EXISTS dimension(dim_uuid blob PRIMARY KEY, chart_uuid blob, id text, name text, multiplier int, divisor int , algorithm int, archived int, options text);"; + char *sql = "PRAGMA synchronous=1 ; PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS dimension(dim_uuid blob PRIMARY KEY, chart_uuid blob, id text, name text, multiplier int, divisor int , algorithm int, archived int, options text);"; rc = sqlite3_exec(db, sql, 0, 0, &err_msg); @@ -87,6 +87,30 @@ int sql_init_database() sqlite3_free(err_msg); } + rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS datafile (fileno integer primary key, path text, file_size int); delete from datafile;", 0, 0, &err_msg); + + if (rc != SQLITE_OK) { + error("SQL error: %s", err_msg); + sqlite3_free(err_msg); + sqlite3_close(db); + } + + rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS page (dim_uuid blob, page int , page_size int, start_date int, end_date int, fileno int, offset int, size int); delete from page;", 0, 0, &err_msg); + + if (rc != SQLITE_OK) { + error("SQL error: %s", err_msg); + sqlite3_free(err_msg); + sqlite3_close(db); + } + + rc = sqlite3_exec(db, "create unique index if not exists ind_page on page (dim_uuid, start_date);", 0, 0, &err_msg); + + if (rc != SQLITE_OK) { + error("SQL error: %s", err_msg); + sqlite3_free(err_msg); + } + + // EXAMPLE -- Creating RAM database and attaching // rc = sqlite3_exec(db, "ATTACH ':memory:' as ram;", 0, 0, &err_msg); // if (rc != SQLITE_OK) { @@ -114,7 +138,27 @@ int sql_init_database() sqlite3_free(err_msg); } - rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS metric_page(key_id integer primary key, dim_uuid blob, entries int, start_date int, end_date int, metric blob);", 0, 0, &err_msg); + rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS metric_page(dim_uuid blob, entries int, start_date int, end_date int, metric blob);", 0, 0, &err_msg); + if (rc != SQLITE_OK) { + error("SQL error: %s", err_msg); + sqlite3_free(err_msg); + } + + rc = sqlite3_exec(db, "create unique index if not exists ind_metric_page on metric_page (dim_uuid, start_date);", 0, 0, &err_msg); + + if (rc != SQLITE_OK) { + error("SQL error: %s", err_msg); + sqlite3_free(err_msg); + } + + rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS metric_migrate(dim_uuid blob, entries int, start_date int, end_date int, metric blob);", 0, 0, &err_msg); + if (rc != SQLITE_OK) { + error("SQL error: %s", err_msg); + sqlite3_free(err_msg); + } + + rc = sqlite3_exec(db, "create unique index if not exists ind_metric_migrate on metric_migrate (dim_uuid, start_date);", 0, 0, &err_msg); + if (rc != SQLITE_OK) { error("SQL error: %s", err_msg); sqlite3_free(err_msg); @@ -152,7 +196,7 @@ void sql_backup_database() char sql[512]; - sprintf(sql,"VACUUM into '/tmp/database.%u'", time(NULL)); + sprintf(sql,"VACUUM into '/tmp/database.%lu'", time(NULL)); int rc = sqlite3_exec(db, sql, 0, 0, &err_msg); @@ -270,7 +314,7 @@ int sql_dimension_options(uuid_t *dim_uuid, char *options) int sql_create_dimension(char *dim_str, RRDSET *st) { - char sql[1024]; + //char sql[1024]; uuid_t dim_uuid; sqlite3_stmt *res; int rc; @@ -608,12 +652,10 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr) uint32_t entries = descr->page_length / sizeof(storage_number); uint32_t *metric = descr->pg_cache_descr->page; //uint32_t dt = 0; - time_t start_time = descr->start_time/ USEC_PER_SEC; - //if (entries > 1) - // dt = ((descr->end_time - descr->start_time) / USEC_PER_SEC) / (entries - 1); + //time_t start_time = descr->start_time/ USEC_PER_SEC; + if (!res) { - //snprintf(sql, 256, "insert into metric (dim_uuid, date_created, value) values (@dim_uuid, @date, @value);"); rc = sqlite3_prepare_v2(db, "insert or replace into metric_update (dim_uuid, date_created) values (@dim_uuid, @date);", -1, &res, 0); if (rc != SQLITE_OK) { info("SQLITE: Failed to prepare statement"); @@ -621,7 +663,6 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr) } dim_id = sqlite3_bind_parameter_index(res, "@dim_uuid"); date_id = sqlite3_bind_parameter_index(res, "@date"); - //value_id = sqlite3_bind_parameter_index(res, "@value"); } if (!res_page) { @@ -636,6 +677,65 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr) rc = sqlite3_bind_blob(res, dim_id, dim_uuid, 16, SQLITE_TRANSIENT); rc = sqlite3_bind_int(res, date_id, descr->end_time / USEC_PER_SEC); + void *compressed_buf = NULL; + int max_compressed_size = LZ4_compressBound(descr->page_length); + compressed_buf = mallocz(max_compressed_size); + + int compressed_size = LZ4_compress_default(metric, compressed_buf, descr->page_length, max_compressed_size); + + rc = sqlite3_bind_int(res_page, 1, entries); + rc = sqlite3_bind_blob(res_page, 2, dim_uuid, 16, SQLITE_TRANSIENT); + rc = sqlite3_bind_int64(res_page, 3, descr->start_time); + rc = sqlite3_bind_int64(res_page, 4, descr->end_time); + rc = sqlite3_bind_blob(res_page, metric_id, compressed_buf, compressed_size, SQLITE_TRANSIENT); + + unsigned long long start = now_realtime_usec(); + sqlite3_step(res); + sqlite3_reset(res); + sqlite3_step(res_page); + sqlite3_reset(res_page); + unsigned long long end = now_realtime_usec(); + info("SQLITE: PAGE in %llu usec (%d -> %d bytes) entries=%d", end-start, descr->page_length, compressed_size, entries); + + freez(compressed_buf); + return; +} + +void sql_add_metric_page_from_extent(struct rrdeng_page_descr *descr) +{ + char *err_msg = NULL; + char dim_str[37]; + int rc; + static sqlite3_stmt *res = NULL; + static sqlite3_stmt *res_page = NULL; + static int dim_id, date_id; + static int metric_id; + static int level = 0; + + if (!descr->page_length) { + info("SQLITE: Empty page"); + return; + } + level++; + + uuid_unparse_lower(descr->id, dim_str); + uint32_t entries = descr->page_length / sizeof(storage_number); + uint32_t *metric = descr->pg_cache_descr->page; + + if (!res_page) { + rc = sqlite3_prepare_v2( + db, + "insert or replace into metric_migrate (entries, dim_uuid, start_date, end_date, metric) values (@entries, @dim, @start_date, @end_date, @page);", + -1, &res_page, 0); + if (rc != SQLITE_OK) { + info("SQLITE: Failed to prepare statement for metric page"); + return; + } + metric_id = sqlite3_bind_parameter_index(res_page, "@page"); + } + + rc = sqlite3_bind_blob(res, dim_id, descr->id, 16, SQLITE_TRANSIENT); + rc = sqlite3_bind_int(res, date_id, descr->end_time / USEC_PER_SEC); void *compressed_buf = NULL; int max_compressed_size = LZ4_compressBound(descr->page_length); @@ -647,7 +747,7 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr) rc = sqlite3_bind_int64(res_page, 3, descr->start_time); rc = sqlite3_bind_int64(res_page, 4, descr->end_time); rc = sqlite3_bind_blob(res_page, metric_id, compressed_buf, compressed_size, SQLITE_TRANSIENT); - rc = sqlite3_bind_blob(res_page, 2 , dim_uuid, 16, SQLITE_TRANSIENT); + rc = sqlite3_bind_blob(res_page, 2, descr->id, 16, SQLITE_TRANSIENT); freez(compressed_buf); @@ -657,6 +757,93 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr) sqlite3_step(res_page); sqlite3_reset(res_page); unsigned long long end = now_realtime_usec(); - info("SQLITE: PAGE in %llu usec (%d -> %d bytes) (max computed %d) entries=%d", end-start, descr->page_length, compressed_size, max_compressed_size, entries); + info( + "SQLITE: PAGE in %llu usec (%d -> %d bytes) (max computed %d) entries=%d (level - %d)", end - start, + descr->page_length, compressed_size, max_compressed_size, entries, level); + level--; + return; +} + +void sql_store_datafile_info(char *path, int fileno, size_t file_size) +{ + char sql[512]; + char *err_msg = NULL; + sprintf(sql, "INSERT OR REPLACE into datafile (fileno, path , file_size ) values (%u, '%s', %u);", + fileno, path, file_size); + int rc = sqlite3_exec(db, sql, 0, 0, &err_msg); + + if (rc != SQLITE_OK) { + error("SQL error: %s", err_msg); + sqlite3_free(err_msg); + } + return; +} + +void sql_store_page_info(uuid_t dim_uuid, int valid_page, int page_length, usec_t start_time, usec_t end_time, int fileno, size_t offset, size_t size) +{ + char sql[512]; + char *err_msg = NULL; + static sqlite3_stmt *res = NULL; + static int last_fileno = 0; + static int last_offset = 0; + static char *buf = NULL; + static char *uncompressed_buf = NULL; + static void *compressed_buf = NULL; + static int max_compressed_size = 0; + return; -} \ No newline at end of file + + if (!res) { + int rc = sqlite3_prepare_v2( + db, + "INSERT OR REPLACE into page (dim_uuid, page , page_size, start_date, end_date, fileno, offset, size) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8);", + -1, &res, 0); + if (rc != SQLITE_OK) { + info("SQLITE: Failed to prepare statement for metric page"); + return; + } + } + +// if (last_fileno != fileno || last_offset != offset) { +// freez(buf); +// freez(uncompressed_buf); +// freez(compressed_buf); +// //buf = malloc(size); +// size_t old_pos = lseek(fileno, 0, SEEK_CUR); +// int new_pos = lseek(fileno, offset, SEEK_SET); +// posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size)); +// int rc = read(fileno, buf, size); +// if (rc < 0) +// error("Cant ready the extent"); +// lseek(fileno, old_pos, SEEK_SET); +// uncompressed_buf = malloc(64 * page_length + 128); +// int ret = LZ4_decompress_safe((char *) buf, (char *) uncompressed_buf, size, 64 * page_length); +// info("Read %d bytes -- Uncompressed extent, new size = %d (old file pos %llu , new file pos %llu)", rc, ret, old_pos, new_pos); +// max_compressed_size = LZ4_compressBound(page_length); +// compressed_buf = mallocz(max_compressed_size); +// last_fileno = fileno; +// last_offset = offset; +// } +// // Uncompress it +// +// int compressed_size = LZ4_compress_default(uncompressed_buf+valid_page * page_length, compressed_buf, page_length, max_compressed_size); +// info("Compressed size for page %d = %d", valid_page, compressed_size); + + int rc = sqlite3_bind_blob(res, 1 , dim_uuid, 16, SQLITE_TRANSIENT); + rc = sqlite3_bind_int(res, 2, valid_page); + rc = sqlite3_bind_int(res, 3, page_length); + rc = sqlite3_bind_int64(res, 4, start_time); + rc = sqlite3_bind_int64(res, 5, end_time); + rc = sqlite3_bind_int(res, 6, fileno); + rc = sqlite3_bind_int64(res, 7, offset); + rc = sqlite3_bind_int64(res, 8, size); + //rc = sqlite3_bind_blob(res, 9, compressed_buf, compressed_size, SQLITE_TRANSIENT); + + //free(compressed_buf); + + sqlite3_step(res); + sqlite3_reset(res); + return; +} +// +// sql_store_page_info(temp_id, valid_page, descr->page_length, descr->start_time, descr->end_time, extent->datafile->file, extent->offset); \ No newline at end of file diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index 6f09fbaf89..39f831010a 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -41,6 +41,10 @@ extern char *sql_find_dim_uuid(RRDSET *st, char *id, char *name); extern void sql_sync_ram_db(); extern void sql_backup_database(); extern void sql_compact_database(); +extern void sql_store_datafile_info(char *path, int fileno, size_t file_size); +extern void sql_store_page_info(uuid_t temp_id, int valid_page, int page_length, usec_t start_time, usec_t end_time, int , size_t offset, size_t size); +extern void sql_add_metric_page_from_extent(struct rrdeng_page_descr *descr); + #endif //NETDATA_SQLITE_FUNCTIONS_H -- cgit v1.2.3