diff options
author | Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com> | 2024-05-24 22:58:46 +0200 |
---|---|---|
committer | Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com> | 2024-05-24 22:58:46 +0200 |
commit | 1286f07fdd6236afa2f4eb7b8c8c6d29fdaf5c4d (patch) | |
tree | 18058d12ad5131c6fdabc8b7177432c31efb9d27 | |
parent | aeeae61df9f055536c03cdaaa4765260fc316923 (diff) |
Getting closer
4 files changed, 152 insertions, 100 deletions
diff --git a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py index fd0497293f9..934ac1f233f 100644 --- a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py +++ b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py @@ -232,62 +232,62 @@ class Fetcher(Generic[Q, R]): ), f"Transformed data must be of the correct type. Expected: {cls.return_type} Got: {type(transformed_data)}" -class StreamFetcher(Generic[Q, R]): - """Class to fetch live streaming data using WebSocket connections.""" - - @classmethod - async def connect( - cls, - uri: str, - ): - """Connect to a WebSocket server.""" - cls.websocket = await websockets.connect(uri) - print("Connected to WebSocket server.") - asyncio.create_task(cls.receive_data()) - - @staticmethod - def transform_data(data: Any, **kwargs) -> Union[R, AnnotatedResult[R]]: - """Transform the provider-specific data.""" - raise NotImplementedError - - @classmethod - async def receive_data(cls, **kwargs): - """Receive data from the WebSocket server.""" - try: - while True: - message = await cls.websocket.recv() - processed_data = await cls.process_message(message, **kwargs) - if processed_data: - print(processed_data) - - except websockets.exceptions.ConnectionClosed: - print("WebSocket connection closed.") - - @classmethod - async def process_message(cls, message: str, **kwargs) -> Optional[R]: - """Process incoming WebSocket messages.""" - try: - json_data = json.loads(message) - transformed_data = cls.transform_data(json_data, **kwargs) - return transformed_data - except Exception as e: - print(f"Error processing message: {e}") - return None - - @classmethod - async def disconnect(cls): - """Disconnect the WebSocket.""" - await cls.websocket.close() - - @classmethod - async def fetch_data( - cls, # pylint: disable=unused-argument - params: Dict[str, Any], - credentials: Optional[Dict[str, str]] = None, # pylint: disable=unused-argument - **kwargs, - ) -> Union[R, AnnotatedResult[R]]: - """Fetch live data from a provider.""" - # In a streaming context, this method may just ensure the connection is open. - if not hasattr(cls, "websocket"): - await cls.connect(params.get("uri")) - # Data handling is asynchronous and managed by `receive_data`. +# class StreamFetcher(Generic[Q, R]): +# """Class to fetch live streaming data using WebSocket connections.""" + +# @classmethod +# async def connect( +# cls, +# uri: str, +# ): +# """Connect to a WebSocket server.""" +# cls.websocket = await websockets.connect(uri) +# print("Connected to WebSocket server.") +# asyncio.create_task(cls.receive_data()) + +# @staticmethod +# def transform_data(data: Any, **kwargs) -> Union[R, AnnotatedResult[R]]: +# """Transform the provider-specific data.""" +# raise NotImplementedError + +# @classmethod +# async def receive_data(cls, **kwargs): +# """Receive data from the WebSocket server.""" +# try: +# while True: +# message = await cls.websocket.recv() +# processed_data = await cls.process_message(message, **kwargs) +# if processed_data: +# print(processed_data) + +# except websockets.exceptions.ConnectionClosed: +# print("WebSocket connection closed.") + +# @classmethod +# async def process_message(cls, message: str, **kwargs) -> Optional[R]: +# """Process incoming WebSocket messages.""" +# try: +# json_data = json.loads(message) +# transformed_data = cls.transform_data(json_data, **kwargs) +# return transformed_data +# except Exception as e: +# print(f"Error processing message: {e}") +# return None + +# @classmethod +# async def disconnect(cls): +# """Disconnect the WebSocket.""" +# await cls.websocket.close() + +# @classmethod +# async def fetch_data( +# cls, # pylint: disable=unused-argument +# params: Dict[str, Any], +# credentials: Optional[Dict[str, str]] = None, # pylint: disable=unused-argument +# **kwargs, +# ) -> Union[R, AnnotatedResult[R]]: +# """Fetch live data from a provider.""" +# # In a streaming context, this method may just ensure the connection is open. +# if not hasattr(cls, "websocket"): +# await cls.connect(params.get("uri")) +# # Data handling is asynchronous and managed by `receive_data`. diff --git a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py index d11097f6960..258be3cec44 100644 --- a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py +++ b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py @@ -1,6 +1,6 @@ """Crypto Router.""" -import asyncio +# import asyncio from openbb_core.app.model.command_context import CommandContext from openbb_core.app.model.example import APIEx @@ -12,8 +12,9 @@ from openbb_core.app.provider_interface import ( ) from openbb_core.app.query import Query from openbb_core.app.router import Router + from providers.binance.openbb_binance.models.crypto_historical import ( - BinanceStreamFetcher, + BinanceCryptoHistoricalFetcher, ) from openbb_crypto.price.price_router import router as price_router @@ -41,21 +42,10 @@ async def search( # pylint: disable=unused-argument +@router.command(method=["POST"]) async def crypto_historical(): "Define the POC." - full_url = "wss://stream.binance.com:9443/ws/ethbtc@miniTicker" - await BinanceStreamFetcher.connect(full_url) - try: - await asyncio.sleep(10) # Keep connection open for 60 seconds - finally: - await BinanceStreamFetcher.disconnect() - - # Adjusted setup for existing asyncio event loops - loop = asyncio.get_event_loop() - - if loop.is_running(): - # Scheduling the coroutine to run and handling with the existing event loop - loop.create_task(crypto_historical()) - else: - # If the loop is not running, run until complete - loop.run_until_complete(crypto_historical()) + yield BinanceCryptoHistoricalFetcher().fetch_data( + params={"symbol": "ethbtc", "lifetime": 10}, + credentials=None, + ) diff --git a/openbb_platform/providers/binance/openbb_binance/__init__.py b/openbb_platform/providers/binance/openbb_binance/__init__.py index 444b35b9353..34ffffa6658 100644 --- a/openbb_platform/providers/binance/openbb_binance/__init__.py +++ b/openbb_platform/providers/binance/openbb_binance/__init__.py @@ -1,6 +1,8 @@ """Biztoc provider module.""" -from openbb_binance.models.crypto_historical import BinanceStreamFetcher +# from openbb_binance.models.crypto_historical import ( +# BinanceCryptoHistoricalFetcher, +# ) from openbb_core.provider.abstract.provider import Provider binance_provider = Provider( @@ -17,7 +19,7 @@ If you're not a developer but would still like to use Biztoc outside of the main we've partnered with OpenBB, allowing you to pull in BizToc's news stream in their Terminal.""", # credentials=["api_key"], fetcher_dict={ - "bcrypto_historical": BinanceStreamFetcher, + # "bcrypto_historical": BinanceCryptoHistoricalFetcher, }, repr_name="Binance", instructions="The BizToc API is hosted on RapidAPI. To set up, go to: https://rapidapi.com/thma/api/binance.\n\n![binance0](https://github.com/marban/OpenBBTerminal/assets/18151143/04cdd423-f65e-4ad8-ad5a-4a59b0f5ddda)\n\nIn the top right, select 'Sign Up'. After answering some questions, you will be prompted to select one of their plans.\n\n![binance1](https://github.com/marban/OpenBBTerminal/assets/18151143/9f3b72ea-ded7-48c5-aa33-bec5c0de8422)\n\nAfter signing up, navigate back to https://rapidapi.com/thma/api/binance. If you are logged in, you will see a header called X-RapidAPI-Key.\n\n![binance2](https://github.com/marban/OpenBBTerminal/assets/18151143/0f3b6c91-07e0-447a-90cd-a9e23522929f)", # noqa: E501 pylint: disable=line-too-long diff --git a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py index f80b0709a74..b39cfa61ddd 100644 --- a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py +++ b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py @@ -1,19 +1,33 @@ """Binance Crypto Historical WS Data.""" +import asyncio import json -from datetime import datetime -from typing import Any, Dict, Optional +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional from openbb_core.provider.standard_models.crypto_historical import ( CryptoHistoricalData, + CryptoHistoricalQueryParams, ) from pydantic import Field +import websockets -from openbb_platform.core.openbb_core.provider.abstract.fetcher import StreamFetcher +from openbb_platform.core.openbb_core.provider.abstract.fetcher import ( + # StreamFetcher, + Fetcher, +) # pylint: disable=unused-kwargs +class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams): + """Binance Crypto Historical Query Params""" + + lifetime: Optional[int] = Field( + default=60, description="Lifetime of WebSocket in seconds" + ) + + class BinanceCryptoHistoricalData(CryptoHistoricalData): """Binance Crypto Historical Data.""" @@ -37,25 +51,71 @@ class BinanceCryptoHistoricalData(CryptoHistoricalData): ) -class BinanceStreamFetcher(StreamFetcher): - """Define Binance Stream Fetcher.""" +class BinanceCryptoHistoricalFetcher(Fetcher): + """Define Binance Crypto Historical Fetcher.""" + + @staticmethod + def transform_query(params: Dict[str, Any]) -> BinanceCryptoHistoricalQueryParams: + """Transform the query params.""" + return BinanceCryptoHistoricalQueryParams(**params) + + @staticmethod + async def aextract_data( + query: BinanceCryptoHistoricalQueryParams, + credentials: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> BinanceCryptoHistoricalData: + """Return the raw data from the Binance endpoint.""" + async with websockets.connect( + f"wss://stream.binance.com:9443/ws/{query.symbol.lower()}@miniTicker" + ) as websocket: + print("Connected to WebSocket server.") + end_time = datetime.now() + timedelta(seconds=query.lifetime) + try: + while datetime.now() < end_time: + message = await websocket.recv() + data = json.loads(message) + transformed_data = BinanceCryptoHistoricalFetcher.transform_data( + query, data + ) + # print(transformed_data) + yield transformed_data + except websockets.exceptions.ConnectionClosed: + print("WebSocket connection closed.") + finally: + print("WebSocket connection closed.") @staticmethod - def transform_data(data: Dict[str, Any], **kwargs) -> BinanceCryptoHistoricalData: - """Transform the incoming data.""" - if "date" not in data: - data["date"] = datetime.now().isoformat() + def transform_data( + query: BinanceCryptoHistoricalQueryParams, + data: Dict[str, Any], + ) -> BinanceCryptoHistoricalData: + """Return the transformed data.""" + data["date"] = ( + datetime.now().isoformat() if "date" not in data else data["date"] + ) return BinanceCryptoHistoricalData(**data) - @classmethod - async def process_message( - cls, message: str, **kwargs - ) -> Optional[BinanceCryptoHistoricalData]: - """Process incoming WebSocket messages.""" - try: - json_data = json.loads(message) - transformed_data = cls.transform_data(json_data) - return transformed_data - except Exception as e: - print(f"Error processing message from Binance: {e}") - return None + +# class BinanceStreamFetcher(StreamFetcher): +# """Define Binance Stream Fetcher.""" + +# @staticmethod +# def transform_data(data: Dict[str, Any], **kwargs) -> BinanceCryptoHistoricalData: +# """Transform the incoming data.""" +# if "date" not in data: +# data["date"] = datetime.now().isoformat() +# return BinanceCryptoHistoricalData(**data) + +# @classmethod +# async def process_message( +# cls, message: str, **kwargs +# ) -> Optional[BinanceCryptoHistoricalData]: +# """Process incoming WebSocket messages.""" +# try: +# json_data = json.loads(message) +# transformed_data = cls.transform_data(json_data) +# return transformed_data +# except Exception as e: +# print(f"Error processing message from Binance: {e}") +# return None |