diff options
author | koljonen <koljonen@outlook.com> | 2016-06-10 03:26:42 +0200 |
---|---|---|
committer | koljonen <koljonen@outlook.com> | 2016-06-10 03:26:42 +0200 |
commit | 582852adb690866e6d149ab1bcaddf88cbfb8c69 (patch) | |
tree | ff7519c714195a8850618f3ac95ab3a442fd73ba | |
parent | 9b6a72e9e00bae4c1247153205e90ba549e2f1ea (diff) |
Various changes after review
-rw-r--r-- | pgcli/packages/sqlcompletion.py | 5 | ||||
-rw-r--r-- | pgcli/pgcompleter.py | 7 | ||||
-rw-r--r-- | tests/test_parseutils.py | 5 | ||||
-rw-r--r-- | tests/test_smart_completion_multiple_schemata.py | 47 | ||||
-rw-r--r-- | tests/test_smart_completion_public_schema_only.py | 75 | ||||
-rw-r--r-- | tests/test_sqlcompletion.py | 19 |
6 files changed, 136 insertions, 22 deletions
diff --git a/pgcli/packages/sqlcompletion.py b/pgcli/packages/sqlcompletion.py index 063bed3d..81ab32cb 100644 --- a/pgcli/packages/sqlcompletion.py +++ b/pgcli/packages/sqlcompletion.py @@ -21,7 +21,9 @@ Special = namedtuple('Special', []) Database = namedtuple('Database', []) Schema = namedtuple('Schema', []) Table = namedtuple('Table', ['schema']) +# JoinConditions are suggested after ON, e.g. 'foo.barid = bar.barid' JoinCondition = namedtuple('JoinCondition', ['tables', 'parent']) +# Joins are suggested after JOIN, e.g. 'foo ON foo.barid = bar.barid' Join = namedtuple('Join', ['tables', 'schema']) Function = namedtuple('Function', ['schema', 'filter']) @@ -431,4 +433,5 @@ def _allow_join_suggestion(statement): return False last_tok = statement.token_prev(len(statement.tokens)) - return last_tok.value.lower().endswith('join') + return (last_tok.value.lower().endswith('join') + and last_tok.value.lower() != 'cross join') diff --git a/pgcli/pgcompleter.py b/pgcli/pgcompleter.py index b20279e4..7b2a25a1 100644 --- a/pgcli/pgcompleter.py +++ b/pgcli/pgcompleter.py @@ -392,10 +392,9 @@ class PGCompleter(Completer): tblprio = dict((t.ref, n) for n, t in enumerate(suggestion.tables)) joins, prios = [], [] # Iterate over FKs in existing tables to find potential joins - for fk, rtbl, rcol in ((fk, rtbl, rcol) - for rtbl, rcols in scoped_cols.items() - for rcol in rcols - for fk in rcol.foreignkeys): + fks = ((fk, rtbl, rcol) for rtbl, rcols in scoped_cols.items() + for rcol in rcols for fk in rcol.foreignkeys) + for fk, rtbl, rcol in fks: if (fk.childschema, fk.childtable, fk.childcolumn) == ( rtbl.schema, rtbl.name, rcol.name): lsch = fk.parentschema diff --git a/tests/test_parseutils.py b/tests/test_parseutils.py index 87a6d32b..a2a2fd15 100644 --- a/tests/test_parseutils.py +++ b/tests/test_parseutils.py @@ -149,13 +149,16 @@ def test_subselect_tables(): tables = extract_tables(sql) assert tables == ((None, 'abc', None, False),) +@pytest.mark.parametrize('text', ['SELECT * FROM foo.', 'SELECT 123 AS foo']) +def test_extract_no_tables(text): + tables = extract_tables(text) + assert tables == tuple() @pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3']) def test_simple_function_as_table(arg_list): tables = extract_tables('SELECT * FROM foo({0})'.format(arg_list)) assert tables == ((None, 'foo', None, True),) - @pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3']) def test_simple_schema_qualified_function_as_table(arg_list): tables = extract_tables('SELECT * FROM foo.bar({0})'.format(arg_list)) diff --git a/tests/test_smart_completion_multiple_schemata.py b/tests/test_smart_completion_multiple_schemata.py index c107a710..381657de 100644 --- a/tests/test_smart_completion_multiple_schemata.py +++ b/tests/test_smart_completion_multiple_schemata.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals import pytest from prompt_toolkit.completion import Completion from prompt_toolkit.document import Document -from pgcli.packages.function_metadata import FunctionMetadata +from pgcli.packages.function_metadata import FunctionMetadata, ForeignKey metadata = { 'tables': { @@ -37,6 +37,9 @@ metadata = { 'public': ['typ1', 'typ2'], 'custom': ['typ3', 'typ4'], }, + 'foreignkeys': [ + ('public', 'users', 'id', 'custom', 'shipments', 'user_id') + ], } @pytest.fixture @@ -63,11 +66,14 @@ def completer(): for schema, datatypes in metadata['datatypes'].items() for datatype in datatypes] + foreignkeys = [ForeignKey(*fk) for fk in metadata['foreignkeys']] + comp.extend_schemata(schemata) comp.extend_relations(tables, kind='tables') comp.extend_columns(columns, kind='tables') comp.extend_functions(functions) comp.extend_datatypes(datatypes) + comp.extend_foreignkeys(foreignkeys) comp.set_search_path(['public']) return comp @@ -135,6 +141,45 @@ def test_suggested_column_names_from_qualified_shadowed_table(completer, complet list(map(lambda x: Completion(x, display_meta='keyword'), completer.keywords)) ) +@pytest.mark.parametrize('text', [ + 'SELECT * FROM users JOIN custom.shipments ON ', + '''SELECT * + FROM public.users + JOIN custom.shipments ON ''' +]) +def test_suggested_join_conditions(completer, complete_event, text): + position = len(text) + result = set(completer.get_completions( + Document(text=text, cursor_position=position), + complete_event)) + assert set(result) == set([ + Completion(text='users', start_position=0, display_meta='table alias'), + Completion(text='shipments', start_position=0, display_meta='table alias'), + Completion(text='shipments.id = users.id', start_position=0, display_meta='name join'), + Completion(text='shipments.user_id = users.id', start_position=0, display_meta='fk join')]) + +@pytest.mark.parametrize('text', [ + 'SELECT * FROM public.users RIGHT OUTER JOIN ', + '''SELECT * + FROM users + JOIN ''' +]) +def test_suggested_joins(completer, complete_event, text): + position = len(text) + result = set(completer.get_completions( + Document(text=text, cursor_position=position), + complete_event)) + assert set(result) == set([ + Completion(text='custom.shipments ON shipments.user_id = users.id', start_position=0, display_meta='join'), + Completion(text='public', start_position=0, display_meta='schema'), + Completion(text='custom', start_position=0, display_meta='schema'), + Completion(text='"Custom"', start_position=0, display_meta='schema'), + Completion(text='orders', start_position=0, display_meta='table'), + Completion(text='users', start_position=0, display_meta='table'), + Completion(text='"select"', start_position=0, display_meta='table'), + Completion(text='func1', start_position=0, display_meta='function'), + Completion(text='func2', start_position=0, display_meta='function')]) + def test_suggested_column_names_from_schema_qualifed_table(completer, complete_event): """ Suggest column and function names when selecting from a qualified-table diff --git a/tests/test_smart_completion_public_schema_only.py b/tests/test_smart_completion_public_schema_only.py index 4e2048c7..1709dc46 100644 --- a/tests/test_smart_completion_public_schema_only.py +++ b/tests/test_smart_completion_public_schema_only.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals import pytest from prompt_toolkit.completion import Completion from prompt_toolkit.document import Document -from pgcli.packages.function_metadata import FunctionMetadata +from pgcli.packages.function_metadata import FunctionMetadata, ForeignKey metadata = { 'tables': { @@ -20,6 +20,9 @@ metadata = { ['set_returning_func', ['x', 'y'], ['integer', 'integer'], ['o', 'o'], '', False, False, True]], 'datatypes': ['custom_type1', 'custom_type2'], + 'foreignkeys': [ + ('public', 'users', 'id', 'public', 'Users', 'userid') + ], } @pytest.fixture @@ -59,6 +62,10 @@ def completer(): datatypes = [('public', typ) for typ in metadata['datatypes']] comp.extend_datatypes(datatypes) + # fks + foreignkeys = [ForeignKey(*fk) for fk in metadata['foreignkeys']] + comp.extend_foreignkeys(foreignkeys) + comp.set_search_path(['public']) return comp @@ -304,6 +311,52 @@ def test_suggest_columns_after_three_way_join(completer, complete_event): set(result)) @pytest.mark.parametrize('text', [ + 'SELECT * FROM users u JOIN "Users" u2 ON ', + 'SELECT * FROM users u INNER join "Users" u2 ON ', + 'SELECT * FROM USERS u right JOIN "Users" u2 ON ', + 'SELECT * FROM users u LEFT JOIN "Users" u2 ON ', + 'SELECT * FROM Users u FULL JOIN "Users" u2 ON ', + 'SELECT * FROM users u right outer join "Users" u2 ON ', + 'SELECT * FROM Users u LEFT OUTER JOIN "Users" u2 ON ', + 'SELECT * FROM users u FULL OUTER JOIN "Users" u2 ON ', + '''SELECT * + FROM users u + FULL OUTER JOIN "Users" u2 ON ''' +]) +def test_suggested_join_conditions(completer, complete_event, text): + position = len(text) + result = set(completer.get_completions( + Document(text=text, cursor_position=position), + complete_event)) + assert set(result) == set([ + Completion(text='u', start_position=0, display_meta='table alias'), + Completion(text='u2', start_position=0, display_meta='table alias'), + Completion(text='u2.userid = u.id', start_position=0, display_meta='fk join')]) + +@pytest.mark.parametrize('text', [ + 'SELECT * FROM users JOIN ', + '''SELECT * + FROM users + INNER JOIN ''' +]) +def test_suggested_joins(completer, complete_event, text): + position = len(text) + result = set(completer.get_completions( + Document(text=text, cursor_position=position), + complete_event)) + assert set(result) == set([ + Completion(text='"Users" ON "Users".userid = users.id', start_position=0, display_meta='join'), + Completion(text='public', start_position=0, display_meta='schema'), + Completion(text='"Users"', start_position=0, display_meta='table'), + Completion(text='"select"', start_position=0, display_meta='table'), + Completion(text='orders', start_position=0, display_meta='table'), + Completion(text='users', start_position=0, display_meta='table'), + Completion(text='user_emails', start_position=0, display_meta='view'), + Completion(text='custom_func2', start_position=0, display_meta='function'), + Completion(text='set_returning_func', start_position=0, display_meta='function'), + Completion(text='custom_func1', start_position=0, display_meta='function')]) + +@pytest.mark.parametrize('text', [ 'SELECT u.name, o.id FROM users u JOIN orders o ON ', 'SELECT u.name, o.id FROM users u JOIN orders o ON JOIN orders o2 ON' ]) @@ -400,12 +453,15 @@ def test_join_using_suggests_columns_after_first_column(completer, complete_even Completion(text='email', start_position=0, display_meta='column'), ]) -def test_table_names_after_from(completer, complete_event): - text = 'SELECT * FROM ' - position = len('SELECT * FROM ') - result = set(completer.get_completions( +@pytest.mark.parametrize('text', [ + 'SELECT * FROM ', + 'SELECT * FROM users CROSS JOIN ' +]) +def test_table_names_after_from(completer, complete_event, text): + position = len(text) + result = completer.get_completions( Document(text=text, cursor_position=position), - complete_event)) + complete_event) assert set(result) == set([ Completion(text='public', start_position=0, display_meta='schema'), Completion(text='users', start_position=0, display_meta='table'), @@ -417,13 +473,6 @@ def test_table_names_after_from(completer, complete_event): Completion(text='custom_func2', start_position=0, display_meta='function'), Completion(text='set_returning_func', start_position=0, display_meta='function') ]) - -def test_table_names_after_from_are_lexical_ordered_by_text(completer, complete_event): - text = 'SELECT * FROM ' - position = len('SELECT * FROM ') - result = completer.get_completions( - Document(text=text, cursor_position=position), - complete_event) assert [c.text for c in result] == [ '"Users"', 'custom_func1', diff --git a/tests/test_sqlcompletion.py b/tests/test_sqlcompletion.py index bce9fe7a..de95936b 100644 --- a/tests/test_sqlcompletion.py +++ b/tests/test_sqlcompletion.py @@ -111,9 +111,24 @@ def test_suggest_tables_views_schemas_and_functions(expression): @pytest.mark.parametrize('expression', [ - 'SELECT * FROM foo JOIN ', + 'SELECT * FROM foo JOIN bar on bar.barid = foo.barid JOIN ', + 'SELECT * FROM foo JOIN bar USING (barid) JOIN ', ]) -def test_suggest_tables_views_schemas_functions_and_joins(expression): +def test_suggest_after_join_with_two_tables(expression): + suggestions = suggest_type(expression, expression) + assert set(suggestions) == set([ + Table(schema=None), + View(schema=None), + Function(schema=None, filter='for_from_clause'), + Join(((None, 'foo', None, False), (None, 'bar', None, False)), None), + Schema(), + ]) + + +@pytest.mark.parametrize('expression', [ + 'SELECT * FROM foo JOIN ' +]) +def test_suggest_after_join_with_one_table(expression): suggestions = suggest_type(expression, expression) assert set(suggestions) == set([ Table(schema=None), |