summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarel Ben-Attia <harelba@gmail.com>2017-11-22 20:37:13 +0200
committerGitHub <noreply@github.com>2017-11-22 20:37:13 +0200
commitea99c0133844596c2ba4ad531a58a10a376330fb (patch)
treec0c3c24f734364da65fe3d99312beeb2e81236fe
parentab509c7bcba8e2aa20cb9b3234124fabd39b7d30 (diff)
parentdb470503e39cab20e949ac6bd43d0d44a89ed7be (diff)
Merge pull request #159 from harelba/save-db-to-disk
-rw-r--r--.gitignore1
-rwxr-xr-xbin/q120
-rwxr-xr-xtest/test-suite53
3 files changed, 151 insertions, 23 deletions
diff --git a/.gitignore b/.gitignore
index 8f68670..a1e0486 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,4 @@ win_build
packages
.idea/
dist/windows/
+dist/
diff --git a/bin/q b/bin/q
index c3b0182..dc95d16 100755
--- a/bin/q
+++ b/bin/q
@@ -27,7 +27,7 @@
#
# Run with --help for command line details
#
-q_version = "1.6.3"
+q_version = "1.6.3"
__all__ = [ 'QTextAsData' ]
@@ -67,7 +67,7 @@ def sha1(data):
if not isinstance(data,str) and not isinstance(data,unicode):
return hashlib.sha1(str(data)).hexdigest()
return hashlib.sha1(data).hexdigest()
-
+
def regexp(regular_expression, data):
if data is not None:
if not isinstance(data, str) and not isinstance(data, unicode):
@@ -103,7 +103,7 @@ class StrictPercentile(object):
def finalize(self):
if len(self.values) == 0 or (self.p < 0 or self.p > 1):
- return None
+ return None
else:
return percentile(sorted(self.values),self.p)
@@ -119,6 +119,43 @@ class Sqlite3DB(object):
self.numeric_column_types = set([int, long, float])
self.add_user_functions()
+ def done(self):
+ self.conn.commit()
+
+ def store_db_to_disk_standard(self,sqlite_db_filename,table_names_mapping):
+ new_db = sqlite3.connect(sqlite_db_filename,isolation_level=None)
+ c = new_db.cursor()
+ for s in self.conn.iterdump():
+ c.execute(s)
+ results = c.fetchall()
+ #print "executed %s results %s " % (s,results)
+ for source_filename_str,tn in table_names_mapping.iteritems():
+ c.execute('alter table `%s` rename to `%s`' % (tn, source_filename_str))
+ new_db.close()
+
+ def store_db_to_disk_fast(self,sqlite_db_filename,table_names_mapping):
+ try:
+ import sqlitebck
+ except ImportError, e:
+ msg = "sqlitebck python module cannot be found - fast store to disk cannot be performed"
+ print >>sys.stderr,msg
+ raise ValueError(msg)
+
+ new_db = sqlite3.connect(sqlite_db_filename)
+ sqlitebck.copy(self.conn,new_db)
+ c = new_db.cursor()
+ for source_filename_str,tn in table_names_mapping.iteritems():
+ c.execute('alter table `%s` rename to `%s`' % (tn, source_filename_str))
+ new_db.close()
+
+ def store_db_to_disk(self,sqlite_db_filename,table_names_mapping,method='standard'):
+ if method == 'standard':
+ self.store_db_to_disk_standard(sqlite_db_filename,table_names_mapping)
+ elif method == 'fast':
+ self.store_db_to_disk_fast(sqlite_db_filename,table_names_mapping)
+ else:
+ raise ValueError('Unknown store-db-to-disk method %s' % method)
+
def add_user_functions(self):
self.conn.create_function("regexp", 2, regexp)
self.conn.create_function("sha1", 1, sha1)
@@ -192,7 +229,8 @@ class Sqlite3DB(object):
def generate_temp_table_name(self):
self.last_temp_table_id += 1
- return "temp_table_%s" % self.last_temp_table_id
+ tn = "temp_table_%s" % self.last_temp_table_id
+ return tn
def generate_drop_table(self, table_name):
return "DROP TABLE %s" % table_name
@@ -370,7 +408,7 @@ class Sql(object):
self.qtable_name_effective_table_names[
qtable_name] = effective_table_name
- def get_effective_sql(self):
+ def get_effective_sql(self,original_names=False):
if len(filter(lambda x: x is None, self.qtable_name_effective_table_names)) != 0:
raise Exception('There are qtables without effective tables')
@@ -378,11 +416,17 @@ class Sql(object):
for qtable_name, positions in self.qtable_name_positions.iteritems():
for pos in positions:
- effective_sql[pos] = self.qtable_name_effective_table_names[
- qtable_name]
+ if not original_names:
+ effective_sql[pos] = self.qtable_name_effective_table_names[
+ qtable_name]
+ else:
+ effective_sql[pos] = "`%s`" % qtable_name
return " ".join(effective_sql)
+ def get_qtable_name_effective_table_names(self):
+ return self.qtable_name_effective_table_names
+
def execute_and_fetch(self, db):
db_results_obj = db.execute_and_fetch(self.get_effective_sql())
return db_results_obj
@@ -840,13 +884,13 @@ class TableCreator(object):
self.column_inferer.force_analysis()
self._do_create_table()
-
+
if total_data_lines_read == 0:
raise EmptyDataException()
def populate(self,dialect,stop_after_analysis=False):
if self.state == TableCreatorState.NEW:
- self._pre_populate(dialect)
+ self._pre_populate(dialect)
self.state = TableCreatorState.INITIALIZED
if self.state == TableCreatorState.INITIALIZED:
@@ -926,7 +970,7 @@ class TableCreator(object):
if self.mode == 'relaxed':
if actual_col_count > expected_col_count:
xxx = col_vals[:expected_col_count - 1] + \
- [self.input_delimiter.join([v if v is not None else '' for v in
+ [self.input_delimiter.join([v if v is not None else '' for v in
col_vals[expected_col_count - 1:]])]
return xxx
else:
@@ -989,7 +1033,7 @@ class TableCreator(object):
# Guard against empty tables (instead of preventing the creation, just create with a dummy column)
if len(column_dict) == 0:
- column_dict = { 'dummy_column_for_empty_tables' : str }
+ column_dict = { 'dummy_column_for_empty_tables' : str }
ordered_column_names = [ 'dummy_column_for_empty_tables' ]
else:
ordered_column_names = self.column_inferer.get_column_names()
@@ -1157,7 +1201,7 @@ class QTextAsData(object):
input_quoting_modes = { 'minimal' : csv.QUOTE_MINIMAL,
'all' : csv.QUOTE_ALL,
- # nonnumeric is not supported for input quoting modes, since we determine the data types
+ # nonnumeric is not supported for input quoting modes, since we determine the data types
# ourselves instead of letting the csv module try to identify the types
'none' : csv.QUOTE_NONE }
@@ -1203,7 +1247,7 @@ class QTextAsData(object):
# Create the matching database table and populate it
table_creator = TableCreator(
self.db, filename, line_splitter, input_params.skip_header, input_params.gzipped_input, input_params.with_universal_newlines,input_params.input_encoding,
- mode=input_params.parsing_mode, expected_column_count=input_params.expected_column_count,
+ mode=input_params.parsing_mode, expected_column_count=input_params.expected_column_count,
input_delimiter=input_params.delimiter,disable_column_type_detection=input_params.disable_column_type_detection,
stdin_file = stdin_file,stdin_filename = stdin_filename)
@@ -1239,7 +1283,7 @@ class QTextAsData(object):
for filename in sql_object.qtable_names:
sql_object.set_effective_table_name(filename,self.table_creators[filename].table_name)
- def _execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-',stop_after_analysis=False):
+ def _execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-',stop_after_analysis=False,save_db_to_disk_filename=None,save_db_to_disk_method=None):
warnings = []
error = None
data_loads = []
@@ -1254,24 +1298,37 @@ class QTextAsData(object):
# Hueristic attempt to auto convert the query to unicode before failing
query_str = query_str.decode('utf-8')
except:
- error = QError(EncodedQueryException(),"Query should be in unicode. Please make sure to provide a unicode literal string or decode it using proper the character encoding.",91)
+ error = QError(EncodedQueryException(''),"Query should be in unicode. Please make sure to provide a unicode literal string or decode it using proper the character encoding.",91)
return QOutput(error = error)
- # Create SQL statment
+ # Create SQL statement
sql_object = Sql('%s' % query_str)
try:
+ load_start_time = time.time()
data_loads += self._ensure_data_is_loaded(sql_object,effective_input_params,stdin_file=stdin_file,stdin_filename=stdin_filename,stop_after_analysis=stop_after_analysis)
table_structures = self._create_table_structures_list()
self.materialize_sql_object(sql_object)
+ if save_db_to_disk_filename is not None:
+ self.db.done()
+ dump_start_time = time.time()
+ print >>sys.stderr,"Data has been loaded in %4.3f seconds" % (dump_start_time - load_start_time)
+ print >>sys.stderr,"Saving data to db file %s" % save_db_to_disk_filename
+ self.db.store_db_to_disk(save_db_to_disk_filename,sql_object.get_qtable_name_effective_table_names(),save_db_to_disk_method)
+ print >>sys.stderr,"Data has been saved into %s . Saving has taken %4.3f seconds" % (save_db_to_disk_filename,time.time()-dump_start_time)
+ print >>sys.stderr,"Query to run on the database: %s;" % sql_object.get_effective_sql(True)
+ # TODO Propagate dump results using a different output class instead of an empty one
+
+ return QOutput()
+
# Execute the query and fetch the data
db_results_obj = sql_object.execute_and_fetch(self.db)
return QOutput(
- data = db_results_obj.results,
+ data = db_results_obj.results,
metadata = QMetadata(
table_structures=table_structures,
output_column_name_list=db_results_obj.query_column_names,
@@ -1315,8 +1372,8 @@ class QTextAsData(object):
return QOutput(warnings = warnings,error = error , metadata=QMetadata(table_structures=table_structures,data_loads = data_loads))
- def execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-'):
- return self._execute(query_str,input_params,stdin_file,stdin_filename,stop_after_analysis=False)
+ def execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-',save_db_to_disk_filename=None,save_db_to_disk_method=None):
+ return self._execute(query_str,input_params,stdin_file,stdin_filename,stop_after_analysis=False,save_db_to_disk_filename=save_db_to_disk_filename,save_db_to_disk_method=save_db_to_disk_method)
def unload(self):
@@ -1579,6 +1636,10 @@ def run_standalone():
help="Print version")
parser.add_option("-V", "--verbose", dest="verbose", default=False, action="store_true",
help="Print debug info in case of problems")
+ parser.add_option("-S", "--save-db-to-disk", dest="save_db_to_disk_filename", default=None,
+ help="Save database to an sqlite database file")
+ parser.add_option("", "--save-db-to-disk-method", dest="save_db_to_disk_method", default='standard',
+ help="Method to use to save db to disk. 'standard' does not require any deps, 'fast' currenty requires manually running `pip install sqlitebck` on your python installation. Once packing issues are solved, the fast method will be the default.")
#-----------------------------------------------
input_data_option_group = OptionGroup(parser,"Input Data Options")
input_data_option_group.add_option("-H", "--skip-header", dest="skip_header", default=default_skip_header, action="store_true",
@@ -1613,7 +1674,7 @@ def run_standalone():
help="Expect universal newlines in the data. Limitation: -U works only with regular files for now, stdin or .gz files are not supported yet.")
parser.add_option_group(input_data_option_group)
#-----------------------------------------------
- output_data_option_group = OptionGroup(parser,"Output Options")
+ output_data_option_group = OptionGroup(parser,"Output Options")
output_data_option_group.add_option("-D", "--output-delimiter", dest="output_delimiter", default=default_output_delimiter,
help="Field delimiter for output. If none specified, then the -d delimiter is used if present, or space if no delimiter is specified")
output_data_option_group.add_option("-T", "--tab-delimited-output", dest="tab_delimited_output", default=False, action="store_true",
@@ -1629,7 +1690,7 @@ def run_standalone():
help="Output quoting mode. Possible values are all, minimal, nonnumeric and none. Note the slightly misleading parameter name, and see the matching -w parameter for input quoting.")
parser.add_option_group(output_data_option_group)
#-----------------------------------------------
- query_option_group = OptionGroup(parser,"Query Related Options")
+ query_option_group = OptionGroup(parser,"Query Related Options")
query_option_group.add_option("-q", "--query-filename", dest="query_filename", default=None,
help="Read query from the provided filename instead of the command line, possibly using the provided query encoding (using -Q).")
query_option_group.add_option("-Q", "--query-encoding", dest="query_encoding", default=default_query_encoding,
@@ -1742,6 +1803,21 @@ def run_standalone():
print >> sys.stderr, "Max column length limit must be a positive integer (%s)" % max_column_length_limit
sys.exit(31)
+ if options.save_db_to_disk_filename is not None:
+ if options.analyze_only:
+ print >>sys.stderr,"Cannot save database to disk when running with -A (analyze-only) option."
+ sys.exit(119)
+
+ print >>sys.stderr,"Going to save data into a disk database: %s" % options.save_db_to_disk_filename
+ if os.path.exists(options.save_db_to_disk_filename):
+ print >> sys.stderr, "Disk database file %s already exists." % options.save_db_to_disk_filename
+ sys.exit(77)
+
+ if options.save_db_to_disk_method is not None:
+ if options.save_db_to_disk_method not in ['standard','fast']:
+ print >>sys.stderr,"save-db-to-disk method should be either standard or fast (%s)" % options.save_db_to_disk_method
+ sys.exit(78)
+
default_input_params = QInputParams(skip_header=options.skip_header,
delimiter=options.delimiter,
input_encoding=options.encoding,
@@ -1770,7 +1846,7 @@ def run_standalone():
q_output = q_engine.analyze(query_str,stdin_file=sys.stdin)
q_output_printer.print_analysis(STDOUT,sys.stderr,q_output)
else:
- q_output = q_engine.execute(query_str,stdin_file=sys.stdin)
+ q_output = q_engine.execute(query_str,stdin_file=sys.stdin,save_db_to_disk_filename=options.save_db_to_disk_filename,save_db_to_disk_method=options.save_db_to_disk_method)
q_output_printer.print_output(STDOUT,sys.stderr,q_output)
if q_output.status == 'error':
diff --git a/test/test-suite b/test/test-suite
index 38dc0ac..b5c0595 100755
--- a/test/test-suite
+++ b/test/test-suite
@@ -10,6 +10,7 @@
#
import unittest
+import random
import json
from json import JSONEncoder
from subprocess import PIPE, Popen, STDOUT
@@ -133,6 +134,55 @@ class AbstractQTestCase(unittest.TestCase):
def cleanup(self, tmpfile):
os.remove(tmpfile.name)
+ def random_tmp_filename(self,prefix,postfix):
+ # TODO Use more robust method for this
+ path = '/var/tmp'
+ return '%s/%s-%s.%s' % (path,prefix,random.randint(0,1000000000),postfix)
+
+class SaveDbToDiskTests(AbstractQTestCase):
+
+ def test_store_to_disk(self):
+ db_filename = self.random_tmp_filename('store-to-disk','db')
+ self.assertFalse(os.path.exists(db_filename))
+
+ retcode, o, e = run_command('seq 1 1000 | ../bin/q "select count(*) from -" -c 1 -S %s' % db_filename)
+
+ self.assertTrue(retcode == 0)
+ self.assertTrue(len(o) == 0)
+ self.assertTrue(len(e) == 5)
+ self.assertTrue(e[0].startswith('Going to save data'))
+ self.assertTrue(db_filename in e[0])
+ self.assertTrue(e[1].startswith('Data has been loaded in'))
+ self.assertTrue(e[2].startswith('Saving data to db file'))
+ self.assertTrue(e[3].startswith('Data has been saved into'))
+ self.assertTrue(e[4] == 'Query to run on the database: select count(*) from `-`;')
+
+ self.assertTrue(os.path.exists(db_filename))
+
+ sqlite_command = """echo 'select * from `-`;' | sqlite3 %s""" % db_filename
+ sqlite_retcode,sqlite_o,sqlite_e = run_command(sqlite_command)
+ self.assertTrue(sqlite_retcode == 0)
+ self.assertTrue(len(sqlite_o) == 1000)
+ self.assertTrue(len(sqlite_e) == 0)
+
+ os.remove(db_filename)
+
+ def test_preventing_db_overwrite(self):
+ db_filename = self.random_tmp_filename('store-to-disk', 'db')
+ self.assertFalse(os.path.exists(db_filename))
+
+ retcode, o, e = run_command('seq 1 1000 | ../bin/q "select count(*) from -" -c 1 -S %s' % db_filename)
+
+ self.assertTrue(retcode == 0)
+ self.assertTrue(os.path.exists(db_filename))
+
+ retcode2, o2, e2 = run_command('seq 1 1000 | ../bin/q "select count(*) from -" -c 1 -S %s' % db_filename)
+ self.assertTrue(retcode2 != 0)
+ self.assertTrue(e2[0].startswith('Going to save data into a disk database'))
+ self.assertTrue(e2[1] == 'Disk database file %s already exists.' % db_filename)
+
+ os.remove(db_filename)
+
class BasicTests(AbstractQTestCase):
@@ -2235,7 +2285,8 @@ def suite():
sql = tl.loadTestsFromTestCase(SqlTests)
formatting = tl.loadTestsFromTestCase(FormattingTests)
basic_module_stuff = tl.loadTestsFromTestCase(BasicModuleTests)
- return unittest.TestSuite([basic_module_stuff, basic_stuff, parsing_mode, sql, formatting])
+ save_db_to_disk_tests = tl.loadTestsFromTestCase(SaveDbToDiskTests)
+ return unittest.TestSuite([basic_module_stuff, basic_stuff, parsing_mode, sql, formatting,save_db_to_disk_tests])
if __name__ == '__main__':
test_runner = unittest.TextTestRunner(verbosity=2)