From a0d580ab25f91657b05acdadc47cd0999de102c7 Mon Sep 17 00:00:00 2001 From: montezdesousa <79287829+montezdesousa@users.noreply.github.com> Date: Wed, 1 May 2024 22:17:02 +0100 Subject: [Feature] - Display command providers (#6355) * fix: avoid calling Session() multiple times * rename session refs * fix: cmd text * fix: avoid Session() * fix: add providers by cmd * fix: timezones --------- Co-authored-by: Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com> --- cli/openbb_cli/config/console.py | 10 +- cli/openbb_cli/config/menu_text.py | 169 +++++++++++---------- cli/openbb_cli/controllers/base_controller.py | 99 ++++++------ .../controllers/base_platform_controller.py | 26 ++-- cli/openbb_cli/controllers/choices.py | 8 +- cli/openbb_cli/controllers/cli_controller.py | 77 +++++----- .../controllers/feature_flags_controller.py | 87 ++++++----- cli/openbb_cli/controllers/hub_service.py | 13 +- cli/openbb_cli/controllers/script_parser.py | 6 +- 9 files changed, 254 insertions(+), 241 deletions(-) diff --git a/cli/openbb_cli/config/console.py b/cli/openbb_cli/config/console.py index f19904cedc4..cbb0f8f2696 100644 --- a/cli/openbb_cli/config/console.py +++ b/cli/openbb_cli/config/console.py @@ -31,12 +31,8 @@ class Console: self.menu_text = "" self.menu_path = "" - def capture(self): - """Capture the console output.""" - return self._console.capture() - @staticmethod - def filter_rich_tags(text): + def _filter_rich_tags(text): """Filter out rich tags from text.""" for val in RICH_TAGS: text = text.replace(val, "") @@ -44,7 +40,7 @@ class Console: return text @staticmethod - def blend_text( + def _blend_text( message: str, color1: Tuple[int, int, int], color2: Tuple[int, int, int] ) -> Text: """Blend text from one color to another.""" @@ -85,7 +81,7 @@ class Console: else: self._console.print(kwargs["text"]) else: - print(self.filter_rich_tags(kwargs["text"])) # noqa: T201 + print(self._filter_rich_tags(kwargs["text"])) # noqa: T201 elif not self._settings.TEST_MODE: self._console.print(*args, **kwargs) else: diff --git a/cli/openbb_cli/config/menu_text.py b/cli/openbb_cli/config/menu_text.py index 39bfa5787e9..d9bdc59656c 100644 --- a/cli/openbb_cli/config/menu_text.py +++ b/cli/openbb_cli/config/menu_text.py @@ -2,7 +2,7 @@ __docformat__ = "numpy" -from typing import Dict, List, Optional, Union +from typing import Dict, List import i18n from openbb import obb @@ -29,18 +29,18 @@ RICH_TAGS = [ USE_COLOR = True -def get_ordered_list_sources(command_path: str) -> List: - """Return the preferred source for the given command. +def get_ordered_providers(command_path: str) -> List: + """Return the preferred provider for the given command. Parameters ---------- command_path: str - The command to find the source for. E.g. "/equity/price/historical + The command to find the provider for. E.g. "/equity/price/historical Returns ------- List - The list of sources for the given command. + The list of providers for the given command. """ command_reference = obb.reference.get("paths", {}).get(command_path, {}) # type: ignore if command_reference: @@ -52,7 +52,12 @@ def get_ordered_list_sources(command_path: str) -> List: class MenuText: """Create menu text with rich colors to be displayed by CLI.""" - def __init__(self, path: str = "", column_sources: int = 100): + CMD_NAME_LENGTH = 18 + CMD_DESCRIPTION_LENGTH = 65 + CMD_PROVIDERS_LENGTH = 23 + SECTION_SPACING = 4 + + def __init__(self, path: str = ""): """Initialize menu help. Parameters @@ -64,7 +69,6 @@ class MenuText: """ self.menu_text = "" self.menu_path = path - self.col_src = column_sources self.warnings: List[Dict[str, str]] = [] def add_raw(self, raw_text: str): @@ -117,146 +121,153 @@ class MenuText: ) self.menu_text += f"[param]{parameter_translated}{space}:[/param] {value}\n" - def _adjust_command_length(self, key_command: str) -> str: + def _format_cmd_name(self, name: str) -> str: """Adjust the length of the command if it is too long. Parameters ---------- - key_command : str - command to be adjusted + name : str + command to be formatted Returns ------- str - adjusted command + formatted command """ - if len(key_command) > 18: - new_key_command = key_command[:18] # Default to trimming to 18 characters + if len(name) > self.CMD_NAME_LENGTH: + new_name = name[ + : self.CMD_NAME_LENGTH + ] # Default to trimming to 18 characters - if "_" in key_command: - key_command_split = key_command.split("_") + if "_" in name: + name_split = name.split("_") - new_key_command = ( - "_".join(key_command_split[:2]) - if len(key_command_split) > 2 - else key_command_split[0] + new_name = ( + "_".join(name_split[:2]) if len(name_split) > 2 else name_split[0] ) - if len(new_key_command) > 18: - new_key_command = new_key_command[:18] + if len(new_name) > self.CMD_NAME_LENGTH: + new_name = new_name[: self.CMD_NAME_LENGTH] - if new_key_command != key_command: + if new_name != name: self.warnings.append( { "warning": "Command name too long", - "command": key_command, - "trimmed_command": new_key_command, + "command": name, + "trimmed_command": new_name, } ) - key_command = new_key_command + name = new_name - return key_command + return name - def _handle_command_description( - self, key_command: str, command_description: str + def _format_cmd_description( + self, name: str, description: str, trim: bool = True ) -> str: """Handle the command description. Parameters ---------- - key_command : str + name : str command to be adjusted - command_description : str + description : str description of the command + trim : bool + If true, the description will be trimmed to the maximum length Returns ------- str adjusted command description """ - if not command_description: - command_description = i18n.t(self.menu_path + key_command) - if command_description == self.menu_path + key_command: - command_description = "" + if not description: + description = i18n.t(self.menu_path + name) + if description == self.menu_path + name: + description = "" return ( - command_description[:88] + "..." - if len(command_description) > 91 - else command_description + description[: self.CMD_DESCRIPTION_LENGTH - 3] + "..." + if len(description) > self.CMD_DESCRIPTION_LENGTH and trim + else description ) - def add_cmd( - self, key_command: str, condition: bool = True, command_description: str = "" - ): + def add_cmd(self, name: str, description: str = "", disable: bool = False): """Append command text (after translation from key) to a menu. Parameters ---------- - key_command : str + name : str key command to be executed by user. It is also used as a key to get description of command. - condition : bool - condition in which command is available to user. I.e. displays command and description. - If condition is false, the command line is greyed out. + description : str + description of the command + disable : bool + If disable is true, the command line is greyed out. """ - key_command = self._adjust_command_length(key_command) - command_description = self._handle_command_description( - key_command, command_description + formatted_name = self._format_cmd_name(name) + name_padding = (self.CMD_NAME_LENGTH - len(formatted_name)) * " " + providers = get_ordered_providers(f"{self.menu_path}{formatted_name}") + formatted_description = self._format_cmd_description( + formatted_name, + description, + bool(providers), ) - spacing = (23 - (len(key_command) + 4)) * " " - - cmd = f"{key_command}{spacing}{command_description}" - cmd = f"[cmds] {cmd}[/cmds]" if condition else f"[unvl] {cmd}[/unvl]" - - sources = get_ordered_list_sources(f"{self.menu_path}{key_command}") - - if sources: - space = (self.col_src - len(cmd)) * " " if self.col_src > len(cmd) else " " - cmd += f"{space}[src][{', '.join(sources)}][/src]" + description_padding = ( + self.CMD_DESCRIPTION_LENGTH - len(formatted_description) + ) * " " + spacing = self.SECTION_SPACING * " " + description_padding = ( + self.CMD_DESCRIPTION_LENGTH - len(formatted_description) + ) * " " + cmd = f"{spacing}{formatted_name + name_padding}{spacing}{formatted_description+description_padding}" + cmd = f"[unvl]{cmd}[/unvl]" if disable else f"[cmds]{cmd}[/cmds]" + + if providers: + cmd += rf"{spacing}[src]\[{', '.join(providers)}][/src]" self.menu_text += cmd + "\n" def add_menu( self, - key_menu: str, - condition: Optional[Union[bool, str]] = True, - menu_description: str = "", + name: str, + description: str = "", + disable: bool = False, ): """Append menu text (after translation from key) to a menu. Parameters ---------- - key_menu : str + name : str key menu to be executed by user. It is also used as a key to get description of menu. - condition : bool - condition in which menu is available to user. I.e. displays menu and description. - If condition is false, the menu line is greyed out. + disable : bool + If disable is true, the menu line is greyed out. """ - spacing = (23 - (len(key_menu) + 4)) * " " + spacing = (self.CMD_NAME_LENGTH - len(name) + self.SECTION_SPACING) * " " - if menu_description: - menu = f"{key_menu}{spacing}{menu_description}" + if description: + menu = f"{name}{spacing}{description}" else: - menu_description = i18n.t(self.menu_path + key_menu) - if menu_description == self.menu_path + key_menu: - menu_description = "" - menu = f"{key_menu}{spacing}{menu_description}" + description = i18n.t(self.menu_path + name) + if description == self.menu_path + name: + description = "" + menu = f"{name}{spacing}{description}" - if condition: - self.menu_text += f"[menu]> {menu}[/menu]\n" - else: + if disable: self.menu_text += f"[unvl]> {menu}[/unvl]\n" + else: + self.menu_text += f"[menu]> {menu}[/menu]\n" - def add_setting(self, key_setting: str, status: bool = True): + def add_setting(self, name: str, status: bool = True): """Append menu text (after translation from key) to a menu. Parameters ---------- - key_setting : str + name : str key setting to be set by user. It is also used as a key to get description of the setting. status : bool status of the current setting. If true the line will be green, otherwise red. """ - spacing = (23 - (len(key_setting) + 4)) * " " + spacing = (self.CMD_NAME_LENGTH - len(name) + self.SECTION_SPACING) * " " + indentation = self.SECTION_SPACING * " " if status: - self.menu_text += f"[green] {key_setting}{spacing}{i18n.t(self.menu_path + key_setting)}[/green]\n" + self.menu_text += f"[green]{indentation}{name}{spacing}{i18n.t(self.menu_path + name)}[/green]\n" else: - self.menu_text += f"[red] {key_setting}{spacing}{i18n.t(self.menu_path + key_setting)}[/red]\n" + self.menu_text += f"[red]{indentation}{name}{spacing}{i18n.t(self.menu_path + name)}[/red]\n" diff --git a/cli/openbb_cli/controllers/base_controller.py b/cli/openbb_cli/controllers/base_controller.py index 7d0c3fe27cc..a3d8b20aa16 100644 --- a/cli/openbb_cli/controllers/base_controller.py +++ b/cli/openbb_cli/controllers/base_controller.py @@ -31,6 +31,7 @@ from prompt_toolkit.styles import Style # pylint: disable=R0912 controllers: Dict[str, Any] = {} +session = Session() # TODO: We should try to avoid these global variables @@ -132,7 +133,7 @@ class BaseController(metaclass=ABCMeta): def load_class(self, class_ins, *args, **kwargs): """Check for an existing instance of the controller before creating a new one.""" - settings = Session().settings + settings = session.settings self.save_class() arguments = len(args) + len(kwargs) # Due to the 'arguments == 1' condition, we actually NEVER load a class @@ -267,7 +268,7 @@ class BaseController(metaclass=ABCMeta): def save_class(self) -> None: """Save the current instance of the class to be loaded later.""" - if Session().settings.REMEMBER_CONTEXTS: + if session.settings.REMEMBER_CONTEXTS: controllers[self.PATH] = self def custom_reset(self) -> List[str]: @@ -321,7 +322,7 @@ class BaseController(metaclass=ABCMeta): actions = self.parse_input(an_input) if an_input and an_input != "reset": - Session().console.print() + session.console.print() # Empty command if len(actions) == 0: @@ -374,7 +375,7 @@ class BaseController(metaclass=ABCMeta): not self.queue or (self.queue and self.queue[0] not in ("quit", "help")) ) ): - Session().console.print() + session.console.print() return self.queue @@ -385,7 +386,7 @@ class BaseController(metaclass=ABCMeta): def call_home(self, _) -> None: """Process home command.""" self.save_class() - if self.PATH.count("/") == 1 and Session().settings.ENABLE_EXIT_AUTO_HELP: + if self.PATH.count("/") == 1 and session.settings.ENABLE_EXIT_AUTO_HELP: self.print_help() for _ in range(self.PATH.count("/") - 1): self.queue.insert(0, "quit") @@ -406,9 +407,9 @@ class BaseController(metaclass=ABCMeta): for _ in range(self.PATH.count("/")): self.queue.insert(0, "quit") - if not Session().is_local(): + if not session.is_local(): remove_file( - Path(Session().user.preferences.export_directory, "routines", "hub") + Path(session.user.preferences.export_directory, "routines", "hub") ) def call_reset(self, _) -> None: @@ -502,7 +503,7 @@ class BaseController(metaclass=ABCMeta): if ns_parser: if not ns_parser.name: - Session().console.print( + session.console.print( "[red]Set a routine title by using the '-n' flag. E.g. 'record -n Morning routine'[/red]" ) return @@ -513,7 +514,7 @@ class BaseController(metaclass=ABCMeta): else ns_parser.tag1 ) if tag1 and tag1 not in SCRIPT_TAGS: - Session().console.print( + session.console.print( f"[red]The parameter 'tag1' needs to be one of the following {', '.join(SCRIPT_TAGS)}[/red]" ) return @@ -524,7 +525,7 @@ class BaseController(metaclass=ABCMeta): else ns_parser.tag2 ) if tag2 and tag2 not in SCRIPT_TAGS: - Session().console.print( + session.console.print( f"[red]The parameter 'tag2' needs to be one of the following {', '.join(SCRIPT_TAGS)}[/red]" ) return @@ -535,19 +536,19 @@ class BaseController(metaclass=ABCMeta): else ns_parser.tag3 ) if tag3 and tag3 not in SCRIPT_TAGS: - Session().console.print( + session.console.print( f"[red]The parameter 'tag3' needs to be one of the following {', '.join(SCRIPT_TAGS)}[/red]" ) return - if Session().is_local() and not ns_parser.local: - Session().console.print( + if session.is_local() and not ns_parser.local: + session.console.print( "[red]Recording session to the OpenBB Hub is not supported in guest mode.[/red]" ) - Session().console.print( + session.console.print( "\n[yellow]Sign to OpenBB Hub to register: http://openbb.co[/yellow]" ) - Session().console.print( + session.console.print( "\n[yellow]Otherwise set the flag '-l' to save the file locally.[/yellow]" ) return @@ -556,7 +557,7 @@ class BaseController(metaclass=ABCMeta): title = " ".join(ns_parser.name) if ns_parser.name else "" pattern = re.compile(r"^[a-zA-Z0-9\s]+$") if not pattern.match(title): - Session().console.print( + session.console.print( f"[red]Title '{title}' has invalid format. Please use only digits, characters and whitespaces.[/]" ) return @@ -582,10 +583,10 @@ class BaseController(metaclass=ABCMeta): SESSION_RECORDED_PUBLIC = ns_parser.public - Session().console.print( + session.console.print( f"[green]The routine '{title}' is successfully being recorded.[/green]" ) - Session().console.print( + session.console.print( "\n[yellow]Remember to run 'stop' command when you are done!\n[/yellow]" ) @@ -595,15 +596,15 @@ class BaseController(metaclass=ABCMeta): global SESSION_RECORDED # noqa: PLW0603 if not RECORD_SESSION: - Session().console.print( + session.console.print( "[red]There is no session being recorded. Start one using the command 'record'[/red]\n" ) elif len(SESSION_RECORDED) < 5: - Session().console.print( + session.console.print( "[red]Run at least 4 commands before stopping recording a session.[/red]\n" ) else: - current_user = Session().user + current_user = session.user # Check if the user just wants to store routine locally # This works regardless of whether they are logged in or not @@ -620,14 +621,14 @@ class BaseController(metaclass=ABCMeta): # If file already exists, add a timestamp to the name if os.path.isfile(routine_file): - i = Session().console.input( + i = session.console.input( "A local routine with the same name already exists, " "do you want to override it? (y/n): " ) - Session().console.print("") + session.console.print("") while i.lower() not in ["y", "yes", "n", "no"]: - i = Session().console.input("Select 'y' or 'n' to proceed: ") - Session().console.print("") + i = session.console.input("Select 'y' or 'n' to proceed: ") + session.console.print("") if i.lower() in ["n", "no"]: new_name = ( @@ -639,7 +640,7 @@ class BaseController(metaclass=ABCMeta): "routines", new_name, ) - Session().console.print( + session.console.print( f"[yellow]The routine name has been updated to '{new_name}'[/yellow]\n" ) @@ -650,7 +651,7 @@ class BaseController(metaclass=ABCMeta): lines = ["# OpenBB Platform CLI - Routine", "\n"] username = getattr( - Session().user.profile.hub_session, "username", "local" + session.user.profile.hub_session, "username", "local" ) lines += [f"# Author: {username}", "\n\n"] if username else ["\n"] @@ -666,13 +667,13 @@ class BaseController(metaclass=ABCMeta): # Writing data to a file file1.writelines(lines) - Session().console.print( + session.console.print( f"[green]Your routine has been recorded and saved here: {routine_file}[/green]\n" ) # If user doesn't specify they want to store routine locally # Confirm that the user is logged in - elif not Session().is_local(): + elif not session.is_local(): routine = "\n".join(SESSION_RECORDED[:-1]) hub_session = current_user.profile.hub_session @@ -692,16 +693,16 @@ class BaseController(metaclass=ABCMeta): } response = upload_routine(**kwargs) # type: ignore if response is not None and response.status_code == 409: - i = Session().console.input( + i = session.console.input( "A routine with the same name already exists, " "do you want to replace it? (y/n): " ) - Session().console.print("") + session.console.print("") if i.lower() in ["y", "yes"]: kwargs["override"] = True # type: ignore response = upload_routine(**kwargs) # type: ignore else: - Session().console.print("[info]Aborted.[/info]") + session.console.print("[info]Aborted.[/info]") # Clear session to be recorded again RECORD_SESSION = False @@ -718,14 +719,14 @@ class BaseController(metaclass=ABCMeta): ns_parser = self.parse_simple_args(parser, other_args) if ns_parser: - current_user = Session().user - local_user = Session().is_local() + current_user = session.user + local_user = session.is_local() if not local_user: hub_session = current_user.profile.hub_session - Session().console.print( + session.console.print( f"[info]email:[/info] {hub_session.email if hub_session else 'N/A'}" ) - Session().console.print( + session.console.print( f"[info]uuid:[/info] {hub_session.user_uuid if hub_session else 'N/A'}" ) else: @@ -751,23 +752,23 @@ class BaseController(metaclass=ABCMeta): "-h", "--help", action="store_true", help="show this help message" ) - if Session().settings.USE_CLEAR_AFTER_CMD: + if session.settings.USE_CLEAR_AFTER_CMD: system_clear() try: (ns_parser, l_unknown_args) = parser.parse_known_args(other_args) except SystemExit: # In case the command has required argument that isn't specified - Session().console.print("\n") + session.console.print("\n") return None if ns_parser.help: txt_help = parser.format_help() - Session().console.print(f"[help]{txt_help}[/help]") + session.console.print(f"[help]{txt_help}[/help]") return None if l_unknown_args: - Session().console.print( + session.console.print( f"The following args couldn't be interpreted: {l_unknown_args}\n" ) @@ -870,7 +871,7 @@ class BaseController(metaclass=ABCMeta): help="Number of entries to show in data.", type=check_positive, ) - if Session().settings.USE_CLEAR_AFTER_CMD: + if session.settings.USE_CLEAR_AFTER_CMD: system_clear() if "--help" in other_args or "-h" in other_args: @@ -880,7 +881,7 @@ class BaseController(metaclass=ABCMeta): f"For more information and examples, use 'about {parser.prog}' " f"to access the related guide.\n" ) - Session().console.print(f"[help]{txt_help}[/help]") + session.console.print(f"[help]{txt_help}[/help]") return None try: @@ -910,14 +911,14 @@ class BaseController(metaclass=ABCMeta): setup.set_last_legend(" ".join(ns_parser.hold_legend_str)) if l_unknown_args: - Session().console.print( + session.console.print( f"The following args couldn't be interpreted: {l_unknown_args}" ) return ns_parser def menu(self, custom_path_menu_above: str = ""): """Enter controller menu.""" - settings = Session().settings + settings = session.settings an_input = "HELP_ME" while True: @@ -948,7 +949,7 @@ class BaseController(metaclass=ABCMeta): and an_input != "help" and an_input.split(" ")[0] in self.controller_choices ): - Session().console.print( + session.console.print( f"{get_flair_and_username()} {self.PATH} $ {an_input}" ) @@ -959,7 +960,7 @@ class BaseController(metaclass=ABCMeta): self.print_help() try: - prompt_session = Session().prompt_session + prompt_session = session.prompt_session if prompt_session and settings.USE_PROMPT_TOOLKIT: # Check if toolbar hint was enabled if settings.TOOLBAR_HINT: @@ -1002,7 +1003,7 @@ class BaseController(metaclass=ABCMeta): self.queue = self.switch(an_input) except SystemExit: - Session().console.print( + session.console.print( f"[red]The command '{an_input}' doesn't exist on the {self.PATH} menu.[/red]\n", ) similar_cmd = difflib.get_close_matches( @@ -1019,14 +1020,14 @@ class BaseController(metaclass=ABCMeta): if candidate_input == an_input: an_input = "" self.queue = [] - Session().console.print("\n") + session.console.print("\n") continue an_input = candidate_input else: an_input = similar_cmd[0] - Session().console.print( + session.console.print( f"[green]Replacing by '{an_input}'.[/green]\n" ) self.queue.insert(0, an_input) diff --git a/cli/openbb_cli/controllers/base_platform_controller.py b/cli/openbb_cli/controllers/base_platform_controller.py index c6b7d6b0374..0fc55c0ff1c 100644 --- a/cli/openbb_cli/controllers/base_platform_controller.py +++ b/cli/openbb_cli/controllers/base_platform_controller.py @@ -18,6 +18,8 @@ from openbb_cli.controllers.base_controller import BaseController from openbb_cli.controllers.utils import export_data, print_rich_table from openbb_cli.session import Session +session = Session() + class DummyTranslation: """Dummy Translation for testing.""" @@ -68,7 +70,7 @@ class PlatformController(BaseController): self._generate_commands() self._generate_sub_controllers() - if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT: + if session.prompt_session and session.settings.USE_PROMPT_TOOLKIT: choices: dict = self.choices_default self.completer = NestedCompleter.from_nested_dict(choices) @@ -180,7 +182,7 @@ class PlatformController(BaseController): print_rich_table(df=df, show_index=True, title=title) elif obbject: - Session().console.print(obbject) + session.console.print(obbject) if hasattr(ns_parser, "export") and ns_parser.export: sheet_name = getattr(ns_parser, "sheet_name", None) @@ -194,7 +196,7 @@ class PlatformController(BaseController): ) except Exception as e: - Session().console.print(f"[red]{e}[/]\n") + session.console.print(f"[red]{e}[/]\n") return # Bind the method to the class @@ -268,25 +270,25 @@ class PlatformController(BaseController): if self.CHOICES_MENUS: for menu in self.CHOICES_MENUS: - menu_description = self._get_menu_description(menu) - mt.add_menu(key_menu=menu, menu_description=menu_description) + description = self._get_menu_description(menu) + mt.add_menu(name=menu, description=description) if self.CHOICES_COMMANDS: mt.add_raw("\n") for command in self.CHOICES_COMMANDS: command_description = self._get_command_description(command) mt.add_cmd( - key_command=command.replace(f"{self._name}_", ""), - command_description=command_description, + name=command.replace(f"{self._name}_", ""), + description=command_description, ) - Session().console.print(text=mt.menu_text, menu=self._name) + session.console.print(text=mt.menu_text, menu=self._name) - settings = Session().settings + settings = session.settings dev_mode = settings.DEBUG_MODE or settings.TEST_MODE if mt.warnings and dev_mode: - Session().console.print("") + session.console.print("") for w in mt.warnings: w_str = str(w).replace("{", "").replace("}", "").replace("'", "") - Session().console.print(f"[yellow]{w_str}[/yellow]") - Session().console.print("") + session.console.print(f"[yellow]{w_str}[/yellow]") + session.console.print("") diff --git a/cli/openbb_cli/controllers/choices.py b/cli/openbb_cli/controllers/choices.py index 7e66660ff79..aee144cf477 100644 --- a/cli/openbb_cli/controllers/choices.py +++ b/cli/openbb_cli/controllers/choices.py @@ -13,6 +13,8 @@ from openbb_cli.controllers.utils import ( ) from openbb_cli.session import Session +session = Session() + def __mock_parse_known_args_and_warn( controller, # pylint: disable=unused-argument @@ -206,7 +208,7 @@ def __patch_controller_functions(controller): ), ] - if not Session().settings.DEBUG_MODE: + if not session.settings.DEBUG_MODE: rich.start() patched_function_list = [] for patcher in patcher_list: @@ -214,7 +216,7 @@ def __patch_controller_functions(controller): yield patched_function_list - if not Session().settings.DEBUG_MODE: + if not session.settings.DEBUG_MODE: rich.stop() for patcher in patcher_list: patcher.stop() @@ -314,7 +316,7 @@ def build_controller_choice_map(controller) -> dict: argument_parser=argument_parser ) except Exception as exception: - if Session().settings.DEBUG_MODE: + if session.settings.DEBUG_MODE: raise Exception( f"On command : `{command}`.\n{str(exception)}" ) from exception diff --git a/cli/openbb_cli/controllers/cli_controller.py b/cli/openbb_cli/controllers/cli_controller.py index f6e0998fc2a..8c79ae6f842 100644 --- a/cli/openbb_cli/controllers/cli_controller.py +++ b/cli/openbb_cli/controllers/cli_controller.py @@ -66,6 +66,7 @@ DATA_PROCESSING_ROUTERS = ["technical", "quantitative", "econometrics"] logger = logging.getLogger(__name__) env_file = str(ENV_FILE_SETTINGS) +session = Session() if is_installer(): # Necessary for installer so that it can locate the correct certificates for @@ -163,11 +164,9 @@ class CLIController(BaseController): def update_runtime_choices(self): """Update runtime choices.""" - routines_directory = Path( - Session().user.preferences.export_directory, "routines" - ) + routines_directory = Path(session.user.preferences.export_directory, "routines") - if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT: + if session.prompt_session and session.settings.USE_PROMPT_TOOLKIT: # choices: dict = self.choices_default choices: dict = {c: {} for c in self.controller_choices} # type: ignore choices["hold"] = {c: None for c in ["on", "off", "-s", "--sameaxis"]} @@ -241,8 +240,8 @@ class CLIController(BaseController): .get("description") ) or "" mt.add_menu( - key_menu=router, - menu_description=menu_description.split(".")[0].lower(), + name=router, + description=menu_description.split(".")[0].lower(), ) else: mt.add_cmd(router) @@ -259,8 +258,8 @@ class CLIController(BaseController): .get("description") ) or "" mt.add_menu( - key_menu=router, - menu_description=menu_description.split(".")[0].lower(), + name=router, + description=menu_description.split(".")[0].lower(), ) else: mt.add_cmd(router) @@ -276,8 +275,8 @@ class CLIController(BaseController): .get("description") ) or "" mt.add_menu( - key_menu=router, - menu_description=menu_description.split(".")[0].lower(), + name=router, + description=menu_description.split(".")[0].lower(), ) else: mt.add_cmd(router) @@ -285,7 +284,7 @@ class CLIController(BaseController): mt.add_raw("\n cached results (OBBjects)\n") mt.add_cmd("results") - Session().console.print(text=mt.menu_text, menu="Home") + session.console.print(text=mt.menu_text, menu="Home") self.update_runtime_choices() def parse_input(self, an_input: str) -> List: @@ -314,7 +313,7 @@ class CLIController(BaseController): other_args += self.queue if not other_args: - Session().console.print( + session.console.print( "[info]Provide a path to the routine you wish to execute. For an example, please use " "`exe --example` and for documentation and to learn how create your own script " "type `about exe`.\n[/info]" @@ -369,7 +368,7 @@ class CLIController(BaseController): if ns_parser: if ns_parser.example: routine_path = ASSETS_DIRECTORY / "routines" / "routine_example.openbb" - Session().console.print( + session.console.print( "[info]Executing an example, please type `about exe` " "to learn how to create your own script.[/info]\n" ) @@ -389,14 +388,12 @@ class CLIController(BaseController): final_url = f"{url}?raw=true" response = requests.get(final_url, timeout=10) if response.status_code != 200: - Session().console.print( + session.console.print( "[red]Could not find the requested script.[/red]" ) return routine_text = response.json()["script"] - file_path = Path( - Session().user.preferences.export_directory, "routines" - ) + file_path = Path(session.user.preferences.export_directory, "routines") routine_path = file_path / file_name with open(routine_path, "w") as file: file.write(routine_text) @@ -441,7 +438,7 @@ class CLIController(BaseController): # issue in parsing the routine and therefore we don't want to feed it # to the terminal if err: - Session().console.print(err) + session.console.print(err) return self.queue = [ @@ -466,12 +463,12 @@ class CLIController(BaseController): # Check if the directory exists if os.path.isdir(export_path): - Session().console.print( + session.console.print( f"Export data to be saved in the selected folder: '{export_path}'" ) else: os.makedirs(export_path) - Session().console.print( + session.console.print( f"[green]Folder '{export_path}' successfully created.[/green]" ) self.queue = self.queue[1:] @@ -485,7 +482,7 @@ class CLIController(BaseController): df, show_index=True, index_name="stack index", title="OBBject Results" ) else: - Session().console.print("[info]No results found.[/info]") + session.console.print("[info]No results found.[/info]") def handle_job_cmds(jobs_cmds: Optional[List[str]]) -> Optional[List[str]]: @@ -513,12 +510,12 @@ def handle_job_cmds(jobs_cmds: Optional[List[str]]) -> Optional[List[str]]: # Check if the directory exists if os.path.isdir(export_path): - Session().console.print( + session.console.print( f"Export data to be saved in the selected folder: '{export_path}'" ) else: os.makedirs(export_path) - Session().console.print( + session.console.print( f"[green]Folder '{export_path}' successfully created.[/green]" ) return jobs_cmds @@ -560,16 +557,16 @@ def run_cli(jobs_cmds: Optional[List[str]] = None, test_mode=False): # Print the current location because this was an instruction and we want user to know what was the action if an_input and an_input.split(" ")[0] in t_controller.CHOICES_COMMANDS: - Session().console.print(f"{get_flair_and_username()} / $ {an_input}") + session.console.print(f"{get_flair_and_username()} / $ {an_input}") # Get input command from user else: try: # Get input from user using auto-completion - if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT: + if session.prompt_session and session.settings.USE_PROMPT_TOOLKIT: # Check if toolbar hint was enabled - if Session().settings.TOOLBAR_HINT: - an_input = Session().prompt_session.prompt( # type: ignore[union-attr] + if session.settings.TOOLBAR_HINT: + an_input = session.prompt_session.prompt( # type: ignore[union-attr] f"{get_flair_and_username()} / $ ", completer=t_controller.completer, search_ignore_case=True, @@ -588,7 +585,7 @@ def run_cli(jobs_cmds: Optional[List[str]] = None, test_mode=False): ), ) else: - an_input = Session().prompt_session.prompt( # type: ignore[union-attr] + an_input = session.prompt_session.prompt( # type: ignore[union-attr] f"{get_flair_and_username()} / $ ", completer=t_controller.completer, search_ignore_case=True, @@ -620,7 +617,7 @@ def run_cli(jobs_cmds: Optional[List[str]] = None, test_mode=False): "The command '%s' doesn't exist on the / menu.", an_input, ) - Session().console.print( + session.console.print( f"[red]The command '{an_input}' doesn't exist on the / menu.[/red]\n", ) similar_cmd = difflib.get_close_matches( @@ -638,11 +635,11 @@ def run_cli(jobs_cmds: Optional[List[str]] = None, test_mode=False): if candidate_input == an_input: an_input = "" t_controller.queue = [] - Session().console.print("\n") + session.console.print("\n") continue an_input = candidate_input - Session().console.print(f"[green]Replacing by '{an_input}'.[/green]") + session.console.print(f"[green]Replacing by '{an_input}'.[/green]") t_controller.queue.insert(0, an_input) @@ -682,7 +679,7 @@ def run_scripts( Whether to log tests to txt files """ if not path.exists(): - Session().console.print(f"File '{path}' doesn't exist. Launching base CLI.\n") + session.console.print(f"File '{path}' doesn't exist. Launching base CLI.\n") if not test_mode: run_cli() @@ -736,7 +733,7 @@ def run_scripts( run_cli(file_cmds, test_mode=True) else: with suppress_stdout(): - Session().console.print(f"To ensure: {output}") + session.console.print(f"To ensure: {output}") if output: timestamp = datetime.now().timestamp() stamp_str = str(timestamp).replace(".", "") @@ -777,7 +774,7 @@ def replace_dynamic(match: re.Match, special_arguments: Dict[str, str]) -> str: def run_routine(file: str, routines_args=Optional[str]): """Execute command routine from .openbb file.""" - user_routine_path = Path(Session().user.preferences.export_directory, "routines") + user_routine_path = Path(session.user.preferences.export_directory, "routines") default_routine_path = ASSETS_DIRECTORY / "routines" / file if user_routine_path.exists(): @@ -785,7 +782,7 @@ def run_routine(file: str, routines_args=Optional[str]): elif default_routine_path.exists(): run_scripts(path=default_routine_path, routines_args=routines_args) else: - Session().console.print( + session.console.print( f"Routine not found, please put your `.openbb` file into : {user_routine_path}." ) @@ -819,12 +816,12 @@ def main( E.g. GME,AMC,BTC-USD """ if debug: - Session().settings.DEBUG_MODE = True + session.settings.DEBUG_MODE = True if dev: - Session().settings.DEV_BACKEND = True - Session().settings.BASE_URL = "https://payments.openbb.dev/" - Session().settings.HUB_URL = "https://my.openbb.dev" + session.settings.DEV_BACKEND = True + session.settings.BASE_URL = "https://payments.openbb.dev/" + session.settings.HUB_URL = "https://my.openbb.dev" if isinstance(path_list, list) and path_list[0].endswith(".openbb"): run_routine(file=path_list[0], routines_args=routines_args) @@ -917,7 +914,7 @@ def parse_args_and_run(): # Use -d flag if you want to see the unknown args. if unknown: if ns_parser.debug: - Session().console.print(unknown) + session.console.print(unknown) else: sys.exit(-1) diff --git a/cli/openbb_cli/controllers/feature_flags_controller.py b/cli/openbb_cli/controllers/feature_flags_controller.py index 860cd98a819..d6de826964b 100644 --- a/cli/openbb_cli/controllers/feature_flags_controller.py +++ b/cli/openbb_cli/controllers/feature_flags_controller.py @@ -13,6 +13,8 @@ from openbb_cli.controllers.base_controller import BaseController from openbb_cli.controllers.utils import all_timezones, is_timezone_valid from openbb_cli.session import Session +session = Session() + class FeatureFlagsController(BaseController): """Feature Flags Controller class.""" @@ -46,13 +48,13 @@ class FeatureFlagsController(BaseController): """Initialize the Constructor.""" super().__init__(queue) - if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT: + if session.prompt_session and session.settings.USE_PROMPT_TOOLKIT: choices: dict = self.choices_default self.completer = NestedCompleter.from_nested_dict(choices) def print_help(self): """Print help.""" - settings = Session().settings + settings = session.settings mt = MenuText("settings/") mt.add_info("_info_") @@ -76,63 +78,61 @@ class FeatureFlagsController(BaseController): mt.add_cmd("n_rows") mt.add_cmd("n_cols") - Session().console.print(text=mt.menu_text, menu="Feature Flags") + session.console.print(text=mt.menu_text, menu="Feature Flags") def call_overwrite(self, _): """Process overwrite command.""" - Session().settings.set_item( - "FILE_OVERWRITE", not Session().settings.FILE_OVERWRITE - ) + session.settings.set_item("FILE_OVERWRITE", not session.settings.FILE_OVERWRITE) def call_version(self, _): """Process version command.""" - Session().settings.SHOW_VERSION = not Session().settings.SHOW_VERSION + session.settings.SHOW_VERSION = not session.settings.SHOW_VERSION def call_interactive(self, _): """Process interactive command.""" - Session().settings.set_item( - "USE_INTERACTIVE_DF", not Session().settings.USE_INTERACTIVE_DF + session.settings.set_item( + "USE_INTERACTIVE_DF", not session.settings.USE_INTERACTIVE_DF ) def call_cls(self, _): """Process cls command.""" - Session().settings.set_item( - "USE_CLEAR_AFTER_CMD", not Session().settings.USE_CLEAR_AFTER_CMD + session.settings.set_item( + "USE_CLEAR_AFTER_CMD", not session.settings.USE_CLEAR_AFTER_CMD ) def call_promptkit(self, _): """Process promptkit command.""" - Session().settings.set_item( - "USE_PROMPT_TOOLKIT", not Session().settings.USE_PROMPT_TOOLKIT + session.settings.set_item( + "USE_PROMPT_TOOLKIT", not session.settings.USE_PROMPT_TOOLKIT ) def call_exithelp(self, _): """Process exithelp command.""" - Session().settings.set_item( - "ENABLE_EXIT_AUTO_HELP", not Session().settings.ENABLE_EXIT_AUTO_HELP + session.settings.set_item( + "ENABLE_EXIT_AUTO_HELP", not session.settings.ENABLE_EXIT_AUTO_HELP ) def call_rcontext(self, _): """Process rcontext command.""" - Session().settings.set_item( - "REMEMBER_CONTEXTS", not Session().settings.REMEMBER_CONTEXTS + session.settings.set_item( + "REMEMBER_CONTEXTS", not session.settings.REMEMBER_CONTEXTS ) def call_dt(self, _): """Process dt command.""" - Session().settings.set_item("USE_DATETIME", not Session().settings.USE_DATETIME) + session.settings.set_item("USE_DATETIME", not session.settings.USE_DATETIME) def call_richpanel(self, _): """Process richpanel command.""" - Session().settings.set_item( - "ENABLE_RICH_PANEL", not Session().settings.ENABLE_RICH_PANEL + session.settings.set_item( + "ENABLE_RICH_PANEL", not session.settings.ENABLE_RICH_PANEL ) def call_tbhint(self, _): """Process tbhint command.""" - if Session().settings.TOOLBAR_HINT: - Session().console.print("Will take effect when running CLI again.") - Session().settings.set_item("TOOLBAR_HINT", not Session().settings.TOOLBAR_HINT) + if session.settings.TOOLBAR_HINT: + session.console.print("Will take effect when running CLI again.") + session.settings.set_item("TOOLBAR_HINT", not session.settings.TOOLBAR_HINT) def call_console_style(self, other_args: List[str]) -> None: """Process cosole_style command.""" @@ -148,16 +148,16 @@ class FeatureFlagsController(BaseController): dest="style", action="store", required=False, - choices=Session().style.available_styles, + choices=session.style.available_styles, ) ns_parser = self.parse_simple_args(parser, other_args) if ns_parser and ns_parser.style: - Session().style.apply(ns_parser.style) - Session().settings.set_item("RICH_STYLE", ns_parser.style) + session.style.apply(ns_parser.style) + session.settings.set_item("RICH_STYLE", ns_parser.style) elif not other_args: - Session().console.print( - f"Current console style: {Session().settings.RICH_STYLE}" + session.console.print( + f"Current console style: {session.settings.RICH_STYLE}" ) def call_flair(self, other_args: List[str]) -> None: @@ -179,9 +179,9 @@ class FeatureFlagsController(BaseController): ns_parser = self.parse_simple_args(parser, other_args) if ns_parser and ns_parser.flair: - Session().settings.set_item("FLAIR", ns_parser.flair) + session.settings.set_item("FLAIR", ns_parser.flair) elif not other_args: - Session().console.print(f"Current flair: {Session().settings.FLAIR}") + session.console.print(f"Current flair: {session.settings.FLAIR}") def call_timezone(self, other_args: List[str]) -> None: """Process timezone command.""" @@ -198,21 +198,22 @@ class FeatureFlagsController(BaseController): action="store", required=False, type=str, + choices=all_timezones, ) ns_parser = self.parse_simple_args(parser, other_args) if ns_parser and ns_parser.timezone: if is_timezone_valid(ns_parser.timezone): - Session().settings.set_item("TIMEZONE", ns_parser.timezone) + session.settings.set_item("TIMEZONE", ns_parser.timezone) else: - Session().console.print( + session.console.print( "Invalid timezone. Please enter a valid timezone." ) - Session().console.print( + session.console.print( f"Available timezones are: {', '.join(all_timezones)}" ) elif not other_args: - Session().console.print(f"Current timezone: {Session().settings.TIMEZONE}") + session.console.print(f"Current timezone: {session.settings.TIMEZONE}") def call_language(self, other_args: List[str]) -> None: """Process language command.""" @@ -233,12 +234,10 @@ class FeatureFlagsController(BaseController): ns_parser = self.parse_simple_args(parser, other_args) if ns_parser and ns_parser.language: - Session().settings.set_item("USE_LANGUAGE", ns_parser.language) + session.settings.set_item("USE_LANGUAGE", ns_parser.language) elif not other_args: - Session().console.print( - f"Current language: {Session().settings.USE_LANGUAGE}" - ) + session.console.print(f"Current language: {session.settings.USE_LANGUAGE}") def call_n_rows(self, other_args: List[str]) -> None: """Process n_rows command.""" @@ -259,11 +258,11 @@ class FeatureFlagsController(BaseController): ns_parser = self.parse_simple_args(parser, other_args) if ns_parser and ns_parser.rows: - Session().settings.set_item("ALLOWED_NUMBER_OF_ROWS", ns_parser.rows) + session.settings.set_item("ALLOWED_NUMBER_OF_ROWS", ns_parser.rows) elif not other_args: - Session().console.print( - f"Current number of rows: {Session().settings.ALLOWED_NUMBER_OF_ROWS}" + session.console.print( + f"Current number of rows: {session.settings.ALLOWED_NUMBER_OF_ROWS}" ) def call_n_cols(self, other_args: List[str]) -> None: @@ -285,9 +284,9 @@ class FeatureFlagsController(BaseController): ns_parser = self.parse_simple_args(parser, other_args) if ns_parser and ns_parser.columns: - Session().settings.set_item("ALLOWED_NUMBER_OF_COLUMNS", ns_parser.columns) + session.settings.set_item("ALLOWED_NUMBER_OF_COLUMNS", ns_parser.columns) elif not other_args: - Session().console.print( - f"Current number of columns: {Session().settings.ALLOWED_NUMBER_OF_COLUMNS}" + session.console.print( + f"Current number of columns: {session.settings.ALLOWED_NUMBER_OF_COLUMNS}" ) diff --git a/cli/openbb_cli/controllers/hub_service.py b/cli/openbb_cli/controllers/hub_service.py index aa44ebcc41c..8a63f701604 100644 --- a/cli/openbb_cli/controllers/hub_service.py +++ b/cli/openbb_cli/controllers/hub_service.py @@ -16,6 +16,9 @@ from openbb_cli.session import Session # created new directory structure to account for personal and default routines +session = Session() + + # pylint: disable=too-many-arguments def upload_routine( auth_header: str, @@ -57,26 +60,26 @@ def upload_routine( "script": routine, "override": override, "tags": tags, - "version": Session().settings.VERSION, + "version": session.settings.VERSION, "public": public, } - _console = Session().console + _console = session.console try: response = requests.post( headers={"Authorization": auth_header}, - url=Session().settings.BASE_URL + "/terminal/script", + url=session.settings.BASE_URL + "/terminal/script", json=data, timeout=timeout, ) if response.status_code == 200: - username = getattr(Session().user.profile.hub_session, "username", None) + username = getattr(session.user.profile.hub_session, "username", None) if not username: _console.print("[red]No username found.[/red]") _console.print("[red]Failed to upload your routine.[/red]") return None _console.print("[green]Successfully uploaded your routine.[/]") - hub_url = Session().settings.HUB_URL + hub_url = session.settings.HUB_URL if public: _console.print( diff --git a/cli/openbb_cli/controllers/script_parser.py b/cli/openbb_cli/controllers/script_parser.py index 1dd48925253..69134fc6d8f 100644 --- a/cli/openbb_cli/controllers/script_parser.py +++ b/cli/openbb_cli/controllers/script_parser.py @@ -7,6 +7,8 @@ from typing import Dict, List, Match, Optional, Tuple, Union from dateutil.relativedelta import relativedelta from openbb_cli.session import Session +session = Session() + # pylint: disable=too-many-statements,eval-used,consider-iterating-dictionary # pylint: disable=too-many-branches,too-many-return-statements @@ -194,7 +196,7 @@ def parse_openbb_script( # noqa: PLR0911,PLR0912 # Just throw a warning when user uses wrong convention numdollars = len(re.findall(r"\$", line)) if numdollars > 1: - Session().console.print( + session.console.print( f"The variable {VAR_NAME} should not be declared as " f"{'$' * numdollars}{VAR_NAME}. Instead it will be " f"converted into ${VAR_NAME}." @@ -445,7 +447,7 @@ def parse_openbb_script( # noqa: PLR0911,PLR0912 final_lines.append(foreach_line_loop.strip()) if not varused_inside: - Session().console.print( + session.console.print( f"The variable {varname} was used in foreach header " "but it wasn't used inside the loop." ) -- cgit v1.2.3