diff options
author | Amjith Ramanujam <amjith.r@gmail.com> | 2017-06-07 21:45:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-07 21:45:02 -0700 |
commit | 8643eeae5fe7166839917ccb25a8e6f7f062fe36 (patch) | |
tree | a712a55dd167e92554ad6a3cce66301767c56bb5 | |
parent | 2e53ab3dbe2078424d402cf1c7a87244f059b53b (diff) | |
parent | 0170d3c9e7f59893eec869d91b7c171cdd2609d8 (diff) |
Merge pull request #457 from dbcli/tsroten/test_execute
Test sqlexecute.run return values, not formatted output.
-rw-r--r-- | test/test_sqlexecute.py | 281 | ||||
-rw-r--r-- | test/utils.py | 23 |
2 files changed, 103 insertions, 201 deletions
diff --git a/test/test_sqlexecute.py b/test/test_sqlexecute.py index aebfa1a..66a4876 100644 --- a/test/test_sqlexecute.py +++ b/test/test_sqlexecute.py @@ -1,64 +1,62 @@ # coding=UTF-8 +import os + import pytest import pymysql -import os -from textwrap import dedent -from utils import run, dbtest, set_expanded_output + +from utils import run, dbtest, set_expanded_output, is_expanded_output + + +def assert_result_equal(result, title=None, rows=None, headers=None, + status=None, auto_status=True, assert_contains=False): + """Assert that an sqlexecute.run() result matches the expected values.""" + if status is None and auto_status and rows: + status = '{} row{} in set'.format( + len(rows), 's' if len(rows) > 1 else '') + fields = {'title': title, 'rows': rows, 'headers': headers, + 'status': status} + + if assert_contains: + # Do a loose match on the results using the *in* operator. + for key, field in fields.items(): + if field: + assert field in result[0][key] + else: + # Do an exact match on the fields. + assert result == [fields] @dbtest def test_conn(executor): run(executor, '''create table test(a text)''') run(executor, '''insert into test values('abc')''') - results = run(executor, '''select * from test''', join=True) - assert results == dedent("""\ - +-----+ - | a | - +-----+ - | abc | - +-----+ - 1 row in set""") + results = run(executor, '''select * from test''') + + assert_result_equal(results, headers=['a'], rows=[('abc',)]) @dbtest def test_bools(executor): run(executor, '''create table test(a boolean)''') run(executor, '''insert into test values(True)''') - results = run(executor, '''select * from test''', join=True) - assert results == dedent("""\ - +---+ - | a | - +---+ - | 1 | - +---+ - 1 row in set""") + results = run(executor, '''select * from test''') + + assert_result_equal(results, headers=['a'], rows=[(1,)]) @dbtest def test_binary(executor): run(executor, '''create table bt(geom linestring NOT NULL)''') - run(executor, '''INSERT INTO bt VALUES (GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));''') - results = run(executor, '''select * from bt''', join=True) - assert results == dedent("""\ - +----------------------------------------------------------------------------------------------+ - | geom | - +----------------------------------------------------------------------------------------------+ - | 0x00000000010200000002000000397f130a11185d4034f44f70b1de43400000000000185d40423ee8d9acde4340 | - +----------------------------------------------------------------------------------------------+ - 1 row in set""") + run(executor, "INSERT INTO bt VALUES " + "(GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));") + results = run(executor, '''select * from bt''') + geom = (b'\x00\x00\x00\x00\x01\x02\x00\x00\x00\x02\x00\x00\x009\x7f\x13\n' + b'\x11\x18]@4\xf4Op\xb1\xdeC@\x00\x00\x00\x00\x00\x18]@B>\xe8\xd9' + b'\xac\xdeC@') -@dbtest -def test_binary_expanded(executor): - run(executor, '''create table bt(geom linestring NOT NULL)''') - run(executor, '''INSERT INTO bt VALUES (GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));''') - results = run(executor, '''select * from bt\G''', join=True) - assert results == dedent("""\ - ***************************[ 1. row ]*************************** - geom | 0x00000000010200000002000000397f130a11185d4034f44f70b1de43400000000000185d40423ee8d9acde4340 - - 1 row in set""") + assert_result_equal(results, headers=['geom'], rows=[(geom,)]) @dbtest @@ -97,38 +95,19 @@ def test_unicode_support_in_output(executor): run(executor, u"insert into unicodechars (t) values ('é')") # See issue #24, this raises an exception without proper handling - assert u'é' in run(executor, u"select * from unicodechars", join=True) - - -@dbtest -def test_expanded_output(executor): - run(executor, '''create table test(a text)''') - run(executor, '''insert into test values('abc')''') - results = run(executor, '''select * from test\G''', join=True) - - expected_results = set([ - dedent("""\ - -[ RECORD 0 ] - a | abc - - 1 row in set"""), - dedent("""\ - ***************************[ 1. row ]*************************** - a | abc - - 1 row in set"""), - ]) - - assert results in expected_results + results = run(executor, u"select * from unicodechars") + assert_result_equal(results, headers=['t'], rows=[(u'é',)]) @dbtest def test_multiple_queries_same_line(executor): - result = run(executor, "select 'foo'; select 'bar'") - # 2 for the results and 2 more for status messages. - assert len(result) == 4 - assert "foo" in result[0] - assert "bar" in result[2] + results = run(executor, "select 'foo'; select 'bar'") + + expected = [{'title': None, 'headers': ['foo'], 'rows': [('foo',)], + 'status': '1 row in set'}, + {'title': None, 'headers': ['bar'], 'rows': [('bar',)], + 'status': '1 row in set'}] + assert expected == results @dbtest @@ -146,19 +125,15 @@ def test_favorite_query(executor): run(executor, "insert into test values('def')") results = run(executor, "\\fs test-a select * from test where a like 'a%'") - assert results == ['Saved.'] + assert_result_equal(results, status='Saved.') - results = run(executor, "\\f test-a", join=True) - assert results == dedent("""\ - > select * from test where a like 'a%' - +-----+ - | a | - +-----+ - | abc | - +-----+""") + results = run(executor, "\\f test-a") + assert_result_equal(results, + title="> select * from test where a like 'a%'", + headers=['a'], rows=[('abc',)], auto_status=False) results = run(executor, "\\fd test-a") - assert results == ['test-a: Deleted'] + assert_result_equal(results, status='test-a: Deleted') @dbtest @@ -168,27 +143,20 @@ def test_favorite_query_multiple_statement(executor): run(executor, "insert into test values('abc')") run(executor, "insert into test values('def')") - results = run(executor, "\\fs test-ad select * from test where a like 'a%'; " - "select * from test where a like 'd%'") - assert results == ['Saved.'] - - results = run(executor, "\\f test-ad", join=True) - assert results == dedent("""\ - > select * from test where a like 'a%' - +-----+ - | a | - +-----+ - | abc | - +-----+ - > select * from test where a like 'd%' - +-----+ - | a | - +-----+ - | def | - +-----+""") + results = run(executor, + "\\fs test-ad select * from test where a like 'a%'; " + "select * from test where a like 'd%'") + assert_result_equal(results, status='Saved.') + + results = run(executor, "\\f test-ad") + expected = [{'title': "> select * from test where a like 'a%'", + 'headers': ['a'], 'rows': [('abc',)], 'status': None}, + {'title': "> select * from test where a like 'd%'", + 'headers': ['a'], 'rows': [('def',)], 'status': None}] + assert expected == results results = run(executor, "\\fd test-ad") - assert results == ['test-ad: Deleted'] + assert_result_equal(results, status='test-ad: Deleted') @dbtest @@ -198,156 +166,89 @@ def test_favorite_query_expanded_output(executor): run(executor, '''insert into test values('abc')''') results = run(executor, "\\fs test-ae select * from test") - assert results == ['Saved.'] - - results = run(executor, "\\f test-ae \G", join=True) - - expected_results = set([ - dedent("""\ - > select * from test - -[ RECORD 0 ] - a | abc - """), - dedent("""\ - > select * from test - ***************************[ 1. row ]*************************** - a | abc - """), - ]) - set_expanded_output(False) + assert_result_equal(results, status='Saved.') + + results = run(executor, "\\f test-ae \G") + assert is_expanded_output() is True + assert_result_equal(results, title='> select * from test', + headers=['a'], rows=[('abc',)], auto_status=False) - assert results in expected_results + set_expanded_output(False) results = run(executor, "\\fd test-ae") - assert results == ['test-ae: Deleted'] + assert_result_equal(results, status='test-ae: Deleted') @dbtest def test_special_command(executor): results = run(executor, '\\?') - expected_line = u'\n| help' - assert len(results) == 1 - assert expected_line in results[0] + assert_result_equal(results, rows=('quit', '\\q', 'Quit.'), + headers='Command', assert_contains=True, + auto_status=False) @dbtest def test_cd_command_without_a_folder_name(executor): results = run(executor, 'system cd') - expected_line = 'No folder name was provided.' - assert len(results) == 1 - assert expected_line in results[0] + assert_result_equal(results, status='No folder name was provided.') @dbtest def test_system_command_not_found(executor): results = run(executor, 'system xyz') - assert len(results) == 1 - expected_line = 'OSError:' - assert expected_line in results[0] + assert_result_equal(results, status='OSError: No such file or directory', + assert_contains=True) @dbtest def test_system_command_output(executor): test_file_path = os.path.join(os.path.abspath('.'), 'test', 'test.txt') results = run(executor, 'system cat {0}'.format(test_file_path)) - assert len(results) == 1 - expected_line = u'mycli rocks!\n' - assert expected_line == results[0] + assert_result_equal(results, status='mycli rocks!\n') @dbtest def test_cd_command_current_dir(executor): test_path = os.path.join(os.path.abspath('.'), 'test') - results = run(executor, 'system cd {0}'.format(test_path)) + run(executor, 'system cd {0}'.format(test_path)) assert os.getcwd() == test_path @dbtest def test_unicode_support(executor): - assert u'日本語' in run(executor, u"SELECT '日本語' AS japanese;", join=True) - - -@dbtest -def test_favorite_query_multiline_statement(executor): - set_expanded_output(False) - run(executor, "create table test(a text)") - run(executor, "insert into test values('abc')") - run(executor, "insert into test values('def')") - - results = run(executor, "\\fs test-ad select * from test where a like 'a%';\n" - "select * from test where a like 'd%'") - assert results == ['Saved.'] - - results = run(executor, "\\f test-ad", join=True) - assert results == dedent("""\ - > select * from test where a like 'a%' - +-----+ - | a | - +-----+ - | abc | - +-----+ - > select * from test where a like 'd%' - +-----+ - | a | - +-----+ - | def | - +-----+""") - - results = run(executor, "\\fd test-ad") - assert results == ['test-ad: Deleted'] + results = run(executor, u"SELECT '日本語' AS japanese;") + assert_result_equal(results, headers=['japanese'], rows=[(u'日本語',)]) @dbtest def test_timestamp_null(executor): run(executor, '''create table ts_null(a timestamp)''') run(executor, '''insert into ts_null values(0)''') - results = run(executor, '''select * from ts_null''', join=True) - assert results == dedent("""\ - +---------------------+ - | a | - +---------------------+ - | 0000-00-00 00:00:00 | - +---------------------+ - 1 row in set""") + results = run(executor, '''select * from ts_null''') + assert_result_equal(results, headers=['a'], + rows=[('0000-00-00 00:00:00',)]) @dbtest def test_datetime_null(executor): run(executor, '''create table dt_null(a datetime)''') run(executor, '''insert into dt_null values(0)''') - results = run(executor, '''select * from dt_null''', join=True) - assert results == dedent("""\ - +---------------------+ - | a | - +---------------------+ - | 0000-00-00 00:00:00 | - +---------------------+ - 1 row in set""") + results = run(executor, '''select * from dt_null''') + assert_result_equal(results, headers=['a'], + rows=[('0000-00-00 00:00:00',)]) @dbtest def test_date_null(executor): run(executor, '''create table date_null(a date)''') run(executor, '''insert into date_null values(0)''') - results = run(executor, '''select * from date_null''', join=True) - assert results == dedent("""\ - +------------+ - | a | - +------------+ - | 0000-00-00 | - +------------+ - 1 row in set""") + results = run(executor, '''select * from date_null''') + assert_result_equal(results, headers=['a'], rows=[('0000-00-00',)]) @dbtest def test_time_null(executor): run(executor, '''create table time_null(a time)''') run(executor, '''insert into time_null values(0)''') - results = run(executor, '''select * from time_null''', join=True) - assert results == dedent("""\ - +----------+ - | a | - +----------+ - | 00:00:00 | - +----------+ - 1 row in set""") + results = run(executor, '''select * from time_null''') + assert_result_equal(results, headers=['a'], rows=[('00:00:00',)]) diff --git a/test/utils.py b/test/utils.py index 0d6b6a9..c7a91b5 100644 --- a/test/utils.py +++ b/test/utils.py @@ -3,7 +3,7 @@ from os import getenv import pymysql import pytest -from mycli.main import MyCli, special +from mycli.main import special PASSWORD = getenv('PYTEST_PASSWORD') USER = getenv('PYTEST_USER', 'root') @@ -13,8 +13,8 @@ CHARSET = getenv('PYTEST_CHARSET', 'utf8') def db_connection(dbname=None): - conn = pymysql.connect(user=USER, host=HOST, port=PORT, database=dbname, password=PASSWORD, - charset=CHARSET, + conn = pymysql.connect(user=USER, host=HOST, port=PORT, database=dbname, + password=PASSWORD, charset=CHARSET, local_infile=False) conn.autocommit = True return conn @@ -40,22 +40,23 @@ def create_db(dbname): pass -def run(executor, sql, join=False): +def run(executor, sql, rows_as_list=True): """Return string output for the sql to be run.""" result = [] - # TODO: this needs to go away. `run()` should not test formatted output. - # It should test raw results. - mycli = MyCli() for title, rows, headers, status in executor.run(sql): - result.extend(mycli.format_output(title, rows, headers, status, - special.is_expanded_output())) + rows = list(rows) if (rows_as_list and rows) else rows + result.append({'title': title, 'rows': rows, 'headers': headers, + 'status': status}) - if join: - result = '\n'.join(result) return result def set_expanded_output(is_expanded): """Pass-through for the tests.""" return special.set_expanded_output(is_expanded) + + +def is_expanded_output(): + """Pass-through for the tests.""" + return special.is_expanded_output() |