summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSaif Hakim <saif@benchling.com>2021-09-30 12:00:31 -0700
committerGitHub <noreply@github.com>2021-09-30 12:00:31 -0700
commit123e00a086d91281a2ed8cfd84452a796f22ad91 (patch)
tree753f29409f65eb4acb9f92def0bb199184c22f5a
parentc65495716d9fa6914f689d410d3c737d7661053d (diff)
Re-run last query with bare `\watch` (#1283)
* Re-run last query with bare `\watch` * add test * clean up post test refactor * lint * rerun tests
-rw-r--r--changelog.rst1
-rw-r--r--pgcli/main.py45
-rw-r--r--tests/test_main.py57
3 files changed, 89 insertions, 14 deletions
diff --git a/changelog.rst b/changelog.rst
index 3cfeb865..961400e2 100644
--- a/changelog.rst
+++ b/changelog.rst
@@ -5,6 +5,7 @@ Features:
---------
* Add `max_field_width` setting to config, to enable more control over field truncation ([related issue](https://github.com/dbcli/pgcli/issues/1250)).
+* Re-run last query via bare `\watch`. (Thanks: `Saif Hakim`_)
Bug fixes:
----------
diff --git a/pgcli/main.py b/pgcli/main.py
index 124cac5c..5395f673 100644
--- a/pgcli/main.py
+++ b/pgcli/main.py
@@ -772,18 +772,7 @@ class PGCli:
click.secho(str(e), err=True, fg="red")
continue
- # Initialize default metaquery in case execution fails
- self.watch_command, timing = special.get_watch_command(text)
- if self.watch_command:
- while self.watch_command:
- try:
- query = self.execute_command(self.watch_command)
- click.echo(f"Waiting for {timing} seconds before repeating")
- sleep(timing)
- except KeyboardInterrupt:
- self.watch_command = None
- else:
- query = self.execute_command(text)
+ self.handle_watch_command(text)
self.now = dt.datetime.today()
@@ -791,12 +780,40 @@ class PGCli:
with self._completer_lock:
self.completer.extend_query_history(text)
- self.query_history.append(query)
-
except (PgCliQuitError, EOFError):
if not self.less_chatty:
print("Goodbye!")
+ def handle_watch_command(self, text):
+ # Initialize default metaquery in case execution fails
+ self.watch_command, timing = special.get_watch_command(text)
+
+ # If we run \watch without a command, apply it to the last query run.
+ if self.watch_command is not None and not self.watch_command.strip():
+ try:
+ self.watch_command = self.query_history[-1].query
+ except IndexError:
+ click.secho(
+ "\\watch cannot be used with an empty query", err=True, fg="red"
+ )
+ self.watch_command = None
+
+ # If there's a command to \watch, run it in a loop.
+ if self.watch_command:
+ while self.watch_command:
+ try:
+ query = self.execute_command(self.watch_command)
+ click.echo(f"Waiting for {timing} seconds before repeating")
+ sleep(timing)
+ except KeyboardInterrupt:
+ self.watch_command = None
+
+ # Otherwise, execute it as a regular command.
+ else:
+ query = self.execute_command(text)
+
+ self.query_history.append(query)
+
def _build_cli(self, history):
key_bindings = pgcli_bindings(self)
diff --git a/tests/test_main.py b/tests/test_main.py
index 2526062f..9b3a84b2 100644
--- a/tests/test_main.py
+++ b/tests/test_main.py
@@ -297,6 +297,63 @@ def test_i_works(tmpdir, executor):
run(executor, statement, pgspecial=cli.pgspecial)
+@dbtest
+def test_watch_works(executor):
+ cli = PGCli(pgexecute=executor)
+
+ def run_with_watch(
+ query, target_call_count=1, expected_output="", expected_timing=None
+ ):
+ """
+ :param query: Input to the CLI
+ :param target_call_count: Number of times the user lets the command run before Ctrl-C
+ :param expected_output: Substring expected to be found for each executed query
+ :param expected_timing: value `time.sleep` expected to be called with on every invocation
+ """
+ with mock.patch.object(cli, "echo_via_pager") as mock_echo, mock.patch(
+ "pgcli.main.sleep"
+ ) as mock_sleep:
+ mock_sleep.side_effect = [None] * (target_call_count - 1) + [
+ KeyboardInterrupt
+ ]
+ cli.handle_watch_command(query)
+ # Validate that sleep was called with the right timing
+ for i in range(target_call_count - 1):
+ assert mock_sleep.call_args_list[i][0][0] == expected_timing
+ # Validate that the output of the query was expected
+ assert mock_echo.call_count == target_call_count
+ for i in range(target_call_count):
+ assert expected_output in mock_echo.call_args_list[i][0][0]
+
+ # With no history, it errors.
+ with mock.patch("pgcli.main.click.secho") as mock_secho:
+ cli.handle_watch_command(r"\watch 2")
+ mock_secho.assert_called()
+ assert (
+ r"\watch cannot be used with an empty query"
+ in mock_secho.call_args_list[0][0][0]
+ )
+
+ # Usage 1: Run a query and then re-run it with \watch across two prompts.
+ run_with_watch("SELECT 111", expected_output="111")
+ run_with_watch(
+ "\\watch 10", target_call_count=2, expected_output="111", expected_timing=10
+ )
+
+ # Usage 2: Run a query and \watch via the same prompt.
+ run_with_watch(
+ "SELECT 222; \\watch 4",
+ target_call_count=3,
+ expected_output="222",
+ expected_timing=4,
+ )
+
+ # Usage 3: Re-run the last watched command with a new timing
+ run_with_watch(
+ "\\watch 5", target_call_count=4, expected_output="222", expected_timing=5
+ )
+
+
def test_missing_rc_dir(tmpdir):
rcfile = str(tmpdir.join("subdir").join("rcfile"))