diff options
author | Harel Ben-Attia <harel@coralogix.com> | 2021-05-22 22:42:12 +0300 |
---|---|---|
committer | Harel Ben-Attia <harel@coralogix.com> | 2021-05-22 22:42:12 +0300 |
commit | 832b82bed47f0e2c8dc75ab4b7748a535f3609b6 (patch) | |
tree | 6d1af5ec6ac9fe7bf6e6fc3898e2964b728f836b | |
parent | 9bb6e1f0890cd40743f82f13c09f1799d381641e (diff) |
split data stream and delimited file handling. tests passing (except direct_use which is not supported yet)delimited-file-and-data-stream-split
-rwxr-xr-x | bin/q.py | 184 | ||||
-rw-r--r-- | qsqlite.md | 2 |
2 files changed, 115 insertions, 71 deletions
@@ -667,15 +667,19 @@ class ContentSignatureNotFoundException(Exception): class StrictModeColumnCountMismatchException(Exception): - def __init__(self,expected_col_count,actual_col_count): + def __init__(self,atomic_fn, expected_col_count,actual_col_count,lines_read): + self.atomic_fn = atomic_fn self.expected_col_count = expected_col_count self.actual_col_count = actual_col_count + self.lines_read = lines_read class FluffyModeColumnCountMismatchException(Exception): - def __init__(self,expected_col_count,actual_col_count): + def __init__(self,atomic_fn, expected_col_count,actual_col_count,lines_read): + self.atomic_fn = atomic_fn self.expected_col_count = expected_col_count self.actual_col_count = actual_col_count + self.lines_read = lines_read class ContentSignatureDiffersException(Exception): @@ -1161,43 +1165,23 @@ class TableSourceType(object): QSQL_FILE = 'qsql-file' DATA_STREAM = 'data-stream' -class MaterializedFileState(object): - def __init__(self, qtable_name, atomic_fn, input_params, dialect, data_stream=None): - xprint("Creating new MFS: %s %s" % (id(self),qtable_name)) - self.qtable_name = qtable_name +class DelimitedFileReader(object): + def __init__(self,atomic_fn, input_params, dialect, f = None): self.atomic_fn = atomic_fn self.input_params = input_params self.dialect = dialect - self.data_stream = data_stream + self.f = f self.lines_read = 0 self.skipped_bom = False - self.f = None - self.is_open = False - - def __str__(self): - return "MaterializedFileState<qtable_name=%s,atomic_fn=%s,lines_read=%s,input_params=%s,f=%s,data_stream=%s,is_open=%s" % ( - self.qtable_name, - self.atomic_fn, - self.lines_read, - self.input_params, - self.f, - self.data_stream, - self.is_open) - __repr__ = __str__ + self.is_open = f is not None def open_file(self): - # TODO Support universal newlines for gzipped and stdin data as well - - # If it's a data stream - if self.data_stream is not None: - # then just use it - self.f = self.data_stream.stream - self.is_open = True - return + if self.f is not None or self.is_open: + raise Exception('File is already open %s' % self.f) - # Otherwise, it's a file, open the file + # TODO Support universal newlines for gzipped and stdin data as well if self.input_params.gzipped_input or self.atomic_fn.endswith('.gz'): f = codecs.iterdecode(gzip.GzipFile(fileobj=io.open(self.atomic_fn,'rb')),encoding=self.input_params.input_encoding) @@ -1219,12 +1203,19 @@ class MaterializedFileState(object): xprint("Actually opened file %s" % self.f) return f - - def read_file_using_csv(self): + def close_file(self): if not self.is_open: - raise Exception('bug - not open: %s' % self.atomic_fn) + raise Exception("Bug - file should already be open: %s" % self.atomic_fn) + + self.f.close() + + def generate_rows(self): + # TODO RLRL Move up the stack + # if not self.is_open: + # raise Exception('bug - not open: %s' % self.atomic_fn) + # This is a hack for utf-8 with BOM encoding in order to skip the BOM. python's csv module - # has a bug which prevents fixing it using the proper encoding, and it has been encountered by + # has a bug which prevents fixing it using the proper encoding, and it has been encountered by # multiple people. if self.input_params.input_encoding == 'utf-8-sig' and self.lines_read == 0 and not self.skipped_bom: try: @@ -1249,12 +1240,60 @@ class MaterializedFileState(object): # No need to translate the exception, but we want it to be explicitly defined here for clarity raise UniversalNewlinesExistException() - def close_file(self): - if not self.is_open: - raise Exception("Bug - file should already be open: %s" % self.atomic_fn) +class MaterializedState(object): + def __init__(self,qtable_name): + self.qtable_name = qtable_name - if not self.data_stream: - self.f.close() +class MaterialiedDataStreamState(object): + def __init__(self,qtable_name,atomic_fn, input_params, dialect, data_stream = None): + xprint("Creating new MDSS: %s %s" % (id(self), qtable_name)) + self.qtable_name = qtable_name + self.atomic_fn = atomic_fn + self.input_params = input_params + self.dialect = dialect + + self.data_stream = data_stream + + def initialize(self): + if self.input_params.gzipped_input: + raise CannotUnzipDataStreamException() + + self.delimited_file_reader = DelimitedFileReader(self.atomic_fn, self.input_params, self.dialect,f = self.data_stream.stream) + + def read_file_using_csv(self): + for x in self.delimited_file_reader.generate_rows(): + yield x + + def finalize(self): + pass + + def get_lines_read(self): + return self.delimited_file_reader.lines_read + + +class MaterializedDelimitedFileState(object): + def __init__(self, qtable_name, atomic_fn, input_params, dialect): + xprint("Creating new MDFS: %s %s" % (id(self),qtable_name)) + self.qtable_name = qtable_name + self.atomic_fn = atomic_fn + self.input_params = input_params + self.dialect = dialect + + self.delimited_file_reader = None + + def initialize(self): + self.delimited_file_reader = DelimitedFileReader(self.atomic_fn,self.input_params,self.dialect) + return self.delimited_file_reader.open_file() + + def read_file_using_csv(self): + for x in self.delimited_file_reader.generate_rows(): + yield x + + def finalize(self): + self.delimited_file_reader.close_file() + + def get_lines_read(self): + return self.delimited_file_reader.lines_read class TableCreator(object): def __str__(self): @@ -1264,6 +1303,9 @@ class TableCreator(object): def __init__(self, mfs, input_params,sqlite_db=None,target_sqlite_table_name=None): xprint("Initializing table creator for %s" % mfs) + # TODO RLRL - Change to TableCreatorFromDelimitedStream or something + assert isinstance(mfs, MaterializedDelimitedFileState) or isinstance (mfs, MaterialiedDataStreamState) + self.mfs = mfs self.sqlite_db = sqlite_db @@ -1277,7 +1319,6 @@ class TableCreator(object): self.mode = input_params.parsing_mode self.expected_column_count = input_params.expected_column_count self.input_delimiter = input_params.delimiter - self.data_stream = mfs.data_stream self.with_universal_newlines = input_params.with_universal_newlines self.column_inferer = TableColumnInferer(input_params) @@ -1308,7 +1349,7 @@ class TableCreator(object): # (side by side naming). If the source file doesn't exist, then it's possible to search for the cache # file and just use it (without comparing signatures to the source file). If the source file exists # and the cache doesn't, then it's possible to create it after reading the source file. - if self.data_stream is None: + if not isinstance(self.mfs,MaterialiedDataStreamState): size = os.stat(self.mfs.atomic_fn).st_size last_modification_time = os.stat(self.mfs.atomic_fn).st_mtime_ns else: @@ -1369,18 +1410,18 @@ class TableCreator(object): self._insert_row(self.mfs.atomic_fn, col_vals) if stop_after_analysis and self.column_inferer.inferred: return - if self.mfs.lines_read == 0 and self.skip_header: + if self.mfs.get_lines_read() == 0 and self.skip_header: raise MissingHeaderException("Header line is expected but missing in file %s" % self.mfs.atomic_fn) - total_data_lines_read += self.mfs.lines_read - (1 if self.skip_header else 0) + total_data_lines_read += self.mfs.delimited_file_reader.lines_read - (1 if self.skip_header else 0) except StrictModeColumnCountMismatchException as e: raise ColumnCountMismatchException( 'Strict mode - Expected %s columns instead of %s columns in file %s row %s. Either use relaxed/fluffy modes or check your delimiter' % ( - e.expected_col_count, e.actual_col_count, normalized_filename(self.mfs.atomic_fn), self.mfs.lines_read)) + e.expected_col_count, e.actual_col_count, normalized_filename(e.atomic_fn), e.lines_read)) except FluffyModeColumnCountMismatchException as e: raise ColumnCountMismatchException( 'Deprecated fluffy mode - Too many columns in file %s row %s (%s fields instead of %s fields). Consider moving to either relaxed or strict mode' % ( - normalized_filename(self.mfs.atomic_fn), self.mfs.lines_read, e.actual_col_count, e.expected_col_count)) + normalized_filename(e.atomic_fn), e.lines_read, e.actual_col_count, e.expected_col_count)) finally: self._flush_inserts() @@ -1395,7 +1436,7 @@ class TableCreator(object): def get_absolute_filenames_str(self): return self.mfs.atomic_fn - def perform_analyze(self, dialect,data_stream): + def perform_analyze(self, dialect): xprint("Analyzing... %s" % dialect) if self.state == TableCreatorState.INITIALIZED: self._populate(dialect,stop_after_analysis=True) @@ -1407,7 +1448,7 @@ class TableCreator(object): now = datetime.datetime.utcnow().isoformat() - if data_stream is not None: + if isinstance(self.mfs,MaterialiedDataStreamState): source_type = 'data-stream' source = self.mfs.data_stream.stream_id else: @@ -1543,7 +1584,7 @@ class TableCreator(object): actual_col_count = len(col_vals) if self.mode == 'strict': if actual_col_count != expected_col_count: - raise StrictModeColumnCountMismatchException(expected_col_count,actual_col_count) + raise StrictModeColumnCountMismatchException(self.mfs.atomic_fn, expected_col_count,actual_col_count,self.mfs.get_lines_read()) return col_vals # in all non strict mode, we add dummy data to missing columns @@ -1564,7 +1605,7 @@ class TableCreator(object): if self.mode == 'fluffy': if actual_col_count > expected_col_count: - raise FluffyModeColumnCountMismatchException(expected_col_count,actual_col_count) + raise FluffyModeColumnCountMismatchException(self.mfs.atomic_fn,expected_col_count,actual_col_count,self.mfs.get_lines_read()) return col_vals raise Exception("Unidentified parsing mode %s" % self.mode) @@ -1932,23 +1973,20 @@ class QTextAsData(object): data_stream = self.data_streams.get_for_filename(qtable_name) if data_stream is not None: - if input_params.gzipped_input: - raise CannotUnzipDataStreamException() - - mfs = MaterializedFileState(qtable_name, None, input_params, dialect, data_stream) - materialized_file_dict[qtable_name] = [mfs] + ms = MaterialiedDataStreamState(qtable_name,None,input_params,dialect,data_stream) + materialized_file_dict[qtable_name] = [ms] else: materialized_file_list = self.materialize_file_list(qtable_name) # For each match for atomic_fn in materialized_file_list: - mfs = MaterializedFileState(qtable_name,atomic_fn,input_params,dialect,None) + ms = MaterializedDelimitedFileState(qtable_name,atomic_fn,input_params,dialect) # TODO RLRL Perhaps a test that shows the contract around using data streams along with concatenated files if atomic_fn not in materialized_file_dict: - materialized_file_dict[atomic_fn] = [mfs] + materialized_file_dict[atomic_fn] = [ms] else: - materialized_file_dict[atomic_fn] = materialized_file_dict[atomic_fn] + [mfs] + materialized_file_dict[atomic_fn] = materialized_file_dict[atomic_fn] + [ms] - xprint("MFS dict: %s" % str(materialized_file_dict)) + xprint("MS dict: %s" % str(materialized_file_dict)) return materialized_file_dict def is_sqlite_file(self,filename): @@ -1957,14 +1995,14 @@ class QTextAsData(object): f.close() return magic == six.b("SQLite format 3\x00") - def detect_table_type(self,filename,data_stream): - if data_stream is None: + def detect_table_type(self,filename,mfs): + if isinstance(mfs,MaterialiedDataStreamState): + return TableSourceType.DATA_STREAM + else: if self.is_sqlite_file(filename): return TableSourceType.QSQL_FILE else: return TableSourceType.DELIMITED_FILE - else: - return TableSourceType.DATA_STREAM def _load_mfs_as_table_creator(self,mfs,atomic_fn,input_params,dialect_id,stop_after_analysis): xprint("Loading MFS:", mfs) @@ -1975,9 +2013,9 @@ class QTextAsData(object): xprint("Atomic Filename %s not found - loading" % atomic_fn) - table_type = self.detect_table_type(atomic_fn,mfs.data_stream) + table_type = self.detect_table_type(atomic_fn,mfs) if table_type == TableSourceType.DELIMITED_FILE or table_type == TableSourceType.DATA_STREAM: - mfs.open_file() + mfs.initialize() source,source_type, db_id, db_to_use = self.choose_db_to_use(mfs,atomic_fn) @@ -1985,7 +2023,7 @@ class QTextAsData(object): should_read_from_cache = self.get_should_read_from_cache(mfs, input_params, atomic_fn, disk_db_filename) xprint("should read from cache %s" % should_read_from_cache) - if not should_read_from_cache and mfs.data_stream is None: + if not should_read_from_cache and not isinstance(mfs,MaterialiedDataStreamState): self.add_db_to_database_list(db_id, db_to_use, needs_closing=True) xprint("db %s (%s) has been added to the database list" % (db_id, db_to_use)) else: @@ -2010,7 +2048,7 @@ class QTextAsData(object): self.attach_to_adhoc_db(atomic_fn, table_creator, db_to_use) - mfs.close_file() + mfs.finalize() xprint("MFS Loaded") elif table_type == TableSourceType.QSQL_FILE: @@ -2020,7 +2058,7 @@ class QTextAsData(object): db_id = '%s' % self._generate_db_name(atomic_fn) # TODO RLRL Table creator should directly use the qsql file and not just after read_table_from_cache() # is called. This is a must for -A to work properly, since the table must be analyzed without and analyze() - # phase. + # phase. ? db_to_use = None table_creator = TableCreator(mfs, input_params, sqlite_db=db_to_use,target_sqlite_table_name=target_sqlite_table_name) @@ -2040,11 +2078,11 @@ class QTextAsData(object): # Create the matching database table and populate it table_creator = TableCreator(mfs, input_params, sqlite_db=db_to_use, target_sqlite_table_name=target_sqlite_table_name) - table_creator.perform_analyze(dialect_id, table_creator.data_stream) + table_creator.perform_analyze(dialect_id) return source, source_type, table_creator def save_cache_to_disk_if_needed(self, atomic_fn, disk_db_filename, input_params, mfs, table_creator): - effective_write_caching = (mfs.data_stream is None) and input_params.write_caching + effective_write_caching = (not isinstance(mfs,MaterialiedDataStreamState)) and input_params.write_caching if effective_write_caching: xprint("Going to write file cache for %s. Disk filename is %s" % (atomic_fn, disk_db_filename)) self.store_qsql(table_creator.sqlite_db, disk_db_filename) @@ -2063,7 +2101,7 @@ class QTextAsData(object): def get_should_read_from_cache(self, mfs, input_params, atomic_fn, disk_db_filename): disk_db_file_exists = os.path.exists(disk_db_filename) - if mfs.data_stream is not None: + if isinstance(mfs,MaterialiedDataStreamState): should_read_from_cache = False else: should_read_from_cache = input_params.read_caching and disk_db_file_exists @@ -2072,7 +2110,7 @@ class QTextAsData(object): def choose_db_to_use(self, mfs, atomic_fn): db_id = None - if mfs.data_stream is not None: + if isinstance(mfs,MaterialiedDataStreamState): db_to_use = self.adhoc_db source_type = 'data-stream' source = mfs.data_stream.stream_id @@ -2118,7 +2156,11 @@ class QTextAsData(object): if table_creator is not None: self.table_creators[atomic_fn] = table_creator - dls += [QDataLoad(atomic_fn, start_time, time.time(), data_stream=mfs.data_stream, source_type=source_type,source=source)] + if isinstance(mfs,MaterialiedDataStreamState): + ds = mfs.data_stream + else: + ds = None + dls += [QDataLoad(atomic_fn, start_time, time.time(), data_stream=ds, source_type=source_type,source=source)] return dls @@ -41,3 +41,5 @@ Insights * had to improve the reproducability of e2e test bugs in order to actually be effective +Deprecated earlier - needs to be removed +* fluffy mode |