summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkoljonen <koljonen@outlook.com>2016-06-10 03:26:42 +0200
committerkoljonen <koljonen@outlook.com>2016-06-10 03:26:42 +0200
commit582852adb690866e6d149ab1bcaddf88cbfb8c69 (patch)
treeff7519c714195a8850618f3ac95ab3a442fd73ba
parent9b6a72e9e00bae4c1247153205e90ba549e2f1ea (diff)
Various changes after review
-rw-r--r--pgcli/packages/sqlcompletion.py5
-rw-r--r--pgcli/pgcompleter.py7
-rw-r--r--tests/test_parseutils.py5
-rw-r--r--tests/test_smart_completion_multiple_schemata.py47
-rw-r--r--tests/test_smart_completion_public_schema_only.py75
-rw-r--r--tests/test_sqlcompletion.py19
6 files changed, 136 insertions, 22 deletions
diff --git a/pgcli/packages/sqlcompletion.py b/pgcli/packages/sqlcompletion.py
index 063bed3d..81ab32cb 100644
--- a/pgcli/packages/sqlcompletion.py
+++ b/pgcli/packages/sqlcompletion.py
@@ -21,7 +21,9 @@ Special = namedtuple('Special', [])
Database = namedtuple('Database', [])
Schema = namedtuple('Schema', [])
Table = namedtuple('Table', ['schema'])
+# JoinConditions are suggested after ON, e.g. 'foo.barid = bar.barid'
JoinCondition = namedtuple('JoinCondition', ['tables', 'parent'])
+# Joins are suggested after JOIN, e.g. 'foo ON foo.barid = bar.barid'
Join = namedtuple('Join', ['tables', 'schema'])
Function = namedtuple('Function', ['schema', 'filter'])
@@ -431,4 +433,5 @@ def _allow_join_suggestion(statement):
return False
last_tok = statement.token_prev(len(statement.tokens))
- return last_tok.value.lower().endswith('join')
+ return (last_tok.value.lower().endswith('join')
+ and last_tok.value.lower() != 'cross join')
diff --git a/pgcli/pgcompleter.py b/pgcli/pgcompleter.py
index b20279e4..7b2a25a1 100644
--- a/pgcli/pgcompleter.py
+++ b/pgcli/pgcompleter.py
@@ -392,10 +392,9 @@ class PGCompleter(Completer):
tblprio = dict((t.ref, n) for n, t in enumerate(suggestion.tables))
joins, prios = [], []
# Iterate over FKs in existing tables to find potential joins
- for fk, rtbl, rcol in ((fk, rtbl, rcol)
- for rtbl, rcols in scoped_cols.items()
- for rcol in rcols
- for fk in rcol.foreignkeys):
+ fks = ((fk, rtbl, rcol) for rtbl, rcols in scoped_cols.items()
+ for rcol in rcols for fk in rcol.foreignkeys)
+ for fk, rtbl, rcol in fks:
if (fk.childschema, fk.childtable, fk.childcolumn) == (
rtbl.schema, rtbl.name, rcol.name):
lsch = fk.parentschema
diff --git a/tests/test_parseutils.py b/tests/test_parseutils.py
index 87a6d32b..a2a2fd15 100644
--- a/tests/test_parseutils.py
+++ b/tests/test_parseutils.py
@@ -149,13 +149,16 @@ def test_subselect_tables():
tables = extract_tables(sql)
assert tables == ((None, 'abc', None, False),)
+@pytest.mark.parametrize('text', ['SELECT * FROM foo.', 'SELECT 123 AS foo'])
+def test_extract_no_tables(text):
+ tables = extract_tables(text)
+ assert tables == tuple()
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo({0})'.format(arg_list))
assert tables == ((None, 'foo', None, True),)
-
@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
def test_simple_schema_qualified_function_as_table(arg_list):
tables = extract_tables('SELECT * FROM foo.bar({0})'.format(arg_list))
diff --git a/tests/test_smart_completion_multiple_schemata.py b/tests/test_smart_completion_multiple_schemata.py
index c107a710..381657de 100644
--- a/tests/test_smart_completion_multiple_schemata.py
+++ b/tests/test_smart_completion_multiple_schemata.py
@@ -2,7 +2,7 @@ from __future__ import unicode_literals
import pytest
from prompt_toolkit.completion import Completion
from prompt_toolkit.document import Document
-from pgcli.packages.function_metadata import FunctionMetadata
+from pgcli.packages.function_metadata import FunctionMetadata, ForeignKey
metadata = {
'tables': {
@@ -37,6 +37,9 @@ metadata = {
'public': ['typ1', 'typ2'],
'custom': ['typ3', 'typ4'],
},
+ 'foreignkeys': [
+ ('public', 'users', 'id', 'custom', 'shipments', 'user_id')
+ ],
}
@pytest.fixture
@@ -63,11 +66,14 @@ def completer():
for schema, datatypes in metadata['datatypes'].items()
for datatype in datatypes]
+ foreignkeys = [ForeignKey(*fk) for fk in metadata['foreignkeys']]
+
comp.extend_schemata(schemata)
comp.extend_relations(tables, kind='tables')
comp.extend_columns(columns, kind='tables')
comp.extend_functions(functions)
comp.extend_datatypes(datatypes)
+ comp.extend_foreignkeys(foreignkeys)
comp.set_search_path(['public'])
return comp
@@ -135,6 +141,45 @@ def test_suggested_column_names_from_qualified_shadowed_table(completer, complet
list(map(lambda x: Completion(x, display_meta='keyword'), completer.keywords))
)
+@pytest.mark.parametrize('text', [
+ 'SELECT * FROM users JOIN custom.shipments ON ',
+ '''SELECT *
+ FROM public.users
+ JOIN custom.shipments ON '''
+])
+def test_suggested_join_conditions(completer, complete_event, text):
+ position = len(text)
+ result = set(completer.get_completions(
+ Document(text=text, cursor_position=position),
+ complete_event))
+ assert set(result) == set([
+ Completion(text='users', start_position=0, display_meta='table alias'),
+ Completion(text='shipments', start_position=0, display_meta='table alias'),
+ Completion(text='shipments.id = users.id', start_position=0, display_meta='name join'),
+ Completion(text='shipments.user_id = users.id', start_position=0, display_meta='fk join')])
+
+@pytest.mark.parametrize('text', [
+ 'SELECT * FROM public.users RIGHT OUTER JOIN ',
+ '''SELECT *
+ FROM users
+ JOIN '''
+])
+def test_suggested_joins(completer, complete_event, text):
+ position = len(text)
+ result = set(completer.get_completions(
+ Document(text=text, cursor_position=position),
+ complete_event))
+ assert set(result) == set([
+ Completion(text='custom.shipments ON shipments.user_id = users.id', start_position=0, display_meta='join'),
+ Completion(text='public', start_position=0, display_meta='schema'),
+ Completion(text='custom', start_position=0, display_meta='schema'),
+ Completion(text='"Custom"', start_position=0, display_meta='schema'),
+ Completion(text='orders', start_position=0, display_meta='table'),
+ Completion(text='users', start_position=0, display_meta='table'),
+ Completion(text='"select"', start_position=0, display_meta='table'),
+ Completion(text='func1', start_position=0, display_meta='function'),
+ Completion(text='func2', start_position=0, display_meta='function')])
+
def test_suggested_column_names_from_schema_qualifed_table(completer, complete_event):
"""
Suggest column and function names when selecting from a qualified-table
diff --git a/tests/test_smart_completion_public_schema_only.py b/tests/test_smart_completion_public_schema_only.py
index 4e2048c7..1709dc46 100644
--- a/tests/test_smart_completion_public_schema_only.py
+++ b/tests/test_smart_completion_public_schema_only.py
@@ -2,7 +2,7 @@ from __future__ import unicode_literals
import pytest
from prompt_toolkit.completion import Completion
from prompt_toolkit.document import Document
-from pgcli.packages.function_metadata import FunctionMetadata
+from pgcli.packages.function_metadata import FunctionMetadata, ForeignKey
metadata = {
'tables': {
@@ -20,6 +20,9 @@ metadata = {
['set_returning_func', ['x', 'y'], ['integer', 'integer'],
['o', 'o'], '', False, False, True]],
'datatypes': ['custom_type1', 'custom_type2'],
+ 'foreignkeys': [
+ ('public', 'users', 'id', 'public', 'Users', 'userid')
+ ],
}
@pytest.fixture
@@ -59,6 +62,10 @@ def completer():
datatypes = [('public', typ) for typ in metadata['datatypes']]
comp.extend_datatypes(datatypes)
+ # fks
+ foreignkeys = [ForeignKey(*fk) for fk in metadata['foreignkeys']]
+ comp.extend_foreignkeys(foreignkeys)
+
comp.set_search_path(['public'])
return comp
@@ -304,6 +311,52 @@ def test_suggest_columns_after_three_way_join(completer, complete_event):
set(result))
@pytest.mark.parametrize('text', [
+ 'SELECT * FROM users u JOIN "Users" u2 ON ',
+ 'SELECT * FROM users u INNER join "Users" u2 ON ',
+ 'SELECT * FROM USERS u right JOIN "Users" u2 ON ',
+ 'SELECT * FROM users u LEFT JOIN "Users" u2 ON ',
+ 'SELECT * FROM Users u FULL JOIN "Users" u2 ON ',
+ 'SELECT * FROM users u right outer join "Users" u2 ON ',
+ 'SELECT * FROM Users u LEFT OUTER JOIN "Users" u2 ON ',
+ 'SELECT * FROM users u FULL OUTER JOIN "Users" u2 ON ',
+ '''SELECT *
+ FROM users u
+ FULL OUTER JOIN "Users" u2 ON '''
+])
+def test_suggested_join_conditions(completer, complete_event, text):
+ position = len(text)
+ result = set(completer.get_completions(
+ Document(text=text, cursor_position=position),
+ complete_event))
+ assert set(result) == set([
+ Completion(text='u', start_position=0, display_meta='table alias'),
+ Completion(text='u2', start_position=0, display_meta='table alias'),
+ Completion(text='u2.userid = u.id', start_position=0, display_meta='fk join')])
+
+@pytest.mark.parametrize('text', [
+ 'SELECT * FROM users JOIN ',
+ '''SELECT *
+ FROM users
+ INNER JOIN '''
+])
+def test_suggested_joins(completer, complete_event, text):
+ position = len(text)
+ result = set(completer.get_completions(
+ Document(text=text, cursor_position=position),
+ complete_event))
+ assert set(result) == set([
+ Completion(text='"Users" ON "Users".userid = users.id', start_position=0, display_meta='join'),
+ Completion(text='public', start_position=0, display_meta='schema'),
+ Completion(text='"Users"', start_position=0, display_meta='table'),
+ Completion(text='"select"', start_position=0, display_meta='table'),
+ Completion(text='orders', start_position=0, display_meta='table'),
+ Completion(text='users', start_position=0, display_meta='table'),
+ Completion(text='user_emails', start_position=0, display_meta='view'),
+ Completion(text='custom_func2', start_position=0, display_meta='function'),
+ Completion(text='set_returning_func', start_position=0, display_meta='function'),
+ Completion(text='custom_func1', start_position=0, display_meta='function')])
+
+@pytest.mark.parametrize('text', [
'SELECT u.name, o.id FROM users u JOIN orders o ON ',
'SELECT u.name, o.id FROM users u JOIN orders o ON JOIN orders o2 ON'
])
@@ -400,12 +453,15 @@ def test_join_using_suggests_columns_after_first_column(completer, complete_even
Completion(text='email', start_position=0, display_meta='column'),
])
-def test_table_names_after_from(completer, complete_event):
- text = 'SELECT * FROM '
- position = len('SELECT * FROM ')
- result = set(completer.get_completions(
+@pytest.mark.parametrize('text', [
+ 'SELECT * FROM ',
+ 'SELECT * FROM users CROSS JOIN '
+])
+def test_table_names_after_from(completer, complete_event, text):
+ position = len(text)
+ result = completer.get_completions(
Document(text=text, cursor_position=position),
- complete_event))
+ complete_event)
assert set(result) == set([
Completion(text='public', start_position=0, display_meta='schema'),
Completion(text='users', start_position=0, display_meta='table'),
@@ -417,13 +473,6 @@ def test_table_names_after_from(completer, complete_event):
Completion(text='custom_func2', start_position=0, display_meta='function'),
Completion(text='set_returning_func', start_position=0, display_meta='function')
])
-
-def test_table_names_after_from_are_lexical_ordered_by_text(completer, complete_event):
- text = 'SELECT * FROM '
- position = len('SELECT * FROM ')
- result = completer.get_completions(
- Document(text=text, cursor_position=position),
- complete_event)
assert [c.text for c in result] == [
'"Users"',
'custom_func1',
diff --git a/tests/test_sqlcompletion.py b/tests/test_sqlcompletion.py
index bce9fe7a..de95936b 100644
--- a/tests/test_sqlcompletion.py
+++ b/tests/test_sqlcompletion.py
@@ -111,9 +111,24 @@ def test_suggest_tables_views_schemas_and_functions(expression):
@pytest.mark.parametrize('expression', [
- 'SELECT * FROM foo JOIN ',
+ 'SELECT * FROM foo JOIN bar on bar.barid = foo.barid JOIN ',
+ 'SELECT * FROM foo JOIN bar USING (barid) JOIN ',
])
-def test_suggest_tables_views_schemas_functions_and_joins(expression):
+def test_suggest_after_join_with_two_tables(expression):
+ suggestions = suggest_type(expression, expression)
+ assert set(suggestions) == set([
+ Table(schema=None),
+ View(schema=None),
+ Function(schema=None, filter='for_from_clause'),
+ Join(((None, 'foo', None, False), (None, 'bar', None, False)), None),
+ Schema(),
+ ])
+
+
+@pytest.mark.parametrize('expression', [
+ 'SELECT * FROM foo JOIN '
+])
+def test_suggest_after_join_with_one_table(expression):
suggestions = suggest_type(expression, expression)
assert set(suggestions) == set([
Table(schema=None),