summaryrefslogtreecommitdiffstats
path: root/tests/parseutils
diff options
context:
space:
mode:
authorDarik Gamble <darik.gamble.spam@gmail.com>2016-07-22 07:24:21 -0400
committerDarik Gamble <darik.gamble.spam@gmail.com>2016-07-27 15:33:58 -0400
commita52ee78fcb41b4ab1fc17d0dd05f1a8cb6da4b92 (patch)
treeb7bc6c44931afcafead8f63f59ff7c34be266d29 /tests/parseutils
parent4d84c55df3652f53c01f1789a18575dc5c54767c (diff)
Move parseutils tests into test subfolder
Diffstat (limited to 'tests/parseutils')
-rw-r--r--tests/parseutils/test_function_metadata.py16
-rw-r--r--tests/parseutils/test_parseutils.py242
2 files changed, 258 insertions, 0 deletions
diff --git a/tests/parseutils/test_function_metadata.py b/tests/parseutils/test_function_metadata.py
new file mode 100644
index 00000000..097ce62d
--- /dev/null
+++ b/tests/parseutils/test_function_metadata.py
@@ -0,0 +1,16 @@
+from pgcli.packages.parseutils.meta import FunctionMetadata
+
+
+def test_function_metadata_eq():
+ f1 = FunctionMetadata('s', 'f', ['x'], ['integer'], [], 'int', False,
+ False, False)
+ f2 = FunctionMetadata('s', 'f', ['x'], ['integer'], [], 'int', False,
+ False, False)
+ f3 = FunctionMetadata('s', 'g', ['x'], ['integer'], [], 'int', False,
+ False, False)
+ assert f1 == f2
+ assert f1 != f3
+ assert not (f1 != f2)
+ assert not (f1 == f3)
+ assert hash(f1) == hash(f2)
+ assert hash(f1) != hash(f3)
diff --git a/tests/parseutils/test_parseutils.py b/tests/parseutils/test_parseutils.py
new file mode 100644
index 00000000..6c2994f8
--- /dev/null
+++ b/tests/parseutils/test_parseutils.py
@@ -0,0 +1,242 @@
+import pytest
+from pgcli.packages.parseutils.tables import extract_tables
+from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote
+
+def test_empty_string():
+ tables = extract_tables('')
+ assert tables == ()
+
+def test_simple_select_single_table():
+ tables = extract_tables('select * from abc')
+ assert tables == ((None, 'abc', None, False),)
+
+
+@pytest.mark.parametrize('sql', [
+ 'select * from "abc"."def"',
+ 'select * from abc."def"',
+])
+def test_simple_select_single_table_schema_qualified_quoted_table(sql):
+ tables = extract_tables(sql)
+ assert tables == (('abc', 'def', '"def"', False),)
+
+
+@pytest.mark.parametrize('sql', [
+ 'select * from abc.def',
+ 'select * from "abc".def',
+])
+def test_simple_select_single_table_schema_qualified(sql):
+ tables = extract_tables(sql)
+ assert tables == (('abc', 'def', None, False),)
+
+
+def test_simple_select_single_table_double_quoted():
+ tables = extract_tables('select * from "Abc"')
+ assert tables == ((None, 'Abc', None, False),)
+
+def test_simple_select_multiple_tables():
+ tables = extract_tables('select * from abc, def')
+ assert set(tables) == set([(None, 'abc', None, False),
+ (None, 'def', None, False)])
+
+def test_simple_select_multiple_tables_double_quoted():
+ tables = extract_tables('select * from "Abc", "Def"')
+ assert set(tables) == set([(None, 'Abc', None, False),
+ (None, 'Def', None, False)])
+
+def test_simple_select_single_table_deouble_quoted_aliased():
+ tables = extract_tables('select * from "Abc" a')
+ assert tables == ((None, 'Abc', 'a', False),)
+
+def test_simple_select_multiple_tables_deouble_quoted_aliased():
+ tables = extract_tables('select * from "Abc" a, "Def" d')
+ assert set(tables) == set([(None, 'Abc', 'a', False),
+ (None, 'Def', 'd', False)])
+
+def test_simple_select_multiple_tables_schema_qualified():
+ tables = extract_tables('select * from abc.def, ghi.jkl')
+ assert set(tables) == set([('abc', 'def', None, False),
+ ('ghi', 'jkl', None, False)])
+
+def test_simple_select_with_cols_single_table():
+ tables = extract_tables('select a,b from abc')
+ assert tables == ((None, 'abc', None, False),)
+
+def test_simple_select_with_cols_single_table_schema_qualified():
+ tables = extract_tables('select a,b from abc.def')
+ assert tables == (('abc', 'def', None, False),)
+
+def test_simple_select_with_cols_multiple_tables():
+ tables = extract_tables('select a,b from abc, def')
+ assert set(tables) == set([(None, 'abc', None, False),
+ (None, 'def', None, False)])
+
+def test_simple_select_with_cols_multiple_qualified_tables():
+ tables = extract_tables('select a,b from abc.def, def.ghi')
+ assert set(tables) == set([('abc', 'def', None, False),
+ ('def', 'ghi', None, False)])
+
+def test_select_with_hanging_comma_single_table():
+ tables = extract_tables('select a, from abc')
+ assert tables == ((None, 'abc', None, False),)
+
+def test_select_with_hanging_comma_multiple_tables():
+ tables = extract_tables('select a, from abc, def')
+ assert set(tables) == set([(None, 'abc', None, False),
+ (None, 'def', None, False)])
+
+def test_select_with_hanging_period_multiple_tables():
+ tables = extract_tables('SELECT t1. FROM tabl1 t1, tabl2 t2')
+ assert set(tables) == set([(None, 'tabl1', 't1', False),
+ (None, 'tabl2', 't2', False)])
+
+def test_simple_insert_single_table():
+ tables = extract_tables('insert into abc (id, name) values (1, "def")')
+
+ # sqlparse mistakenly assigns an alias to the table
+ # AND mistakenly identifies the field list as
+ # assert tables == ((None, 'abc', 'abc', False),)
+
+ assert tables == ((None, 'abc', 'abc', False),)
+
+@pytest.mark.xfail
+def test_simple_insert_single_table_schema_qualified():
+ tables = extract_tables('insert into abc.def (id, name) values (1, "def")')
+ assert tables == (('abc', 'def', None, False),)
+
+def test_simple_update_table_no_schema():
+ tables = extract_tables('update abc set id = 1')
+ assert tables == ((None, 'abc', None, False),)
+
+def test_simple_update_table_with_schema():
+ tables = extract_tables('update abc.def set id = 1')
+ assert tables == (('abc', 'def', None, False),)
+
+@pytest.mark.parametrize('join_type', ['', 'INNER', 'LEFT', 'RIGHT OUTER'])
+def test_join_table(join_type):
+ sql = 'SELECT * FROM abc a {0} JOIN def d ON a.id = d.num'.format(join_type)
+ tables = extract_tables(sql)
+ assert set(tables) == set([(None, 'abc', 'a', False),
+ (None, 'def', 'd', False)])
+
+def test_join_table_schema_qualified():
+ tables = extract_tables('SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num')
+ assert set(tables) == set([('abc', 'def', 'x', False),
+ ('ghi', 'jkl', 'y', False)])
+
+
+def test_incomplete_join_clause():
+ sql = '''select a.x, b.y
+ from abc a join bcd b
+ on a.id = '''
+ tables = extract_tables(sql)
+ assert tables == ((None, 'abc', 'a', False),
+ (None, 'bcd', 'b', False))
+
+
+def test_join_as_table():
+ tables = extract_tables('SELECT * FROM my_table AS m WHERE m.a > 5')
+ assert tables == ((None, 'my_table', 'm', False),)
+
+
+def test_multiple_joins():
+ sql = '''select * from t1
+ inner join t2 ON
+ t1.id = t2.t1_id
+ inner join t3 ON
+ t2.id = t3.'''
+ tables = extract_tables(sql)
+ assert tables == (
+ (None, 't1', None, False),
+ (None, 't2', None, False),
+ (None, 't3', None, False))
+
+
+def test_subselect_tables():
+ sql = 'SELECT * FROM (SELECT FROM abc'
+ tables = extract_tables(sql)
+ assert tables == ((None, 'abc', None, False),)
+
+@pytest.mark.parametrize('text', ['SELECT * FROM foo.', 'SELECT 123 AS foo'])
+def test_extract_no_tables(text):
+ tables = extract_tables(text)
+ assert tables == tuple()
+
+@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
+def test_simple_function_as_table(arg_list):
+ tables = extract_tables('SELECT * FROM foo({0})'.format(arg_list))
+ assert tables == ((None, 'foo', None, True),)
+
+@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
+def test_simple_schema_qualified_function_as_table(arg_list):
+ tables = extract_tables('SELECT * FROM foo.bar({0})'.format(arg_list))
+ assert tables == (('foo', 'bar', None, True),)
+
+@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
+def test_simple_aliased_function_as_table(arg_list):
+ tables = extract_tables('SELECT * FROM foo({0}) bar'.format(arg_list))
+ assert tables == ((None, 'foo', 'bar', True),)
+
+
+def test_simple_table_and_function():
+ tables = extract_tables('SELECT * FROM foo JOIN bar()')
+ assert set(tables) == set([(None, 'foo', None, False),
+ (None, 'bar', None, True)])
+
+def test_complex_table_and_function():
+ tables = extract_tables('''SELECT * FROM foo.bar baz
+ JOIN bar.qux(x, y, z) quux''')
+ assert set(tables) == set([('foo', 'bar', 'baz', False),
+ ('bar', 'qux', 'quux', True)])
+
+
+
+def test_find_prev_keyword_using():
+ q = 'select * from tbl1 inner join tbl2 using (col1, '
+ kw, q2 = find_prev_keyword(q)
+ assert kw.value == '(' and q2 == 'select * from tbl1 inner join tbl2 using ('
+
+@pytest.mark.parametrize('sql', [
+ 'select * from foo where bar',
+ 'select * from foo where bar = 1 and baz or ',
+ 'select * from foo where bar = 1 and baz between qux and ',
+])
+def test_find_prev_keyword_where(sql):
+ kw, stripped = find_prev_keyword(sql)
+ assert kw.value == 'where' and stripped == 'select * from foo where'
+
+
+@pytest.mark.parametrize('sql', [
+ 'create table foo (bar int, baz ',
+ 'select * from foo() as bar (baz '
+])
+def test_find_prev_keyword_open_parens(sql):
+ kw, _ = find_prev_keyword(sql)
+ assert kw.value == '('
+
+
+@pytest.mark.parametrize('sql', [
+ '',
+ '$$ foo $$',
+ "$$ 'foo' $$",
+ '$$ "foo" $$',
+ '$$ $a$ $$',
+ '$a$ $$ $a$',
+ 'foo bar $$ baz $$',
+])
+def test_is_open_quote__closed(sql):
+ assert not is_open_quote(sql)
+
+
+@pytest.mark.parametrize('sql', [
+ '$$',
+ ';;;$$',
+ 'foo $$ bar $$; foo $$',
+ '$$ foo $a$',
+ "foo 'bar baz",
+ "$a$ foo ",
+ '$$ "foo" ',
+ '$$ $a$ ',
+ 'foo bar $$ baz',
+])
+def test_is_open_quote__open(sql):
+ assert is_open_quote(sql)