from __future__ import unicode_literals, print_function
from metadata import (MetaData, alias, name_join, fk_join, join, keyword,
schema, table, view, function, column, wildcard_expansion,
get_result, result_set, qual, no_qual, parametrize)
from prompt_toolkit.completion import Completion
metadata = {
'tables': {
'users': ['id', 'parentid', 'email', 'first_name', 'last_name'],
'Users': ['userid', 'username'],
'orders': ['id', 'ordered_date', 'status', 'email'],
'select': ['id', 'insert', 'ABC']},
'views': {
'user_emails': ['id', 'email'],
'functions': ['function'],
},
'functions': [
['custom_fun', [], [], [], '', False, False, False],
['_custom_fun', [], [], [], '', False, False, False],
['custom_func1', [], [], [], '', False, False, False],
['custom_func2', [], [], [], '', False, False, False],
['set_returning_func', ['x', 'y'], ['integer', 'integer'],
['b', 'b'], '', False, False, True]],
'datatypes': ['custom_type1', 'custom_type2'],
'foreignkeys': [
('public', 'users', 'id', 'public', 'users', 'parentid'),
('public', 'users', 'id', 'public', 'Users', 'userid')
],
}
metadata = dict((k, {'public': v}) for k, v in metadata.items())
testdata = MetaData(metadata)
cased_users_col_names = ['ID', 'PARENTID', 'Email', 'First_Name', 'last_name']
cased_users2_col_names = ['UserID', 'UserName']
cased_func_names = [
'Custom_Fun', '_custom_fun', 'Custom_Func1', 'custom_func2', 'set_returning_func'
]
cased_tbls = ['Users', 'Orders']
cased_views = ['User_Emails', 'Functions']
casing = (
['SELECT', 'PUBLIC'] + cased_func_names + cased_tbls + cased_views
+ cased_users_col_names + cased_users2_col_names
)
# Lists for use in assertions
cased_funcs = [
function(f) for f in ('Custom_Fun()', '_custom_fun()', 'Custom_Func1()', 'custom_func2()')
] + [function('set_returning_func(x := , y := )', display='set_returning_func(x, y)')]
cased_tbls = [table(t) for t in (cased_tbls + ['"Users"', '"select"'])]
cased_rels = [view(t) for t in cased_views] + cased_funcs + cased_tbls
cased_users_cols = [column(c) for c in cased_users_col_names]
aliased_rels = [
table(t) for t in ('users u', '"Users" U', 'orders o', '"select" s')
] + [view('user_emails ue'), view('functions f')] + [
function(f) for f in (
'_custom_fun() cf', 'custom_fun() cf', 'custom_func1() cf',
'custom_func2() cf'
)
] + [function(
'set_returning_func(x := , y := ) srf',
display='set_returning_func(x, y) srf'
)]
cased_aliased_rels = [
table(t) for t in ('Users U', '"Users" U', 'Orders O', '"select" s')
] + [view('User_Emails UE'), view('Functions F')] + [
function(f) for f in (
'_custom_fun() cf', 'Custom_Fun() CF', 'Custom_Func1() CF', 'custom_func2() cf'
)
] + [function(
'set_returning_func(x := , y := ) srf',
display='set_returning_func(x, y) srf'
)]
completers = testdata.get_completers(casing)
# Just to make sure that this doesn't crash
@parametrize('completer', completers())
def test_function_column_name(completer):
for l in range(
len('SELECT * FROM Functions WHERE function:'),
len('SELECT * FROM Functions WHERE function:text') + 1
):
assert [] == get_result(
completer, 'SELECT * FROM Functions WHERE function:text'[:l]
)
@parametrize('action', ['ALTER', 'DROP', 'CREATE', 'CREATE OR REPLACE'])
@parametrize('completer', completers())
def test_drop_alter_function(completer, action):
assert get_result(completer, action + ' FUNCTION set_ret') == [
function('set_returning_func(x integer, y integer)', -len('set_ret'))
]
@parametrize('completer', completers())
def test_empty_string_completion(completer):
result = result_set(completer, '')
assert set(testdata.keywords() + testdata.specials()) == result
@parametrize('completer', completers())
def test_select_keyword_completion(completer):
result = result_set(completer, 'SEL')
assert result == set([keyword('SELECT', -3)])
@parametrize('completer', completers())
def test_builtin_function_name_completion(completer):
result = result_set(completer, 'SELECT MA')
assert result == set([
function('MAX', -2),
keyword('MAXEXTENTS', -2), keyword('MATERIALIZED VIEW', -2)
])
@parametrize('completer', completers())
def test_builtin_function_matches_only_at_start(completer):
text = 'SELECT IN'
result = [c.text for c in get_result(completer, text)]
assert 'MIN' not in result
@parametrize('completer', completers(casing=False, aliasing=False))
def test_user_function_name_completion(completer):
result = result_set(completer, 'SELECT cu')
assert result == set([
function('custom_fun()', -2),
function