diff options
author | Amjith Ramanujam <amjith.r@gmail.com> | 2019-06-01 15:58:46 -0700 |
---|---|---|
committer | Amjith Ramanujam <amjith.r@gmail.com> | 2019-06-01 15:58:46 -0700 |
commit | 6be87faf31762124a561df4c74828462068456cb (patch) | |
tree | 3f2046bd9193429c451cca69ac09e794b52592d6 | |
parent | f2f0ac1e278e95c57a7c836eee122015dccaa11f (diff) |
Use the sqlite binding for favorite queries.
-rw-r--r-- | litecli/packages/special/iocommands.py | 12 | ||||
-rw-r--r-- | tests/test_sqlexecute.py | 67 |
2 files changed, 47 insertions, 32 deletions
diff --git a/litecli/packages/special/iocommands.py b/litecli/packages/special/iocommands.py index 2a0982e..a9036aa 100644 --- a/litecli/packages/special/iocommands.py +++ b/litecli/packages/special/iocommands.py @@ -183,12 +183,22 @@ def execute_favorite_query(cur, arg, **_): """Parse out favorite name and optional substitution parameters""" name, _, arg_str = arg.partition(" ") - args = shlex.split(arg_str, posix=False) + args = shlex.split(arg_str) query = favoritequeries.get(name) if query is None: message = "No favorite query: %s" % (name) yield (None, None, None, message) + elif "?" in query: + for sql in sqlparse.split(query): + sql = sql.rstrip(";") + title = "> %s" % (sql) + cur.execute(sql, args) + if cur.description: + headers = [x[0] for x in cur.description] + yield (title, cur, headers, None) + else: + yield (title, None, None, None) else: query, arg_error = subst_favorite_query_args(query, args) if arg_error: diff --git a/tests/test_sqlexecute.py b/tests/test_sqlexecute.py index 09f67c5..098b26e 100644 --- a/tests/test_sqlexecute.py +++ b/tests/test_sqlexecute.py @@ -5,7 +5,7 @@ import os import pytest from utils import run, dbtest, set_expanded_output, is_expanded_output -from sqlite3 import OperationalError +from sqlite3 import OperationalError, ProgrammingError def assert_result_equal( @@ -153,64 +153,69 @@ def test_favorite_query(executor): @dbtest -def test_parameterized_favorite_query(executor): +def test_bind_parameterized_favorite_query(executor): set_expanded_output(False) - run(executor, "create table test(a text, id integer)") - run(executor, "insert into test values('abc', 1)") + run(executor, "create table test(name text, id integer)") run(executor, "insert into test values('def', 2)") + run(executor, "insert into test values('two words', 3)") - results = run(executor, "\\fs sh_param select * from test where id=$1") - assert_result_equal(results, status="Saved.") - - results = run(executor, "\\fs q_param select * from test where id=?") + results = run(executor, "\\fs q_param select * from test where name=?") assert_result_equal(results, status="Saved.") - results = run(executor, "\\f sh_param 1") + results = run(executor, "\\f q_param def") assert_result_equal( results, - title="> select * from test where id=1", - headers=["a", "id"], - rows=[("abc", 1)], + title="> select * from test where name=?", + headers=["name", "id"], + rows=[("def", 2)], auto_status=False, ) - results = run(executor, "\\f sh_param") + results = run(executor, "\\f q_param 'two words'") assert_result_equal( results, - title=None, - headers=None, - rows=None, - status="missing substitution for $1 in query:\n select * from test where id=$1", + title="> select * from test where name=?", + headers=["name", "id"], + rows=[("two words", 3)], + auto_status=False, ) - results = run(executor, "\\f sh_param 1 2") - assert_result_equal( - results, - title=None, - headers=None, - rows=None, - status="Too many arguments.\nQuery does not have enough place holders to substitute.\nselect * from test where id=1", - ) + with pytest.raises(ProgrammingError): + results = run(executor, "\\f q_param") + + with pytest.raises(ProgrammingError): + results = run(executor, "\\f q_param 1 2") + + +@dbtest +def test_shell_parameterized_favorite_query(executor): + set_expanded_output(False) + run(executor, "create table test(a text, id integer)") + run(executor, "insert into test values('abc', 1)") + run(executor, "insert into test values('def', 2)") - results = run(executor, "\\f q_param 2") + results = run(executor, "\\fs sh_param select * from test where id=$1") + assert_result_equal(results, status="Saved.") + + results = run(executor, "\\f sh_param 1") assert_result_equal( results, - title="> select * from test where id=2", + title="> select * from test where id=1", headers=["a", "id"], - rows=[("def", 2)], + rows=[("abc", 1)], auto_status=False, ) - results = run(executor, "\\f q_param") + results = run(executor, "\\f sh_param") assert_result_equal( results, title=None, headers=None, rows=None, - status="missing substitution for ? in query:\n select * from test where id=?", + status="missing substitution for $1 in query:\n select * from test where id=$1", ) - results = run(executor, "\\f q_param 1 2") + results = run(executor, "\\f sh_param 1 2") assert_result_equal( results, title=None, |