summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Radovanovic <74266147+IgorWounds@users.noreply.github.com>2024-05-24 22:58:46 +0200
committerIgor Radovanovic <74266147+IgorWounds@users.noreply.github.com>2024-05-24 22:58:46 +0200
commit1286f07fdd6236afa2f4eb7b8c8c6d29fdaf5c4d (patch)
tree18058d12ad5131c6fdabc8b7177432c31efb9d27
parentaeeae61df9f055536c03cdaaa4765260fc316923 (diff)
Getting closer
-rw-r--r--openbb_platform/core/openbb_core/provider/abstract/fetcher.py118
-rw-r--r--openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py26
-rw-r--r--openbb_platform/providers/binance/openbb_binance/__init__.py6
-rw-r--r--openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py102
4 files changed, 152 insertions, 100 deletions
diff --git a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py
index fd0497293f9..934ac1f233f 100644
--- a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py
+++ b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py
@@ -232,62 +232,62 @@ class Fetcher(Generic[Q, R]):
), f"Transformed data must be of the correct type. Expected: {cls.return_type} Got: {type(transformed_data)}"
-class StreamFetcher(Generic[Q, R]):
- """Class to fetch live streaming data using WebSocket connections."""
-
- @classmethod
- async def connect(
- cls,
- uri: str,
- ):
- """Connect to a WebSocket server."""
- cls.websocket = await websockets.connect(uri)
- print("Connected to WebSocket server.")
- asyncio.create_task(cls.receive_data())
-
- @staticmethod
- def transform_data(data: Any, **kwargs) -> Union[R, AnnotatedResult[R]]:
- """Transform the provider-specific data."""
- raise NotImplementedError
-
- @classmethod
- async def receive_data(cls, **kwargs):
- """Receive data from the WebSocket server."""
- try:
- while True:
- message = await cls.websocket.recv()
- processed_data = await cls.process_message(message, **kwargs)
- if processed_data:
- print(processed_data)
-
- except websockets.exceptions.ConnectionClosed:
- print("WebSocket connection closed.")
-
- @classmethod
- async def process_message(cls, message: str, **kwargs) -> Optional[R]:
- """Process incoming WebSocket messages."""
- try:
- json_data = json.loads(message)
- transformed_data = cls.transform_data(json_data, **kwargs)
- return transformed_data
- except Exception as e:
- print(f"Error processing message: {e}")
- return None
-
- @classmethod
- async def disconnect(cls):
- """Disconnect the WebSocket."""
- await cls.websocket.close()
-
- @classmethod
- async def fetch_data(
- cls, # pylint: disable=unused-argument
- params: Dict[str, Any],
- credentials: Optional[Dict[str, str]] = None, # pylint: disable=unused-argument
- **kwargs,
- ) -> Union[R, AnnotatedResult[R]]:
- """Fetch live data from a provider."""
- # In a streaming context, this method may just ensure the connection is open.
- if not hasattr(cls, "websocket"):
- await cls.connect(params.get("uri"))
- # Data handling is asynchronous and managed by `receive_data`.
+# class StreamFetcher(Generic[Q, R]):
+# """Class to fetch live streaming data using WebSocket connections."""
+
+# @classmethod
+# async def connect(
+# cls,
+# uri: str,
+# ):
+# """Connect to a WebSocket server."""
+# cls.websocket = await websockets.connect(uri)
+# print("Connected to WebSocket server.")
+# asyncio.create_task(cls.receive_data())
+
+# @staticmethod
+# def transform_data(data: Any, **kwargs) -> Union[R, AnnotatedResult[R]]:
+# """Transform the provider-specific data."""
+# raise NotImplementedError
+
+# @classmethod
+# async def receive_data(cls, **kwargs):
+# """Receive data from the WebSocket server."""
+# try:
+# while True:
+# message = await cls.websocket.recv()
+# processed_data = await cls.process_message(message, **kwargs)
+# if processed_data:
+# print(processed_data)
+
+# except websockets.exceptions.ConnectionClosed:
+# print("WebSocket connection closed.")
+
+# @classmethod
+# async def process_message(cls, message: str, **kwargs) -> Optional[R]:
+# """Process incoming WebSocket messages."""
+# try:
+# json_data = json.loads(message)
+# transformed_data = cls.transform_data(json_data, **kwargs)
+# return transformed_data
+# except Exception as e:
+# print(f"Error processing message: {e}")
+# return None
+
+# @classmethod
+# async def disconnect(cls):
+# """Disconnect the WebSocket."""
+# await cls.websocket.close()
+
+# @classmethod
+# async def fetch_data(
+# cls, # pylint: disable=unused-argument
+# params: Dict[str, Any],
+# credentials: Optional[Dict[str, str]] = None, # pylint: disable=unused-argument
+# **kwargs,
+# ) -> Union[R, AnnotatedResult[R]]:
+# """Fetch live data from a provider."""
+# # In a streaming context, this method may just ensure the connection is open.
+# if not hasattr(cls, "websocket"):
+# await cls.connect(params.get("uri"))
+# # Data handling is asynchronous and managed by `receive_data`.
diff --git a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
index d11097f6960..258be3cec44 100644
--- a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
+++ b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
@@ -1,6 +1,6 @@
"""Crypto Router."""
-import asyncio
+# import asyncio
from openbb_core.app.model.command_context import CommandContext
from openbb_core.app.model.example import APIEx
@@ -12,8 +12,9 @@ from openbb_core.app.provider_interface import (
)
from openbb_core.app.query import Query
from openbb_core.app.router import Router
+
from providers.binance.openbb_binance.models.crypto_historical import (
- BinanceStreamFetcher,
+ BinanceCryptoHistoricalFetcher,
)
from openbb_crypto.price.price_router import router as price_router
@@ -41,21 +42,10 @@ async def search(
# pylint: disable=unused-argument
+@router.command(method=["POST"])
async def crypto_historical():
"Define the POC."
- full_url = "wss://stream.binance.com:9443/ws/ethbtc@miniTicker"
- await BinanceStreamFetcher.connect(full_url)
- try:
- await asyncio.sleep(10) # Keep connection open for 60 seconds
- finally:
- await BinanceStreamFetcher.disconnect()
-
- # Adjusted setup for existing asyncio event loops
- loop = asyncio.get_event_loop()
-
- if loop.is_running():
- # Scheduling the coroutine to run and handling with the existing event loop
- loop.create_task(crypto_historical())
- else:
- # If the loop is not running, run until complete
- loop.run_until_complete(crypto_historical())
+ yield BinanceCryptoHistoricalFetcher().fetch_data(
+ params={"symbol": "ethbtc", "lifetime": 10},
+ credentials=None,
+ )
diff --git a/openbb_platform/providers/binance/openbb_binance/__init__.py b/openbb_platform/providers/binance/openbb_binance/__init__.py
index 444b35b9353..34ffffa6658 100644
--- a/openbb_platform/providers/binance/openbb_binance/__init__.py
+++ b/openbb_platform/providers/binance/openbb_binance/__init__.py
@@ -1,6 +1,8 @@
"""Biztoc provider module."""
-from openbb_binance.models.crypto_historical import BinanceStreamFetcher
+# from openbb_binance.models.crypto_historical import (
+# BinanceCryptoHistoricalFetcher,
+# )
from openbb_core.provider.abstract.provider import Provider
binance_provider = Provider(
@@ -17,7 +19,7 @@ If you're not a developer but would still like to use Biztoc outside of the main
we've partnered with OpenBB, allowing you to pull in BizToc's news stream in their Terminal.""",
# credentials=["api_key"],
fetcher_dict={
- "bcrypto_historical": BinanceStreamFetcher,
+ # "bcrypto_historical": BinanceCryptoHistoricalFetcher,
},
repr_name="Binance",
instructions="The BizToc API is hosted on RapidAPI. To set up, go to: https://rapidapi.com/thma/api/binance.\n\n![binance0](https://github.com/marban/OpenBBTerminal/assets/18151143/04cdd423-f65e-4ad8-ad5a-4a59b0f5ddda)\n\nIn the top right, select 'Sign Up'. After answering some questions, you will be prompted to select one of their plans.\n\n![binance1](https://github.com/marban/OpenBBTerminal/assets/18151143/9f3b72ea-ded7-48c5-aa33-bec5c0de8422)\n\nAfter signing up, navigate back to https://rapidapi.com/thma/api/binance. If you are logged in, you will see a header called X-RapidAPI-Key.\n\n![binance2](https://github.com/marban/OpenBBTerminal/assets/18151143/0f3b6c91-07e0-447a-90cd-a9e23522929f)", # noqa: E501 pylint: disable=line-too-long
diff --git a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
index f80b0709a74..b39cfa61ddd 100644
--- a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
+++ b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
@@ -1,19 +1,33 @@
"""Binance Crypto Historical WS Data."""
+import asyncio
import json
-from datetime import datetime
-from typing import Any, Dict, Optional
+from datetime import datetime, timedelta
+from typing import Any, Dict, List, Optional
from openbb_core.provider.standard_models.crypto_historical import (
CryptoHistoricalData,
+ CryptoHistoricalQueryParams,
)
from pydantic import Field
+import websockets
-from openbb_platform.core.openbb_core.provider.abstract.fetcher import StreamFetcher
+from openbb_platform.core.openbb_core.provider.abstract.fetcher import (
+ # StreamFetcher,
+ Fetcher,
+)
# pylint: disable=unused-kwargs
+class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams):
+ """Binance Crypto Historical Query Params"""
+
+ lifetime: Optional[int] = Field(
+ default=60, description="Lifetime of WebSocket in seconds"
+ )
+
+
class BinanceCryptoHistoricalData(CryptoHistoricalData):
"""Binance Crypto Historical Data."""
@@ -37,25 +51,71 @@ class BinanceCryptoHistoricalData(CryptoHistoricalData):
)
-class BinanceStreamFetcher(StreamFetcher):
- """Define Binance Stream Fetcher."""
+class BinanceCryptoHistoricalFetcher(Fetcher):
+ """Define Binance Crypto Historical Fetcher."""
+
+ @staticmethod
+ def transform_query(params: Dict[str, Any]) -> BinanceCryptoHistoricalQueryParams:
+ """Transform the query params."""
+ return BinanceCryptoHistoricalQueryParams(**params)
+
+ @staticmethod
+ async def aextract_data(
+ query: BinanceCryptoHistoricalQueryParams,
+ credentials: Optional[Dict[str, str]] = None,
+ **kwargs: Any,
+ ) -> BinanceCryptoHistoricalData:
+ """Return the raw data from the Binance endpoint."""
+ async with websockets.connect(
+ f"wss://stream.binance.com:9443/ws/{query.symbol.lower()}@miniTicker"
+ ) as websocket:
+ print("Connected to WebSocket server.")
+ end_time = datetime.now() + timedelta(seconds=query.lifetime)
+ try:
+ while datetime.now() < end_time:
+ message = await websocket.recv()
+ data = json.loads(message)
+ transformed_data = BinanceCryptoHistoricalFetcher.transform_data(
+ query, data
+ )
+ # print(transformed_data)
+ yield transformed_data
+ except websockets.exceptions.ConnectionClosed:
+ print("WebSocket connection closed.")
+ finally:
+ print("WebSocket connection closed.")
@staticmethod
- def transform_data(data: Dict[str, Any], **kwargs) -> BinanceCryptoHistoricalData:
- """Transform the incoming data."""
- if "date" not in data:
- data["date"] = datetime.now().isoformat()
+ def transform_data(
+ query: BinanceCryptoHistoricalQueryParams,
+ data: Dict[str, Any],
+ ) -> BinanceCryptoHistoricalData:
+ """Return the transformed data."""
+ data["date"] = (
+ datetime.now().isoformat() if "date" not in data else data["date"]
+ )
return BinanceCryptoHistoricalData(**data)
- @classmethod
- async def process_message(
- cls, message: str, **kwargs
- ) -> Optional[BinanceCryptoHistoricalData]:
- """Process incoming WebSocket messages."""
- try:
- json_data = json.loads(message)
- transformed_data = cls.transform_data(json_data)
- return transformed_data
- except Exception as e:
- print(f"Error processing message from Binance: {e}")
- return None
+
+# class BinanceStreamFetcher(StreamFetcher):
+# """Define Binance Stream Fetcher."""
+
+# @staticmethod
+# def transform_data(data: Dict[str, Any], **kwargs) -> BinanceCryptoHistoricalData:
+# """Transform the incoming data."""
+# if "date" not in data:
+# data["date"] = datetime.now().isoformat()
+# return BinanceCryptoHistoricalData(**data)
+
+# @classmethod
+# async def process_message(
+# cls, message: str, **kwargs
+# ) -> Optional[BinanceCryptoHistoricalData]:
+# """Process incoming WebSocket messages."""
+# try:
+# json_data = json.loads(message)
+# transformed_data = cls.transform_data(json_data)
+# return transformed_data
+# except Exception as e:
+# print(f"Error processing message from Binance: {e}")
+# return None