summaryrefslogtreecommitdiffstats
path: root/pgcli
diff options
context:
space:
mode:
authorIrina Truong <i.chernyavska@gmail.com>2019-05-25 13:08:56 -0700
committerGitHub <noreply@github.com>2019-05-25 13:08:56 -0700
commit8cb7009bcd0f0062942932c853706a36178f566c (patch)
treecb5bb42674ccce90b173e7a2f09bf72157ae4a86 /pgcli
parenta5e607b6fc889afd3f8960ca3903ae16b641c304 (diff)
black all the things. (#1049)
* added black to develop guide * no need for pep8radius. * changelog. * Add pre-commit checkbox. * Add pre-commit to dev reqs. * Add pyproject.toml for black. * Pre-commit config. * Add black to travis and dev reqs. * Install and run black in travis. * Remove black from dev reqs. * Lower black target version. * Re-format with black.
Diffstat (limited to 'pgcli')
-rw-r--r--pgcli/__init__.py2
-rw-r--r--pgcli/completion_refresher.py54
-rw-r--r--pgcli/config.py23
-rw-r--r--pgcli/encodingutils.py4
-rw-r--r--pgcli/key_bindings.py32
-rw-r--r--pgcli/magic.py26
-rw-r--r--pgcli/main.py1019
-rw-r--r--pgcli/packages/parseutils/__init__.py5
-rw-r--r--pgcli/packages/parseutils/ctes.py20
-rw-r--r--pgcli/packages/parseutils/meta.py101
-rw-r--r--pgcli/packages/parseutils/tables.py60
-rw-r--r--pgcli/packages/parseutils/utils.py35
-rw-r--r--pgcli/packages/pgliterals/main.py2
-rw-r--r--pgcli/packages/prioritization.py6
-rw-r--r--pgcli/packages/prompt_utils.py5
-rw-r--r--pgcli/packages/sqlcompletion.py281
-rw-r--r--pgcli/pgbuffer.py21
-rw-r--r--pgcli/pgcompleter.py536
-rw-r--r--pgcli/pgexecute.py256
-rw-r--r--pgcli/pgstyle.py78
-rw-r--r--pgcli/pgtoolbar.py47
21 files changed, 1486 insertions, 1127 deletions
diff --git a/pgcli/__init__.py b/pgcli/__init__.py
index a33997dd..9aa3f903 100644
--- a/pgcli/__init__.py
+++ b/pgcli/__init__.py
@@ -1 +1 @@
-__version__ = '2.1.0'
+__version__ = "2.1.0"
diff --git a/pgcli/completion_refresher.py b/pgcli/completion_refresher.py
index 388bb295..a70d2f2b 100644
--- a/pgcli/completion_refresher.py
+++ b/pgcli/completion_refresher.py
@@ -14,8 +14,7 @@ class CompletionRefresher(object):
self._completer_thread = None
self._restart_refresh = threading.Event()
- def refresh(self, executor, special, callbacks, history=None,
- settings=None):
+ def refresh(self, executor, special, callbacks, history=None, settings=None):
"""
Creates a PGCompleter object and populates it with the relevant
completion suggestions in a background thread.
@@ -30,27 +29,29 @@ class CompletionRefresher(object):
"""
if self.is_refreshing():
self._restart_refresh.set()
- return [(None, None, None, 'Auto-completion refresh restarted.')]
+ return [(None, None, None, "Auto-completion refresh restarted.")]
else:
self._completer_thread = threading.Thread(
target=self._bg_refresh,
args=(executor, special, callbacks, history, settings),
- name='completion_refresh')
+ name="completion_refresh",
+ )
self._completer_thread.setDaemon(True)
self._completer_thread.start()
- return [(None, None, None,
- 'Auto-completion refresh started in the background.')]
+ return [
+ (None, None, None, "Auto-completion refresh started in the background.")
+ ]
def is_refreshing(self):
return self._completer_thread and self._completer_thread.is_alive()
- def _bg_refresh(self, pgexecute, special, callbacks, history=None,
- settings=None):
+ def _bg_refresh(self, pgexecute, special, callbacks, history=None, settings=None):
settings = settings or {}
- completer = PGCompleter(smart_completion=True, pgspecial=special,
- settings=settings)
+ completer = PGCompleter(
+ smart_completion=True, pgspecial=special, settings=settings
+ )
- if settings.get('single_connection'):
+ if settings.get("single_connection"):
executor = pgexecute
else:
# Create a new pgexecute method to populate the completions.
@@ -88,55 +89,58 @@ def refresher(name, refreshers=CompletionRefresher.refreshers):
"""Decorator to populate the dictionary of refreshers with the current
function.
"""
+
def wrapper(wrapped):
refreshers[name] = wrapped
return wrapped
+
return wrapper
-@refresher('schemata')
+@refresher("schemata")
def refresh_schemata(completer, executor):
completer.set_search_path(executor.search_path())
completer.extend_schemata(executor.schemata())
-@refresher('tables')
+@refresher("tables")
def refresh_tables(completer, executor):
- completer.extend_relations(executor.tables(), kind='tables')
- completer.extend_columns(executor.table_columns(), kind='tables')
+ completer.extend_relations(executor.tables(), kind="tables")
+ completer.extend_columns(executor.table_columns(), kind="tables")
completer.extend_foreignkeys(executor.foreignkeys())
-@refresher('views')
+@refresher("views")
def refresh_views(completer, executor):
- completer.extend_relations(executor.views(), kind='views')
- completer.extend_columns(executor.view_columns(), kind='views')
+ completer.extend_relations(executor.views(), kind="views")
+ completer.extend_columns(executor.view_columns(), kind="views")
+
-@refresher('types')
+@refresher("types")
def refresh_types(completer, executor):
completer.extend_datatypes(executor.datatypes())
-@refresher('databases')
+@refresher("databases")
def refresh_databases(completer, executor):
completer.extend_database_names(executor.databases())
-@refresher('casing')
+@refresher("casing")
def refresh_casing(completer, executor):
casing_file = completer.casing_file
if not casing_file:
return
generate_casing_file = completer.generate_casing_file
if generate_casing_file and not os.path.isfile(casing_file):
- casing_prefs = '\n'.join(executor.casing())
- with open(casing_file, 'w') as f:
+ casing_prefs = "\n".join(executor.casing())
+ with open(casing_file, "w") as f:
f.write(casing_prefs)
if os.path.isfile(casing_file):
- with open(casing_file, 'r') as f:
+ with open(casing_file, "r") as f:
completer.extend_casing([line.strip() for line in f])
-@refresher('functions')
+@refresher("functions")
def refresh_functions(completer, executor):
completer.extend_functions(executor.functions())
diff --git a/pgcli/config.py b/pgcli/config.py
index 1303eed2..a2c0b0b2 100644
--- a/pgcli/config.py
+++ b/pgcli/config.py
@@ -7,18 +7,18 @@ from configobj import ConfigObj
def config_location():
- if 'XDG_CONFIG_HOME' in os.environ:
- return '%s/pgcli/' % expanduser(os.environ['XDG_CONFIG_HOME'])
- elif platform.system() == 'Windows':
- return os.getenv('USERPROFILE') + '\\AppData\\Local\\dbcli\\pgcli\\'
+ if "XDG_CONFIG_HOME" in os.environ:
+ return "%s/pgcli/" % expanduser(os.environ["XDG_CONFIG_HOME"])
+ elif platform.system() == "Windows":
+ return os.getenv("USERPROFILE") + "\\AppData\\Local\\dbcli\\pgcli\\"
else:
- return expanduser('~/.config/pgcli/')
+ return expanduser("~/.config/pgcli/")
def load_config(usr_cfg, def_cfg=None):
cfg = ConfigObj()
cfg.merge(ConfigObj(def_cfg, interpolation=False))
- cfg.merge(ConfigObj(expanduser(usr_cfg), interpolation=False, encoding='utf-8'))
+ cfg.merge(ConfigObj(expanduser(usr_cfg), interpolation=False, encoding="utf-8"))
cfg.filename = expanduser(usr_cfg)
return cfg
@@ -51,18 +51,19 @@ def upgrade_config(config, def_config):
def get_config(pgclirc_file=None):
from pgcli import __file__ as package_root
+
package_root = os.path.dirname(package_root)
- pgclirc_file = pgclirc_file or '%sconfig' % config_location()
+ pgclirc_file = pgclirc_file or "%sconfig" % config_location()
- default_config = os.path.join(package_root, 'pgclirc')
+ default_config = os.path.join(package_root, "pgclirc")
write_default_config(default_config, pgclirc_file)
return load_config(pgclirc_file, default_config)
def get_casing_file(config):
- casing_file = config['main']['casing_file']
- if casing_file == 'default':
- casing_file = config_location() + 'casing'
+ casing_file = config["main"]["casing_file"]
+ if casing_file == "default":
+ casing_file = config_location() + "casing"
return casing_file
diff --git a/pgcli/encodingutils.py b/pgcli/encodingutils.py
index 5de97b7e..279da3a8 100644
--- a/pgcli/encodingutils.py
+++ b/pgcli/encodingutils.py
@@ -13,7 +13,7 @@ def unicode2utf8(arg):
"""
if PY2 and isinstance(arg, unicode):
- return arg.encode('utf-8')
+ return arg.encode("utf-8")
return arg
@@ -24,5 +24,5 @@ def utf8tounicode(arg):
"""
if PY2 and isinstance(arg, str):
- return arg.decode('utf-8')
+ return arg.decode("utf-8")
return arg
diff --git a/pgcli/key_bindings.py b/pgcli/key_bindings.py
index d7750432..cc078977 100644
--- a/pgcli/key_bindings.py
+++ b/pgcli/key_bindings.py
@@ -12,32 +12,32 @@ def pgcli_bindings(pgcli):
"""Custom key bindings for pgcli."""
kb = KeyBindings()
- tab_insert_text = ' ' * 4
+ tab_insert_text = " " * 4
- @kb.add('f2')
+ @kb.add("f2")
def _(event):
"""Enable/Disable SmartCompletion Mode."""
- _logger.debug('Detected F2 key.')
+ _logger.debug("Detected F2 key.")
pgcli.completer.smart_completion = not pgcli.completer.smart_completion
- @kb.add('f3')
+ @kb.add("f3")
def _(event):
"""Enable/Disable Multiline Mode."""
- _logger.debug('Detected F3 key.')
+ _logger.debug("Detected F3 key.")
pgcli.multi_line = not pgcli.multi_line
- @kb.add('f4')
+ @kb.add("f4")
def _(event):
"""Toggle between Vi and Emacs mode."""
- _logger.debug('Detected F4 key.')
+ _logger.debug("Detected F4 key.")
pgcli.vi_mode = not pgcli.vi_mode
event.app.editing_mode = EditingMode.VI if pgcli.vi_mode else EditingMode.EMACS
- @kb.add('tab')
+ @kb.add("tab")
def _(event):
"""Force autocompletion at cursor on non-empty lines."""
- _logger.debug('Detected <Tab> key.')
+ _logger.debug("Detected <Tab> key.")
buff = event.app.current_buffer
doc = buff.document
@@ -50,17 +50,15 @@ def pgcli_bindings(pgcli):
else:
buff.insert_text(tab_insert_text, fire_event=False)
- @kb.add('escape', filter=has_completions)
+ @kb.add("escape", filter=has_completions)
def _(event):
"""Force closing of autocompletion."""
- _logger.debug('Detected <Esc> key.')
+ _logger.debug("Detected <Esc> key.")
event.current_buffer.complete_state = None
event.app.current_buffer.complete_state = None
-
-
- @kb.add('c-space')
+ @kb.add("c-space")
def _(event):
"""
Initialize autocompletion at cursor.
@@ -70,7 +68,7 @@ def pgcli_bindings(pgcli):
If the menu is showing, select the next completion.
"""
- _logger.debug('Detected <C-Space> key.')
+ _logger.debug("Detected <C-Space> key.")
b = event.app.current_buffer
if b.complete_state:
@@ -78,7 +76,7 @@ def pgcli_bindings(pgcli):
else:
b.start_completion(select_first=False)
- @kb.add('enter', filter=completion_is_selected)
+ @kb.add("enter", filter=completion_is_selected)
def _(event):
"""Makes the enter key work as the tab key only when showing the menu.
@@ -87,7 +85,7 @@ def pgcli_bindings(pgcli):
(accept current selection).
"""
- _logger.debug('Detected enter key.')
+ _logger.debug("Detected enter key.")
event.current_buffer.complete_state = None
event.app.current_buffer.complete_state = None
diff --git a/pgcli/magic.py b/pgcli/magic.py
index 899fe733..3e9a4021 100644
--- a/pgcli/magic.py
+++ b/pgcli/magic.py
@@ -10,40 +10,40 @@ def load_ipython_extension(ipython):
"""This is called via the ipython command '%load_ext pgcli.magic'"""
# first, load the sql magic if it isn't already loaded
- if not ipython.find_line_magic('sql'):
- ipython.run_line_magic('load_ext', 'sql')
+ if not ipython.find_line_magic("sql"):
+ ipython.run_line_magic("load_ext", "sql")
# register our own magic
- ipython.register_magic_function(pgcli_line_magic, 'line', 'pgcli')
+ ipython.register_magic_function(pgcli_line_magic, "line", "pgcli")
def pgcli_line_magic(line):
- _logger.debug('pgcli magic called: %r', line)
+ _logger.debug("pgcli magic called: %r", line)
parsed = sql.parse.parse(line, {})
# "get" was renamed to "set" in ipython-sql:
# https://github.com/catherinedevlin/ipython-sql/commit/f4283c65aaf68f961e84019e8b939e4a3c501d43
- if hasattr(sql.connection.Connection, 'get'):
- conn = sql.connection.Connection.get(parsed['connection'])
+ if hasattr(sql.connection.Connection, "get"):
+ conn = sql.connection.Connection.get(parsed["connection"])
else:
- conn = sql.connection.Connection.set(parsed['connection'])
+ conn = sql.connection.Connection.set(parsed["connection"])
try:
# A corresponding pgcli object already exists
pgcli = conn._pgcli
- _logger.debug('Reusing existing pgcli')
+ _logger.debug("Reusing existing pgcli")
except AttributeError:
# I can't figure out how to get the underylying psycopg2 connection
# from the sqlalchemy connection, so just grab the url and make a
# new connection
pgcli = PGCli()
u = conn.session.engine.url
- _logger.debug('New pgcli: %r', str(u))
+ _logger.debug("New pgcli: %r", str(u))
pgcli.connect(u.database, u.host, u.username, u.port, u.password)
conn._pgcli = pgcli
# For convenience, print the connection alias
- print('Connected: {}'.format(conn.name))
+ print ("Connected: {}".format(conn.name))
try:
pgcli.run_cli()
@@ -56,12 +56,12 @@ def pgcli_line_magic(line):
q = pgcli.query_history[-1]
if not q.successful:
- _logger.debug('Unsuccessful query - ignoring')
+ _logger.debug("Unsuccessful query - ignoring")
return
if q.meta_changed or q.db_changed or q.path_changed:
- _logger.debug('Dangerous query detected -- ignoring')
+ _logger.debug("Dangerous query detected -- ignoring")
return
ipython = get_ipython()
- return ipython.run_cell_magic('sql', line, q.query)
+ return ipython.run_cell_magic("sql", line, q.query)
diff --git a/pgcli/main.py b/pgcli/main.py
index 2a000cd9..fdbbc15c 100644
--- a/pgcli/main.py
+++ b/pgcli/main.py
@@ -5,7 +5,7 @@ import warnings
from pgspecial.namedqueries import NamedQueries
-warnings.filterwarnings("ignore", category=UserWarning, module='psycopg2')
+warnings.filterwarnings("ignore", category=UserWarning, module="psycopg2")
import os
import re
@@ -20,12 +20,13 @@ import datetime as dt
import itertools
from time import time, sleep
from codecs import open
+
keyring = None # keyring will be loaded later
from cli_helpers.tabular_output import TabularOutputFormatter
-from cli_helpers.tabular_output.preprocessors import (align_decimals,
- format_numbers)
+from cli_helpers.tabular_output.preprocessors import align_decimals, format_numbers
import click
+
try:
import setproctitle
except ImportError:
@@ -36,14 +37,16 @@ from prompt_toolkit.shortcuts import PromptSession, CompleteStyle
from prompt_toolkit.document import Document
from prompt_toolkit.filters import HasFocus, IsDone
from prompt_toolkit.lexers import PygmentsLexer
-from prompt_toolkit.layout.processors import (ConditionalProcessor,
- HighlightMatchingBracketProcessor,
- TabsProcessor)
+from prompt_toolkit.layout.processors import (
+ ConditionalProcessor,
+ HighlightMatchingBracketProcessor,
+ TabsProcessor,
+)
from prompt_toolkit.history import FileHistory
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from pygments.lexers.sql import PostgresLexer
-from pgspecial.main import (PGSpecial, NO_QUERY, PAGER_OFF, PAGER_LONG_OUTPUT)
+from pgspecial.main import PGSpecial, NO_QUERY, PAGER_OFF, PAGER_LONG_OUTPUT
import pgspecial as special
from .pgcompleter import PGCompleter
@@ -52,8 +55,13 @@ from .pgstyle import style_factory, style_factory_output
from .pgexecute import PGExecute
from .pgbuffer import pg_is_multiline
from .completion_refresher import CompletionRefresher
-from .config import (get_casing_file,
- load_config, config_location, ensure_dir_exists, get_config)
+from .config import (
+ get_casing_file,
+ load_config,
+ config_location,
+ ensure_dir_exists,
+ get_config,
+)
from .key_bindings import pgcli_bindings
from .encodingutils import utf8tounicode
from .encodingutils import text_type
@@ -76,30 +84,38 @@ from collections import namedtuple
from textwrap import dedent
# Ref: https://stackoverflow.com/questions/30425105/filter-special-chars-such-as-color-codes-from-shell-output
-COLOR_CODE_REGEX = re.compile(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))')
+COLOR_CODE_REGEX = re.compile(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))")
# Query tuples are used for maintaining history
MetaQuery = namedtuple(
- 'Query',
+ "Query",
[
- 'query', # The entire text of the command
- 'successful', # True If all subqueries were successful
- 'total_time', # Time elapsed executing the query and formatting results
- 'execution_time', # Time elapsed executing the query
- 'meta_changed', # True if any subquery executed create/alter/drop
- 'db_changed', # True if any subquery changed the database
- 'path_changed', # True if any subquery changed the search path
- 'mutated', # True if any subquery executed insert/update/delete
- 'is_special', # True if the query is a special command
- ])
-MetaQuery.__new__.__defaults__ = ('', False, 0, 0, False, False, False, False)
+ "query", # The entire text of the command
+ "successful", # True If all subqueries were successful
+ "total_time", # Time elapsed executing the query and formatting results
+ "execution_time", # Time elapsed executing the query
+ "meta_changed", # True if any subquery executed create/alter/drop
+ "db_changed", # True if any subquery changed the database
+ "path_changed", # True if any subquery changed the search path
+ "mutated", # True if any subquery executed insert/update/delete
+ "is_special", # True if the query is a special command
+ ],
+)
+MetaQuery.__new__.__defaults__ = ("", False, 0, 0, False, False, False, False)
OutputSettings = namedtuple(
- 'OutputSettings',
- 'table_format dcmlfmt floatfmt missingval expanded max_width case_function style_output'
+ "OutputSettings",
+ "table_format dcmlfmt floatfmt missingval expanded max_width case_function style_output",
)
OutputSettings.__new__.__defaults__ = (
- None, None, None, '<null>', False, None, lambda x: x, None
+ None,
+ None,
+ None,
+ "<null>",
+ False,
+ None,
+ lambda x: x,
+ None,
)
@@ -109,34 +125,49 @@ class PgCliQuitError(Exception):
class PGCli(object):
- default_prompt = '\\u@\\h:\\d> '
+ default_prompt = "\\u@\\h:\\d> "
max_len_prompt = 30
def set_default_pager(self, config):
- configured_pager = config['main'].get('pager')
- os_environ_pager = os.environ.get('PAGER')
+ configured_pager = config["main"].get("pager")
+ os_environ_pager = os.environ.get("PAGER")
if configured_pager:
self.logger.info(
- 'Default pager found in config file: "{}"'.format(configured_pager))
- os.environ['PAGER'] = configured_pager
+ 'Default pager found in config file: "{}"'.format(configured_pager)
+ )
+ os.environ["PAGER"] = configured_pager
elif os_environ_pager:
- self.logger.info('Default pager found in PAGER environment variable: "{}"'.format(
- os_environ_pager))
- os.environ['PAGER'] = os_environ_pager
+ self.logger.info(
+ 'Default pager found in PAGER environment variable: "{}"'.format(
+ os_environ_pager
+ )
+ )
+ os.environ["PAGER"] = os_environ_pager
else:
self.logger.info(
- 'No default pager found in environment. Using os default pager')
+ "No default pager found in environment. Using os default pager"
+ )
# Set default set of less recommended options, if they are not already set.
# They are ignored if pager is different than less.
- if not os.environ.get('LESS'):
- os.environ['LESS'] = '-SRXF'
-
- def __init__(self, force_passwd_prompt=False, never_passwd_prompt=False,
- pgexecute=None, pgclirc_file=None, row_limit=None,
- single_connection=False, less_chatty=None, prompt=None, prompt_dsn=None,
- auto_vertical_output=False, warn=None):
+ if not os.environ.get("LESS"):
+ os.environ["LESS"] = "-SRXF"
+
+ def __init__(
+ self,
+ force_passwd_prompt=False,
+ never_passwd_prompt=False,
+ pgexecute=None,
+ pgclirc_file=None,
+ row_limit=None,
+ single_connection=False,
+ less_chatty=None,
+ prompt=None,
+ prompt_dsn=None,
+ auto_vertical_output=False,
+ warn=None,
+ ):
self.force_passwd_prompt = force_passwd_prompt
self.never_passwd_prompt = never_passwd_prompt
@@ -156,40 +187,43 @@ class PGCli(object):
self.output_file = None
self.pgspecial = PGSpecial()
- self.multi_line = c['main'].as_bool('multi_line')
- self.multiline_mode = c['main'].get('multi_line_mode', 'psql')
- self.vi_mode = c['main'].as_bool('vi')
- self.auto_expand = auto_vertical_output or c['main'].as_bool(
- 'auto_expand')
- self.expanded_output = c['main'].as_bool('expand')
- self.pgspecial.timing_enabled = c['main'].as_bool('timing')
+ self.multi_line = c["main"].as_bool("multi_line")
+ self.multiline_mode = c["main"].get("multi_line_mode", "psql")
+ self.vi_mode = c["main"].as_bool("vi")
+ self.auto_expand = auto_vertical_output or c["main"].as_bool("auto_expand")
+ self.expanded_output = c["main"].as_bool("expand")
+ self.pgspecial.timing_enabled = c["main"].as_bool("timing")
if row_limit is not None:
self.row_limit = row_limit
else:
- self.row_limit = c['main'].as_int('row_limit')
-
- self.min_num_menu_lines = c['main'].as_int('min_num_menu_lines')
- self.multiline_continuation_char = c['main']['multiline_continuation_char']
- self.table_format = c['main']['table_format']
- self.syntax_style = c['main']['syntax_style']
- self.cli_style = c['colors']
- self.wider_completion_menu = c['main'].as_bool('wider_completion_menu')
- c_dest_warning = c['main'].as_bool('destructive_warning')
+ self.row_limit = c["main"].as_int("row_limit")
+
+ self.min_num_menu_lines = c["main"].as_int("min_num_menu_lines")
+ self.multiline_continuation_char = c["main"]["multiline_continuation_char"]
+ self.table_format = c["main"]["table_format"]
+ self.syntax_style = c["main"]["syntax_style"]
+ self.cli_style = c["colors"]
+ self.wider_completion_menu = c["main"].as_bool("wider_completion_menu")
+ c_dest_warning = c["main"].as_bool("destructive_warning")
self.destructive_warning = c_dest_warning if warn is None else warn
- self.less_chatty = bool(less_chatty) or c['main'].as_bool('less_chatty')
- self.null_string = c['main'].get('null_string', '<null>')
- self.prompt_format = prompt if prompt is not None else c['main'].get('prompt', self.default_prompt)
+ self.less_chatty = bool(less_chatty) or c["main"].as_bool("less_chatty")
+ self.null_string = c["main"].get("null_string", "<null>")
+ self.prompt_format = (
+ prompt
+ if prompt is not None
+ else c["main"].get("prompt", self.default_prompt)
+ )
self.prompt_dsn_format = prompt_dsn
- self.on_error = c['main']['on_error'].upper()
- self.decimal_format = c['data_formats']['decimal']
- self.float_format = c['data_formats']['float']
+ self.on_error = c["main"]["on_error"].upper()
+ self.decimal_format = c["data_formats"]["decimal"]
+ self.float_format = c["data_formats"]["float"]
self.initialize_keyring()
- self.pgspecial.pset_pager(self.config['main'].as_bool(
- 'enable_pager') and "on" or "off")
+ self.pgspecial.pset_pager(
+ self.config["main"].as_bool("enable_pager") and "on" or "off"
+ )
- self.style_output = style_factory_output(
- self.syntax_style, c['colors'])
+ self.style_output = style_factory_output(self.syntax_style, c["colors"])
self.now = dt.datetime.today()
@@ -198,23 +232,24 @@ class PGCli(object):
self.query_history = []
# Initialize completer
- smart_completion = c['main'].as_bool('smart_completion')
- keyword_casing = c['main']['keyword_casing']
+ smart_completion = c["main"].as_bool("smart_completion")
+ keyword_casing = c["main"]["keyword_casing"]
self.settings = {
- 'casing_file': get_casing_file(c),
- 'generate_casing_file': c['main'].as_bool('generate_casing_file'),
- 'generate_aliases': c['main'].as_bool('generate_aliases'),
- 'asterisk_column_order': c['main']['asterisk_column_order'],
- 'qualify_columns': c['main']['qualify_columns'],
- 'case_column_headers': c['main'].as_bool('case_column_headers'),
- 'search_path_filter': c['main'].as_bool('search_path_filter'),
- 'single_connection': single_connection,
- 'less_chatty': less_chatty,
- 'keyword_casing': keyword_casing,
+ "casing_file": get_casing_file(c),
+ "generate_casing_file": c["main"].as_bool("generate_casing_file"),
+ "generate_aliases": c["main"].as_bool("generate_aliases"),
+ "asterisk_column_order": c["main"]["asterisk_column_order"],
+ "qualify_columns": c["main"]["qualify_columns"],
+ "case_column_headers": c["main"].as_bool("case_column_headers"),
+ "search_path_filter": c["main"].as_bool("search_path_filter"),
+ "single_connection": single_connection,
+ "less_chatty": less_chatty,
+ "keyword_casing": keyword_casing,
}
- completer = PGCompleter(smart_completion, pgspecial=self.pgspecial,
- settings=self.settings)
+ completer = PGCompleter(
+ smart_completion, pgspecial=self.pgspecial, settings=self.settings
+ )
self.completer = completer
self._completer_lock = threading.Lock()
self.register_special_commands()
@@ -227,57 +262,93 @@ class PGCli(object):
def register_special_commands(self):
self.pgspecial.register(
- self.change_db, '\\c', '\\c[onnect] database_name',
- 'Change to a new database.', aliases=('use', '\\connect', 'USE'))
-
- refresh_callback = lambda: self.refresh_completions(
- persist_priorities='all')
-
- self.pgspecial.register(self.quit, '\\q', '\\q',
- 'Quit pgcli.', arg_type=NO_QUERY, case_sensitive=True,
- aliases=(':q',))
- self.pgspecial.register(self.quit, 'quit', 'quit',
- 'Quit pgcli.', arg_type=NO_QUERY, case_sensitive=False,
- aliases=('exit',))
- self.pgspecial.register(refresh_callback, '\\#', '\\#',
- 'Refresh auto-completions.', arg_type=NO_QUERY)
- self.pgspecial.register(refresh_callback, '\\refresh', '\\refresh',
- 'Refresh auto-completions.', arg_type=NO_QUERY)
- self.pgspecial.register(self.execute_from_file, '\\i', '\\i filename',
- 'Execute commands from file.')
- self.pgspecial.register(self.write_to_file, '\\o', '\\o [filename]',
- 'Send all query results to file.')
- self.pgspecial.register(self.info_connection, '\\conninfo',
- '\\conninfo', 'Get connection details')
- self.pgspecial.register(self.change_table_format, '\\T', '\\T [format]',
- 'Change the table format used to output results')
+ self.change_db,
+ "\\c",
+ "\\c[onnect] database_name",
+ "Change to a new database.",
+ aliases=("use", "\\connect", "USE"),
+ )
+
+ refresh_callback = lambda: self.refresh_completions(persist_priorities="all")
+
+ self.pgspecial.register(
+ self.quit,
+ "\\q",
+ "\\q",
+ "Quit pgcli.",
+ arg_type=NO_QUERY,
+ case_sensitive=True,
+ aliases=(":q",),
+ )
+ self.pgspecial.register(
+ self.quit,
+ "quit",
+ "quit",
+ "Quit pgcli.",
+ arg_type=NO_QUERY,
+ case_sensitive=False,
+ aliases=("exit",),
+ )
+ self.pgspecial.register(
+ refresh_callback,
+ "\\#",
+ "\\#",
+ "Refresh auto-completions.",
+ arg_type=NO_QUERY,
+ )
+ self.pgspecial.register(
+ refresh_callback,
+ "\\refresh",
+ "\\refresh",
+ "Refresh auto-completions.",
+ arg_type=NO_QUERY,
+ )
+ self.pgspecial.register(
+ self.execute_from_file, "\\i", "\\i filename", "Execute commands from file."
+ )
+ self.pgspecial.register(
+ self.write_to_file,
+ "\\o",
+ "\\o [filename]",
+ "Send all query results to file.",
+ )
+ self.pgspecial.register(
+ self.info_connection, "\\conninfo", "\\conninfo", "Get connection details"
+ )
+ self.pgspecial.register(
+ self.change_table_format,
+ "\\T",
+ "\\T [format]",
+ "Change the table format used to output results",
+ )
def change_table_format(self, pattern, **_):
try:
if pattern not in TabularOutputFormatter().supported_formats:
raise ValueError()
self.table_format = pattern
- yield (None, None, None,
- 'Changed table format to {}'.format(pattern))
+ yield (None, None, None, "Changed table format to {}".format(pattern))
except ValueError:
- msg = 'Table format {} not recognized. Allowed formats:'.format(
- pattern)
+ msg = "Table format {} not recognized. Allowed formats:".format(pattern)
for table_type in TabularOutputFormatter().supported_formats:
msg += "\n\t{}".format(table_type)
- msg += '\nCurrently set to: %s' % self.table_format
+ msg += "\nCurrently set to: %s" % self.table_format
yield (None, None, None, msg)
def info_connection(self, **_):
- if self.pgexecute.host.startswith('/'):
+ if self