summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2020-08-23 22:42:08 +0300
committerStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2020-08-23 22:42:08 +0300
commit4886eee8f5e5c0ef37768e7cd6332cd4591a9539 (patch)
tree8b75c91cbda967f53eb4e71a57b73333b57afd49
parent8ebc1831d325685349df8709fbbe49eaf527d755 (diff)
Add code to store datafile info
Add code to record page location (disabled for now) Add code to migrate page to SQLite (disabled) Define WAL and sync mode normal
-rw-r--r--database/engine/datafile.c1
-rw-r--r--database/engine/journalfile.c4
-rw-r--r--database/engine/rrdengine.c2
-rw-r--r--database/sqlite/sqlite_functions.c211
-rw-r--r--database/sqlite/sqlite_functions.h4
5 files changed, 210 insertions, 12 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 01f6863d55..160ed3ae8d 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -232,6 +232,7 @@ static int load_data_file(struct rrdengine_datafile *datafile)
datafile->file = file;
datafile->pos = file_size;
+ sql_store_datafile_info(path, file, file_size);
info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
return 0;
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 0704c830f7..66a19994a6 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -289,6 +289,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
page_index = *PValue;
}
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ char dim_str[37];
+ uuid_unparse_lower(temp_id, dim_str);
if (NULL == PValue) {
/* First time we see the UUID */
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
@@ -304,6 +306,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
descr->page_length = jf_metric_data->descr[i].page_length;
descr->start_time = jf_metric_data->descr[i].start_time;
descr->end_time = jf_metric_data->descr[i].end_time;
+ // info("JOURNAL: Page %d metric %s OFFSET(%llu, size=%d) (%llu - %llu) file %d", valid_pages, dim_str, my_offset, descr->page_length, descr->start_time, descr->end_time, extent->datafile->file);
+ sql_store_page_info(temp_id, valid_pages, descr->page_length, descr->start_time, descr->end_time, extent->datafile->file, extent->offset, extent->size);
descr->id = &page_index->id;
descr->extent = extent;
extent->pages[valid_pages++] = descr;
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index abd97952a8..f2d88e2128 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -125,6 +125,8 @@ after_crc_check:
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
+ // TODO: Migrate page to the SQLite
+ //sql_add_metric_page_from_extent(descr);
pg_cache_descr->flags |= RRD_PAGE_POPULATED;
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
rrdeng_page_descr_mutex_unlock(ctx, descr);
diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c
index 7293f75a68..9676073cf9 100644
--- a/database/sqlite/sqlite_functions.c
+++ b/database/sqlite/sqlite_functions.c
@@ -70,7 +70,7 @@ int sql_init_database()
int rc = sqlite3_open("/tmp/database", &db);
info("SQLite Database initialized (rc = %d)", rc);
- char *sql = "PRAGMA synchronous=0 ; CREATE TABLE IF NOT EXISTS dimension(dim_uuid blob PRIMARY KEY, chart_uuid blob, id text, name text, multiplier int, divisor int , algorithm int, archived int, options text);";
+ char *sql = "PRAGMA synchronous=1 ; PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS dimension(dim_uuid blob PRIMARY KEY, chart_uuid blob, id text, name text, multiplier int, divisor int , algorithm int, archived int, options text);";
rc = sqlite3_exec(db, sql, 0, 0, &err_msg);
@@ -87,6 +87,30 @@ int sql_init_database()
sqlite3_free(err_msg);
}
+ rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS datafile (fileno integer primary key, path text, file_size int); delete from datafile;", 0, 0, &err_msg);
+
+ if (rc != SQLITE_OK) {
+ error("SQL error: %s", err_msg);
+ sqlite3_free(err_msg);
+ sqlite3_close(db);
+ }
+
+ rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS page (dim_uuid blob, page int , page_size int, start_date int, end_date int, fileno int, offset int, size int); delete from page;", 0, 0, &err_msg);
+
+ if (rc != SQLITE_OK) {
+ error("SQL error: %s", err_msg);
+ sqlite3_free(err_msg);
+ sqlite3_close(db);
+ }
+
+ rc = sqlite3_exec(db, "create unique index if not exists ind_page on page (dim_uuid, start_date);", 0, 0, &err_msg);
+
+ if (rc != SQLITE_OK) {
+ error("SQL error: %s", err_msg);
+ sqlite3_free(err_msg);
+ }
+
+
// EXAMPLE -- Creating RAM database and attaching
// rc = sqlite3_exec(db, "ATTACH ':memory:' as ram;", 0, 0, &err_msg);
// if (rc != SQLITE_OK) {
@@ -114,7 +138,27 @@ int sql_init_database()
sqlite3_free(err_msg);
}
- rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS metric_page(key_id integer primary key, dim_uuid blob, entries int, start_date int, end_date int, metric blob);", 0, 0, &err_msg);
+ rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS metric_page(dim_uuid blob, entries int, start_date int, end_date int, metric blob);", 0, 0, &err_msg);
+ if (rc != SQLITE_OK) {
+ error("SQL error: %s", err_msg);
+ sqlite3_free(err_msg);
+ }
+
+ rc = sqlite3_exec(db, "create unique index if not exists ind_metric_page on metric_page (dim_uuid, start_date);", 0, 0, &err_msg);
+
+ if (rc != SQLITE_OK) {
+ error("SQL error: %s", err_msg);
+ sqlite3_free(err_msg);
+ }
+
+ rc = sqlite3_exec(db, "CREATE TABLE IF NOT EXISTS metric_migrate(dim_uuid blob, entries int, start_date int, end_date int, metric blob);", 0, 0, &err_msg);
+ if (rc != SQLITE_OK) {
+ error("SQL error: %s", err_msg);
+ sqlite3_free(err_msg);
+ }
+
+ rc = sqlite3_exec(db, "create unique index if not exists ind_metric_migrate on metric_migrate (dim_uuid, start_date);", 0, 0, &err_msg);
+
if (rc != SQLITE_OK) {
error("SQL error: %s", err_msg);
sqlite3_free(err_msg);
@@ -152,7 +196,7 @@ void sql_backup_database()
char sql[512];
- sprintf(sql,"VACUUM into '/tmp/database.%u'", time(NULL));
+ sprintf(sql,"VACUUM into '/tmp/database.%lu'", time(NULL));
int rc = sqlite3_exec(db, sql, 0, 0, &err_msg);
@@ -270,7 +314,7 @@ int sql_dimension_options(uuid_t *dim_uuid, char *options)
int sql_create_dimension(char *dim_str, RRDSET *st)
{
- char sql[1024];
+ //char sql[1024];
uuid_t dim_uuid;
sqlite3_stmt *res;
int rc;
@@ -608,12 +652,10 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr)
uint32_t entries = descr->page_length / sizeof(storage_number);
uint32_t *metric = descr->pg_cache_descr->page;
//uint32_t dt = 0;
- time_t start_time = descr->start_time/ USEC_PER_SEC;
- //if (entries > 1)
- // dt = ((descr->end_time - descr->start_time) / USEC_PER_SEC) / (entries - 1);
+ //time_t start_time = descr->start_time/ USEC_PER_SEC;
+
if (!res) {
- //snprintf(sql, 256, "insert into metric (dim_uuid, date_created, value) values (@dim_uuid, @date, @value);");
rc = sqlite3_prepare_v2(db, "insert or replace into metric_update (dim_uuid, date_created) values (@dim_uuid, @date);", -1, &res, 0);
if (rc != SQLITE_OK) {
info("SQLITE: Failed to prepare statement");
@@ -621,7 +663,6 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr)
}
dim_id = sqlite3_bind_parameter_index(res, "@dim_uuid");
date_id = sqlite3_bind_parameter_index(res, "@date");
- //value_id = sqlite3_bind_parameter_index(res, "@value");
}
if (!res_page) {
@@ -636,6 +677,65 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr)
rc = sqlite3_bind_blob(res, dim_id, dim_uuid, 16, SQLITE_TRANSIENT);
rc = sqlite3_bind_int(res, date_id, descr->end_time / USEC_PER_SEC);
+ void *compressed_buf = NULL;
+ int max_compressed_size = LZ4_compressBound(descr->page_length);
+ compressed_buf = mallocz(max_compressed_size);
+
+ int compressed_size = LZ4_compress_default(metric, compressed_buf, descr->page_length, max_compressed_size);
+
+ rc = sqlite3_bind_int(res_page, 1, entries);
+ rc = sqlite3_bind_blob(res_page, 2, dim_uuid, 16, SQLITE_TRANSIENT);
+ rc = sqlite3_bind_int64(res_page, 3, descr->start_time);
+ rc = sqlite3_bind_int64(res_page, 4, descr->end_time);
+ rc = sqlite3_bind_blob(res_page, metric_id, compressed_buf, compressed_size, SQLITE_TRANSIENT);
+
+ unsigned long long start = now_realtime_usec();
+ sqlite3_step(res);
+ sqlite3_reset(res);
+ sqlite3_step(res_page);
+ sqlite3_reset(res_page);
+ unsigned long long end = now_realtime_usec();
+ info("SQLITE: PAGE in %llu usec (%d -> %d bytes) entries=%d", end-start, descr->page_length, compressed_size, entries);
+
+ freez(compressed_buf);
+ return;
+}
+
+void sql_add_metric_page_from_extent(struct rrdeng_page_descr *descr)
+{
+ char *err_msg = NULL;
+ char dim_str[37];
+ int rc;
+ static sqlite3_stmt *res = NULL;
+ static sqlite3_stmt *res_page = NULL;
+ static int dim_id, date_id;
+ static int metric_id;
+ static int level = 0;
+
+ if (!descr->page_length) {
+ info("SQLITE: Empty page");
+ return;
+ }
+ level++;
+
+ uuid_unparse_lower(descr->id, dim_str);
+ uint32_t entries = descr->page_length / sizeof(storage_number);
+ uint32_t *metric = descr->pg_cache_descr->page;
+
+ if (!res_page) {
+ rc = sqlite3_prepare_v2(
+ db,
+ "insert or replace into metric_migrate (entries, dim_uuid, start_date, end_date, metric) values (@entries, @dim, @start_date, @end_date, @page);",
+ -1, &res_page, 0);
+ if (rc != SQLITE_OK) {
+ info("SQLITE: Failed to prepare statement for metric page");
+ return;
+ }
+ metric_id = sqlite3_bind_parameter_index(res_page, "@page");
+ }
+
+ rc = sqlite3_bind_blob(res, dim_id, descr->id, 16, SQLITE_TRANSIENT);
+ rc = sqlite3_bind_int(res, date_id, descr->end_time / USEC_PER_SEC);
void *compressed_buf = NULL;
int max_compressed_size = LZ4_compressBound(descr->page_length);
@@ -647,7 +747,7 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr)
rc = sqlite3_bind_int64(res_page, 3, descr->start_time);
rc = sqlite3_bind_int64(res_page, 4, descr->end_time);
rc = sqlite3_bind_blob(res_page, metric_id, compressed_buf, compressed_size, SQLITE_TRANSIENT);
- rc = sqlite3_bind_blob(res_page, 2 , dim_uuid, 16, SQLITE_TRANSIENT);
+ rc = sqlite3_bind_blob(res_page, 2, descr->id, 16, SQLITE_TRANSIENT);
freez(compressed_buf);
@@ -657,6 +757,93 @@ void sql_add_metric_page(uuid_t *dim_uuid, struct rrdeng_page_descr *descr)
sqlite3_step(res_page);
sqlite3_reset(res_page);
unsigned long long end = now_realtime_usec();
- info("SQLITE: PAGE in %llu usec (%d -> %d bytes) (max computed %d) entries=%d", end-start, descr->page_length, compressed_size, max_compressed_size, entries);
+ info(
+ "SQLITE: PAGE in %llu usec (%d -> %d bytes) (max computed %d) entries=%d (level - %d)", end - start,
+ descr->page_length, compressed_size, max_compressed_size, entries, level);
+ level--;
+ return;
+}
+
+void sql_store_datafile_info(char *path, int fileno, size_t file_size)
+{
+ char sql[512];
+ char *err_msg = NULL;
+ sprintf(sql, "INSERT OR REPLACE into datafile (fileno, path , file_size ) values (%u, '%s', %u);",
+ fileno, path, file_size);
+ int rc = sqlite3_exec(db, sql, 0, 0, &err_msg);
+
+ if (rc != SQLITE_OK) {
+ error("SQL error: %s", err_msg);
+ sqlite3_free(err_msg);
+ }
+ return;
+}
+
+void sql_store_page_info(uuid_t dim_uuid, int valid_page, int page_length, usec_t start_time, usec_t end_time, int fileno, size_t offset, size_t size)
+{
+ char sql[512];
+ char *err_msg = NULL;
+ static sqlite3_stmt *res = NULL;
+ static int last_fileno = 0;
+ static int last_offset = 0;
+ static char *buf = NULL;
+ static char *uncompressed_buf = NULL;
+ static void *compressed_buf = NULL;
+ static int max_compressed_size = 0;
+
return;
-} \ No newline at end of file
+
+ if (!res) {
+ int rc = sqlite3_prepare_v2(
+ db,
+ "INSERT OR REPLACE into page (dim_uuid, page , page_size, start_date, end_date, fileno, offset, size) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8);",
+ -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ info("SQLITE: Failed to prepare statement for metric page");
+ return;
+ }
+ }
+
+// if (last_fileno != fileno || last_offset != offset) {
+// freez(buf);
+// freez(uncompressed_buf);
+// freez(compressed_buf);
+// //buf = malloc(size);
+// size_t old_pos = lseek(fileno, 0, SEEK_CUR);
+// int new_pos = lseek(fileno, offset, SEEK_SET);
+// posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size));
+// int rc = read(fileno, buf, size);
+// if (rc < 0)
+// error("Cant ready the extent");
+// lseek(fileno, old_pos, SEEK_SET);
+// uncompressed_buf = malloc(64 * page_length + 128);
+// int ret = LZ4_decompress_safe((char *) buf, (char *) uncompressed_buf, size, 64 * page_length);
+// info("Read %d bytes -- Uncompressed extent, new size = %d (old file pos %llu , new file pos %llu)", rc, ret, old_pos, new_pos);
+// max_compressed_size = LZ4_compressBound(page_length);
+// compressed_buf = mallocz(max_compressed_size);
+// last_fileno = fileno;
+// last_offset = offset;
+// }
+// // Uncompress it
+//
+// int compressed_size = LZ4_compress_default(uncompressed_buf+valid_page * page_length, compressed_buf, page_length, max_compressed_size);
+// info("Compressed size for page %d = %d", valid_page, compressed_size);
+
+ int rc = sqlite3_bind_blob(res, 1 , dim_uuid, 16, SQLITE_TRANSIENT);
+ rc = sqlite3_bind_int(res, 2, valid_page);
+ rc = sqlite3_bind_int(res, 3, page_length);
+ rc = sqlite3_bind_int64(res, 4, start_time);
+ rc = sqlite3_bind_int64(res, 5, end_time);
+ rc = sqlite3_bind_int(res, 6, fileno);
+ rc = sqlite3_bind_int64(res, 7, offset);
+ rc = sqlite3_bind_int64(res, 8, size);
+ //rc = sqlite3_bind_blob(res, 9, compressed_buf, compressed_size, SQLITE_TRANSIENT);
+
+ //free(compressed_buf);
+
+ sqlite3_step(res);
+ sqlite3_reset(res);
+ return;
+}
+//
+// sql_store_page_info(temp_id, valid_page, descr->page_length, descr->start_time, descr->end_time, extent->datafile->file, extent->offset); \ No newline at end of file
diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h
index 6f09fbaf89..39f831010a 100644
--- a/database/sqlite/sqlite_functions.h
+++ b/database/sqlite/sqlite_functions.h
@@ -41,6 +41,10 @@ extern char *sql_find_dim_uuid(RRDSET *st, char *id, char *name);
extern void sql_sync_ram_db();
extern void sql_backup_database();
extern void sql_compact_database();
+extern void sql_store_datafile_info(char *path, int fileno, size_t file_size);
+extern void sql_store_page_info(uuid_t temp_id, int valid_page, int page_length, usec_t start_time, usec_t end_time, int , size_t offset, size_t size);
+extern void sql_add_metric_page_from_extent(struct rrdeng_page_descr *descr);
+
#endif //NETDATA_SQLITE_FUNCTIONS_H