diff options
author | Harel Ben-Attia <harelba@gmail.com> | 2017-11-22 20:37:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-22 20:37:13 +0200 |
commit | ea99c0133844596c2ba4ad531a58a10a376330fb (patch) | |
tree | c0c3c24f734364da65fe3d99312beeb2e81236fe | |
parent | ab509c7bcba8e2aa20cb9b3234124fabd39b7d30 (diff) | |
parent | db470503e39cab20e949ac6bd43d0d44a89ed7be (diff) |
Merge pull request #159 from harelba/save-db-to-disk
-rw-r--r-- | .gitignore | 1 | ||||
-rwxr-xr-x | bin/q | 120 | ||||
-rwxr-xr-x | test/test-suite | 53 |
3 files changed, 151 insertions, 23 deletions
@@ -11,3 +11,4 @@ win_build packages .idea/ dist/windows/ +dist/ @@ -27,7 +27,7 @@ # # Run with --help for command line details # -q_version = "1.6.3" +q_version = "1.6.3" __all__ = [ 'QTextAsData' ] @@ -67,7 +67,7 @@ def sha1(data): if not isinstance(data,str) and not isinstance(data,unicode): return hashlib.sha1(str(data)).hexdigest() return hashlib.sha1(data).hexdigest() - + def regexp(regular_expression, data): if data is not None: if not isinstance(data, str) and not isinstance(data, unicode): @@ -103,7 +103,7 @@ class StrictPercentile(object): def finalize(self): if len(self.values) == 0 or (self.p < 0 or self.p > 1): - return None + return None else: return percentile(sorted(self.values),self.p) @@ -119,6 +119,43 @@ class Sqlite3DB(object): self.numeric_column_types = set([int, long, float]) self.add_user_functions() + def done(self): + self.conn.commit() + + def store_db_to_disk_standard(self,sqlite_db_filename,table_names_mapping): + new_db = sqlite3.connect(sqlite_db_filename,isolation_level=None) + c = new_db.cursor() + for s in self.conn.iterdump(): + c.execute(s) + results = c.fetchall() + #print "executed %s results %s " % (s,results) + for source_filename_str,tn in table_names_mapping.iteritems(): + c.execute('alter table `%s` rename to `%s`' % (tn, source_filename_str)) + new_db.close() + + def store_db_to_disk_fast(self,sqlite_db_filename,table_names_mapping): + try: + import sqlitebck + except ImportError, e: + msg = "sqlitebck python module cannot be found - fast store to disk cannot be performed" + print >>sys.stderr,msg + raise ValueError(msg) + + new_db = sqlite3.connect(sqlite_db_filename) + sqlitebck.copy(self.conn,new_db) + c = new_db.cursor() + for source_filename_str,tn in table_names_mapping.iteritems(): + c.execute('alter table `%s` rename to `%s`' % (tn, source_filename_str)) + new_db.close() + + def store_db_to_disk(self,sqlite_db_filename,table_names_mapping,method='standard'): + if method == 'standard': + self.store_db_to_disk_standard(sqlite_db_filename,table_names_mapping) + elif method == 'fast': + self.store_db_to_disk_fast(sqlite_db_filename,table_names_mapping) + else: + raise ValueError('Unknown store-db-to-disk method %s' % method) + def add_user_functions(self): self.conn.create_function("regexp", 2, regexp) self.conn.create_function("sha1", 1, sha1) @@ -192,7 +229,8 @@ class Sqlite3DB(object): def generate_temp_table_name(self): self.last_temp_table_id += 1 - return "temp_table_%s" % self.last_temp_table_id + tn = "temp_table_%s" % self.last_temp_table_id + return tn def generate_drop_table(self, table_name): return "DROP TABLE %s" % table_name @@ -370,7 +408,7 @@ class Sql(object): self.qtable_name_effective_table_names[ qtable_name] = effective_table_name - def get_effective_sql(self): + def get_effective_sql(self,original_names=False): if len(filter(lambda x: x is None, self.qtable_name_effective_table_names)) != 0: raise Exception('There are qtables without effective tables') @@ -378,11 +416,17 @@ class Sql(object): for qtable_name, positions in self.qtable_name_positions.iteritems(): for pos in positions: - effective_sql[pos] = self.qtable_name_effective_table_names[ - qtable_name] + if not original_names: + effective_sql[pos] = self.qtable_name_effective_table_names[ + qtable_name] + else: + effective_sql[pos] = "`%s`" % qtable_name return " ".join(effective_sql) + def get_qtable_name_effective_table_names(self): + return self.qtable_name_effective_table_names + def execute_and_fetch(self, db): db_results_obj = db.execute_and_fetch(self.get_effective_sql()) return db_results_obj @@ -840,13 +884,13 @@ class TableCreator(object): self.column_inferer.force_analysis() self._do_create_table() - + if total_data_lines_read == 0: raise EmptyDataException() def populate(self,dialect,stop_after_analysis=False): if self.state == TableCreatorState.NEW: - self._pre_populate(dialect) + self._pre_populate(dialect) self.state = TableCreatorState.INITIALIZED if self.state == TableCreatorState.INITIALIZED: @@ -926,7 +970,7 @@ class TableCreator(object): if self.mode == 'relaxed': if actual_col_count > expected_col_count: xxx = col_vals[:expected_col_count - 1] + \ - [self.input_delimiter.join([v if v is not None else '' for v in + [self.input_delimiter.join([v if v is not None else '' for v in col_vals[expected_col_count - 1:]])] return xxx else: @@ -989,7 +1033,7 @@ class TableCreator(object): # Guard against empty tables (instead of preventing the creation, just create with a dummy column) if len(column_dict) == 0: - column_dict = { 'dummy_column_for_empty_tables' : str } + column_dict = { 'dummy_column_for_empty_tables' : str } ordered_column_names = [ 'dummy_column_for_empty_tables' ] else: ordered_column_names = self.column_inferer.get_column_names() @@ -1157,7 +1201,7 @@ class QTextAsData(object): input_quoting_modes = { 'minimal' : csv.QUOTE_MINIMAL, 'all' : csv.QUOTE_ALL, - # nonnumeric is not supported for input quoting modes, since we determine the data types + # nonnumeric is not supported for input quoting modes, since we determine the data types # ourselves instead of letting the csv module try to identify the types 'none' : csv.QUOTE_NONE } @@ -1203,7 +1247,7 @@ class QTextAsData(object): # Create the matching database table and populate it table_creator = TableCreator( self.db, filename, line_splitter, input_params.skip_header, input_params.gzipped_input, input_params.with_universal_newlines,input_params.input_encoding, - mode=input_params.parsing_mode, expected_column_count=input_params.expected_column_count, + mode=input_params.parsing_mode, expected_column_count=input_params.expected_column_count, input_delimiter=input_params.delimiter,disable_column_type_detection=input_params.disable_column_type_detection, stdin_file = stdin_file,stdin_filename = stdin_filename) @@ -1239,7 +1283,7 @@ class QTextAsData(object): for filename in sql_object.qtable_names: sql_object.set_effective_table_name(filename,self.table_creators[filename].table_name) - def _execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-',stop_after_analysis=False): + def _execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-',stop_after_analysis=False,save_db_to_disk_filename=None,save_db_to_disk_method=None): warnings = [] error = None data_loads = [] @@ -1254,24 +1298,37 @@ class QTextAsData(object): # Hueristic attempt to auto convert the query to unicode before failing query_str = query_str.decode('utf-8') except: - error = QError(EncodedQueryException(),"Query should be in unicode. Please make sure to provide a unicode literal string or decode it using proper the character encoding.",91) + error = QError(EncodedQueryException(''),"Query should be in unicode. Please make sure to provide a unicode literal string or decode it using proper the character encoding.",91) return QOutput(error = error) - # Create SQL statment + # Create SQL statement sql_object = Sql('%s' % query_str) try: + load_start_time = time.time() data_loads += self._ensure_data_is_loaded(sql_object,effective_input_params,stdin_file=stdin_file,stdin_filename=stdin_filename,stop_after_analysis=stop_after_analysis) table_structures = self._create_table_structures_list() self.materialize_sql_object(sql_object) + if save_db_to_disk_filename is not None: + self.db.done() + dump_start_time = time.time() + print >>sys.stderr,"Data has been loaded in %4.3f seconds" % (dump_start_time - load_start_time) + print >>sys.stderr,"Saving data to db file %s" % save_db_to_disk_filename + self.db.store_db_to_disk(save_db_to_disk_filename,sql_object.get_qtable_name_effective_table_names(),save_db_to_disk_method) + print >>sys.stderr,"Data has been saved into %s . Saving has taken %4.3f seconds" % (save_db_to_disk_filename,time.time()-dump_start_time) + print >>sys.stderr,"Query to run on the database: %s;" % sql_object.get_effective_sql(True) + # TODO Propagate dump results using a different output class instead of an empty one + + return QOutput() + # Execute the query and fetch the data db_results_obj = sql_object.execute_and_fetch(self.db) return QOutput( - data = db_results_obj.results, + data = db_results_obj.results, metadata = QMetadata( table_structures=table_structures, output_column_name_list=db_results_obj.query_column_names, @@ -1315,8 +1372,8 @@ class QTextAsData(object): return QOutput(warnings = warnings,error = error , metadata=QMetadata(table_structures=table_structures,data_loads = data_loads)) - def execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-'): - return self._execute(query_str,input_params,stdin_file,stdin_filename,stop_after_analysis=False) + def execute(self,query_str,input_params=None,stdin_file=None,stdin_filename='-',save_db_to_disk_filename=None,save_db_to_disk_method=None): + return self._execute(query_str,input_params,stdin_file,stdin_filename,stop_after_analysis=False,save_db_to_disk_filename=save_db_to_disk_filename,save_db_to_disk_method=save_db_to_disk_method) def unload(self): @@ -1579,6 +1636,10 @@ def run_standalone(): help="Print version") parser.add_option("-V", "--verbose", dest="verbose", default=False, action="store_true", help="Print debug info in case of problems") + parser.add_option("-S", "--save-db-to-disk", dest="save_db_to_disk_filename", default=None, + help="Save database to an sqlite database file") + parser.add_option("", "--save-db-to-disk-method", dest="save_db_to_disk_method", default='standard', + help="Method to use to save db to disk. 'standard' does not require any deps, 'fast' currenty requires manually running `pip install sqlitebck` on your python installation. Once packing issues are solved, the fast method will be the default.") #----------------------------------------------- input_data_option_group = OptionGroup(parser,"Input Data Options") input_data_option_group.add_option("-H", "--skip-header", dest="skip_header", default=default_skip_header, action="store_true", @@ -1613,7 +1674,7 @@ def run_standalone(): help="Expect universal newlines in the data. Limitation: -U works only with regular files for now, stdin or .gz files are not supported yet.") parser.add_option_group(input_data_option_group) #----------------------------------------------- - output_data_option_group = OptionGroup(parser,"Output Options") + output_data_option_group = OptionGroup(parser,"Output Options") output_data_option_group.add_option("-D", "--output-delimiter", dest="output_delimiter", default=default_output_delimiter, help="Field delimiter for output. If none specified, then the -d delimiter is used if present, or space if no delimiter is specified") output_data_option_group.add_option("-T", "--tab-delimited-output", dest="tab_delimited_output", default=False, action="store_true", @@ -1629,7 +1690,7 @@ def run_standalone(): help="Output quoting mode. Possible values are all, minimal, nonnumeric and none. Note the slightly misleading parameter name, and see the matching -w parameter for input quoting.") parser.add_option_group(output_data_option_group) #----------------------------------------------- - query_option_group = OptionGroup(parser,"Query Related Options") + query_option_group = OptionGroup(parser,"Query Related Options") query_option_group.add_option("-q", "--query-filename", dest="query_filename", default=None, help="Read query from the provided filename instead of the command line, possibly using the provided query encoding (using -Q).") query_option_group.add_option("-Q", "--query-encoding", dest="query_encoding", default=default_query_encoding, @@ -1742,6 +1803,21 @@ def run_standalone(): print >> sys.stderr, "Max column length limit must be a positive integer (%s)" % max_column_length_limit sys.exit(31) + if options.save_db_to_disk_filename is not None: + if options.analyze_only: + print >>sys.stderr,"Cannot save database to disk when running with -A (analyze-only) option." + sys.exit(119) + + print >>sys.stderr,"Going to save data into a disk database: %s" % options.save_db_to_disk_filename + if os.path.exists(options.save_db_to_disk_filename): + print >> sys.stderr, "Disk database file %s already exists." % options.save_db_to_disk_filename + sys.exit(77) + + if options.save_db_to_disk_method is not None: + if options.save_db_to_disk_method not in ['standard','fast']: + print >>sys.stderr,"save-db-to-disk method should be either standard or fast (%s)" % options.save_db_to_disk_method + sys.exit(78) + default_input_params = QInputParams(skip_header=options.skip_header, delimiter=options.delimiter, input_encoding=options.encoding, @@ -1770,7 +1846,7 @@ def run_standalone(): q_output = q_engine.analyze(query_str,stdin_file=sys.stdin) q_output_printer.print_analysis(STDOUT,sys.stderr,q_output) else: - q_output = q_engine.execute(query_str,stdin_file=sys.stdin) + q_output = q_engine.execute(query_str,stdin_file=sys.stdin,save_db_to_disk_filename=options.save_db_to_disk_filename,save_db_to_disk_method=options.save_db_to_disk_method) q_output_printer.print_output(STDOUT,sys.stderr,q_output) if q_output.status == 'error': diff --git a/test/test-suite b/test/test-suite index 38dc0ac..b5c0595 100755 --- a/test/test-suite +++ b/test/test-suite @@ -10,6 +10,7 @@ # import unittest +import random import json from json import JSONEncoder from subprocess import PIPE, Popen, STDOUT @@ -133,6 +134,55 @@ class AbstractQTestCase(unittest.TestCase): def cleanup(self, tmpfile): os.remove(tmpfile.name) + def random_tmp_filename(self,prefix,postfix): + # TODO Use more robust method for this + path = '/var/tmp' + return '%s/%s-%s.%s' % (path,prefix,random.randint(0,1000000000),postfix) + +class SaveDbToDiskTests(AbstractQTestCase): + + def test_store_to_disk(self): + db_filename = self.random_tmp_filename('store-to-disk','db') + self.assertFalse(os.path.exists(db_filename)) + + retcode, o, e = run_command('seq 1 1000 | ../bin/q "select count(*) from -" -c 1 -S %s' % db_filename) + + self.assertTrue(retcode == 0) + self.assertTrue(len(o) == 0) + self.assertTrue(len(e) == 5) + self.assertTrue(e[0].startswith('Going to save data')) + self.assertTrue(db_filename in e[0]) + self.assertTrue(e[1].startswith('Data has been loaded in')) + self.assertTrue(e[2].startswith('Saving data to db file')) + self.assertTrue(e[3].startswith('Data has been saved into')) + self.assertTrue(e[4] == 'Query to run on the database: select count(*) from `-`;') + + self.assertTrue(os.path.exists(db_filename)) + + sqlite_command = """echo 'select * from `-`;' | sqlite3 %s""" % db_filename + sqlite_retcode,sqlite_o,sqlite_e = run_command(sqlite_command) + self.assertTrue(sqlite_retcode == 0) + self.assertTrue(len(sqlite_o) == 1000) + self.assertTrue(len(sqlite_e) == 0) + + os.remove(db_filename) + + def test_preventing_db_overwrite(self): + db_filename = self.random_tmp_filename('store-to-disk', 'db') + self.assertFalse(os.path.exists(db_filename)) + + retcode, o, e = run_command('seq 1 1000 | ../bin/q "select count(*) from -" -c 1 -S %s' % db_filename) + + self.assertTrue(retcode == 0) + self.assertTrue(os.path.exists(db_filename)) + + retcode2, o2, e2 = run_command('seq 1 1000 | ../bin/q "select count(*) from -" -c 1 -S %s' % db_filename) + self.assertTrue(retcode2 != 0) + self.assertTrue(e2[0].startswith('Going to save data into a disk database')) + self.assertTrue(e2[1] == 'Disk database file %s already exists.' % db_filename) + + os.remove(db_filename) + class BasicTests(AbstractQTestCase): @@ -2235,7 +2285,8 @@ def suite(): sql = tl.loadTestsFromTestCase(SqlTests) formatting = tl.loadTestsFromTestCase(FormattingTests) basic_module_stuff = tl.loadTestsFromTestCase(BasicModuleTests) - return unittest.TestSuite([basic_module_stuff, basic_stuff, parsing_mode, sql, formatting]) + save_db_to_disk_tests = tl.loadTestsFromTestCase(SaveDbToDiskTests) + return unittest.TestSuite([basic_module_stuff, basic_stuff, parsing_mode, sql, formatting,save_db_to_disk_tests]) if __name__ == '__main__': test_runner = unittest.TextTestRunner(verbosity=2) |