from __future__ import unicode_literals
from __future__ import print_function
import os
import sys
import traceback
import logging
import threading
from time import time
from datetime import datetime
from io import open
from collections import namedtuple
from sqlite3 import OperationalError, sqlite_version
import shutil
from cli_helpers.tabular_output import TabularOutputFormatter
from cli_helpers.tabular_output import preprocessors
import click
import sqlparse
from prompt_toolkit.completion import DynamicCompleter
from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
from prompt_toolkit.shortcuts import PromptSession, CompleteStyle
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from prompt_toolkit.document import Document
from prompt_toolkit.filters import HasFocus, IsDone
from prompt_toolkit.formatted_text import ANSI
from prompt_toolkit.layout.processors import (
HighlightMatchingBracketProcessor,
ConditionalProcessor,
)
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.history import FileHistory
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from .packages.special.main import NO_QUERY
from .packages.prompt_utils import confirm, confirm_destructive_query
from .packages import special
from .sqlcompleter import SQLCompleter
from .clitoolbar import create_toolbar_tokens_func
from .clistyle import style_factory, style_factory_output
from .sqlexecute import SQLExecute
from .clibuffer import cli_is_multiline
from .completion_refresher import CompletionRefresher
from .config import config_location, ensure_dir_exists, get_config
from .key_bindings import cli_bindings
from .encodingutils import utf8tounicode, text_type
from .lexer import LiteCliLexer
from .__init__ import __version__
from .packages.filepaths import dir_path_exists
import itertools
click.disable_unicode_literals_warning = True
# Query tuples are used for maintaining history
Query = namedtuple("Query", ["query", "successful", "mutating"])
PACKAGE_ROOT = os.path.abspath(os.path.dirname(__file__))
class LiteCli(object):
default_prompt = "\\d> "
max_len_prompt = 45
def __init__(
self,
sqlexecute=None,
prompt=None,
logfile=None,
auto_vertical_output=False,
warn=None,
liteclirc=None,
):
self.sqlexecute = sqlexecute
self.logfile = logfile
# Load config.
c = self.config = get_config(liteclirc)
self.multi_line = c["main"].as_bool("multi_line")
self.key_bindings = c["main"]["key_bindings"]
special.set_favorite_queries(self.config)
self.formatter = TabularOutputFormatter(format_name=c["main"]["table_format"])
self.formatter.litecli = self
self.syntax_style = c["main"]["syntax_style"]
self.less_chatty = c["main"].as_bool("less_chatty")
self.show_bottom_toolbar = c["main"].as_bool("show_bottom_toolbar")
self.cli_style = c["colors"]
self.output_style = style_factory_output(self.syntax_style, self.cli_style)
self.wider_completion_menu = c["main"].as_bool("wider_completion_menu")
self.autocompletion = c["main"].as_bool("autocompletion")
c_dest_warning = c["main"].as_bool("destructive_warning")
self.destructive_warning = c_dest_warning if warn is None else warn
self.login_path_as_host = c["main"].as_bool("login_path_as_host")
# read from cli argument or user config file
self.auto_vertical_output = auto_vertical_output or c["main"].as_bool(
"auto_vertical_output"
)
# audit log
if self.logfile is None and "audit_log" in c["main"]:
try:
self.logfile = open(os.path.expanduser(c["main"]["audit_log"]), "a")
except (IOError, OSError):
self.echo(
"Error: Unable to open the audit log file. Your queries will not be logged.",
err=True,
fg="red",
)
self.logfile = False
# Load startup commands.
try:
self.startup_commands = c["startup_commands"]
except (
KeyError
): # Redundant given the load_config() function that merges in the standard config, but put here to avoid fail if user do not have updated config file.
self.startup_commands = None
self.completion_refresher = CompletionRefresher()
self.logger = logging.getLogger(__name__)
self.initialize_logging()
prompt_cnf = self.read_my_cnf_files(["prompt"])["prompt"]
self.prompt_format = (
prompt or prompt_cnf or c["main"]["prompt"] or self.default_prompt
)
self.prompt_continuation_format = c["main"]["prompt_continuation"]
keyword_casing = c["main"].get("keyword_casing", "auto")
self.query_history = []
# Initialize completer.
self.completer = SQLCompleter(
supported_formats=self.formatter.supported_formats,
keyword_casing=keyword_casing,
)
self._completer_lock = threading.Lock()
# Register custom special commands.