summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Radovanovic <74266147+IgorWounds@users.noreply.github.com>2024-05-27 14:07:41 +0200
committerIgor Radovanovic <74266147+IgorWounds@users.noreply.github.com>2024-05-27 14:07:41 +0200
commit38f4316749b9976bb83a727cef96d5e0d572a1d7 (patch)
tree5d65e30189ecb5f04198ca61cad210b2455b6947
parent459bb70c983001736f56fb2467664916c2f15c4e (diff)
Closer
-rw-r--r--openbb_platform/core/openbb_core/api/router/commands.py13
-rw-r--r--openbb_platform/core/openbb_core/app/command_runner.py32
-rw-r--r--openbb_platform/core/openbb_core/app/model/obbject.py5
-rw-r--r--openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py3
-rw-r--r--openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py31
5 files changed, 44 insertions, 40 deletions
diff --git a/openbb_platform/core/openbb_core/api/router/commands.py b/openbb_platform/core/openbb_core/api/router/commands.py
index ce1cab484a8..16b1380adf6 100644
--- a/openbb_platform/core/openbb_core/api/router/commands.py
+++ b/openbb_platform/core/openbb_core/api/router/commands.py
@@ -3,9 +3,10 @@
import inspect
from functools import partial, wraps
from inspect import Parameter, Signature, signature
-from typing import Any, Callable, Dict, Optional, Tuple, TypeVar
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, Union
from fastapi import APIRouter, Depends, Header
+from fastapi.responses import StreamingResponse
from fastapi.routing import APIRoute
from openbb_core.app.command_runner import CommandRunner
from openbb_core.app.model.command_context import CommandContext
@@ -188,7 +189,9 @@ def build_api_wrapper(
func.__annotations__ = new_annotations_map
@wraps(wrapped=func)
- async def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> OBBject:
+ async def wrapper(
+ *args: Tuple[Any], **kwargs: Dict[str, Any]
+ ) -> Union[OBBject, StreamingResponse]:
user_settings: UserSettings = UserSettings.model_validate(
kwargs.pop(
"__authenticated_user_settings",
@@ -196,9 +199,11 @@ def build_api_wrapper(
)
)
execute = partial(command_runner.run, path, user_settings)
- output: OBBject = await execute(*args, **kwargs)
+ output = await execute(*args, **kwargs)
- return validate_output(output)
+ if isinstance(output, OBBject):
+ return validate_output(output)
+ return output
return wrapper
diff --git a/openbb_platform/core/openbb_core/app/command_runner.py b/openbb_platform/core/openbb_core/app/command_runner.py
index 89433e45156..7a5df24dea0 100644
--- a/openbb_platform/core/openbb_core/app/command_runner.py
+++ b/openbb_platform/core/openbb_core/app/command_runner.py
@@ -8,9 +8,10 @@ from datetime import datetime
from inspect import Parameter, signature
from sys import exc_info
from time import perf_counter_ns
-from typing import Any, Callable, Dict, List, Optional, Tuple, Type
+from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from warnings import catch_warnings, showwarning, warn
+from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ConfigDict, create_model
from openbb_core.app.logs.logging_service import LoggingService
@@ -420,7 +421,7 @@ class StaticCommandRunner:
/,
*args,
**kwargs,
- ) -> OBBject:
+ ) -> Union[OBBject, StreamingResponse]:
"""Run a command and return the OBBject as output."""
timestamp = datetime.now()
start_ns = perf_counter_ns()
@@ -429,7 +430,7 @@ class StaticCommandRunner:
route = execution_context.route
if func := command_map.get_command(route=route):
- obbject = await cls._execute_func(
+ result = await cls._execute_func(
route=route,
args=args, # type: ignore
execution_context=execution_context,
@@ -442,19 +443,20 @@ class StaticCommandRunner:
duration = perf_counter_ns() - start_ns
if execution_context.user_settings.preferences.metadata:
- try:
- obbject.extra["metadata"] = Metadata(
- arguments=kwargs,
- duration=duration,
- route=route,
- timestamp=timestamp,
- )
- except Exception as e:
- if Env().DEBUG_MODE:
- raise OpenBBError(e) from e
- warn(str(e), OpenBBWarning)
+ if isinstance(result, OBBject):
+ try:
+ result.extra["metadata"] = Metadata(
+ arguments=kwargs,
+ duration=duration,
+ route=route,
+ timestamp=timestamp,
+ )
+ except Exception as e:
+ if Env().DEBUG_MODE:
+ raise OpenBBError(e) from e
+ warn(str(e), OpenBBWarning)
- return obbject
+ return result
class CommandRunner:
diff --git a/openbb_platform/core/openbb_core/app/model/obbject.py b/openbb_platform/core/openbb_core/app/model/obbject.py
index 0a1d09d01a9..67f41e9d15d 100644
--- a/openbb_platform/core/openbb_core/app/model/obbject.py
+++ b/openbb_platform/core/openbb_core/app/model/obbject.py
@@ -275,11 +275,6 @@ class OBBject(Tagged, Generic[T]):
show_function: Callable = getattr(self.chart.fig, "show")
show_function(**kwargs)
- async def stream(self) -> None:
- """Stream response."""
- async for result in self.results: # pylint: disable=not-an-iterable
- print(result)
-
@classmethod
async def from_query(cls, query: "Query") -> "OBBject":
"""Create OBBject from query.
diff --git a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
index 863e7dcbedc..b27eaf0c5d9 100644
--- a/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
+++ b/openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
@@ -17,6 +17,7 @@ from providers.binance.openbb_binance.models.crypto_historical import (
)
from openbb_crypto.price.price_router import router as price_router
+from fastapi.responses import StreamingResponse
router = Router(prefix="", description="Cryptocurrency market data.")
router.include_router(price_router)
@@ -49,4 +50,4 @@ async def stream_price(symbol: str = "ethbtc", lifetime: int = 10) -> OBBject:
params={"symbol": symbol, "lifetime": lifetime},
credentials=None,
)
- return OBBject(results=generator, provider="binance")
+ return StreamingResponse(generator, media_type="application/x-ndjson")
diff --git a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
index bf34da35e44..3c8d440c166 100644
--- a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
+++ b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py
@@ -2,7 +2,7 @@
import json
from datetime import datetime, timedelta
-from typing import Any, Dict, Optional
+from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional
import websockets
from openbb_core.provider.standard_models.crypto_historical import (
@@ -20,7 +20,7 @@ class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams):
"""Binance Crypto Historical Query Params."""
lifetime: Optional[int] = Field(
- default=60, description="Lifetime of WebSocket in seconds"
+ default=60, description="Lifetime of WebSocket in seconds."
)
@@ -56,11 +56,23 @@ class BinanceCryptoHistoricalFetcher(Fetcher):
return BinanceCryptoHistoricalQueryParams(**params)
@staticmethod
+ def transform_data(
+ query: BinanceCryptoHistoricalQueryParams,
+ data: Dict[str, Any],
+ ) -> BinanceCryptoHistoricalData:
+ """Return the transformed data."""
+ data["date"] = (
+ datetime.now().isoformat() if "date" not in data else data["date"]
+ )
+
+ return BinanceCryptoHistoricalData(**data)
+
+ @staticmethod
async def aextract_data(
query: BinanceCryptoHistoricalQueryParams,
credentials: Optional[Dict[str, str]] = None,
**kwargs: Any,
- ) -> BinanceCryptoHistoricalData:
+ ) -> AsyncIterator[str]:
"""Return the raw data from the Binance endpoint."""
async with websockets.connect(
f"wss://stream.binance.com:9443/ws/{query.symbol.lower()}@miniTicker"
@@ -74,20 +86,9 @@ class BinanceCryptoHistoricalFetcher(Fetcher):
transformed_data = BinanceCryptoHistoricalFetcher.transform_data(
query, data
)
- yield transformed_data
+ yield transformed_data.model_dump_json() + "\n"
except websockets.exceptions.ConnectionClosed as e:
print("WebSocket connection closed.")
raise e
finally:
print("WebSocket connection closed.")
-
- @staticmethod
- def transform_data(
- query: BinanceCryptoHistoricalQueryParams,
- data: Dict[str, Any],
- ) -> BinanceCryptoHistoricalData:
- """Return the transformed data."""
- data["date"] = (
- datetime.now().isoformat() if "date" not in data else data["date"]
- )
- return BinanceCryptoHistoricalData(**data)