summaryrefslogtreecommitdiffstats
path: root/tests/parseutils
diff options
context:
space:
mode:
authorIrina Truong <i.chernyavska@gmail.com>2019-05-25 13:08:56 -0700
committerGitHub <noreply@github.com>2019-05-25 13:08:56 -0700
commit8cb7009bcd0f0062942932c853706a36178f566c (patch)
treecb5bb42674ccce90b173e7a2f09bf72157ae4a86 /tests/parseutils
parenta5e607b6fc889afd3f8960ca3903ae16b641c304 (diff)
black all the things. (#1049)
* added black to develop guide * no need for pep8radius. * changelog. * Add pre-commit checkbox. * Add pre-commit to dev reqs. * Add pyproject.toml for black. * Pre-commit config. * Add black to travis and dev reqs. * Install and run black in travis. * Remove black from dev reqs. * Lower black target version. * Re-format with black.
Diffstat (limited to 'tests/parseutils')
-rw-r--r--tests/parseutils/test_ctes.py128
-rw-r--r--tests/parseutils/test_function_metadata.py9
-rw-r--r--tests/parseutils/test_parseutils.py244
3 files changed, 199 insertions, 182 deletions
diff --git a/tests/parseutils/test_ctes.py b/tests/parseutils/test_ctes.py
index 4d2d050f..3e89ccaf 100644
--- a/tests/parseutils/test_ctes.py
+++ b/tests/parseutils/test_ctes.py
@@ -1,8 +1,10 @@
import pytest
from sqlparse import parse
from pgcli.packages.parseutils.ctes import (
- token_start_pos, extract_ctes,
- extract_column_names as _extract_column_names)
+ token_start_pos,
+ extract_ctes,
+ extract_column_names as _extract_column_names,
+)
def extract_column_names(sql):
@@ -11,109 +13,125 @@ def extract_column_names(sql):
def test_token_str_pos():
- sql = 'SELECT * FROM xxx'
+ sql = "SELECT * FROM xxx"
p = parse(sql)[0]
idx = p.token_index(p.tokens[-1])
- assert token_start_pos(p.tokens, idx) == len('SELECT * FROM ')
+ assert token_start_pos(p.tokens, idx) == len("SELECT * FROM ")
- sql = 'SELECT * FROM \nxxx'
+ sql = "SELECT * FROM \nxxx"
p = parse(sql)[0]
idx = p.token_index(p.tokens[-1])
- assert token_start_pos(p.tokens, idx) == len('SELECT * FROM \n')
+ assert token_start_pos(p.tokens, idx) == len("SELECT * FROM \n")
def test_single_column_name_extraction():
- sql = 'SELECT abc FROM xxx'
- assert extract_column_names(sql) == ('abc',)
+ sql = "SELECT abc FROM xxx"
+ assert extract_column_names(sql) == ("abc",)
def test_aliased_single_column_name_extraction():
- sql = 'SELECT abc def FROM xxx'
- assert extract_column_names(sql) == ('def',)
+ sql = "SELECT abc def FROM xxx"
+ assert extract_column_names(sql) == ("def",)
def test_aliased_expression_name_extraction():
- sql = 'SELECT 99 abc FROM xxx'
- assert extract_column_names(sql) == ('abc',)
+ sql = "SELECT 99 abc FROM xxx"
+ assert extract_column_names(sql) == ("abc",)
def test_multiple_column_name_extraction():
- sql = 'SELECT abc, def FROM xxx'
- assert extract_column_names(sql) == ('abc', 'def')
+ sql = "SELECT abc, def FROM xxx"
+ assert extract_column_names(sql) == ("abc", "def")
def test_missing_column_name_handled_gracefully():
- sql = 'SELECT abc, 99 FROM xxx'
- assert extract_column_names(sql) == ('abc',)
+ sql = "SELECT abc, 99 FROM xxx"
+ assert extract_column_names(sql) == ("abc",)
- sql = 'SELECT abc, 99, def FROM xxx'
- assert extract_column_names(sql) == ('abc', 'def')
+ sql = "SELECT abc, 99, def FROM xxx"
+ assert extract_column_names(sql) == ("abc", "def")
def test_aliased_multiple_column_name_extraction():
- sql = 'SELECT abc def, ghi jkl FROM xxx'
- assert extract_column_names(sql) == ('def', 'jkl')
+ sql = "SELECT abc def, ghi jkl FROM xxx"
+ assert extract_column_names(sql) == ("def", "jkl")
def test_table_qualified_column_name_extraction():
- sql = 'SELECT abc.def, ghi.jkl FROM xxx'
- assert extract_column_names(sql) == ('def', 'jkl')
-
-
-@pytest.mark.parametrize('sql', [
- 'INSERT INTO foo (x, y, z) VALUES (5, 6, 7) RETURNING x, y',
- 'DELETE FROM foo WHERE x > y RETURNING x, y',
- 'UPDATE foo SET x = 9 RETURNING x, y',
-])
+ sql = "SELECT abc.def, ghi.jkl FROM xxx"
+ assert extract_column_names(sql) == ("def", "jkl")
+
+
+@pytest.mark.parametrize(
+ "sql",
+ [
+ "INSERT INTO foo (x, y, z) VALUES (5, 6, 7) RETURNING x, y",
+ "DELETE FROM foo WHERE x > y RETURNING x, y",
+ "UPDATE foo SET x = 9 RETURNING x, y",
+ ],
+)
def test_extract_column_names_from_returning_clause(sql):
- assert extract_column_names(sql) == ('x', 'y')
+ assert extract_column_names(sql) == ("x", "y")
def test_simple_cte_extraction():
- sql = 'WITH a AS (SELECT abc FROM xxx) SELECT * FROM a'
- start_pos = len('WITH a AS ')
- stop_pos = len('WITH a AS (SELECT abc FROM xxx)')
+ sql = "WITH a AS (SELECT abc FROM xxx) SELECT * FROM a"
+ start_pos = len("WITH a AS ")
+ stop_pos = len("WITH a AS (SELECT abc FROM xxx)")
ctes, remainder = extract_ctes(sql)
- assert tuple(ctes) == (('a', ('abc',), start_pos, stop_pos),)
- assert remainder.strip() == 'SELECT * FROM a'
+ assert tuple(ctes) == (("a", ("abc",), start_pos, stop_pos),)
+ assert remainder.strip() == "SELECT * FROM a"
def test_cte_extraction_around_comments():
- sql = '''--blah blah blah
+ sql = """--blah blah blah
WITH a AS (SELECT abc def FROM x)
- SELECT * FROM a'''
- start_pos = len('''--blah blah blah
- WITH a AS ''')
- stop_pos = len('''--blah blah blah
- WITH a AS (SELECT abc def FROM x)''')
+ SELECT * FROM a"""
+ start_pos = len(
+ """--blah blah blah
+ WITH a AS """
+ )
+ stop_pos = len(
+ """--blah blah blah
+ WITH a AS (SELECT abc def FROM x)"""
+ )
ctes, remainder = extract_ctes(sql)
- assert tuple(ctes) == (('a', ('def',), start_pos, stop_pos),)
- assert remainder.strip() == 'SELECT * FROM a'
+ assert tuple(ctes) == (("a", ("def",), start_pos, stop_pos),)
+ assert remainder.strip() == "SELECT * FROM a"
def test_multiple_cte_extraction():
- sql = '''WITH
+ sql = """WITH
x AS (SELECT abc, def FROM x),
y AS (SELECT ghi, jkl FROM y)
- SELECT * FROM a, b'''
+ SELECT * FROM a, b"""
- start1 = len('''WITH
- x AS ''')
+ start1 = len(
+ """WITH
+ x AS """
+ )
- stop1 = len('''WITH
- x AS (SELECT abc, def FROM x)''')
+ stop1 = len(
+ """WITH
+ x AS (SELECT abc, def FROM x)"""
+ )
- start2 = len('''WITH
+ start2 = len(
+ """WITH
x AS (SELECT abc, def FROM x),
- y AS ''')
+ y AS """
+ )
- stop2 = len('''WITH
+ stop2 = len(
+ """WITH
x AS (SELECT abc, def FROM x),
- y AS (SELECT ghi, jkl FROM y)''')
+ y AS (SELECT ghi, jkl FROM y)"""
+ )
ctes, remainder = extract_ctes(sql)
assert tuple(ctes) == (
- ('x', ('abc', 'def'), start1, stop1),
- ('y', ('ghi', 'jkl'), start2, stop2))
+ ("x", ("abc", "def"), start1, stop1),
+ ("y", ("ghi", "jkl"), start2, stop2),
+ )
diff --git a/tests/parseutils/test_function_metadata.py b/tests/parseutils/test_function_metadata.py
index 1d0b72b7..0350e2a1 100644
--- a/tests/parseutils/test_function_metadata.py
+++ b/tests/parseutils/test_function_metadata.py
@@ -3,16 +3,13 @@ from pgcli.packages.parseutils.meta import FunctionMetadata
def test_function_metadata_eq():
f1 = FunctionMetadata(
- 's', 'f', ['x'], ['integer'], [
- ], 'int', False, False, False, False, None
+ "s", "f", ["x"], ["integer"], [], "int", False, False, False, False, None
)
f2 = FunctionMetadata(
- 's', 'f', ['x'], ['integer'], [
- ], 'int', False, False, False, False, None
+ "s", "f", ["x"], ["integer"], [], "int", False, False, False, False, None
)
f3 = FunctionMetadata(
- 's', 'g', ['x'], ['integer'], [
- ], 'int', False, False, False, False, None
+ "s", "g", ["x"], ["integer"], [], "int", False, False, False, False, None
)
assert f1 == f2
assert f1 != f3
diff --git a/tests/parseutils/test_parseutils.py b/tests/parseutils/test_parseutils.py
index 4ffc7ee3..50bc8896 100644
--- a/tests/parseutils/test_parseutils.py
+++ b/tests/parseutils/test_parseutils.py
@@ -4,104 +4,98 @@ from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote
def test_empty_string():
- tables = extract_tables('')
+ tables = extract_tables("")
assert tables == ()
def test_simple_select_single_table():
- tables = extract_tables('select * from abc')
- assert tables == ((None, 'abc', None, False),)
+ tables = extract_tables("select * from abc")
+ assert tables == ((None, "abc", None, False),)
-@pytest.mark.parametrize('sql', [
- 'select * from "abc"."def"',
- 'select * from abc."def"',
-])
+@pytest.mark.parametrize(
+ "sql", ['select * from "abc"."def"', 'select * from abc."def"']
+)
def test_simple_select_single_table_schema_qualified_quoted_table(sql):
tables = extract_tables(sql)
- assert tables == (('abc', 'def', '"def"', False),)
+ assert tables == (("abc", "def", '"def"', False),)
-@pytest.mark.parametrize('sql', [
- 'select * from abc.def',
- 'select * from "abc".def',
-])
+@pytest.mark.parametrize("sql", ["select * from abc.def", 'select * from "abc".def'])
def test_simple_select_single_table_schema_qualified(sql):
tables = extract_tables(sql)
- assert tables == (('abc', 'def', None, False),)
+ assert tables == (("abc", "def", None, False),)
def test_simple_select_single_table_double_quoted():
tables = extract_tables('select * from "Abc"')
- assert tables == ((None, 'Abc', None, False),)
+ assert tables == ((None, "Abc", None, False),)
def test_simple_select_multiple_tables():
- tables = extract_tables('select * from abc, def')
- assert set(tables) == set([(None, 'abc', None, False),
- (None, 'def', None, False)])
+ tables = extract_tables("select * from abc, def")
+ assert set(tables) == set([(None, "abc", None, False), (None, "def", None, False)])
def test_simple_select_multiple_tables_double_quoted():
tables = extract_tables('select * from "Abc", "Def"')
- assert set(tables) == set([(None, 'Abc', None, False),
- (None, 'Def', None, False)])
+ assert set(tables) == set([(None, "Abc", None, False), (None, "Def", None, False)])
def test_simple_select_single_table_deouble_quoted_aliased():
tables = extract_tables('select * from "Abc" a')
- assert tables == ((None, 'Abc', 'a', False),)
+ assert tables == ((None, "Abc", "a", False),)
def test_simple_select_multiple_tables_deouble_quoted_aliased():
tables = extract_tables('select * from "Abc" a, "Def" d')
- assert set(tables) == set([(None, 'Abc', 'a', False),
- (None, 'Def', 'd', False)])
+ assert set(tables) == set([(None, "Abc", "a", False), (None, "Def", "d", False)])
def test_simple_select_multiple_tables_schema_qualified():
- tables = extract_tables('select * from abc.def, ghi.jkl')
- assert set(tables) == set([('abc', 'def', None, False),
- ('ghi', 'jkl', None, False)])
+ tables = extract_tables("select * from abc.def, ghi.jkl")
+ assert set(tables) == set(
+ [("abc", "def", None, False), ("ghi", "jkl", None, False)]
+ )
def test_simple_select_with_cols_single_table():
- tables = extract_tables('select a,b from abc')
- assert tables == ((None, 'abc', None, False),)
+ tables = extract_tables("select a,b from abc")
+ assert tables == ((None, "abc", None, False),)
def test_simple_select_with_cols_single_table_schema_qualified():
- tables = extract_tables('select a,b from abc.def')
- assert tables == (('abc', 'def', None, False),)
+ tables = extract_tables("select a,b from abc.def")
+ assert tables == (("abc", "def", None, False),)
def test_simple_select_with_cols_multiple_tables():
- tables = extract_tables('select a,b from abc, def')
- assert set(tables) == set([(None, 'abc', None, False),
- (None, 'def', None, False)])
+ tables = extract_tables("select a,b from abc, def")
+ assert set(tables) == set([(None, "abc", None, False), (None, "def", None, False)])
def test_simple_select_with_cols_multiple_qualified_tables():
- tables = extract_tables('select a,b from abc.def, def.ghi')
- assert set(tables) == set([('abc', 'def', None, False),
- ('def', 'ghi', None, False)])
+ tables = extract_tables("select a,b from abc.def, def.ghi")
+ assert set(tables) == set(
+ [("abc", "def", None, False), ("def", "ghi", None, False)]
+ )
def test_select_with_hanging_comma_single_table():
- tables = extract_tables('select a, from abc')
- assert tables == ((None, 'abc', None, False),)
+ tables = extract_tables("select a, from abc")
+ assert tables == ((None, "abc", None, False),)
def test_select_with_hanging_comma_multiple_tables():
- tables = extract_tables('select a, from abc, def')
- assert set(tables) == set([(None, 'abc', None, False),
- (None, 'def', None, False)])
+ tables = extract_tables("select a, from abc, def")
+ assert set(tables) == set([(None, "abc", None, False), (None, "def", None, False)])
def test_select_with_hanging_period_multiple_tables():
- tables = extract_tables('SELECT t1. FROM tabl1 t1, tabl2 t2')
- assert set(tables) == set([(None, 'tabl1', 't1', False),
- (None, 'tabl2', 't2', False)])
+ tables = extract_tables("SELECT t1. FROM tabl1 t1, tabl2 t2")
+ assert set(tables) == set(
+ [(None, "tabl1", "t1", False), (None, "tabl2", "t2", False)]
+ )
def test_simple_insert_single_table():
@@ -111,157 +105,165 @@ def test_simple_insert_single_table():
# AND mistakenly identifies the field list as
# assert tables == ((None, 'abc', 'abc', False),)
- assert tables == ((None, 'abc', 'abc', False),)
+ assert tables == ((None, "abc", "abc", False),)
@pytest.mark.xfail
def test_simple_insert_single_table_schema_qualified():
tables = extract_tables('insert into abc.def (id, name) values (1, "def")')
- assert tables == (('abc', 'def', None, False),)
+ assert tables == (("abc", "def", None, False),)
def test_simple_update_table_no_schema():
- tables = extract_tables('update abc set id = 1')
- assert tables == ((None, 'abc', None, False),)
+ tables = extract_tables("update abc set id = 1")
+ assert tables == ((None, "abc", None, False),)
def test_simple_update_table_with_schema():
- tables = extract_tables('update abc.def set id = 1')
- assert tables == (('abc', 'def', None, False),)
+ tables = extract_tables("update abc.def set id = 1")
+ assert tables == (("abc", "def", None, False),)
-@pytest.mark.parametrize('join_type', ['', 'INNER', 'LEFT', 'RIGHT OUTER'])
+@pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"])
def test_join_table(join_type):
- sql = 'SELECT * FROM abc a {0} JOIN def d ON a.id = d.num'.format(join_type)
+ sql = "SELECT * FROM abc a {0} JOIN def d ON a.id = d.num".format(join_type)
tables = extract_tables(sql)
- assert set(tables) == set([(None, 'abc', 'a', False),
- (None, 'def', 'd', False)])
+ assert set(tables) == set([(None, "abc", "a", False), (None, "def", "d", False)])
def test_join_table_schema_qualified():
- tables = extract_tables('SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num')
- assert set(tables) == set([('abc', 'def', 'x', False),
- ('ghi', 'jkl', 'y', False)])
+ tables = extract_tables("SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num")
+ assert set(tables) == set([("abc", "def", "x", False), ("ghi", "jkl", "y", False)])
def test_incomplete_join_clause():
- sql = '''select a.x, b.y
+ sql = """select a.x, b.y
from abc a join bcd b
- on a.id = '''
+ on a.id = """
tables = extract_tables(sql)
- assert tables == ((None, 'abc', 'a', False),
- (None, 'bcd', 'b', False))
+ assert tables == ((None, "abc", "a", False), (None, "bcd", "b", False))
def test_join_as_table():
- tables = extract_tables('SELECT * FROM my_table AS m WHERE m.a > 5')
- assert tables == ((None, 'my_table', 'm', False),)
+ tables = extract_tables("SELECT * FROM my_table AS m WHERE m.a > 5")
+ assert tables == ((None, "my_table", "m", False),)
def test_multiple_joins():
- sql = '''select * from t1
+ sql = """select * from t1
inner join t2 ON
t1.id = t2.t1_id
inner join t3 ON
- t2.id = t3.'''
+ t2.id = t3."""
tables = extract_tables(sql)
assert tables == (
- (None, 't1', None, False),
- (None, 't2', None, False),
- (None, 't3', None, False))
+ (None, "t1", None, False),
+ (None, "t2", None, False),
+ (None, "t3", None, False),
+ )
def test_subselect_tables():
- sql = 'SELECT * FROM (SELECT FROM abc'
+ sql = "SELECT * FROM (SELECT FROM abc"
tables = extract_tables(sql)
- assert tables == ((None, 'abc', None, False),)
+ assert tables == ((None, "abc", None, False),)
-@pytest.mark.parametrize('text', ['SELECT * FROM foo.', 'SELECT 123 AS foo'])
+@pytest.mark.parametrize("text", ["SELECT * FROM foo.", "SELECT 123 AS foo"])
def test_extract_no_tables(text):
tables = extract_tables(text)
assert tables == tuple()
-@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
+@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"])
def test_simple_function_as_table(arg_list):
- tables = extract_tables('SELECT * FROM foo({0})'.format(arg_list))
- assert tables == ((None, 'foo', None, True),)
+ tables = extract_tables("SELECT * FROM foo({0})".format(arg_list))
+ assert tables == ((None, "foo", None, True),)
-@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
+@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"])
def test_simple_schema_qualified_function_as_table(arg_list):
- tables = extract_tables('SELECT * FROM foo.bar({0})'.format(arg_list))
- assert tables == (('foo', 'bar', None, True),)
+ tables = extract_tables("SELECT * FROM foo.bar({0})".format(arg_list))
+ assert tables == (("foo", "bar", None, True),)
-@pytest.mark.parametrize('arg_list', ['', 'arg1', 'arg1, arg2, arg3'])
+@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"])
def test_simple_aliased_function_as_table(arg_list):
- tables = extract_tables('SELECT * FROM foo({0}) bar'.format(arg_list))
- assert tables == ((None, 'foo', 'bar', True),)
+ tables = extract_tables("SELECT * FROM foo({0}) bar".format(arg_list))
+ assert tables == ((None, "foo", "bar", True),)
def test_simple_table_and_function():
- tables = extract_tables('SELECT * FROM foo JOIN bar()')
- assert set(tables) == set([(None, 'foo', None, False),
- (None, 'bar', None, True)])
+ tables = extract_tables("SELECT * FROM foo JOIN bar()")
+ assert set(tables) == set([(None, "foo", None, False), (None, "bar", None, True)])
def test_complex_table_and_function():
- tables = extract_tables('''SELECT * FROM foo.bar baz
- JOIN bar.qux(x, y, z) quux''')
- assert set(tables) == set([('foo', 'bar', 'baz', False),
- ('bar', 'qux', 'quux', True)])
+ tables = extract_tables(
+ """SELECT * FROM foo.bar baz
+ JOIN bar.qux(x, y, z) quux"""
+ )
+ assert set(tables) == set(
+ [("foo", "bar", "baz", False), ("bar", "qux", "quux", True)]
+ )
def test_find_prev_keyword_using():
- q = 'select * from tbl1 inner join tbl2 using (col1, '
+ q = "select * from tbl1 inner join tbl2 using (col1, "
kw, q2 = find_prev_keyword(q)
- assert kw.value == '(' and q2 == 'select * from tbl1 inner join tbl2 using ('
+ assert kw.value == "(" and q2 == "select * from tbl1 inner join tbl2 using ("
-@pytest.mark.parametrize('sql', [
- 'select * from foo where bar',
- 'select * from foo where bar = 1 and baz or ',
- 'select * from foo where bar = 1 and baz between qux and ',
-])
+@pytest.mark.parametrize(
+ "sql",
+ [
+ "select * from foo where bar",
+ "select * from foo where bar = 1 and baz or ",
+ "select * from foo where bar = 1 and baz between qux and ",
+ ],
+)
def test_find_prev_keyword_where(sql):
kw, stripped = find_prev_keyword(sql)
- assert kw.value == 'where' and stripped == 'select * from foo where'
+ assert kw.value == "where" and stripped == "select * from foo where"
-@pytest.mark.parametrize('sql', [
- 'create table foo (bar int, baz ',
- 'select * from foo() as bar (baz '
-])
+@pytest.mark.parametrize(
+ "sql", ["create table foo (bar int, baz ", "select * from foo() as bar (baz "]
+)
def test_find_prev_keyword_open_parens(sql):
kw, _ = find_prev_keyword(sql)
- assert kw.value == '('
-
-
-@pytest.mark.parametrize('sql', [
- '',
- '$$ foo $$',
- "$$ 'foo' $$",
- '$$ "foo" $$',
- '$$ $a$ $$',
- '$a$ $$ $a$',
- 'foo bar $$ baz $$',
-])
+ assert kw.value == "("
+
+
+@pytest.mark.parametrize(
+ "sql",
+ [
+ "",
+ "$$ foo $$",
+ "$$ 'foo' $$",
+ '$$ "foo" $$',
+ "$$ $a$ $$",
+ "$a$ $$ $a$",
+ "foo bar $$ baz $$",
+ ],
+)
def test_is_open_quote__closed(sql):
assert not is_open_quote(sql)
-@pytest.mark.parametrize('sql', [
- '$$',
- ';;;$$',
- 'foo $$ bar $$; foo $$',
- '$$ foo $a$',
- "foo 'bar baz",
- "$a$ foo ",
- '$$ "foo" ',
- '$$ $a$ ',
- 'foo bar $$ baz',
-])
+@pytest.mark.parametrize(
+ "sql",
+ [
+ "$$",
+ ";;;$$",
+ "foo $$ bar $$; foo $$",
+ "$$ foo $a$",
+ "foo 'bar baz",
+ "$a$ foo ",
+ '$$ "foo" ',
+ "$$ $a$ ",
+ "foo bar $$ baz",
+ ],
+)
def test_is_open_quote__open(sql):
assert is_open_quote(sql)