diff options
author | Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com> | 2024-05-25 15:09:29 +0200 |
---|---|---|
committer | Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com> | 2024-05-25 15:09:29 +0200 |
commit | 459bb70c983001736f56fb2467664916c2f15c4e (patch) | |
tree | 2e56926fa3e45bad5e9a638fee9e79a505af8f93 | |
parent | 1286f07fdd6236afa2f4eb7b8c8c6d29fdaf5c4d (diff) |
Closer
4 files changed, 35 insertions, 45 deletions
diff --git a/openbb_platform/core/openbb_core/app/model/obbject.py b/openbb_platform/core/openbb_core/app/model/obbject.py index 67f41e9d15d..0a1d09d01a9 100644 --- a/openbb_platform/core/openbb_core/app/model/obbject.py +++ b/openbb_platform/core/openbb_core/app/model/obbject.py @@ -275,6 +275,11 @@ class OBBject(Tagged, Generic[T]): show_function: Callable = getattr(self.chart.fig, "show") show_function(**kwargs) + async def stream(self) -> None: + """Stream response.""" + async for result in self.results: # pylint: disable=not-an-iterable + print(result) + @classmethod async def from_query(cls, query: "Query") -> "OBBject": """Create OBBject from query. diff --git a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py index 934ac1f233f..8f16e8b7bfa 100644 --- a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py +++ b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py @@ -3,8 +3,6 @@ # ruff: noqa: S101, E501 # pylint: disable=E1101, C0301 -import asyncio -import json from typing import ( Any, Dict, @@ -16,7 +14,6 @@ from typing import ( get_origin, ) -import websockets from pandas import DataFrame from openbb_core.provider.abstract.annotated_result import AnnotatedResult @@ -92,6 +89,21 @@ class Fetcher(Generic[Q, R]): ) return cls.transform_data(query=query, data=data, **kwargs) + @classmethod + async def stream_data( + cls, + params: Dict[str, Any], + credentials: Optional[Dict[str, str]] = None, + **kwargs, + ) -> Union[R, AnnotatedResult[R]]: + """Fetch data from a provider.""" + query = cls.transform_query(params=params) + data = await maybe_coroutine( + cls.aextract_data, query=query, credentials=credentials, **kwargs + ) + async for d in data: + yield d + @classproperty def query_params_type(self) -> Q: """Get the type of query.""" diff --git a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py index 258be3cec44..863e7dcbedc 100644 --- a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py +++ b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py @@ -12,7 +12,6 @@ from openbb_core.app.provider_interface import ( ) from openbb_core.app.query import Query from openbb_core.app.router import Router - from providers.binance.openbb_binance.models.crypto_historical import ( BinanceCryptoHistoricalFetcher, ) @@ -41,11 +40,13 @@ async def search( return await OBBject.from_query(Query(**locals())) -# pylint: disable=unused-argument -@router.command(method=["POST"]) -async def crypto_historical(): - "Define the POC." - yield BinanceCryptoHistoricalFetcher().fetch_data( - params={"symbol": "ethbtc", "lifetime": 10}, +@router.command( + methods=["GET"], +) +async def stream_price(symbol: str = "ethbtc", lifetime: int = 10) -> OBBject: + """Define the POC.""" + generator = BinanceCryptoHistoricalFetcher().stream_data( + params={"symbol": symbol, "lifetime": lifetime}, credentials=None, ) + return OBBject(results=generator, provider="binance") diff --git a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py index b39cfa61ddd..bf34da35e44 100644 --- a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py +++ b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py @@ -1,27 +1,23 @@ """Binance Crypto Historical WS Data.""" -import asyncio import json from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional +import websockets from openbb_core.provider.standard_models.crypto_historical import ( CryptoHistoricalData, CryptoHistoricalQueryParams, ) from pydantic import Field -import websockets -from openbb_platform.core.openbb_core.provider.abstract.fetcher import ( - # StreamFetcher, - Fetcher, -) +from openbb_platform.core.openbb_core.provider.abstract.fetcher import Fetcher -# pylint: disable=unused-kwargs +# pylint: disable=unused-argument, arguments-differ class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams): - """Binance Crypto Historical Query Params""" + """Binance Crypto Historical Query Params.""" lifetime: Optional[int] = Field( default=60, description="Lifetime of WebSocket in seconds" @@ -78,10 +74,10 @@ class BinanceCryptoHistoricalFetcher(Fetcher): transformed_data = BinanceCryptoHistoricalFetcher.transform_data( query, data ) - # print(transformed_data) yield transformed_data - except websockets.exceptions.ConnectionClosed: + except websockets.exceptions.ConnectionClosed as e: print("WebSocket connection closed.") + raise e finally: print("WebSocket connection closed.") @@ -95,27 +91,3 @@ class BinanceCryptoHistoricalFetcher(Fetcher): datetime.now().isoformat() if "date" not in data else data["date"] ) return BinanceCryptoHistoricalData(**data) - - -# class BinanceStreamFetcher(StreamFetcher): -# """Define Binance Stream Fetcher.""" - -# @staticmethod -# def transform_data(data: Dict[str, Any], **kwargs) -> BinanceCryptoHistoricalData: -# """Transform the incoming data.""" -# if "date" not in data: -# data["date"] = datetime.now().isoformat() -# return BinanceCryptoHistoricalData(**data) - -# @classmethod -# async def process_message( -# cls, message: str, **kwargs -# ) -> Optional[BinanceCryptoHistoricalData]: -# """Process incoming WebSocket messages.""" -# try: -# json_data = json.loads(message) -# transformed_data = cls.transform_data(json_data) -# return transformed_data -# except Exception as e: -# print(f"Error processing message from Binance: {e}") -# return None |