summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarel Ben-Attia <harel@coralogix.com>2021-06-11 11:18:46 +0300
committerHarel Ben-Attia <harel@coralogix.com>2021-06-11 11:18:46 +0300
commitf2866ed4b7e954030657694ba9939ebe46757f1c (patch)
treed9c8f004cc9fd8668af478372a45e5e6920eea44
parentb93af44b564c7a07ba5e23a6924f436697641e25 (diff)
all tests pass except basic module tests because of data-loads removaldata-load-removal-tests-failing
-rwxr-xr-xbin/q.py211
-rwxr-xr-xtest/test_suite.py8
2 files changed, 90 insertions, 129 deletions
diff --git a/bin/q.py b/bin/q.py
index 5dae98a..7e27a5a 100755
--- a/bin/q.py
+++ b/bin/q.py
@@ -1642,7 +1642,7 @@ class MaterializedQsqlState(object):
mdfs = MaterializedDelimitedFileState(self.qtable_name,self.atomic_fn,self.input_params,self.dialect_id,self.engine_id)
mdfs.initialize()
_,_, _, _ = mdfs.choose_db_to_use()
- _,_,_ = mdfs.make_data_available(stop_after_analysis=True)
+ _,_ = mdfs.make_data_available(stop_after_analysis=True)
# TODO RLRL PXPX original mdfs content is actually correct (with the 50... line). actual_content_signature from qsql file is
# now identical, not supposed to contain the 50... line
@@ -2337,58 +2337,69 @@ class QTextAsData(object):
def get_dialect_id(self,filename):
return 'q_dialect_%s' % filename
- def _open_files_and_get_mfss(self,qtable_name,input_params,dialect):
+ def _open_files_and_get_mfss(self,full_qtable_name,input_params,dialect):
materialized_file_dict = OrderedDict()
- data_stream = self.data_streams.get_for_filename(qtable_name)
- xprint("Found data stream %s" % data_stream)
+ qtable_name_parts = full_qtable_name.split('+')
- if data_stream is not None:
- ms = MaterialiedDataStreamState(qtable_name,None,input_params,dialect,self.engine_id,data_stream,stream_target_db=self.adhoc_db)
- # TODO RLRL Implement concatenated data streams logic
- materialized_file_dict[data_stream.stream_id] = [ms]
- else:
- qsql_filename, table_name = self.try_qsql_table_reference(qtable_name)
- if qsql_filename is not None:
- ms = MaterializedQsqlState(qtable_name,qsql_filename,qsql_filename=qsql_filename,table_name=table_name,
- engine_id=self.engine_id,input_params=input_params,dialect_id=dialect)
- materialized_file_dict['%s:::%s' % (qsql_filename,table_name)] = [ms]
+ for qtable_name in qtable_name_parts:
+ data_stream = self.data_streams.get_for_filename(qtable_name)
+ xprint("Found data stream %s" % data_stream)
+
+ if data_stream is not None:
+ ms = MaterialiedDataStreamState(qtable_name,None,input_params,dialect,self.engine_id,data_stream,stream_target_db=self.adhoc_db)
+ materialized_file_dict[data_stream.stream_id] = [ms]
else:
- materialized_file_list = self.materialize_file_list(qtable_name)
- # For each match
- for atomic_fn in materialized_file_list:
- # TODO RLRL Add test on not supporting select from qsql:::table+qsql:::table - should say "not supported"
- ms = MaterializedDelimitedFileState(qtable_name,atomic_fn,input_params,dialect,self.engine_id)
- # TODO RLRL Perhaps a test that shows the contract around using data streams along with concatenated files
- if atomic_fn not in materialized_file_dict:
- materialized_file_dict[atomic_fn] = [ms]
- else:
- materialized_file_dict[atomic_fn] = materialized_file_dict[atomic_fn] + [ms]
+ qsql_filename, table_name, original_filename = self.try_qsql_table_reference(qtable_name)
+ if qsql_filename is not None:
+ ms = MaterializedQsqlState(qtable_name,original_filename,qsql_filename=qsql_filename,table_name=table_name,
+ engine_id=self.engine_id,input_params=input_params,dialect_id=dialect)
+ materialized_file_dict['%s:::%s' % (qsql_filename,table_name)] = [ms]
+ else:
+ materialized_file_list = self.materialize_file_list(qtable_name)
+ # For each match
+ for atomic_fn in materialized_file_list:
+ # TODO RLRL Add test on not supporting select from qsql:::table+qsql:::table - should say "not supported"
+ ms = MaterializedDelimitedFileState(qtable_name,atomic_fn,input_params,dialect,self.engine_id)
+ # TODO RLRL Perhaps a test that shows the contract around using data streams along with concatenated files
+ if atomic_fn not in materialized_file_dict:
+ materialized_file_dict[atomic_fn] = [ms]
+ else:
+ materialized_file_dict[atomic_fn] = materialized_file_dict[atomic_fn] + [ms]
xprint("MS dict: %s" % str(materialized_file_dict))
return list([item for sublist in materialized_file_dict.values() for item in sublist])
def try_qsql_table_reference(self,filename):
+ # return contract is (qsql_filename, relevant_table_name, real_original_filename)
+
if ':::' in filename:
qsql_filename,table_name = filename.split(":::",1)
if os.path.exists(qsql_filename):
if not self.is_sqlite_file(qsql_filename):
# TODO RLRL Specific Exception and error handling.
raise Exception('Invalid sqlite file %s in table reference %s' % (qsql_filename,filename))
- return qsql_filename,table_name
+ return qsql_filename,table_name,qsql_filename
else:
raise Exception('Bug - Unhandled missing file logic')
else:
- if os.path.exists(filename) and self.is_sqlite_file(filename):
- return filename, 'temp_table_10001'
+ attempted_qsql_filename = '%s.qsql' % filename
+ if os.path.exists(filename) and self.is_sqlite_file(attempted_qsql_filename):
+ return attempted_qsql_filename, 'temp_table_10001', filename
else:
- attempted_qsql_filename = '%s.qsql' % filename
- if os.path.exists(attempted_qsql_filename) and self.is_sqlite_file(attempted_qsql_filename):
- return attempted_qsql_filename,'temp_table_10001'
- return None, None
+ if self.is_sqlite_file(filename):
+ return filename, 'temp_table_10001', filename
+ else:
+ if self.is_sqlite_file(attempted_qsql_filename):
+ return attempted_qsql_filename, 'temp_table_10001', filename
+
+ return None, None, None
def is_sqlite_file(self,filename):
+ if not os.path.exists(filename):
+ return False
+
f = open(filename,'rb')
magic = f.read(16)
f.close()
@@ -2548,25 +2559,18 @@ class QTextAsData(object):
def materialize_file_list(self,qtable_name):
materialized_file_list = []
- # Get the list of filenames
- filenames = qtable_name.split("+")
-
- # TODO RLRL Add test for this with concatenated and non concatenated files
-
unfound_files = []
- # for each filename (or pattern)
- for fileglob in filenames:
- # First check if the file exists without globbing. This will ensure that we don't support non-existent files
- if os.path.exists(fileglob):
- # If it exists, then just use it
- found_files = [fileglob]
- else:
- # If not, then try with globs
- found_files = list(sorted(glob.glob(fileglob)))
- # If no files
- if len(found_files) == 0:
- unfound_files += [fileglob]
- materialized_file_list += found_files
+ # First check if the file exists without globbing. This will ensure that we don't support non-existent files
+ if os.path.exists(qtable_name):
+ # If it exists, then just use it
+ found_files = [qtable_name]
+ else:
+ # If not, then try with globs
+ found_files = list(sorted(glob.glob(qtable_name)))
+ # If no files
+ if len(found_files) == 0:
+ unfound_files += [qtable_name]
+ materialized_file_list += found_files
# If there are no files to go over,
if len(unfound_files) == 1:
@@ -2656,33 +2660,37 @@ class QTextAsData(object):
return table_name_mapping
- # def validate_query(self,sql_object,table_structures):
- # relevant_table_structures = []
- # for qtable_name in sql_object.qtable_names:
- # relevant_table_structures += [table_structures[qtable_name]]
- #
- # column_names = None
- # column_types = None
- # for ts in relevant_table_structures:
- # names = ts.column_names
- # types = ts.column_types
- # xprint("Comparing column names: %s with %s" % (column_names,names))
- # if column_names is None:
- # column_names = names
- # else:
- # if column_names != names:
- # raise BadHeaderException("bad header column names %s vs %s" % (column_names,names))
- #
- # xprint("Comparing column types: %s with %s" % (column_types,types))
- # if column_types is None:
- # column_types = types
- # else:
- # if column_types != types:
- # raise BadHeaderException("bad header column types %s vs %s" % (column_types,types))
- #
- # xprint("All column names match for qtable name %s: column names: %s column types: %s" % (ts.qtable_name,column_names,column_types))
- #
- # xprint("Query validated")
+ def validate_query(self,sql_object,table_structures):
+
+ for qtable_name in sql_object.qtable_names:
+ relevant_table_structures = []
+ for ts in table_structures[qtable_name]:
+ relevant_table_structures += [ts]
+
+ column_names = None
+ column_types = None
+ for ts in relevant_table_structures:
+ names = ts.column_names
+ types = ts.column_types
+ xprint("Comparing column names: %s with %s" % (column_names,names))
+ if column_names is None:
+ column_names = names
+ else:
+ if column_names != names:
+ raise BadHeaderException("Column names differ for table %s: %s vs %s" % (
+ qtable_name, ",".join(column_names), ",".join(names)))
+
+ xprint("Comparing column types: %s with %s" % (column_types,types))
+ if column_types is None:
+ column_types = types
+ else:
+ if column_types != types:
+ raise BadHeaderException("Column types differ for table %s: %s vs %s" % (
+ qtable_name, ",".join(column_types), ",".join(types)))
+
+ xprint("All column names match for qtable name %s: column names: %s column types: %s" % (ts.qtable_name,column_names,column_types))
+
+ xprint("Query validated")
def _execute(self,query_str,input_params=None,data_streams=None,stop_after_analysis=False,save_db_to_disk_filename=None):
warnings = []
@@ -2710,6 +2718,8 @@ class QTextAsData(object):
new_table_structures = self._ensure_data_is_loaded_for_sql(sql_object,effective_input_params,data_streams,stop_after_analysis=stop_after_analysis)
xprint("ensured data is loaded. loaded tables: %s" % self.loaded_table_structures_dict)
+ self.validate_query(sql_object,self.loaded_table_structures_dict)
+
sql_object.materialize_using(self.loaded_table_structures_dict,self.data_streams)
if save_db_to_disk_filename is not None:
@@ -2817,55 +2827,6 @@ class QTextAsData(object):
m[filename] = QMaterializedFile(filename,mfs.data_stream)
return m
- def _create_table_structures_list(self):
- xprint("Loaded Table Structure List: %s" % self.loaded_table_structures_dict)
- table_creators_by_qtable_name = OrderedDict()
- for atomic_fn,table_creator in six.iteritems(self.table_creators):
- xprint("Iterating atomic filename %s" % atomic_fn)
- column_names = table_creator.get_column_names()
- column_types = [Sqlite3DB.PYTHON_TO_SQLITE_TYPE_NAMES[table_creator.get_column_dict()[k]].lower() for k in column_names]
-
- qtable_name = table_creator.qtable_name
-
- # TODO RLRL - Some order here
-
- if qtable_name not in table_creators_by_qtable_name:
- if qtable_name in data_loads_dict:
- xprint("Relevant Data loads are %s" % data_loads_dict[qtable_name])
- data_loads_for_qtable_name = data_loads_dict[qtable_name]
- else:
- xprint("No data load was needed for qtable name %s" % qtable_name)
- data_loads_for_qtable_name = []
-
- if table_creator.atomic_fn:
- x = [table_creator.atomic_fn]
- else:
- x = []
-
- table_creators_by_qtable_name[qtable_name] = QTableStructure(qtable_name,x,column_names,column_types,
- data_loads_for_qtable_name,[])
- else:
- current = table_creators_by_qtable_name[qtable_name]
- xprint("Merging Data load is %s" % current.data_loads)
-
- if current.column_names != column_names:
- raise BadHeaderException("Column names differ for table %s: %s vs %s" % (qtable_name,",".join(current.column_names),",".join(column_names)))
- if current.column_types != column_types:
- raise BadHeaderException("Column types differ for table %s: %s vs %s" % (qtable_name,",".join(current.column_types),",".join(column_types)))
-
- if table_creator.atomic_fn:
- x = [table_creator.atomic_fn]
- else:
- x = []
-
- table_creators_by_qtable_name[qtable_name] = QTableStructure(qtable_name, current.materialized_files + x,
- column_names,
- column_types,
- current.data_loads,[])
-
- xprint("table creators by qtable name: %s" % table_creators_by_qtable_name)
- return table_creators_by_qtable_name
-
def analyze(self,query_str,input_params=None,data_streams=None):
q_output = self._execute(query_str,input_params,data_streams=data_streams,stop_after_analysis=True)
diff --git a/test/test_suite.py b/test/test_suite.py
index b939d56..39d7998 100755
--- a/test/test_suite.py
+++ b/test/test_suite.py
@@ -2327,7 +2327,7 @@ class QsqlUsageTests(AbstractQTestCase):
qsql_with_multiple_tables = self.generate_tmpfile_name(suffix='.qsql')
- cmd = '%s -t "select sum(large_file.aa),sum(large_file.bb),sum(large_file.cc) from %s small_file left join %s large_file on (large_file.aa == small_file.bb)" -S %s' % \
+ cmd = '%s -t "select sum(large_file.aa),sum(large_file.bb),sum(large_file.cc) from %s large_file left join %s small_file on (large_file.aa == small_file.bb)" -S %s' % \
(Q_EXECUTABLE,qsql_filename1,qsql_filename2,qsql_with_multiple_tables)
retcode, o, e = run_command(cmd)
@@ -2336,9 +2336,9 @@ class QsqlUsageTests(AbstractQTestCase):
self.assertEqual(len(e), 4)
self.assertEqual(e[0], six.b('Going to save data into a disk database: %s' % qsql_with_multiple_tables))
self.assertTrue(e[1].startswith(six.b('Data has been saved into %s . Saving has taken' % qsql_with_multiple_tables)))
- self.assertEqual(e[2],six.b('Query to run on the database: select sum(large_file.aa),sum(large_file.bb),sum(large_file.cc) from %s small_file left join %s large_file on (large_file.aa == small_file.bb);' % \
+ self.assertEqual(e[2],six.b('Query to run on the database: select sum(large_file.aa),sum(large_file.bb),sum(large_file.cc) from %s large_file left join %s small_file on (large_file.aa == small_file.bb);' % \
(expected_stored_table_name1,expected_stored_table_name2)))
- self.assertEqual(e[3],six.b('You can run the query directly from the command line using the following command: echo "select sum(large_file.aa),sum(large_file.bb),sum(large_file.cc) from %s small_file left join %s large_file on (large_file.aa == small_file.bb)" | sqlite3 %s' % \
+ self.assertEqual(e[3],six.b('You can run the query directly from the command line using the following command: echo "select sum(large_file.aa),sum(large_file.bb),sum(large_file.cc) from %s large_file left join %s small_file on (large_file.aa == small_file.bb)" | sqlite3 %s' % \
(expected_stored_table_name1,expected_stored_table_name2,qsql_with_multiple_tables)))
cmd = '%s -d , "select count(*) cnt,sum(aa),sum(bb),sum(cc) from %s:::%s"' % (Q_EXECUTABLE,qsql_with_multiple_tables,expected_stored_table_name1)
@@ -2347,7 +2347,7 @@ class QsqlUsageTests(AbstractQTestCase):
self.assertEqual(r,0)
self.assertEqual(len(o),1)
self.assertEqual(len(e),0)
- self.assertEqual(o[0],six.b('10000,50000050,50000050,5000050'))
+ self.assertEqual(o[0],six.b('10000,50005000,50005000,50005000'))
# def test_direct_use_of_sqlite_db_with_one_table(self):
# tmpfile = self.create_file_with_data(six.b(''),suffix='.sqlite')