diff options
Diffstat (limited to 'cli/openbb_cli/controllers/cli_controller.py')
-rw-r--r-- | cli/openbb_cli/controllers/cli_controller.py | 947 |
1 files changed, 947 insertions, 0 deletions
diff --git a/cli/openbb_cli/controllers/cli_controller.py b/cli/openbb_cli/controllers/cli_controller.py new file mode 100644 index 00000000000..f6e0998fc2a --- /dev/null +++ b/cli/openbb_cli/controllers/cli_controller.py @@ -0,0 +1,947 @@ +#!/usr/bin/env python +"""Main CLI Module.""" + +import argparse +import contextlib +import difflib +import logging +import os +import re +import sys +import time +import webbrowser +from datetime import datetime +from functools import partial, update_wrapper +from pathlib import Path +from types import MethodType +from typing import Any, Dict, List, Optional + +import certifi +import pandas as pd +import requests +from openbb import obb +from openbb_cli.argparse_translator.obbject_registry import Registry +from openbb_cli.config import constants +from openbb_cli.config.completer import NestedCompleter +from openbb_cli.config.constants import ( + ASSETS_DIRECTORY, + ENV_FILE_SETTINGS, + HOME_DIRECTORY, + REPOSITORY_DIRECTORY, +) +from openbb_cli.config.menu_text import MenuText +from openbb_cli.controllers.base_controller import BaseController +from openbb_cli.controllers.platform_controller_factory import ( + PlatformControllerFactory, +) +from openbb_cli.controllers.script_parser import is_reset, parse_openbb_script +from openbb_cli.controllers.utils import ( + bootup, + first_time_user, + get_flair_and_username, + is_installer, + parse_and_split_input, + print_goodbye, + print_rich_table, + reset, + suppress_stdout, + welcome_message, +) +from openbb_cli.session import Session +from prompt_toolkit.formatted_text import HTML +from prompt_toolkit.styles import Style +from pydantic import BaseModel + +PLATFORM_ROUTERS = { + d: "menu" if not isinstance(getattr(obb, d), BaseModel) else "command" + for d in dir(obb) + if "_" not in d +} +NON_DATA_ROUTERS = ["coverage", "account", "reference", "system", "user"] +DATA_PROCESSING_ROUTERS = ["technical", "quantitative", "econometrics"] + +# pylint: disable=too-many-public-methods,import-outside-toplevel, too-many-function-args +# pylint: disable=too-many-branches,no-member,C0302,too-many-return-statements, inconsistent-return-statements + +logger = logging.getLogger(__name__) + +env_file = str(ENV_FILE_SETTINGS) + +if is_installer(): + # Necessary for installer so that it can locate the correct certificates for + # API calls and https + # https://stackoverflow.com/questions/27835619/urllib-and-ssl-certificate-verify-failed-error/73270162#73270162 + os.environ["REQUESTS_CA_BUNDLE"] = certifi.where() + os.environ["SSL_CERT_FILE"] = certifi.where() + + +class CLIController(BaseController): + """CLI Controller class.""" + + CHOICES_COMMANDS = ["record", "stop", "exe", "results"] + CHOICES_MENUS = [ + "settings", + ] + + for router, value in PLATFORM_ROUTERS.items(): + if value == "menu": + CHOICES_MENUS.append(router) + else: + CHOICES_COMMANDS.append(router) + + PATH = "/" + CHOICES_GENERATION = False + + def __init__(self, jobs_cmds: Optional[List[str]] = None): + """Construct CLI controller.""" + self.ROUTINE_FILES: Dict[str, str] = dict() + self.ROUTINE_DEFAULT_FILES: Dict[str, str] = dict() + self.ROUTINE_PERSONAL_FILES: Dict[str, str] = dict() + self.ROUTINE_CHOICES: Dict[str, Any] = dict() + + super().__init__(jobs_cmds) + + self.queue: List[str] = list() + + if jobs_cmds: + self.queue = parse_and_split_input( + an_input=" ".join(jobs_cmds), custom_filters=[] + ) + + self.update_success = False + + self._generate_platform_commands() + + self.update_runtime_choices() + + def _generate_platform_commands(self): + """Generate Platform based commands/menus.""" + + def method_call_class(self, _, controller, name, parent_path, target): + self.queue = self.load_class( + controller, name, parent_path, target, self.queue + ) + + # pylint: disable=unused-argument + def method_call_command(self, _, router: str): + """Call command.""" + mdl = getattr(obb, router) + df = pd.DataFrame.from_dict(mdl.model_dump(), orient="index") + return print_rich_table(df, show_index=True) + + for router, value in PLATFORM_ROUTERS.items(): + target = getattr(obb, router) + + if value == "menu": + pcf = PlatformControllerFactory( + target, reference=obb.reference["paths"] # type: ignore + ) + DynamicController = pcf.create() + + # Bind the method to the class + bound_method = MethodType(method_call_class, self) + + # Update the wrapper and set the attribute + bound_method = update_wrapper( # type: ignore + partial( + bound_method, + controller=DynamicController, + name=router, + target=target, + parent_path=self.path, + ), + method_call_class, + ) + else: + bound_method = MethodType(method_call_command, self) + bound_method = update_wrapper( # type: ignore + partial(bound_method, router=router), + method_call_command, + ) + + setattr(self, f"call_{router}", bound_method) + + def update_runtime_choices(self): + """Update runtime choices.""" + routines_directory = Path( + Session().user.preferences.export_directory, "routines" + ) + + if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT: + # choices: dict = self.choices_default + choices: dict = {c: {} for c in self.controller_choices} # type: ignore + choices["hold"] = {c: None for c in ["on", "off", "-s", "--sameaxis"]} + choices["hold"]["off"] = {"--title": None} + + self.ROUTINE_FILES = { + filepath.name: filepath # type: ignore + for filepath in routines_directory.rglob("*.openbb") + } + self.ROUTINE_DEFAULT_FILES = { + filepath.name: filepath # type: ignore + for filepath in Path(routines_directory / "hub" / "default").rglob( + "*.openbb" + ) + } + self.ROUTINE_PERSONAL_FILES = { + filepath.name: filepath # type: ignore + for filepath in Path(routines_directory / "hub" / "personal").rglob( + "*.openbb" + ) + } + + choices["exe"] = { + "--file": { + filename: {} for filename in list(self.ROUTINE_FILES.keys()) + }, + "-f": "--file", + "--example": None, + "-e": "--example", + "--input": None, + "-i": "--input", + "--url": None, + } + + choices["record"] = { + "--name": None, + "-n": "--name", + "--description": None, + "-d": "--description", + "--public": None, + "-p": "--public", + "--local": None, + "-l": "--local", + "--tag1": {c: None for c in constants.SCRIPT_TAGS}, + "--tag2": {c: None for c in constants.SCRIPT_TAGS}, + "--tag3": {c: None for c in constants.SCRIPT_TAGS}, + } + + self.completer = NestedCompleter.from_nested_dict(choices) + + def print_help(self): + """Print help.""" + mt = MenuText("") + mt.add_info("_configure_") + mt.add_menu("settings") + mt.add_raw("\n") + mt.add_info("_scripts_") + mt.add_cmd("record") + mt.add_cmd("stop") + mt.add_cmd("exe") + mt.add_raw("\n") + mt.add_info("Platform CLI") + mt.add_raw(" data\n") + for router, value in PLATFORM_ROUTERS.items(): + if router in NON_DATA_ROUTERS or router in DATA_PROCESSING_ROUTERS: + continue + if value == "menu": + menu_description = ( + obb.reference["routers"] # type: ignore + .get(f"{self.PATH}{router}", {}) + .get("description") + ) or "" + mt.add_menu( + key_menu=router, + menu_description=menu_description.split(".")[0].lower(), + ) + else: + mt.add_cmd(router) + + if any(router in PLATFORM_ROUTERS for router in DATA_PROCESSING_ROUTERS): + mt.add_raw("\n data processing\n") + for router, value in PLATFORM_ROUTERS.items(): + if router not in DATA_PROCESSING_ROUTERS: + continue + if value == "menu": + menu_description = ( + obb.reference["routers"] # type: ignore + .get(f"{self.PATH}{router}", {}) + .get("description") + ) or "" + mt.add_menu( + key_menu=router, + menu_description=menu_description.split(".")[0].lower(), + ) + else: + mt.add_cmd(router) + + mt.add_raw("\n configuration\n") + for router, value in PLATFORM_ROUTERS.items(): + if router not in NON_DATA_ROUTERS or router == "reference": + continue + if value == "menu": + menu_description = ( + obb.reference["routers"] # type: ignore + .get(f"{self.PATH}{router}", {}) + .get("description") + ) or "" + mt.add_menu( + key_menu=router, + menu_description=menu_description.split(".")[0].lower(), + ) + else: + mt.add_cmd(router) + + mt.add_raw("\n cached results (OBBjects)\n") + mt.add_cmd("results") + + Session().console.print(text=mt.menu_text, menu="Home") + self.update_runtime_choices() + + def parse_input(self, an_input: str) -> List: + """Overwrite the BaseController parse_input for `askobb` and 'exe'. + + This will allow us to search for something like "P/E" ratio. + """ + # Filtering out sorting parameters with forward slashes like P/E + sort_filter = r"((\ -q |\ --question|\ ).*?(/))" + # Filter out urls + url = r"(exe (--url )?(https?://)?my\.openbb\.(dev|co)/u/.*/routine/.*)" + custom_filters = [sort_filter, url] + return parse_and_split_input(an_input=an_input, custom_filters=custom_filters) + + def call_settings(self, _): + """Process feature flags command.""" + from openbb_cli.controllers.feature_flags_controller import ( + FeatureFlagsController, + ) + + self.queue = self.load_class(FeatureFlagsController, self.queue) + + def call_exe(self, other_args: List[str]): + """Process exe command.""" + # Merge rest of string path to other_args and remove queue since it is a dir + other_args += self.queue + + if not other_args: + Session().console.print( + "[info]Provide a path to the routine you wish to execute. For an example, please use " + "`exe --example` and for documentation and to learn how create your own script " + "type `about exe`.\n[/info]" + ) + return + parser = argparse.ArgumentParser( + add_help=False, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + prog="exe", + description="Execute automated routine script. For an example, please use " + "`exe --example` and for documentation and to learn how create your own script " + "type `about exe`.", + ) + parser.add_argument( + "--file", + "-f", + help="The path or .openbb file to run.", + dest="file", + required="-h" not in other_args + and "--help" not in other_args + and "-e" not in other_args + and "--example" not in other_args + and "--url" not in other_args + and "my.openbb" not in other_args[0], + type=str, + nargs="+", + ) + parser.add_argument( + "-i", + "--input", + help="Select multiple inputs to be replaced in the routine and separated by commas. E.g. GME,AMC,BTC-USD", + dest="routine_args", + type=lambda s: [str(item) for item in s.split(",")], + ) + parser.add_argument( + "-e", + "--example", + help="Run an example script to understand how routines can be used.", + dest="example", + action="store_true", + default=False, + ) + parser.add_argument( + "--url", help="URL to run openbb script from.", dest="url", type=str + ) + if other_args and "-" not in other_args[0][0]: + if other_args[0].startswith("my.") or other_args[0].startswith("http"): + other_args.insert(0, "--url") + else: + other_args.insert(0, "--file") + ns_parser = self.parse_known_args_and_warn(parser, other_args) + if ns_parser: + if ns_parser.example: + routine_path = ASSETS_DIRECTORY / "routines" / "routine_example.openbb" + Session().console.print( + "[info]Executing an example, please type `about exe` " + "to learn how to create your own script.[/info]\n" + ) + time.sleep(3) + elif ns_parser.url: + if not ns_parser.url.startswith( + "https" + ) and not ns_parser.url.startswith("http:"): + url = "https://" + ns_parser.url + elif ns_parser.url.startswith("http://"): + url = ns_parser.url.replace("http://", "https://") + else: + url = ns_parser.url + username = url.split("/")[-3] + script_name = url.split("/")[-1] + file_name = f"{username}_{script_name}.openbb" + final_url = f"{url}?raw=true" + response = requests.get(final_url, timeout=10) + if response.status_code != 200: + Session().console.print( + "[red]Could not find the requested script.[/red]" + ) + return + routine_text = response.json()["script"] + file_path = Path( + Session().user.preferences.export_directory, "routines" + ) + routine_path = file_path / file_name + with open(routine_path, "w") as file: + file.write(routine_text) + self.update_runtime_choices() + + elif ns_parser.file: + file_path = " ".join(ns_parser.file) # type: ignore + # if string is not in this format "default/file.openbb" then check for files in ROUTINE_FILES + full_path = file_path + hub_routine = file_path.split("/") # type: ignore + # Change with: my.openbb.co + if hub_routine[0] == "default": + routine_path = Path( + self.ROUTINE_DEFAULT_FILES.get(hub_routine[1], full_path) + ) + elif hub_routine[0] == "personal": + routine_path = Path( + self.ROUTINE_PERSONAL_FILES.get(hub_routine[1], full_path) + ) + else: + routine_path = Path(self.ROUTINE_FILES.get(file_path, full_path)) # type: ignore + else: + return + + with open(routine_path) as fp: + raw_lines = list(fp) + + # Capture ARGV either as list if args separated by commas or as single value + if ns_parser.routine_args: + script_inputs = ( + ns_parser.routine_args + if "," not in ns_parser.routine_args + else ns_parser.routine_args.split(",") + ) + + err, parsed_script = parse_openbb_script( + raw_lines=raw_lines, + script_inputs=script_inputs if ns_parser.routine_args else None, + ) + + # If there err output is not an empty string then it means there was an + # issue in parsing the routine and therefore we don't want to feed it + # to the terminal + if err: + Session().console.print(err) + return + + self.queue = [ + val + for val in parse_and_split_input( + an_input=parsed_script, custom_filters=[] + ) + if val + ] + + if "export" in self.queue[0]: + export_path = self.queue[0].split(" ")[1] + # If the path selected does not start from the user root, give relative location from root + if export_path[0] == "~": + export_path = export_path.replace( + "~", HOME_DIRECTORY.as_posix() + ) + elif export_path[0] != "/": + export_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), export_path + ) + + # Check if the directory exists + if os.path.isdir(export_path): + Session().console.print( + f"Export data to be saved in the selected folder: '{export_path}'" + ) + else: + os.makedirs(export_path) + Session().console.print( + f"[green]Folder '{export_path}' successfully created.[/green]" + ) + self.queue = self.queue[1:] + + def call_results(self, _): + """Process results command.""" + results = Registry().all + if results: + df = pd.DataFrame.from_dict(results, orient="index") + print_rich_table( + df, show_index=True, index_name="stack index", title="OBBject Results" + ) + else: + Session().console.print("[info]No results found.[/info]") + + +def handle_job_cmds(jobs_cmds: Optional[List[str]]) -> Optional[List[str]]: + """Handle job commands.""" + # If the path selected does not start from the user root, + # give relative location from root + if jobs_cmds is not None and jobs_cmds: + logger.info("INPUT: %s", "/".join(jobs_cmds)) + + export_path = "" + if jobs_cmds and "export" in jobs_cmds[0]: + commands = jobs_cmds[0].split("/") + first_split = commands[0].split(" ") + if len(first_split) > 1: + export_path = first_split[1] + jobs_cmds = ["/".join(commands[1:])] + if not export_path: + return jobs_cmds + if export_path[0] == "~": + export_path = export_path.replace("~", HOME_DIRECTORY.as_posix()) + elif export_path[0] != "/": + export_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), export_path + ) + + # Check if the directory exists + if os.path.isdir(export_path): + Session().console.print( + f"Export data to be saved in the selected folder: '{export_path}'" + ) + else: + os.makedirs(export_path) + Session().console.print( + f"[green]Folder '{export_path}' successfully created.[/green]" + ) + return jobs_cmds + + +# pylint: disable=unused-argument +def run_cli(jobs_cmds: Optional[List[str]] = None, test_mode=False): + """Run the CLI menu.""" + ret_code = 1 + t_controller = CLIController(jobs_cmds) + an_input = "" + + jobs_cmds = handle_job_cmds(jobs_cmds) + + bootup() + if not jobs_cmds: + welcome_message() + + if first_time_user(): + with contextlib.suppress(EOFError): + webbrowser.open( + "https://docs.openbb.co/terminal/usage/overview/structure-and-navigation" + ) + + t_controller.print_help() + + while ret_code: + + # There is a command in the queue + if t_controller.queue and len(t_controller.queue) > 0: + # If the command is quitting the menu we want to return in here + if t_controller.queue[0] in ("q", "..", "quit"): + print_goodbye() + break + + # Consume 1 element from the queue + an_input = t_controller.queue[0] + t_controller.queue = t_controller.queue[1:] + + # Print the current location because this was an instruction and we want user to know what was the action + if an_input and an_input.split(" ")[0] in t_controller.CHOICES_COMMANDS: + Session().console.print(f"{get_flair_and_username()} / $ {an_input}") + + # Get input command from user + else: + try: + # Get input from user using auto-completion + if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT: + # Check if toolbar hint was enabled + if Session().settings.TOOLBAR_HINT: + an_input = Session().prompt_session.prompt( # type: ignore[union-attr] + f"{get_flair_and_username()} / $ ", + completer=t_controller.completer, + search_ignore_case=True, + bottom_toolbar=HTML( + '<style bg="ansiblack" fg="ansiwhite">[h]</style> help menu ' + '<style bg="ansiblack" fg="ansiwhite">[q]</style> return to previous menu ' + '<style bg="ansiblack" fg="ansiwhite">[e]</style> exit the program ' + '<style bg="ansiblack" fg="ansiwhite">[cmd -h]</style> ' + "see usage and available options " + '<style bg="ansiblack" fg="ansiwhite">[about (cmd/menu)]</style> ' + ), + style=Style.from_dict( + { + "bottom-toolbar": "#ffffff bg:#333333", + } + ), + ) + else: + an_input = Session().prompt_session.prompt( # type: ignore[union-attr] + f"{get_flair_and_username()} / $ ", + completer=t_controller.completer, + search_ignore_case=True, + ) + + # Get input from user without auto-completion + else: + an_input = input(f"{get_flair_and_username()} / $ ") + + except (KeyboardInterrupt, EOFError): + print_goodbye() + break + + try: + # Process the input command + t_controller.queue = t_controller.switch(an_input) + + if an_input in ("q", "quit", "..", "exit", "e"): + print_goodbye() + break + + # Check if the user wants to reset application + if an_input in ("r", "reset") or t_controller.update_success: + reset(t_controller.queue if t_controller.queue else []) + break + + except SystemExit: + logger.exception( + "The command '%s' doesn't exist on the / menu.", + an_input, + ) + Session().console.print( + f"[red]The command '{an_input}' doesn't exist on the / menu.[/red]\n", + ) + similar_cmd = difflib.get_close_matches( + an_input.split(" ")[0] if " " in an_input else an_input, + t_controller.controller_choices, + n=1, + cutoff=0.7, + ) + if similar_cmd: + an_input = similar_cmd[0] + if " " in an_input: + candidate_input = ( + f"{similar_cmd[0]} {' '.join(an_input.split(' ')[1:])}" + ) + if candidate_input == an_input: + an_input = "" + t_controller.queue = [] + Session().console.print("\n") + continue + an_input = candidate_input + + Session().console.print(f"[green]Replacing by '{an_input}'.[/green]") + t_controller.queue.insert(0, an_input) + + +def insert_start_slash(cmds: List[str]) -> List[str]: + """Insert a slash at the beginning of a command sequence.""" + if not cmds[0].startswith("/"): + cmds[0] = f"/{cmds[0]}" + if cmds[0].startswith("/home"): + cmds[0] = f"/{cmds[0][5:]}" + return cmds + + +def run_scripts( + path: Path, + test_mode: bool = False, + verbose: bool = False, + routines_args: Optional[List[str]] = None, + special_arguments: Optional[Dict[str, str]] = None, + output: bool = True, +): + """Run given .openbb scripts. + + Parameters + ---------- + path : str + The location of the .openbb file + test_mode : bool + Whether the CLI is in test mode + verbose : bool + Whether to run tests in verbose mode + routines_args : List[str] + One or multiple inputs to be replaced in the routine and separated by commas. + E.g. GME,AMC,BTC-USD + special_arguments: Optional[Dict[str, str]] + Replace `${key=default}` with `value` for every key in the dictionary + output: bool + Whether to log tests to txt files + """ + if not path.exists(): + Session().console.print(f"File '{path}' doesn't exist. Launching base CLI.\n") + if not test_mode: + run_cli() + + # THIS NEEDS TO BE REFACTORED!!! - ITS USED FOR TESTING + with path.open() as fp: + raw_lines = [x for x in fp if (not is_reset(x)) and ("#" not in x) and x] + raw_lines = [ + raw_line.strip("\n") for raw_line in raw_lines if raw_line.strip("\n") + ] + + if routines_args: + lines = [] + for rawline in raw_lines: + templine = rawline + for i, arg in enumerate(routines_args): + templine = templine.replace(f"$ARGV[{i}]", arg) + lines.append(templine) + # Handle new testing arguments: + elif special_arguments: + lines = [] + for line in raw_lines: + new_line = re.sub( + r"\${[^{]+=[^{]+}", + lambda x: replace_dynamic(x, special_arguments), # type: ignore + line, + ) + lines.append(new_line) + + else: + lines = raw_lines + + if test_mode and "exit" not in lines[-1]: + lines.append("exit") + + # Deals with the export with a path with "/" in it + export_folder = "" + if "export" in lines[0]: + export_folder = lines[0].split("export ")[1].rstrip() + lines = lines[1:] + + simulate_argv = f"/{'/'.join([line.rstrip() for line in lines])}" + file_cmds = simulate_argv.replace("//", "/home/").split() + file_cmds = insert_start_slash(file_cmds) if file_cmds else file_cmds + file_cmds = ( + [f"export {export_folder}{' '.join(file_cmds)}"] + if export_folder + else [" ".join(file_cmds)] + ) + + if not test_mode or verbose: + run_cli(file_cmds, test_mode=True) + else: + with suppress_stdout(): + Session().console.print(f"To ensure: {output}") + if output: + timestamp = datetime.now().timestamp() + stamp_str = str(timestamp).replace(".", "") + whole_path = Path(REPOSITORY_DIRECTORY / "integration_test_output") + whole_path.mkdir(parents=True, exist_ok=True) + first_cmd = file_cmds[0].split("/")[1] + with open( + whole_path / f"{stamp_str}_{first_cmd}_output.txt", "w" + ) as output_file, contextlib.redirect_stdout(output_file): + run_cli(file_cmds, test_mode=True) + else: + run_cli(file_cmds, test_mode=True) + + +def replace_dynamic(match: re.Match, special_arguments: Dict[str, str]) -> str: + """Replace ${key=default} with value in special_arguments if it exists, else with default. + + Parameters + ---------- + match: re.Match[str] + The match object |