summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarel Ben-Attia <harel@coralogix.com>2021-05-22 22:42:12 +0300
committerHarel Ben-Attia <harel@coralogix.com>2021-05-22 22:42:12 +0300
commit832b82bed47f0e2c8dc75ab4b7748a535f3609b6 (patch)
tree6d1af5ec6ac9fe7bf6e6fc3898e2964b728f836b
parent9bb6e1f0890cd40743f82f13c09f1799d381641e (diff)
split data stream and delimited file handling. tests passing (except direct_use which is not supported yet)delimited-file-and-data-stream-split
-rwxr-xr-xbin/q.py184
-rw-r--r--qsqlite.md2
2 files changed, 115 insertions, 71 deletions
diff --git a/bin/q.py b/bin/q.py
index d706f21..137471d 100755
--- a/bin/q.py
+++ b/bin/q.py
@@ -667,15 +667,19 @@ class ContentSignatureNotFoundException(Exception):
class StrictModeColumnCountMismatchException(Exception):
- def __init__(self,expected_col_count,actual_col_count):
+ def __init__(self,atomic_fn, expected_col_count,actual_col_count,lines_read):
+ self.atomic_fn = atomic_fn
self.expected_col_count = expected_col_count
self.actual_col_count = actual_col_count
+ self.lines_read = lines_read
class FluffyModeColumnCountMismatchException(Exception):
- def __init__(self,expected_col_count,actual_col_count):
+ def __init__(self,atomic_fn, expected_col_count,actual_col_count,lines_read):
+ self.atomic_fn = atomic_fn
self.expected_col_count = expected_col_count
self.actual_col_count = actual_col_count
+ self.lines_read = lines_read
class ContentSignatureDiffersException(Exception):
@@ -1161,43 +1165,23 @@ class TableSourceType(object):
QSQL_FILE = 'qsql-file'
DATA_STREAM = 'data-stream'
-class MaterializedFileState(object):
- def __init__(self, qtable_name, atomic_fn, input_params, dialect, data_stream=None):
- xprint("Creating new MFS: %s %s" % (id(self),qtable_name))
- self.qtable_name = qtable_name
+class DelimitedFileReader(object):
+ def __init__(self,atomic_fn, input_params, dialect, f = None):
self.atomic_fn = atomic_fn
self.input_params = input_params
self.dialect = dialect
- self.data_stream = data_stream
+ self.f = f
self.lines_read = 0
self.skipped_bom = False
- self.f = None
- self.is_open = False
-
- def __str__(self):
- return "MaterializedFileState<qtable_name=%s,atomic_fn=%s,lines_read=%s,input_params=%s,f=%s,data_stream=%s,is_open=%s" % (
- self.qtable_name,
- self.atomic_fn,
- self.lines_read,
- self.input_params,
- self.f,
- self.data_stream,
- self.is_open)
- __repr__ = __str__
+ self.is_open = f is not None
def open_file(self):
- # TODO Support universal newlines for gzipped and stdin data as well
-
- # If it's a data stream
- if self.data_stream is not None:
- # then just use it
- self.f = self.data_stream.stream
- self.is_open = True
- return
+ if self.f is not None or self.is_open:
+ raise Exception('File is already open %s' % self.f)
- # Otherwise, it's a file, open the file
+ # TODO Support universal newlines for gzipped and stdin data as well
if self.input_params.gzipped_input or self.atomic_fn.endswith('.gz'):
f = codecs.iterdecode(gzip.GzipFile(fileobj=io.open(self.atomic_fn,'rb')),encoding=self.input_params.input_encoding)
@@ -1219,12 +1203,19 @@ class MaterializedFileState(object):
xprint("Actually opened file %s" % self.f)
return f
-
- def read_file_using_csv(self):
+ def close_file(self):
if not self.is_open:
- raise Exception('bug - not open: %s' % self.atomic_fn)
+ raise Exception("Bug - file should already be open: %s" % self.atomic_fn)
+
+ self.f.close()
+
+ def generate_rows(self):
+ # TODO RLRL Move up the stack
+ # if not self.is_open:
+ # raise Exception('bug - not open: %s' % self.atomic_fn)
+
# This is a hack for utf-8 with BOM encoding in order to skip the BOM. python's csv module
- # has a bug which prevents fixing it using the proper encoding, and it has been encountered by
+ # has a bug which prevents fixing it using the proper encoding, and it has been encountered by
# multiple people.
if self.input_params.input_encoding == 'utf-8-sig' and self.lines_read == 0 and not self.skipped_bom:
try:
@@ -1249,12 +1240,60 @@ class MaterializedFileState(object):
# No need to translate the exception, but we want it to be explicitly defined here for clarity
raise UniversalNewlinesExistException()
- def close_file(self):
- if not self.is_open:
- raise Exception("Bug - file should already be open: %s" % self.atomic_fn)
+class MaterializedState(object):
+ def __init__(self,qtable_name):
+ self.qtable_name = qtable_name
- if not self.data_stream:
- self.f.close()
+class MaterialiedDataStreamState(object):
+ def __init__(self,qtable_name,atomic_fn, input_params, dialect, data_stream = None):
+ xprint("Creating new MDSS: %s %s" % (id(self), qtable_name))
+ self.qtable_name = qtable_name
+ self.atomic_fn = atomic_fn
+ self.input_params = input_params
+ self.dialect = dialect
+
+ self.data_stream = data_stream
+
+ def initialize(self):
+ if self.input_params.gzipped_input:
+ raise CannotUnzipDataStreamException()
+
+ self.delimited_file_reader = DelimitedFileReader(self.atomic_fn, self.input_params, self.dialect,f = self.data_stream.stream)
+
+ def read_file_using_csv(self):
+ for x in self.delimited_file_reader.generate_rows():
+ yield x
+
+ def finalize(self):
+ pass
+
+ def get_lines_read(self):
+ return self.delimited_file_reader.lines_read
+
+
+class MaterializedDelimitedFileState(object):
+ def __init__(self, qtable_name, atomic_fn, input_params, dialect):
+ xprint("Creating new MDFS: %s %s" % (id(self),qtable_name))
+ self.qtable_name = qtable_name
+ self.atomic_fn = atomic_fn
+ self.input_params = input_params
+ self.dialect = dialect
+
+ self.delimited_file_reader = None
+
+ def initialize(self):
+ self.delimited_file_reader = DelimitedFileReader(self.atomic_fn,self.input_params,self.dialect)
+ return self.delimited_file_reader.open_file()
+
+ def read_file_using_csv(self):
+ for x in self.delimited_file_reader.generate_rows():
+ yield x
+
+ def finalize(self):
+ self.delimited_file_reader.close_file()
+
+ def get_lines_read(self):
+ return self.delimited_file_reader.lines_read
class TableCreator(object):
def __str__(self):
@@ -1264,6 +1303,9 @@ class TableCreator(object):
def __init__(self, mfs, input_params,sqlite_db=None,target_sqlite_table_name=None):
xprint("Initializing table creator for %s" % mfs)
+ # TODO RLRL - Change to TableCreatorFromDelimitedStream or something
+ assert isinstance(mfs, MaterializedDelimitedFileState) or isinstance (mfs, MaterialiedDataStreamState)
+
self.mfs = mfs
self.sqlite_db = sqlite_db
@@ -1277,7 +1319,6 @@ class TableCreator(object):
self.mode = input_params.parsing_mode
self.expected_column_count = input_params.expected_column_count
self.input_delimiter = input_params.delimiter
- self.data_stream = mfs.data_stream
self.with_universal_newlines = input_params.with_universal_newlines
self.column_inferer = TableColumnInferer(input_params)
@@ -1308,7 +1349,7 @@ class TableCreator(object):
# (side by side naming). If the source file doesn't exist, then it's possible to search for the cache
# file and just use it (without comparing signatures to the source file). If the source file exists
# and the cache doesn't, then it's possible to create it after reading the source file.
- if self.data_stream is None:
+ if not isinstance(self.mfs,MaterialiedDataStreamState):
size = os.stat(self.mfs.atomic_fn).st_size
last_modification_time = os.stat(self.mfs.atomic_fn).st_mtime_ns
else:
@@ -1369,18 +1410,18 @@ class TableCreator(object):
self._insert_row(self.mfs.atomic_fn, col_vals)
if stop_after_analysis and self.column_inferer.inferred:
return
- if self.mfs.lines_read == 0 and self.skip_header:
+ if self.mfs.get_lines_read() == 0 and self.skip_header:
raise MissingHeaderException("Header line is expected but missing in file %s" % self.mfs.atomic_fn)
- total_data_lines_read += self.mfs.lines_read - (1 if self.skip_header else 0)
+ total_data_lines_read += self.mfs.delimited_file_reader.lines_read - (1 if self.skip_header else 0)
except StrictModeColumnCountMismatchException as e:
raise ColumnCountMismatchException(
'Strict mode - Expected %s columns instead of %s columns in file %s row %s. Either use relaxed/fluffy modes or check your delimiter' % (
- e.expected_col_count, e.actual_col_count, normalized_filename(self.mfs.atomic_fn), self.mfs.lines_read))
+ e.expected_col_count, e.actual_col_count, normalized_filename(e.atomic_fn), e.lines_read))
except FluffyModeColumnCountMismatchException as e:
raise ColumnCountMismatchException(
'Deprecated fluffy mode - Too many columns in file %s row %s (%s fields instead of %s fields). Consider moving to either relaxed or strict mode' % (
- normalized_filename(self.mfs.atomic_fn), self.mfs.lines_read, e.actual_col_count, e.expected_col_count))
+ normalized_filename(e.atomic_fn), e.lines_read, e.actual_col_count, e.expected_col_count))
finally:
self._flush_inserts()
@@ -1395,7 +1436,7 @@ class TableCreator(object):
def get_absolute_filenames_str(self):
return self.mfs.atomic_fn
- def perform_analyze(self, dialect,data_stream):
+ def perform_analyze(self, dialect):
xprint("Analyzing... %s" % dialect)
if self.state == TableCreatorState.INITIALIZED:
self._populate(dialect,stop_after_analysis=True)
@@ -1407,7 +1448,7 @@ class TableCreator(object):
now = datetime.datetime.utcnow().isoformat()
- if data_stream is not None:
+ if isinstance(self.mfs,MaterialiedDataStreamState):
source_type = 'data-stream'
source = self.mfs.data_stream.stream_id
else:
@@ -1543,7 +1584,7 @@ class TableCreator(object):
actual_col_count = len(col_vals)
if self.mode == 'strict':
if actual_col_count != expected_col_count:
- raise StrictModeColumnCountMismatchException(expected_col_count,actual_col_count)
+ raise StrictModeColumnCountMismatchException(self.mfs.atomic_fn, expected_col_count,actual_col_count,self.mfs.get_lines_read())
return col_vals
# in all non strict mode, we add dummy data to missing columns
@@ -1564,7 +1605,7 @@ class TableCreator(object):
if self.mode == 'fluffy':
if actual_col_count > expected_col_count:
- raise FluffyModeColumnCountMismatchException(expected_col_count,actual_col_count)
+ raise FluffyModeColumnCountMismatchException(self.mfs.atomic_fn,expected_col_count,actual_col_count,self.mfs.get_lines_read())
return col_vals
raise Exception("Unidentified parsing mode %s" % self.mode)
@@ -1932,23 +1973,20 @@ class QTextAsData(object):
data_stream = self.data_streams.get_for_filename(qtable_name)
if data_stream is not None:
- if input_params.gzipped_input:
- raise CannotUnzipDataStreamException()
-
- mfs = MaterializedFileState(qtable_name, None, input_params, dialect, data_stream)
- materialized_file_dict[qtable_name] = [mfs]
+ ms = MaterialiedDataStreamState(qtable_name,None,input_params,dialect,data_stream)
+ materialized_file_dict[qtable_name] = [ms]
else:
materialized_file_list = self.materialize_file_list(qtable_name)
# For each match
for atomic_fn in materialized_file_list:
- mfs = MaterializedFileState(qtable_name,atomic_fn,input_params,dialect,None)
+ ms = MaterializedDelimitedFileState(qtable_name,atomic_fn,input_params,dialect)
# TODO RLRL Perhaps a test that shows the contract around using data streams along with concatenated files
if atomic_fn not in materialized_file_dict:
- materialized_file_dict[atomic_fn] = [mfs]
+ materialized_file_dict[atomic_fn] = [ms]
else:
- materialized_file_dict[atomic_fn] = materialized_file_dict[atomic_fn] + [mfs]
+ materialized_file_dict[atomic_fn] = materialized_file_dict[atomic_fn] + [ms]
- xprint("MFS dict: %s" % str(materialized_file_dict))
+ xprint("MS dict: %s" % str(materialized_file_dict))
return materialized_file_dict
def is_sqlite_file(self,filename):
@@ -1957,14 +1995,14 @@ class QTextAsData(object):
f.close()
return magic == six.b("SQLite format 3\x00")
- def detect_table_type(self,filename,data_stream):
- if data_stream is None:
+ def detect_table_type(self,filename,mfs):
+ if isinstance(mfs,MaterialiedDataStreamState):
+ return TableSourceType.DATA_STREAM
+ else:
if self.is_sqlite_file(filename):
return TableSourceType.QSQL_FILE
else:
return TableSourceType.DELIMITED_FILE
- else:
- return TableSourceType.DATA_STREAM
def _load_mfs_as_table_creator(self,mfs,atomic_fn,input_params,dialect_id,stop_after_analysis):
xprint("Loading MFS:", mfs)
@@ -1975,9 +2013,9 @@ class QTextAsData(object):
xprint("Atomic Filename %s not found - loading" % atomic_fn)
- table_type = self.detect_table_type(atomic_fn,mfs.data_stream)
+ table_type = self.detect_table_type(atomic_fn,mfs)
if table_type == TableSourceType.DELIMITED_FILE or table_type == TableSourceType.DATA_STREAM:
- mfs.open_file()
+ mfs.initialize()
source,source_type, db_id, db_to_use = self.choose_db_to_use(mfs,atomic_fn)
@@ -1985,7 +2023,7 @@ class QTextAsData(object):
should_read_from_cache = self.get_should_read_from_cache(mfs, input_params, atomic_fn, disk_db_filename)
xprint("should read from cache %s" % should_read_from_cache)
- if not should_read_from_cache and mfs.data_stream is None:
+ if not should_read_from_cache and not isinstance(mfs,MaterialiedDataStreamState):
self.add_db_to_database_list(db_id, db_to_use, needs_closing=True)
xprint("db %s (%s) has been added to the database list" % (db_id, db_to_use))
else:
@@ -2010,7 +2048,7 @@ class QTextAsData(object):
self.attach_to_adhoc_db(atomic_fn, table_creator, db_to_use)
- mfs.close_file()
+ mfs.finalize()
xprint("MFS Loaded")
elif table_type == TableSourceType.QSQL_FILE:
@@ -2020,7 +2058,7 @@ class QTextAsData(object):
db_id = '%s' % self._generate_db_name(atomic_fn)
# TODO RLRL Table creator should directly use the qsql file and not just after read_table_from_cache()
# is called. This is a must for -A to work properly, since the table must be analyzed without and analyze()
- # phase.
+ # phase. ?
db_to_use = None
table_creator = TableCreator(mfs, input_params, sqlite_db=db_to_use,target_sqlite_table_name=target_sqlite_table_name)
@@ -2040,11 +2078,11 @@ class QTextAsData(object):
# Create the matching database table and populate it
table_creator = TableCreator(mfs, input_params, sqlite_db=db_to_use,
target_sqlite_table_name=target_sqlite_table_name)
- table_creator.perform_analyze(dialect_id, table_creator.data_stream)
+ table_creator.perform_analyze(dialect_id)
return source, source_type, table_creator
def save_cache_to_disk_if_needed(self, atomic_fn, disk_db_filename, input_params, mfs, table_creator):
- effective_write_caching = (mfs.data_stream is None) and input_params.write_caching
+ effective_write_caching = (not isinstance(mfs,MaterialiedDataStreamState)) and input_params.write_caching
if effective_write_caching:
xprint("Going to write file cache for %s. Disk filename is %s" % (atomic_fn, disk_db_filename))
self.store_qsql(table_creator.sqlite_db, disk_db_filename)
@@ -2063,7 +2101,7 @@ class QTextAsData(object):
def get_should_read_from_cache(self, mfs, input_params, atomic_fn, disk_db_filename):
disk_db_file_exists = os.path.exists(disk_db_filename)
- if mfs.data_stream is not None:
+ if isinstance(mfs,MaterialiedDataStreamState):
should_read_from_cache = False
else:
should_read_from_cache = input_params.read_caching and disk_db_file_exists
@@ -2072,7 +2110,7 @@ class QTextAsData(object):
def choose_db_to_use(self, mfs, atomic_fn):
db_id = None
- if mfs.data_stream is not None:
+ if isinstance(mfs,MaterialiedDataStreamState):
db_to_use = self.adhoc_db
source_type = 'data-stream'
source = mfs.data_stream.stream_id
@@ -2118,7 +2156,11 @@ class QTextAsData(object):
if table_creator is not None:
self.table_creators[atomic_fn] = table_creator
- dls += [QDataLoad(atomic_fn, start_time, time.time(), data_stream=mfs.data_stream, source_type=source_type,source=source)]
+ if isinstance(mfs,MaterialiedDataStreamState):
+ ds = mfs.data_stream
+ else:
+ ds = None
+ dls += [QDataLoad(atomic_fn, start_time, time.time(), data_stream=ds, source_type=source_type,source=source)]
return dls
diff --git a/qsqlite.md b/qsqlite.md
index f4d1e2a..157d24e 100644
--- a/qsqlite.md
+++ b/qsqlite.md
@@ -41,3 +41,5 @@ Insights
* had to improve the reproducability of e2e test bugs in order to actually be effective
+Deprecated earlier - needs to be removed
+* fluffy mode