diff options
Diffstat (limited to 'openbb_platform/core/openbb_core/app/static/package_builder.py')
-rw-r--r-- | openbb_platform/core/openbb_core/app/static/package_builder.py | 451 |
1 files changed, 213 insertions, 238 deletions
diff --git a/openbb_platform/core/openbb_core/app/static/package_builder.py b/openbb_platform/core/openbb_core/app/static/package_builder.py index a64c0fd1fd2..b6e08c83696 100644 --- a/openbb_platform/core/openbb_core/app/static/package_builder.py +++ b/openbb_platform/core/openbb_core/app/static/package_builder.py @@ -10,7 +10,6 @@ from inspect import Parameter, _empty, isclass, signature from json import dumps, load from pathlib import Path from typing import ( - Any, Callable, Dict, List, @@ -23,7 +22,6 @@ from typing import ( TypeVar, Union, get_args, - get_origin, get_type_hints, ) @@ -40,8 +38,9 @@ from openbb_core.app.model.custom_parameter import ( OpenBBCustomChoices, OpenBBCustomParameter, ) +from openbb_core.app.model.example import Example from openbb_core.app.provider_interface import ProviderInterface -from openbb_core.app.router import CommandMap, RouterLoader +from openbb_core.app.router import RouterLoader from openbb_core.app.static.utils.console import Console from openbb_core.app.static.utils.linters import Linters from openbb_core.env import Env @@ -66,6 +65,13 @@ DataProcessingSupportedTypes = TypeVar( Data, ) +TAB = " " + + +def create_indent(n: int) -> str: + """Create n indentation space.""" + return TAB * n + class PackageBuilder: """Build the extension package for the Platform.""" @@ -405,8 +411,8 @@ class ClassDefinition: if route.openapi_extra else None ), - examples=route.openapi_extra.get("examples", None), - ) # type: ignore + examples=(route.openapi_extra.get("examples", []) or []), + ) else: doc += " /" if path else " /" doc += c.split("/")[-1] + "\n" @@ -550,9 +556,6 @@ class MethodDefinition: path: str, parameter_map: Dict[str, Parameter] ) -> OrderedDict[str, Parameter]: """Format the params.""" - DEFAULT_REPLACEMENT = { - "provider": None, - } parameter_map.pop("cc", None) # we need to add the chart parameter here bc of the docstring generation @@ -560,7 +563,12 @@ class MethodDefinition: parameter_map["chart"] = Parameter( name="chart", kind=Parameter.POSITIONAL_OR_KEYWORD, - annotation=bool, + annotation=Annotated[ + bool, + OpenBBCustomParameter( + description="Whether to create a chart or not, by default False." + ), + ], default=False, ) @@ -569,6 +577,28 @@ class MethodDefinition: for name, param in parameter_map.items(): if name == "extra_params": formatted[name] = Parameter(name="kwargs", kind=Parameter.VAR_KEYWORD) + elif name == "provider_choices": + fields = param.annotation.__args__[0].__dataclass_fields__ + field = fields["provider"] + type_ = getattr(field, "type") + args = getattr(type_, "__args__") + first = args[0] if args else None + formatted["provider"] = Parameter( + name="provider", + kind=Parameter.POSITIONAL_OR_KEYWORD, + annotation=Annotated[ + Union[MethodDefinition.get_type(field), None], + OpenBBCustomParameter( + description=( + "The provider to use for the query, by default None.\n" + f" If None, the provider specified in defaults is selected or '{first}' if there is\n" + " no default." + "" + ) + ), + ], + default=None, + ) elif MethodDefinition.is_annotated_dc(param.annotation): fields = param.annotation.__args__[0].__dataclass_fields__ for field_name, field in fields.items(): @@ -584,7 +614,7 @@ class MethodDefinition: name=field_name, kind=Parameter.POSITIONAL_OR_KEYWORD, annotation=updated_type, - default=DEFAULT_REPLACEMENT.get(field_name, default), + default=default, ) else: new_type = MethodDefinition.get_expanded_type(name) @@ -602,7 +632,7 @@ class MethodDefinition: name=name, kind=Parameter.POSITIONAL_OR_KEYWORD, annotation=updated_type, - default=DEFAULT_REPLACEMENT.get(name, param.default), + default=param.default, ) return MethodDefinition.reorder_params(params=formatted) @@ -719,20 +749,26 @@ class MethodDefinition: @staticmethod def build_command_method_doc( + path: str, func: Callable, formatted_params: OrderedDict[str, Parameter], model_name: Optional[str] = None, - examples: Optional[List[str]] = None, + examples: Optional[List[Example]] = None, ): """Build the command method docstring.""" doc = func.__doc__ doc = DocstringGenerator.generate( + path=path, func=func, formatted_params=formatted_params, model_name=model_name, examples=examples, ) - code = f' """{doc} """ # noqa: E501\n\n' if doc else "" + code = ( + f'{create_indent(2)}"""{doc}{create_indent(2)}""" # noqa: E501\n\n' + if doc + else "" + ) return code @@ -770,9 +806,9 @@ class MethodDefinition: available = field.type.__args__ code += " provider_choices={\n" code += ' "provider": self._get_provider(\n' - code += " provider,\n" - code += f' "{path}",\n' - code += f" {available},\n" + code += " provider,\n" + code += f' "{path}",\n' + code += f" {available},\n" code += " )\n" code += " },\n" elif MethodDefinition.is_annotated_dc(param.annotation): @@ -812,7 +848,7 @@ class MethodDefinition: raise ValueError( "multiple_items_allowed requires the original type to be specified." ) - return List[original_type] + return List[original_type] # type: ignore return cls.TYPE_EXPANSION.get(field_name, ...) @classmethod @@ -821,7 +857,7 @@ class MethodDefinition: path: str, func: Callable, model_name: Optional[str] = None, - examples: Optional[List[str]] = None, + examples: Optional[List[Example]] = None, ) -> str: """Build the command method.""" func_name = func.__name__ @@ -839,6 +875,7 @@ class MethodDefinition: model_name=model_name, ) code += cls.build_command_method_doc( + path=path, func=func, formatted_params=formatted_params, model_name=model_name, @@ -856,37 +893,114 @@ class DocstringGenerator: provider_interface = ProviderInterface() @staticmethod - def get_OBBject_description(results_type: str, providers: Optional[str]) -> str: + def get_field_type( + field: FieldInfo, target: Literal["docstring", "website"] = "docstring" + ) -> str: + """Get the implicit data type of a defined Pydantic field. + + Args + ---- + field (FieldInfo): Pydantic field object containing field information. + target (Literal["docstring", "website"], optional): Target to return type for. Defaults to "docstring". + + Returns + ------- + str: String representation of the field type. + """ + is_optional = not field.is_required() if target == "docstring" else False + + try: + _type = field.annotation + + if "BeforeValidator" in str(_type): + _type = "Optional[int]" if is_optional else "int" # type: ignore + + field_type = ( + str(_type) + .replace("<class '", "") + .replace("'>", "") + .replace("typing.", "") + .replace("pydantic.types.", "") + .replace("datetime.datetime", "datetime") + .replace("datetime.date", "date") + .replace("NoneType", "None") + .replace(", None", "") + ) + field_type = ( + f"Optional[{field_type}]" + if is_optional and "Optional" not in str(_type) + else field_type + ) + except TypeError: + # Fallback to the annotation if the repr fails + field_type = field.annotation # type: ignore + + return field_type + + @staticmethod + def get_OBBject_description( + results_type: str, + providers: Optional[str], + ) -> str: """Get the command output description.""" available_providers = providers or "Optional[str]" obbject_description = ( - " OBBject\n" - f" results : {results_type}\n" - " Serializable results.\n" - f" provider : {available_providers}\n" - " Provider name.\n" - " warnings : Optional[List[Warning_]]\n" - " List of warnings.\n" - " chart : Optional[Chart]\n" - " Chart object.\n" - " extra : Dict[str, Any]\n" - " Extra info.\n" + f"{create_indent(2)}OBBject\n" + f"{create_indent(3)}results : {results_type}\n" + f"{create_indent(4)}Serializable results.\n" + f"{create_indent(3)}provider : {available_providers}\n" + f"{create_indent(4)}Provider name.\n" + f"{create_indent(3)}warnings : Optional[List[Warning_]]\n" + f"{create_indent(4)}List of warnings.\n" + f"{create_indent(3)}chart : Optional[Chart]\n" + f"{create_indent(4)}Chart object.\n" + f"{create_indent(3)}extra : Dict[str, Any]\n" + f"{create_indent(4)}Extra info.\n" ) obbject_description = obbject_description.replace("NoneType", "None") return obbject_description + @staticmethod + def build_examples( + func_path: str, + param_types: Dict[str, type], + examples: Optional[List[Example]], + target: Literal["docstring", "website"] = "docstring", + ) -> str: + """Get the example section from the examples.""" + if examples: + if target == "docstring": + prompt = ">>> " + indent = create_indent(2) + else: + prompt = "\n```python\n" + indent = create_indent(0) + + doc = f"\n{indent}Examples\n" + doc += f"{indent}--------\n" + doc += f"{indent}{prompt}from openbb import obb\n" + + for e in examples: + doc += e.to_python( + func_path=func_path, + param_types=param_types, + indentation=indent, + prompt=">>> " if target == "docstring" else "", + ) + return doc if target == "docstring" else doc + "```\n\n" + return "" + @classmethod def generate_model_docstring( cls, model_name: str, summary: str, explicit_params: dict, - params: dict, + kwarg_params: dict, returns: Dict[str, FieldInfo], results_type: str, - examples: Optional[List[str]] = None, ) -> str: """Create the docstring for model.""" @@ -902,260 +1016,121 @@ class DocstringGenerator: def format_description(description: str) -> str: """Format description in docstrings.""" - description = description.replace("\n", "\n ") + description = description.replace("\n", f"\n{create_indent(2)}") return description - standard_dict = params["standard"].__dataclass_fields__ - extra_dict = params["extra"].__dataclass_fields__ - - if examples: - example_docstring = "\n Example\n -------\n" - example_docstring += " >>> from openbb import obb\n" - for example in examples: - example_docstring += f" >>> {example}\n" + def get_param_info(parameter: Parameter) -> Tuple[str, str]: + """Get the parameter info.""" + annotation = getattr(parameter, "_annotation", None) + if isinstance(annotation, _AnnotatedAlias): + args = getattr(annotation, "__args__", []) if annotation else [] + p_type = args[0] if args else None + else: + p_type = annotation + type_ = ( + getattr(p_type, "__name__", "") if inspect.isclass(p_type) else p_type + ) + metadata = getattr(annotation, "__metadata__", []) + description = getattr(metadata[0], "description", "") if metadata else "" + return type_, description - docstring = summary.strip("\n") + docstring = summary.strip("\n").replace("\n ", f"\n{create_indent(2)}") docstring += "\n\n" - docstring += " Parameters\n" - docstring += " ----------\n" + docstring += f"{create_indent(2)}Parameters\n" + docstring += f"{create_indent(2)}----------\n" # Explicit parameters for param_name, param in explicit_params.items(): - if param_name in standard_dict: - # pylint: disable=W0212 - p_type = param._annotation.__args__[0] - type_ = p_type.__name__ if inspect.isclass(p_type) else p_type - description = getattr( - param._annotation.__metadata__[0], "description", "" - ) - elif param_name == "provider": - # pylint: disable=W0212 - type_ = param._annotation - default = param._annotation.__args__[0].__args__[0] - description = f"""The provider to use for the query, by default None. - If None, the provider specified in defaults is selected or '{default}' if there is - no default.""" - elif param_name == "chart": - type_ = "bool" - description = "Whether to create a chart or not, by default False." - else: - type_ = "" - description = "" - - type_str = format_type(type_, char_limit=79) # type: ignore - docstring += f" {param_name} : {type_str}\n" - docstring += f" {format_description(description)}\n" + type_, description = get_param_info(param) + type_str = format_type(str(type_), char_limit=79) + docstring += f"{create_indent(2)}{param_name} : {type_str}\n" + docstring += f"{create_indent(3)}{format_description(description)}\n" # Kwargs - for param_name, param in extra_dict.items(): - p_type = param.type - type_ = p_type.__name__ if inspect.isclass(p_type) else p_type + for param_name, param in kwarg_params.items(): + p_type = getattr(param, "type", "") + type_ = ( + getattr(p_type, "__name__", "") if inspect.isclass(p_type) else p_type + ) if "NoneType" in str(type_): type_ = f"Optional[{type_}]".replace(", NoneType", "") - description = getattr(param.default, "description", "") - - docstring += f" {param_name} : {type_}\n" - docstring += f" {format_description(description)}\n" + default = getattr(param, "default", "") + description = getattr(default, "description", "") + docstring += f"{create_indent(2)}{param_name} : {type_}\n" + docstring += f"{create_indent(3)}{format_description(description)}\n" # Returns docstring += "\n" - docstring += " Returns\n" - docstring += " -------\n" - provider_param = explicit_params.get("provider", None) - available_providers = getattr(provider_param, "_annotation", None) - - docstring += cls.get_OBBject_description(results_type, available_providers) + docstring += f"{create_indent(2)}Returns\n" + docstring += f"{create_indent(2)}-------\n" + providers, _ = get_param_info(explicit_params.get("provider", None)) + docstring += cls.get_OBBject_description(results_type, providers) # Schema underline = "-" * len(model_name) - docstring += f"\n {model_name}\n {underline}\n" + docstring += f"\n{create_indent(2)}{model_name}\n" + docstring += f"{create_indent(2)}{underline}\n" for name, field in returns.items(): - try: - _type = field.annotation - is_optional = not field.is_required() - if "BeforeValidator" in str(_type): - _type = "Optional[int]" if is_optional else "int" # type: ignore - - field_type = ( - str(_type) - .replace("<class '", "") - .replace("'>", "") - .replace("typing.", "") - .replace("pydantic.types.", "") - .replace("datetime.datetime", "datetime") - .replace("datetime.date", "date") - .replace("NoneType", "None") - .replace(", None", "") - ) - field_type = ( - f"Optional[{field_type}]" - if is_optional and "Optional" not in str(_type) - else field_type - ) - except TypeError: - # Fallback to the annotation if the repr fails - field_type = field.annotation # type: ignore - + field_type = cls.get_field_type(field) description = getattr(field, "description", "") - - docstring += f" {field.alias or name} : {field_type}\n" - docstring += f" {format_description(description)}\n" - - if examples: - docstring += example_docstring - + docstring += f"{create_indent(2)}{field.alias or name} : {field_type}\n" + docstring += f"{create_indent(3)}{format_description(description)}\n" return docstring @classmethod def generate( cls, + path: str, func: Callable, formatted_params: OrderedDict[str, Parameter], model_name: Optional[str] = None, - examples: Optional[List[str]] = None, + examples: Optional[List[Example]] = None, ) -> Optional[str]: """Generate the docstring for the function.""" - doc = func.__doc__ + doc = func.__doc__ or "" + param_types = {} + + # Parameters explicit in the function signature + explicit_params = dict(formatted_params) + explicit_params.pop("extra_params", None) + # Map of parameter names to types + param_types = {k: v.annotation for k, v in explicit_params.items()} + if model_name: - params = cls.provider_interface.params.get(model_name, None) + params = cls.provider_interface.params.get(model_name, {}) return_schema = cls.provider_interface.return_schema.get(model_name, None) if params and return_schema: - explicit_dict = dict(formatted_params) - explicit_dict.pop("extra_params", None) + # Parameters passed as **kwargs + kwarg_params = params["extra"].__dataclass_fields__ + param_types.update({k: v.type for k, v in kwarg_params.items()}) returns = return_schema.model_fields results_type = func.__annotations__.get("return", model_name) if hasattr(results_type, "results_type_repr"): results_type = results_type.results_type_repr() - return cls.generate_model_docstring( + doc = cls.generate_model_docstring( model_name=model_name, summary=func.__doc__ or "", - explicit_params=explicit_dict, - params=params, + explicit_params=explicit_params, + kwarg_params=kwarg_params, returns=returns, results_type=results_type, - examples=examples, ) - return doc - if examples and examples != [""] and doc: - doc += "\n Examples\n --------\n" - doc += " >>> from openbb import obb\n" - for example in examples: - if example != "": - doc += f" >>> {example}\n" - return doc - - @staticmethod - def get_model_standard_params(param_fields: Dict[str, FieldInfo]) -> Dict[str, Any]: - """Get the test params for the fetcher based on the required standard params.""" - test_params: Dict[str, Any] = {} - for field_name, field in param_fields.items(): - if field.default and field.default is not PydanticUndefined: - test_params[field_name] = field.default - elif field.default and field.default is PydanticUndefined: - example_dict = { - "symbol": "AAPL", - "symbols": "AAPL,MSFT", - "start_date": "2023-01-01", - "end_date": "2023-06-06", - "country": "Portugal", - "date": "2023-01-01", - "countries": ["portugal", "spain"], - } - if field_name in example_dict: - test_params[field_name] = example_dict[field_name] - elif field.annotation == str: - test_params[field_name] = "TEST_STRING" - elif field.annotation == int: - test_params[field_name] = 1 - elif field.annotation == float: - test_params[field_name] = 1.0 - elif field.annotation == bool: - test_params[field_name] = True - elif get_origin(field.annotation) is Literal: # type: ignore - option = field.annotation.__args__[0] # type: ignore - if isinstance(option, str): - test_params[field_name] = f'"{option}"' - else: - test_params[field_name] = option - - return test_params - - @staticmethod - def get_full_command_name(route: str) -> str: - """Get the full command name.""" - cmd_parts = route.split("/") - del cmd_parts[0] - - menu = cmd_parts[0] - command = cmd_parts[-1] - sub_menus = cmd_parts[1:-1] - - sub_menu_str_cmd = f".{'.'.join(sub_menus)}" if sub_menus else "" - - full_command = f"{menu}{sub_menu_str_cmd}.{command}" - - return full_command - - @classmethod - def generate_example( - cls, - model_name: str, - standard_params: Dict[str, FieldInfo], - ) -> str: - """Generate the example for the command.""" - # find the model router here - cm = CommandMap() - commands_model = cm.commands_model - route = [k for k, v in commands_model.items() if v == model_name] - - if not route: - return "" - - full_command_name = cls.get_full_command_name(route=route[0]) - example_params = cls.get_model_standard_params(param_fields=standard_params) - - # Edge cases (might find more) - if "crypto" in route[0] and "symbol" in example_params: - example_params["symbol"] = "BTCUSD" - elif "currency" in route[0] and "symbol" in example_params: - example_params["symbol"] = "EURUSD" - elif ( - "index" in route[0] - and "european" not in route[0] - and "symbol" in example_params - ): - example_params["symbol"] = "SPX" - elif ( - "index" in route[0] - and "european" in route[0] - and "symbol" in example_params - ): - example_params["symbol"] = "BUKBUS" - elif ( - "futures" in route[0] and "curve" in route[0] and "symbol" in example_params - ): - example_params["symbol"] = "VX" - elif "futures" in route[0] and "symbol" in example_params: - example_params["symbol"] = "ES" - - example = "\n Example\n -------\n" - example += " >>> from openbb import obb\n" - example += f" >>> obb.{full_command_name}(" - for param_name, param_value in example_params.items(): - if isinstance(param_value, str): - param_value = f'"{param_value}"' # noqa: PLW2901 - example += f"{param_name}={param_value}, " - if example_params: - example = example[:-2] + ")\n" else: - example += ")\n" + doc = doc.replace("\n ", f"\n{create_indent(2)}") + + if doc and examples: + doc += cls.build_examples( + path.replace("/", "."), + param_types, + examples, + ) - return example + return doc class PathHandler: |