diff options
author | Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com> | 2024-05-27 14:07:41 +0200 |
---|---|---|
committer | Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com> | 2024-05-27 14:07:41 +0200 |
commit | 38f4316749b9976bb83a727cef96d5e0d572a1d7 (patch) | |
tree | 5d65e30189ecb5f04198ca61cad210b2455b6947 | |
parent | 459bb70c983001736f56fb2467664916c2f15c4e (diff) |
Closer
5 files changed, 44 insertions, 40 deletions
diff --git a/openbb_platform/core/openbb_core/api/router/commands.py b/openbb_platform/core/openbb_core/api/router/commands.py index ce1cab484a8..16b1380adf6 100644 --- a/openbb_platform/core/openbb_core/api/router/commands.py +++ b/openbb_platform/core/openbb_core/api/router/commands.py @@ -3,9 +3,10 @@ import inspect from functools import partial, wraps from inspect import Parameter, Signature, signature -from typing import Any, Callable, Dict, Optional, Tuple, TypeVar +from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, Union from fastapi import APIRouter, Depends, Header +from fastapi.responses import StreamingResponse from fastapi.routing import APIRoute from openbb_core.app.command_runner import CommandRunner from openbb_core.app.model.command_context import CommandContext @@ -188,7 +189,9 @@ def build_api_wrapper( func.__annotations__ = new_annotations_map @wraps(wrapped=func) - async def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> OBBject: + async def wrapper( + *args: Tuple[Any], **kwargs: Dict[str, Any] + ) -> Union[OBBject, StreamingResponse]: user_settings: UserSettings = UserSettings.model_validate( kwargs.pop( "__authenticated_user_settings", @@ -196,9 +199,11 @@ def build_api_wrapper( ) ) execute = partial(command_runner.run, path, user_settings) - output: OBBject = await execute(*args, **kwargs) + output = await execute(*args, **kwargs) - return validate_output(output) + if isinstance(output, OBBject): + return validate_output(output) + return output return wrapper diff --git a/openbb_platform/core/openbb_core/app/command_runner.py b/openbb_platform/core/openbb_core/app/command_runner.py index 89433e45156..7a5df24dea0 100644 --- a/openbb_platform/core/openbb_core/app/command_runner.py +++ b/openbb_platform/core/openbb_core/app/command_runner.py @@ -8,9 +8,10 @@ from datetime import datetime from inspect import Parameter, signature from sys import exc_info from time import perf_counter_ns -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from warnings import catch_warnings, showwarning, warn +from fastapi.responses import StreamingResponse from pydantic import BaseModel, ConfigDict, create_model from openbb_core.app.logs.logging_service import LoggingService @@ -420,7 +421,7 @@ class StaticCommandRunner: /, *args, **kwargs, - ) -> OBBject: + ) -> Union[OBBject, StreamingResponse]: """Run a command and return the OBBject as output.""" timestamp = datetime.now() start_ns = perf_counter_ns() @@ -429,7 +430,7 @@ class StaticCommandRunner: route = execution_context.route if func := command_map.get_command(route=route): - obbject = await cls._execute_func( + result = await cls._execute_func( route=route, args=args, # type: ignore execution_context=execution_context, @@ -442,19 +443,20 @@ class StaticCommandRunner: duration = perf_counter_ns() - start_ns if execution_context.user_settings.preferences.metadata: - try: - obbject.extra["metadata"] = Metadata( - arguments=kwargs, - duration=duration, - route=route, - timestamp=timestamp, - ) - except Exception as e: - if Env().DEBUG_MODE: - raise OpenBBError(e) from e - warn(str(e), OpenBBWarning) + if isinstance(result, OBBject): + try: + result.extra["metadata"] = Metadata( + arguments=kwargs, + duration=duration, + route=route, + timestamp=timestamp, + ) + except Exception as e: + if Env().DEBUG_MODE: + raise OpenBBError(e) from e + warn(str(e), OpenBBWarning) - return obbject + return result class CommandRunner: diff --git a/openbb_platform/core/openbb_core/app/model/obbject.py b/openbb_platform/core/openbb_core/app/model/obbject.py index 0a1d09d01a9..67f41e9d15d 100644 --- a/openbb_platform/core/openbb_core/app/model/obbject.py +++ b/openbb_platform/core/openbb_core/app/model/obbject.py @@ -275,11 +275,6 @@ class OBBject(Tagged, Generic[T]): show_function: Callable = getattr(self.chart.fig, "show") show_function(**kwargs) - async def stream(self) -> None: - """Stream response.""" - async for result in self.results: # pylint: disable=not-an-iterable - print(result) - @classmethod async def from_query(cls, query: "Query") -> "OBBject": """Create OBBject from query. diff --git a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py index 863e7dcbedc..b27eaf0c5d9 100644 --- a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py +++ b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py @@ -17,6 +17,7 @@ from providers.binance.openbb_binance.models.crypto_historical import ( ) from openbb_crypto.price.price_router import router as price_router +from fastapi.responses import StreamingResponse router = Router(prefix="", description="Cryptocurrency market data.") router.include_router(price_router) @@ -49,4 +50,4 @@ async def stream_price(symbol: str = "ethbtc", lifetime: int = 10) -> OBBject: params={"symbol": symbol, "lifetime": lifetime}, credentials=None, ) - return OBBject(results=generator, provider="binance") + return StreamingResponse(generator, media_type="application/x-ndjson") diff --git a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py index bf34da35e44..3c8d440c166 100644 --- a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py +++ b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py @@ -2,7 +2,7 @@ import json from datetime import datetime, timedelta -from typing import Any, Dict, Optional +from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional import websockets from openbb_core.provider.standard_models.crypto_historical import ( @@ -20,7 +20,7 @@ class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams): """Binance Crypto Historical Query Params.""" lifetime: Optional[int] = Field( - default=60, description="Lifetime of WebSocket in seconds" + default=60, description="Lifetime of WebSocket in seconds." ) @@ -56,11 +56,23 @@ class BinanceCryptoHistoricalFetcher(Fetcher): return BinanceCryptoHistoricalQueryParams(**params) @staticmethod + def transform_data( + query: BinanceCryptoHistoricalQueryParams, + data: Dict[str, Any], + ) -> BinanceCryptoHistoricalData: + """Return the transformed data.""" + data["date"] = ( + datetime.now().isoformat() if "date" not in data else data["date"] + ) + + return BinanceCryptoHistoricalData(**data) + + @staticmethod async def aextract_data( query: BinanceCryptoHistoricalQueryParams, credentials: Optional[Dict[str, str]] = None, **kwargs: Any, - ) -> BinanceCryptoHistoricalData: + ) -> AsyncIterator[str]: """Return the raw data from the Binance endpoint.""" async with websockets.connect( f"wss://stream.binance.com:9443/ws/{query.symbol.lower()}@miniTicker" @@ -74,20 +86,9 @@ class BinanceCryptoHistoricalFetcher(Fetcher): transformed_data = BinanceCryptoHistoricalFetcher.transform_data( query, data ) - yield transformed_data + yield transformed_data.model_dump_json() + "\n" except websockets.exceptions.ConnectionClosed as e: print("WebSocket connection closed.") raise e finally: print("WebSocket connection closed.") - - @staticmethod - def transform_data( - query: BinanceCryptoHistoricalQueryParams, - data: Dict[str, Any], - ) -> BinanceCryptoHistoricalData: - """Return the transformed data.""" - data["date"] = ( - datetime.now().isoformat() if "date" not in data else data["date"] - ) - return BinanceCryptoHistoricalData(**data) |