summaryrefslogtreecommitdiffstats
path: root/tests/test_pgexecute.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_pgexecute.py')
-rw-r--r--tests/test_pgexecute.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/tests/test_pgexecute.py b/tests/test_pgexecute.py
index 513e6192..109674cb 100644
--- a/tests/test_pgexecute.py
+++ b/tests/test_pgexecute.py
@@ -520,6 +520,21 @@ class BrokenConnection:
raise psycopg2.InterfaceError("I'm broken!")
+class VirtualCursor:
+ """Mock a cursor to virtual database like pgbouncer."""
+
+ def __init__(self):
+ self.protocol_error = False
+ self.protocol_message = ""
+ self.description = None
+ self.status = None
+ self.statusmessage = "Error"
+
+ def execute(self, *args, **kwargs):
+ self.protocol_error = True
+ self.protocol_message = "Command not supported"
+
+
@dbtest
def test_exit_without_active_connection(executor):
quit_handler = MagicMock()
@@ -542,3 +557,12 @@ def test_exit_without_active_connection(executor):
# an exception should be raised when running a query without active connection
with pytest.raises(psycopg2.InterfaceError):
run(executor, "select 1", pgspecial=pgspecial)
+
+
+@dbtest
+def test_virtual_database(executor):
+ virtual_connection = MagicMock()
+ virtual_connection.cursor.return_value = VirtualCursor()
+ with patch.object(executor, "conn", virtual_connection):
+ result = run(executor, "select 1")
+ assert "Command not supported" in result