summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Radovanovic <74266147+IgorWounds@users.noreply.github.com>2024-05-25 15:09:29 +0200
committerIgor Radovanovic <74266147+IgorWounds@users.noreply.github.com>2024-05-25 15:09:29 +0200
commit459bb70c983001736f56fb2467664916c2f15c4e (patch)
tree2e56926fa3e45bad5e9a638fee9e79a505af8f93
parent1286f07fdd6236afa2f4eb7b8c8c6d29fdaf5c4d (diff)
Closer
-rw-r--r--openbb_platform/core/openbb_core/app/model/obbject.py5
-rw-r--r--openbb_platform/core/openbb_core/provider/abstract/fetcher.py18
-rw-r--r--openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py15
-rw-r--r--openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py42
4 files changed, 35 insertions, 45 deletions
diff --git a/openbb_platform/core/openbb_core/app/model/obbject.py b/openbb_platform/core/openbb_core/app/model/obbject.py
index 67f41e9d15d..0a1d09d01a9 100644
--- a/openbb_platform/core/openbb_core/app/model/obbject.py
+++ b/openbb_platform/core/openbb_core/app/model/obbject.py
@@ -275,6 +275,11 @@ class OBBject(Tagged, Generic[T]):
show_function: Callable = getattr(self.chart.fig, "show")
show_function(**kwargs)
+ async def stream(self) -> None:
+ """Stream response."""
+ async for result in self.results: # pylint: disable=not-an-iterable
+ print(result)
+
@classmethod
async def from_query(cls, query: "Query") -> "OBBject":
"""Create OBBject from query.
diff --git a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py
index 934ac1f233f..8f16e8b7bfa 100644
--- a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py
+++ b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py
@@ -3,8 +3,6 @@
# ruff: noqa: S101, E501
# pylint: disable=E1101, C0301
-import asyncio
-import json
from typing import (
Any,
Dict,
@@ -16,7 +14,6 @@ from typing import (
get_origin,
)
-import websockets
from pandas import DataFrame
from openbb_core.provider.abstract.annotated_result import AnnotatedResult
@@ -92,6 +89,21 @@ class Fetcher(Generic[Q, R]):
)
return cls.transform_data(query=query, data=data, **kwargs)
+ @classmethod
+ async def stream_data(
+ cls,
+ params: Dict[str, Any],
+ credentials: Optional[Dict[str, str]] = None,
+ **kwargs,
+ ) -> Union[R, AnnotatedResult[R]]:
+ """Fetch data from a provider."""
+ query = cls.transform_query(params=params)
+ data = await maybe_coroutine(
+ cls.aextract_data, query=query, credentials=credentials, **kwargs
+ )
+ async for d in data:
+ yield d
+
@classproperty
def query_params_type(self) -> Q:
"""Get the type of query."""
diff --git a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
index 258be3cec44..863e7dcbedc 100644
--- a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
+++ b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
@@ -12,7 +12,6 @@ from openbb_core.app.provider_interface import (
)
from openbb_core.app.query import Query
from openbb_core.app.router import Router
-
from providers.binance.openbb_binance.models.crypto_historical import (
BinanceCryptoHistoricalFetcher,
)
@@ -41,11 +40,13 @@ async def search(
return await OBBject.from_query(Query(**locals()))
-# pylint: disable=unused-argument
-@router.command(method=["POST"])
-async def crypto_historical():
- "Define the POC."
- yield BinanceCryptoHistoricalFetcher().fetch_data(
- params={"symbol": "ethbtc", "lifetime": 10},
+@router.command(
+ methods=["GET"],
+)
+async def stream_price(symbol: str = "ethbtc", lifetime: int = 10) -> OBBject:
+ """Define the POC."""
+ generator = BinanceCryptoHistoricalFetcher().stream_data(
+ params={"symbol": symbol, "lifetime": lifetime},
credentials=None,
)
+ return OBBject(results=generator, provider="binance")
diff --git a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
index b39cfa61ddd..bf34da35e44 100644
--- a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
+++ b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
@@ -1,27 +1,23 @@
"""Binance Crypto Historical WS Data."""
-import asyncio
import json
from datetime import datetime, timedelta
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, Optional
+import websockets
from openbb_core.provider.standard_models.crypto_historical import (
CryptoHistoricalData,
CryptoHistoricalQueryParams,
)
from pydantic import Field
-import websockets
-from openbb_platform.core.openbb_core.provider.abstract.fetcher import (
- # StreamFetcher,
- Fetcher,
-)
+from openbb_platform.core.openbb_core.provider.abstract.fetcher import Fetcher
-# pylint: disable=unused-kwargs
+# pylint: disable=unused-argument, arguments-differ
class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams):
- """Binance Crypto Historical Query Params"""
+ """Binance Crypto Historical Query Params."""
lifetime: Optional[int] = Field(
default=60, description="Lifetime of WebSocket in seconds"
@@ -78,10 +74,10 @@ class BinanceCryptoHistoricalFetcher(Fetcher):
transformed_data = BinanceCryptoHistoricalFetcher.transform_data(
query, data
)
- # print(transformed_data)
yield transformed_data
- except websockets.exceptions.ConnectionClosed:
+ except websockets.exceptions.ConnectionClosed as e:
print("WebSocket connection closed.")
+ raise e
finally:
print("WebSocket connection closed.")
@@ -95,27 +91,3 @@ class BinanceCryptoHistoricalFetcher(Fetcher):
datetime.now().isoformat() if "date" not in data else data["date"]
)
return BinanceCryptoHistoricalData(**data)
-
-
-# class BinanceStreamFetcher(StreamFetcher):
-# """Define Binance Stream Fetcher."""
-
-# @staticmethod
-# def transform_data(data: Dict[str, Any], **kwargs) -> BinanceCryptoHistoricalData:
-# """Transform the incoming data."""
-# if "date" not in data:
-# data["date"] = datetime.now().isoformat()
-# return BinanceCryptoHistoricalData(**data)
-
-# @classmethod
-# async def process_message(
-# cls, message: str, **kwargs
-# ) -> Optional[BinanceCryptoHistoricalData]:
-# """Process incoming WebSocket messages."""
-# try:
-# json_data = json.loads(message)
-# transformed_data = cls.transform_data(json_data)
-# return transformed_data
-# except Exception as e:
-# print(f"Error processing message from Binance: {e}")
-# return None