summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpgcli/main.py1
-rw-r--r--pgcli/packages/sqlcompletion.py16
-rw-r--r--pgcli/pgclirc4
-rw-r--r--pgcli/pgcompleter.py47
-rw-r--r--tests/test_smart_completion_public_schema_only.py80
-rw-r--r--tests/test_sqlcompletion.py21
6 files changed, 132 insertions, 37 deletions
diff --git a/pgcli/main.py b/pgcli/main.py
index 07417b8b..cfe14318 100755
--- a/pgcli/main.py
+++ b/pgcli/main.py
@@ -151,6 +151,7 @@ class PGCli(object):
'generate_casing_file': c['main'].as_bool('generate_casing_file'),
'generate_aliases': c['main'].as_bool('generate_aliases'),
'asterisk_column_order': c['main']['asterisk_column_order'],
+ 'qualify_columns': c['main']['qualify_columns'],
'single_connection': single_connection,
'keyword_casing': keyword_casing,
}
diff --git a/pgcli/packages/sqlcompletion.py b/pgcli/packages/sqlcompletion.py
index 1e537a22..0b944fd0 100644
--- a/pgcli/packages/sqlcompletion.py
+++ b/pgcli/packages/sqlcompletion.py
@@ -41,8 +41,10 @@ View.__new__.__defaults__ = (None, tuple())
FromClauseItem.__new__.__defaults__ = (None, tuple(), tuple())
Column = namedtuple(
- 'Column', ['table_refs', 'require_last_table', 'local_tables'])
-Column.__new__.__defaults__ = (None, None, tuple())
+ 'Column',
+ ['table_refs', 'require_last_table', 'local_tables', 'qualifiable']
+)
+Column.__new__.__defaults__ = (None, None, tuple(), False)
Keyword = namedtuple('Keyword', [])
NamedQuery = namedtuple('NamedQuery', [])
@@ -340,10 +342,13 @@ def suggest_based_on_last_token(token, stmt):
return (Column(table_refs=stmt.get_tables('insert')),)
# We're probably in a function argument list
return (Column(table_refs=extract_tables(stmt.full_text),
- local_tables=stmt.local_tables),)
- elif token_v in ('set', 'by', 'distinct'):
+ local_tables=stmt.local_tables, qualifiable=True),)
+ elif token_v == 'set':
return (Column(table_refs=stmt.get_tables(),
local_tables=stmt.local_tables),)
+ elif token_v in ('by', 'distinct'):
+ return (Column(table_refs=stmt.get_tables(),
+ local_tables=stmt.local_tables, qualifiable=True),)
elif token_v in ('select', 'where', 'having'):
# Check for a table alias or schema qualification
parent = (stmt.identifier and stmt.identifier.get_parent_name()) or []
@@ -355,7 +360,8 @@ def suggest_based_on_last_token(token, stmt):
View(schema=parent),
Function(schema=parent),)
else:
- return (Column(table_refs=tables, local_tables=stmt.local_tables),
+ return (Column(table_refs=tables, local_tables=stmt.local_tables,
+ qualifiable=True),
Function(schema=None),
Keyword(),)
elif token_v == 'as':
diff --git a/pgcli/pgclirc b/pgcli/pgclirc
index 3ff831c4..cc658b66 100644
--- a/pgcli/pgclirc
+++ b/pgcli/pgclirc
@@ -58,6 +58,10 @@ log_level = INFO
# Possible values: "table_order" and "alphabetic"
asterisk_column_order = table_order
+# Whether to qualify with table alias/name when suggesting columns
+# Possible values: "always", never" and "if_more_than_one_table"
+qualify_columns = if_more_than_one_table
+
# Default pager.
# By default 'PAGER' environment variable is used
# pager = less -SRXF
diff --git a/pgcli/pgcompleter.py b/pgcli/pgcompleter.py
index 8c985a8a..cfcebff3 100644
--- a/pgcli/pgcompleter.py
+++ b/pgcli/pgcompleter.py
@@ -1,7 +1,7 @@
from __future__ import print_function, unicode_literals
import logging
import re
-from itertools import count, repeat, chain
+from itertools import count, repeat
import operator
from collections import namedtuple, defaultdict
from pgspecial.namedqueries import NamedQueries
@@ -30,7 +30,10 @@ NamedQueries.instance = NamedQueries.from_config(
Match = namedtuple('Match', ['completion', 'priority'])
-Candidate = namedtuple('Candidate', ['completion', 'priority', 'meta'])
+
+_Candidate = namedtuple('Candidate', ['completion', 'priority', 'meta', 'synonyms'])
+def Candidate(completion, priority, meta, synonyms = None):
+ return _Candidate(completion, priority, meta, synonyms or [completion])
normalize_ref = lambda ref: ref if ref[0] == '"' else '"' + ref.lower() + '"'
@@ -58,6 +61,8 @@ class PGCompleter(Completer):
self.generate_aliases = settings.get('generate_aliases')
self.casing_file = settings.get('casing_file')
self.generate_casing_file = settings.get('generate_casing_file')
+ self.qualify_columns = settings.get(
+ 'qualify_columns', 'if_more_than_one_table')
self.asterisk_column_order = settings.get(
'asterisk_column_order', 'table_order')
@@ -304,15 +309,16 @@ class PGCompleter(Completer):
matches = []
for cand in collection:
- if isinstance(cand, Candidate):
- item, prio, met = cand
+ if isinstance(cand, _Candidate):
+ item, prio, display_meta, synonyms = cand
else:
- item, met, prio = cand, meta, 0
- sort_key = _match(item)
+ item, display_meta, prio, synonyms = cand, meta, 0, [cand]
+
+ sort_key = max(_match(x) for x in synonyms)
if sort_key:
- if meta and len(meta) > 50:
+ if display_meta and len(display_meta) > 50:
# Truncate meta-text to 50 characters, if necessary
- met = met[:47] + u'...'
+ display_meta = display_meta[:47] + u'...'
# Lexical order of items in the collection, used for
# tiebreaking items with the same match group length and start
@@ -331,7 +337,8 @@ class PGCompleter(Completer):
priority = sort_key, type_priority, prio, priority_func(item), lexical_priority
matches.append(Match(
- completion=Completion(item, -text_len, display_meta=met),
+ completion=Completion(item, -text_len,
+ display_meta=display_meta),
priority=priority))
return matches
@@ -373,12 +380,20 @@ class PGCompleter(Completer):
def get_column_matches(self, suggestion, word_before_cursor):
tables = suggestion.table_refs
+ do_qualify = suggestion.qualifiable and {'always': True, 'never': False,
+ 'if_more_than_one_table': len(tables) > 1}[self.qualify_columns]
+ qualify = lambda col, tbl: (
+ (tbl + '.' + self.case(col)) if do_qualify else self.case(col))
_logger.debug("Completion column scope: %r", tables)
scoped_cols = self.populate_scoped_cols(tables, suggestion.local_tables)
colit = scoped_cols.items
- flat_cols = list(chain(*((c.name for c in cols)
- for t, cols in colit())))
+ def make_cand(name, ref):
+ return Candidate(qualify(name, ref), 0, 'column', [name])
+ flat_cols = []
+ for t, cols in colit():
+ for c in cols:
+ flat_cols.append(make_cand(c.name, t.ref))
if suggestion.require_last_table:
# require_last_table is used for 'tb11 JOIN tbl2 USING (...' which should
# suggest only columns that appear in the last table and one more
@@ -397,14 +412,10 @@ class PGCompleter(Completer):
# User typed x.*; replicate "x." for all columns except the
# first, which gets the original (as we only replace the "*"")
sep = ', ' + word_before_cursor[:-1]
- collist = sep.join(self.case(c) for c in flat_cols)
- elif len(scoped_cols) > 1:
- # Multiple tables; qualify all columns
- collist = ', '.join(t.ref + '.' + self.case(c.name)
- for t, cs in colit() for c in cs)
+ collist = sep.join(self.case(c.completion) for c in flat_cols)
else:
- # Plain columns
- collist = ', '.join(self.case(c) for c in flat_cols)
+ collist = ', '.join(qualify(c.name, t.ref)
+ for t, cs in colit() for c in cs)
return [Match(completion=Completion(collist, -1,
display_meta='columns', display='*'), priority=(1,1,1))]
diff --git a/tests/test_smart_completion_public_schema_only.py b/tests/test_smart_completion_public_schema_only.py
index dab64757..6c7c7b43 100644
--- a/tests/test_smart_completion_public_schema_only.py
+++ b/tests/test_smart_completion_public_schema_only.py
@@ -31,19 +31,19 @@ metadata = dict((k, {'public': v}) for k, v in metadata.items())
testdata = MetaData(metadata)
-cased_users_cols = ['ID', 'PARENTID', 'Email', 'First_Name', 'last_name']
-cased_users2_cols = ['UserID', 'UserName']
+cased_users_col_names = ['ID', 'PARENTID', 'Email', 'First_Name', 'last_name']
+cased_users2_col_names = ['UserID', 'UserName']
cased_funcs = ['Custom_Fun', '_custom_fun', 'Custom_Func1',
'custom_func2', 'set_returning_func']
cased_tbls = ['Users', 'Orders']
cased_views = ['User_Emails']
casing = (['SELECT', 'PUBLIC'] + cased_funcs + cased_tbls + cased_views
- + cased_users_cols + cased_users2_cols)
+ + cased_users_col_names + cased_users2_col_names)
# Lists for use in assertions
cased_funcs = [function(f + '()') for f in cased_funcs]
cased_tbls = [table(t) for t in (cased_tbls + ['"Users"', '"select"'])]
cased_rels = [view(t) for t in cased_views] + cased_funcs + cased_tbls
-cased_users_cols = [column(c) for c in cased_users_cols]
+cased_users_cols = [column(c) for c in cased_users_col_names]
aliased_rels = [table(t) for t in ('users u', '"Users" U', 'orders o',
'"select" s')] + [view('user_emails ue')] + [function(f) for f in (
'_custom_fun() cf', 'custom_fun() cf', 'custom_func1() cf',
@@ -67,10 +67,19 @@ def aliased_completer():
return testdata.get_completer({'generate_aliases': True})
@pytest.fixture
-def cased_aliased_completer(request):
+def cased_aliased_completer():
return testdata.get_completer({'generate_aliases': True}, casing)
@pytest.fixture
+def cased_always_qualifying_completer():
+ return testdata.get_completer({'qualify_columns': 'always'}, casing)
+
+@pytest.fixture
+def auto_qualifying_completer():
+ return testdata.get_completer({'qualify_columns': 'if_more_than_one_table'})
+
+
+@pytest.fixture
def complete_event():
from mock import Mock
return Mock()
@@ -175,6 +184,67 @@ def test_suggested_cased_column_names(cased_completer, complete_event):
+ testdata.builtin_functions() + testdata.keywords())
+@pytest.mark.parametrize('text', [
+ 'SELECT from users',
+ 'INSERT INTO Orders SELECT from users',
+])
+def test_suggested_auto_qualified_column_names(
+ text, auto_qualifying_completer, complete_event
+):
+ pos = text.index(' ') + 1
+ cols = [column(c.lower()) for c in cased_users_col_names]
+ result = set(auto_qualifying_completer.get_completions(
+ Document(text=text, cursor_position=pos),
+ complete_event))
+ assert set(result) == set(testdata.functions() + cols
+ + testdata.builtin_functions() + testdata.keywords())
+
+
+@pytest.mark.parametrize('text', [
+ 'SELECT from users U NATURAL JOIN "Users"',
+ 'INSERT INTO Orders SELECT from users U NATURAL JOIN "Users"',
+])
+def test_suggested_auto_qualified_column_names_two_tables(
+ text, auto_qualifying_completer, complete_event
+):
+ pos = text.index(' ') + 1
+ cols = [column('U.' + c.lower()) for c in cased_users_col_names]
+ cols += [column('"Users".' + c.lower()) for c in cased_users2_col_names]
+ result = set(auto_qualifying_completer.get_completions(
+ Document(text=text, cursor_position=pos),
+ complete_event))
+ assert set(result) == set(testdata.functions() + cols
+ + testdata.builtin_functions() + testdata.keywords())
+
+
+@pytest.mark.parametrize('text', [
+ 'UPDATE users SET ',
+ 'INSERT INTO users(',
+])
+def test_no_column_qualification(
+ text, cased_always_qualifying_completer, complete_event
+):
+ pos = len(text)
+ cols = [column(c) for c in cased_users_col_names]
+ result = set(cased_always_qualifying_completer.get_completions(
+ Document(text=text, cursor_position=pos),
+ complete_event))
+ assert set(result) == set(cols)
+
+
+def test_suggested_cased_always_qualified_column_names(
+ cased_always_qualifying_completer, complete_event
+):
+ text = 'SELECT from users'
+ position = len('SELECT ')
+ cols = [column('users.' + c) for c in cased_users_col_names]
+ result = set(cased_always_qualifying_completer.get_completions(
+ Document(text=text, cursor_position=position),
+ complete_event))
+ assert set(result) == set(cased_funcs + cols
+ + testdata.builtin_functions() + testdata.keywords())
+
+
def test_suggested_column_names_in_function(completer, complete_event):
"""
Suggest column and function names when selecting multiple
diff --git a/tests/test_sqlcompletion.py b/tests/test_sqlcompletion.py
index 9faea204..a3559ade 100644
--- a/tests/test_sqlcompletion.py
+++ b/tests/test_sqlcompletion.py
@@ -1,12 +1,14 @@
from pgcli.packages.sqlcompletion import (
suggest_type, Special, Database, Schema, Table, Column, View, Keyword,
FromClauseItem, Function, Datatype, Alias, JoinCondition, Join)
+from pgcli.packages.parseutils.tables import TableReference
import pytest
# Returns the expected select-clause suggestions for a single-table select
def cols_etc(table, schema=None, alias=None, is_function=False, parent=None):
return set([
- Column(table_refs=((schema, table, alias, is_function),)),
+ Column(table_refs=(TableReference(schema, table, alias, is_function),),
+ qualifiable=True),
Function(schema=parent),
Keyword()])
@@ -79,13 +81,13 @@ def test_where_equals_any_suggests_columns_or_keywords():
def test_lparen_suggests_cols():
suggestion = suggest_type('SELECT MAX( FROM tbl', 'SELECT MAX(')
assert set(suggestion) == set([
- Column(table_refs=((None, 'tbl', None, False),))])
+ Column(table_refs=((None, 'tbl', None, False),), qualifiable=True)])
def test_select_suggests_cols_and_funcs():
suggestions = suggest_type('SELECT ', 'SELECT ')
assert set(suggestions) == set([
- Column(table_refs=()),
+ Column(table_refs=(), qualifiable=True),
Function(schema=None),
Keyword(),
])
@@ -211,13 +213,13 @@ def test_truncate_suggests_qualified_tables():
])
def test_distinct_suggests_cols(text):
suggestions = suggest_type(text, text)
- assert suggestions ==(Column(table_refs=()),)
+ assert suggestions ==(Column(table_refs=(), qualifiable=True),)
def test_col_comma_suggests_cols():
suggestions = suggest_type('SELECT a, b, FROM tbl', 'SELECT a, b,')
assert set(suggestions) == set([
- Column(table_refs=((None, 'tbl', None, False),)),
+ Column(table_refs=((None, 'tbl', None, False),), qualifiable=True),
Function(schema=None),
Keyword(),
])
@@ -377,7 +379,7 @@ def test_sub_select_col_name_completion():
suggestions = suggest_type('SELECT * FROM (SELECT FROM abc',
'SELECT * FROM (SELECT ')
assert set(suggestions) == set([
- Column(table_refs=((None, 'abc', None, False),)),
+ Column(table_refs=((None, 'abc', None, False),), qualifiable=True),
Function(schema=None),
Keyword(),
])
@@ -541,7 +543,7 @@ def test_2_statements_2nd_current():
suggestions = suggest_type('select * from a; select from b',
'select * from a; select ')
assert set(suggestions) == set([
- Column(table_refs=((None, 'b', None, False),)),
+ Column(table_refs=((None, 'b', None, False),), qualifiable=True),
Function(schema=None),
Keyword()
])
@@ -698,7 +700,7 @@ def test_suggest_where_keyword(text):
@pytest.mark.parametrize('text, before, expected', [
('\\ns abc SELECT ', 'SELECT ', [
- Column(table_refs=()),
+ Column(table_refs=(), qualifiable=True),
Function(schema=None),
Keyword()
]),
@@ -754,7 +756,8 @@ def test_column_keyword_suggests_columns(sql):
def test_handle_unrecognized_kw_generously():
sql = 'SELECT * FROM sessions WHERE session = 1 AND '
suggestions = suggest_type(sql, sql)
- expected = Column(table_refs=((None, 'sessions', None, False),))
+ expected = Column(table_refs=((None, 'sessions', None, False),),
+ qualifiable=True)
assert expected in set(suggestions)