From 78288101c20972b4749c04ab49bfba6dfd68717a Mon Sep 17 00:00:00 2001 From: Darik Gamble Date: Sun, 18 Jan 2015 14:32:30 -0500 Subject: Make autocomplete schema-aware --- tests/test_parseutils.py | 102 ++++++++++++++++---- tests/test_pgexecute.py | 8 +- tests/test_smart_completion.py | 85 ++++++++++++++--- tests/test_sqlcompletion.py | 206 ++++++++++++++++++++++++++++++----------- 4 files changed, 312 insertions(+), 89 deletions(-) (limited to 'tests') diff --git a/tests/test_parseutils.py b/tests/test_parseutils.py index e49aec68..87767fc9 100644 --- a/tests/test_parseutils.py +++ b/tests/test_parseutils.py @@ -3,52 +3,114 @@ from pgcli.packages.parseutils import extract_tables def test_empty_string(): tables = extract_tables('') - assert tables == [] + assert tables.to_dict('list') == {'schema': [], 'table': [], 'alias': []} def test_simple_select_single_table(): tables = extract_tables('select * from abc') - assert tables == ['abc'] + assert tables.to_dict('list') == \ + {'schema': [None], 'table': ['abc'], 'alias': [None]} + +def test_simple_select_single_table_schema_qualified(): + tables = extract_tables('select * from abc.def') + assert tables.to_dict('list') == \ + {'schema': ['abc'], 'table': ['def'], 'alias': [None]} def test_simple_select_multiple_tables(): tables = extract_tables('select * from abc, def') - assert tables == ['abc', 'def'] + assert tables.to_dict('list') == \ + {'schema': [None, None], + 'table': ['abc', 'def'], + 'alias': [None, None]} + +def test_simple_select_multiple_tables_schema_qualified(): + tables = extract_tables('select * from abc.def, ghi.jkl') + assert tables.to_dict('list') == \ + {'schema': ['abc', 'ghi'], + 'table': ['def', 'jkl'], + 'alias': [None, None]} def test_simple_select_with_cols_single_table(): tables = extract_tables('select a,b from abc') - assert tables == ['abc'] + assert tables.to_dict('list') == \ + {'schema': [None], 'table': ['abc'], 'alias': [None]} + +def test_simple_select_with_cols_single_table_schema_qualified(): + tables = extract_tables('select a,b from abc.def') + assert tables.to_dict('list') == \ + {'schema': ['abc'], 'table': ['def'], 'alias': [None]} def test_simple_select_with_cols_multiple_tables(): tables = extract_tables('select a,b from abc, def') - assert tables == ['abc', 'def'] + assert tables.to_dict('list') == \ + {'schema': [None, None], + 'table': ['abc', 'def'], + 'alias': [None, None]} + +def test_simple_select_with_cols_multiple_tables(): + tables = extract_tables('select a,b from abc.def, def.ghi') + assert tables.to_dict('list') == \ + {'schema': ['abc', 'def'], + 'table': ['def', 'ghi'], + 'alias': [None, None]} def test_select_with_hanging_comma_single_table(): tables = extract_tables('select a, from abc') - assert tables == ['abc'] + assert tables.to_dict('list') == \ + {'schema': [None], + 'table': ['abc'], + 'alias': [None]} def test_select_with_hanging_comma_multiple_tables(): tables = extract_tables('select a, from abc, def') - assert tables == ['abc', 'def'] + assert tables.to_dict('list') == \ + {'schema': [None, None], + 'table': ['abc', 'def'], + 'alias': [None, None]} + +def test_select_with_hanging_period_multiple_tables(): + tables = extract_tables('SELECT t1. FROM tabl1 t1, tabl2 t2') + assert tables.to_dict('list') == \ + {'schema': [None, None], + 'table': ['tabl1', 'tabl2'], + 'alias': ['t1', 't2']} def test_simple_insert_single_table(): tables = extract_tables('insert into abc (id, name) values (1, "def")') - assert tables == ['abc'] + assert tables.to_dict('list') == \ + {'schema': [None], 'table': ['abc'], 'alias': ['abc']} + +@pytest.mark.xfail +def test_simple_insert_single_table_schema_qualified(): + tables = extract_tables('insert into abc.def (id, name) values (1, "def")') + assert tables.to_dict('list') == \ + {'schema': ['abc'], 'table': ['def'], 'alias': [None]} def test_simple_update_table(): tables = extract_tables('update abc set id = 1') - assert tables == ['abc'] + assert tables.to_dict('list') == \ + {'schema': [None], 'table': ['abc'], 'alias': [None]} + +def test_simple_update_table(): + tables = extract_tables('update abc.def set id = 1') + assert tables.to_dict('list') == \ + {'schema': ['abc'], 'table': ['def'], 'alias': [None]} def test_join_table(): - expected = {'a': 'abc', 'd': 'def'} tables = extract_tables('SELECT * FROM abc a JOIN def d ON a.id = d.num') - tables_aliases = extract_tables( - 'SELECT * FROM abc a JOIN def d ON a.id = d.num', True) - assert tables == sorted(expected.values()) - assert tables_aliases == expected + assert tables.to_dict('list') == \ + {'schema': [None, None], + 'table': ['abc', 'def'], + 'alias': ['a', 'd']} + +def test_join_table_schema_qualified(): + tables = extract_tables('SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num') + assert tables.to_dict('list') == \ + {'schema': ['abc', 'ghi'], + 'table': ['def', 'jkl'], + 'alias': ['x', 'y']} def test_join_as_table(): - expected = {'m': 'my_table'} - assert extract_tables( - 'SELECT * FROM my_table AS m WHERE m.a > 5') == \ - sorted(expected.values()) - assert extract_tables( - 'SELECT * FROM my_table AS m WHERE m.a > 5', True) == expected + tables = extract_tables('SELECT * FROM my_table AS m WHERE m.a > 5') + assert tables.to_dict('list') == \ + {'schema': [None], 'table': ['my_table'], 'alias': ['m']} + diff --git a/tests/test_pgexecute.py b/tests/test_pgexecute.py index f513a343..8b7db577 100644 --- a/tests/test_pgexecute.py +++ b/tests/test_pgexecute.py @@ -59,10 +59,10 @@ def test_table_and_columns_query(executor): run(executor, "create table a(x text, y text)") run(executor, "create table b(z text)") - tables, columns = executor.tables() - assert tables == ['a', 'b'] - assert columns['a'] == ['x', 'y'] - assert columns['b'] == ['z'] + tables, columns = executor.get_metadata() + assert set(tables['table']) == set(['a', 'b']) + assert set(columns['column'][columns['table']=='a']) == set(['x', 'y']) + assert set(columns['column'][columns['table']=='b']) == set(['z']) @dbtest def test_database_list(executor): diff --git a/tests/test_smart_completion.py b/tests/test_smart_completion.py index 5344d13d..8b938894 100644 --- a/tests/test_smart_completion.py +++ b/tests/test_smart_completion.py @@ -1,20 +1,45 @@ import pytest +from pandas import DataFrame from prompt_toolkit.completion import Completion from prompt_toolkit.document import Document -tables = { - 'users': ['id', 'email', 'first_name', 'last_name'], - 'orders': ['id', 'user_id', 'ordered_date', 'status'] -} +schemata = { + 'public': { + 'users': ['id', 'email', 'first_name', 'last_name'], + 'orders': ['id', 'ordered_date', 'status'] + }, + 'custom': { + 'products': ['id', 'product_name', 'price'], + 'shipments': ['id', 'address', 'user_id'] + } + } + @pytest.fixture def completer(): import pgcli.pgcompleter as pgcompleter comp = pgcompleter.PGCompleter(smart_completion=True) - comp.extend_table_names(tables.keys()) - for t in tables: - comp.extend_column_names(t, tables[t]) + + # Table metadata is a dataframe with columns [schema, table, is_visible] + tables = DataFrame.from_records( + ((schema, table, schema=='public') + for schema, tables in schemata.items() + for table, columns in tables.items()), + columns=['schema', 'table', 'is_visible']) + + # Column metadata is a dataframe with columns [schema, table, column] + columns = DataFrame.from_records( + ((schema, table, column) + for schema, tables in schemata.items() + for table, columns in tables.items() + for column in columns), + columns=['schema', 'table', 'column']) + + + comp.extend_tables(tables) + comp.extend_columns(columns) + return comp @pytest.fixture @@ -39,15 +64,26 @@ def test_select_keyword_completion(completer, complete_event): complete_event) assert set(result) == set([Completion(text='SELECT', start_position=-3)]) + +def test_schema_or_visible_table_completion(completer, complete_event): + text = 'SELECT * FROM ' + position = len(text) + result = completer.get_completions( + Document(text=text, cursor_position=position), complete_event) + assert set(result) == set([Completion(text='public', start_position=0), + Completion(text='custom', start_position=0), + Completion(text='users', start_position=0), + Completion(text='orders', start_position=0)]) + + def test_function_name_completion(completer, complete_event): text = 'SELECT MA' position = len('SELECT MA') result = completer.get_completions( - Document(text=text, cursor_position=position), - complete_event) + Document(text=text, cursor_position=position), complete_event) assert set(result) == set([Completion(text='MAX', start_position=-2)]) -def test_suggested_column_names(completer, complete_event): +def test_suggested_column_names_from_visible_table(completer, complete_event): """ Suggest column and function names when selecting from table :param completer: @@ -67,6 +103,24 @@ def test_suggested_column_names(completer, complete_event): Completion(text='last_name', start_position=0)] + list(map(Completion, completer.functions))) +def test_suggested_column_names_from_schema_qualifed_table(completer, complete_event): + """ + Suggest column and function names when selecting from a qualified-table + :param completer: + :param complete_event: + :return: + """ + text = 'SELECT from custom.products' + position = len('SELECT ') + result = set(completer.get_completions( + Document(text=text, cursor_position=position), complete_event)) + assert set(result) == set([ + Completion(text='*', start_position=0), + Completion(text='id', start_position=0), + Completion(text='product_name', start_position=0), + Completion(text='price', start_position=0)] + + list(map(Completion, completer.functions))) + def test_suggested_column_names_in_function(completer, complete_event): """ Suggest column and function names when selecting multiple @@ -87,7 +141,7 @@ def test_suggested_column_names_in_function(completer, complete_event): Completion(text='first_name', start_position=0), Completion(text='last_name', start_position=0)]) -def test_suggested_column_names_with_dot(completer, complete_event): +def test_suggested_column_names_with_table_dot(completer, complete_event): """ Suggest column names on table name and dot :param completer: @@ -106,6 +160,15 @@ def test_suggested_column_names_with_dot(completer, complete_event): Completion(text='first_name', start_position=0), Completion(text='last_name', start_position=0)]) +def test_suggested_table_names_with_schema_dot(completer, complete_event): + text = 'SELECT * FROM custom.' + position = len(text) + result = completer.get_completions( + Document(text=text, cursor_position=position), complete_event) + assert set(result) == set([ + Completion(text='products', start_position=0), + Completion(text='shipments', start_position=0)]) + def test_suggested_column_names_with_alias(completer, complete_event): """ Suggest column names on table alias and dot diff --git a/tests/test_sqlcompletion.py b/tests/test_sqlcompletion.py index 949321e7..765aac55 100644 --- a/tests/test_sqlcompletion.py +++ b/tests/test_sqlcompletion.py @@ -1,129 +1,227 @@ from pgcli.packages.sqlcompletion import suggest_type import pytest -def test_select_suggests_cols_with_table_scope(): - suggestion = suggest_type('SELECT FROM tabl', 'SELECT ') - assert suggestion == ('columns-and-functions', ['tabl']) +def assert_equals(suggestions, expected_suggestions): + """ Wrapper to convert dataframes to structs + """ + for suggestion in suggestions: + if 'tables' in suggestion: + suggestion['tables'] = suggestion['tables'].to_dict('list') + + assert sorted(suggestions) == sorted(expected_suggestions) + + + +def test_select_suggests_cols_with_visible_table_scope(): + suggestions = suggest_type('SELECT FROM tabl', 'SELECT ') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], + 'table': ['tabl'], + 'alias': [None]}}, + {'type': 'function'}]) + +def test_select_suggests_cols_with_qualified_table_scope(): + suggestions = suggest_type('SELECT FROM sch.tabl', 'SELECT ') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': ['sch'], + 'table': ['tabl'], + 'alias': [None]}}, + {'type': 'function'}]) def test_where_suggests_columns_functions(): - suggestion = suggest_type('SELECT * FROM tabl WHERE ', + suggestions = suggest_type('SELECT * FROM tabl WHERE ', 'SELECT * FROM tabl WHERE ') - assert suggestion == ('columns-and-functions', ['tabl']) + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['tabl'], 'alias': [None]}}, + {'type': 'function'}]) def test_lparen_suggests_cols(): suggestion = suggest_type('SELECT MAX( FROM tbl', 'SELECT MAX(') - assert suggestion == ('columns', ['tbl']) + assert_equals(suggestion, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['tbl'], 'alias': [None]}}]) def test_select_suggests_cols_and_funcs(): - suggestion = suggest_type('SELECT ', 'SELECT ') - assert suggestion == ('columns-and-functions', []) + suggestions = suggest_type('SELECT ', 'SELECT ') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [], 'table': [], 'alias': []}}, + {'type': 'function'}]) -def test_from_suggests_tables(): +def test_from_suggests_tables_and_schemas(): suggestion = suggest_type('SELECT * FROM ', 'SELECT * FROM ') - assert suggestion == ('tables', []) + assert sorted(suggestion) == sorted([ + {'type': 'table', 'schema':[]}, + {'type': 'schema'}]) def test_distinct_suggests_cols(): - suggestion = suggest_type('SELECT DISTINCT ', 'SELECT DISTINCT ') - assert suggestion == ('columns', []) + suggestions = suggest_type('SELECT DISTINCT ', 'SELECT DISTINCT ') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [], 'table': [], 'alias': []}}]) def test_col_comma_suggests_cols(): suggestion = suggest_type('SELECT a, b, FROM tbl', 'SELECT a, b,') - assert suggestion == ('columns-and-functions', ['tbl']) - -def test_table_comma_suggests_tables(): + assert_equals(suggestion, + [{'type': 'column', + 'tables': {'schema': [None], + 'table': ['tbl'], + 'alias': [None]}}, + {'type': 'function'}]) + +def test_table_comma_suggests_tables_and_schemas(): suggestion = suggest_type('SELECT a, b FROM tbl1, ', 'SELECT a, b FROM tbl1, ') - assert suggestion == ('tables', []) + assert sorted(suggestion) == sorted([ + {'type': 'table', 'schema':[]}, + {'type': 'schema'}]) -def test_into_suggests_tables(): +def test_into_suggests_tables_and_schemas(): suggestion = suggest_type('INSERT INTO ', 'INSERT INTO ') - assert suggestion == ('tables', []) + assert sorted(suggestion) == sorted([ + {'type': 'table', 'schema': []}, + {'type': 'schema'}]) def test_insert_into_lparen_suggests_cols(): - suggestion = suggest_type('INSERT INTO abc (', 'INSERT INTO abc (') - assert suggestion == ('columns', ['abc']) + suggestions = suggest_type('INSERT INTO abc (', 'INSERT INTO abc (') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], + 'table': ['abc'], + 'alias': [None]}}]) def test_insert_into_lparen_partial_text_suggests_cols(): - suggestion = suggest_type('INSERT INTO abc (i', 'INSERT INTO abc (i') - assert suggestion == ('columns', ['abc']) + suggestions = suggest_type('INSERT INTO abc (i', 'INSERT INTO abc (i') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], + 'table': ['abc'], + 'alias': [None]}}]) def test_insert_into_lparen_comma_suggests_cols(): - suggestion = suggest_type('INSERT INTO abc (id,', 'INSERT INTO abc (id,') - assert suggestion == ('columns', ['abc']) + suggestions = suggest_type('INSERT INTO abc (id,', 'INSERT INTO abc (id,') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], + 'table': ['abc'], + 'alias': [None]}}]) def test_partially_typed_col_name_suggests_col_names(): suggestion = suggest_type('SELECT * FROM tabl WHERE col_n', 'SELECT * FROM tabl WHERE col_n') - assert suggestion == ('columns-and-functions', ['tabl']) - -def test_dot_suggests_cols_of_a_table(): - suggestion = suggest_type('SELECT tabl. FROM tabl', 'SELECT tabl.') - assert suggestion == ('columns', ['tabl']) + assert_equals(suggestion, + [{'type': 'column', + 'tables': {'schema': [None], + 'table': ['tabl'], + 'alias': [None]}}, + {'type': 'function'}]) + +def test_dot_suggests_cols_of_a_table_or_schema_qualified_table(): + suggestions = suggest_type('SELECT tabl. FROM tabl', 'SELECT tabl.') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['tabl'], 'alias': [None]}}, + {'type': 'table', 'schema': 'tabl'}]) + + suggestions = suggest_type('SELECT tabl. FROM tabl', 'SELECT tabl.') + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['tabl'], 'alias': [None]}}, + {'type': 'table', 'schema': 'tabl'}]) def test_dot_suggests_cols_of_an_alias(): - suggestion = suggest_type('SELECT t1. FROM tabl1 t1, tabl2 t2', + suggestions = suggest_type('SELECT t1. FROM tabl1 t1, tabl2 t2', 'SELECT t1.') - assert suggestion == ('columns', ['tabl1']) + assert_equals(suggestions, + [{'type': 'table', 'schema': 't1'}, + {'type': 'column', + 'tables': {'schema': [None], 'table': ['tabl1'], 'alias': ['t1']}}]) -def test_dot_col_comma_suggests_cols(): - suggestion = suggest_type('SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2', +def test_dot_col_comma_suggests_cols_or_schema_qualified_table(): + suggestions = suggest_type('SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2', 'SELECT t1.a, t2.') - assert suggestion == ('columns', ['tabl2']) + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], + 'table': ['tabl2'], + 'alias': ['t2']}}, + {'type': 'table', 'schema': 't2'}]) def test_sub_select_suggests_keyword(): suggestion = suggest_type('SELECT * FROM (', 'SELECT * FROM (') - assert suggestion == ('keywords', []) + assert suggestion == [{'type': 'keyword'}] def test_sub_select_partial_text_suggests_keyword(): suggestion = suggest_type('SELECT * FROM (S', 'SELECT * FROM (S') - assert suggestion == ('keywords', []) + assert suggestion == [{'type': 'keyword'}] def test_sub_select_table_name_completion(): suggestion = suggest_type('SELECT * FROM (SELECT * FROM ', 'SELECT * FROM (SELECT * FROM ') - assert suggestion == ('tables', []) + assert sorted(suggestion) == sorted([ + {'type': 'table', 'schema': []}, {'type': 'schema'}]) def test_sub_select_col_name_completion(): - suggestion = suggest_type('SELECT * FROM (SELECT FROM abc', + suggestions = suggest_type('SELECT * FROM (SELECT FROM abc', 'SELECT * FROM (SELECT ') - assert suggestion == ('columns-and-functions', ['abc']) + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['abc'], 'alias': [None]}}, + {'type': 'function'}]) @pytest.mark.xfail def test_sub_select_multiple_col_name_completion(): - suggestion = suggest_type('SELECT * FROM (SELECT a, FROM abc', + suggestions = suggest_type('SELECT * FROM (SELECT a, FROM abc', 'SELECT * FROM (SELECT a, ') - assert suggestion == ('columns-and-functions', ['abc']) + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['abc'], 'alias': [None]}}, + {'type': 'function'}]) def test_sub_select_dot_col_name_completion(): - suggestion = suggest_type('SELECT * FROM (SELECT t. FROM tabl t', + suggestions = suggest_type('SELECT * FROM (SELECT t. FROM tabl t', 'SELECT * FROM (SELECT t.') - assert suggestion == ('columns', ['tabl']) + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['tabl'], 'alias': ['t']}}, + {'type': 'table', 'schema': 't'}]) -def test_join_suggests_tables(): +def test_join_suggests_tables_and_schemas(): suggestion = suggest_type('SELECT * FROM abc a JOIN ', 'SELECT * FROM abc a JOIN ') - assert suggestion == ('tables', []) + assert sorted(suggestion) == sorted([ + {'type': 'table', 'schema': []}, + {'type': 'schema'}]) def test_join_alias_dot_suggests_cols1(): - suggestion = suggest_type('SELECT * FROM abc a JOIN def d ON a.', + suggestions = suggest_type('SELECT * FROM abc a JOIN def d ON a.', 'SELECT * FROM abc a JOIN def d ON a.') - assert suggestion == ('columns', ['abc']) + assert_equals(suggestions, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['abc'], 'alias': ['a']}}, + {'type': 'table', 'schema': 'a'}]) def test_join_alias_dot_suggests_cols2(): suggestion = suggest_type('SELECT * FROM abc a JOIN def d ON a.', 'SELECT * FROM abc a JOIN def d ON a.id = d.') - assert suggestion == ('columns', ['def']) + assert_equals(suggestion, + [{'type': 'column', + 'tables': {'schema': [None], 'table': ['def'], 'alias': ['d']}}, + {'type': 'table', 'schema': 'd'}]) def test_on_suggests_aliases(): - category, scope = suggest_type( + suggestions = suggest_type( 'select a.x, b.y from abc a join bcd b on ', 'select a.x, b.y from abc a join bcd b on ') - assert category == 'tables-or-aliases' - assert set(scope) == set(['a', 'b']) + assert_equals(suggestions, + [{'type': 'alias', 'aliases': ['a', 'b']}]) def test_on_suggests_tables(): - category, scope = suggest_type( + suggestions = suggest_type( 'select abc.x, bcd.y from abc join bcd on ', 'select abc.x, bcd.y from abc join bcd on ') - assert category == 'tables-or-aliases' - assert set(scope) == set(['abc', 'bcd']) + assert_equals(suggestions, + [{'type': 'alias', 'aliases': ['abc', 'bcd']}]) \ No newline at end of file -- cgit v1.2.3