summaryrefslogtreecommitdiffstats
path: root/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py
diff options
context:
space:
mode:
Diffstat (limited to 'openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py')
-rw-r--r--openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py286
1 files changed, 141 insertions, 145 deletions
diff --git a/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py b/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py
index d68b413fa43..2da810d1acf 100644
--- a/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py
+++ b/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py
@@ -1,37 +1,54 @@
-"""CBOE Index Historical Model."""
+"""Cboe Market Indices Model."""
+import warnings
from datetime import datetime, timedelta
from typing import Any, Dict, List, Literal, Optional
-import pandas as pd
from openbb_cboe.utils.helpers import (
TICKER_EXCEPTIONS,
- get_cboe_index_directory,
- get_ticker_info,
+ get_index_directory,
)
from openbb_core.provider.abstract.fetcher import Fetcher
from openbb_core.provider.standard_models.index_historical import (
IndexHistoricalData,
IndexHistoricalQueryParams,
)
-from openbb_core.provider.utils.helpers import make_request
+from openbb_core.provider.utils.descriptions import QUERY_DESCRIPTIONS
+from openbb_core.provider.utils.errors import EmptyDataError
+from openbb_core.provider.utils.helpers import amake_requests
+from pandas import DataFrame, Series, concat, to_datetime
from pydantic import Field
+_warn = warnings.warn
+
class CboeIndexHistoricalQueryParams(IndexHistoricalQueryParams):
- """CBOE Index Historical Query.
+ """CBOE Market Indices Query.
Source: https://www.cboe.com/
"""
- interval: Literal["1d", "1m"] = Field(
- description="Use interval, 1m, for intraday prices during the most recent trading period.",
+ interval: Literal["1m", "1d"] = Field(
default="1d",
+ description=(
+ QUERY_DESCRIPTIONS.get("interval", "")
+ + " The most recent trading day is not including in daily historical data."
+ + " Intraday data is only available for the most recent trading day at 1 minute intervals."
+ ),
+ )
+ use_cache: bool = Field(
+ default=True,
+ description="When True, the company directories will be cached for 24 hours and are used to validate symbols."
+ + " The results of the function are not cached. Set as False to bypass.",
)
class CboeIndexHistoricalData(IndexHistoricalData):
- """CBOE Index Historical Data."""
+ """CBOE Market Indices Data."""
+
+ __alias_dict__ = {
+ "volume": "stock_volume",
+ }
calls_volume: Optional[float] = Field(
default=None,
@@ -57,156 +74,135 @@ class CboeIndexHistoricalFetcher(
@staticmethod
def transform_query(params: Dict[str, Any]) -> CboeIndexHistoricalQueryParams:
- """Transform the query. Setting the start and end dates for a 1 year period."""
- return CboeIndexHistoricalQueryParams(**params)
+ """Transform the query."""
+ transformed_params = params.copy()
+ now = datetime.now()
+ if (
+ len(params.get("symbol", "").split(",")) > 1
+ and params.get("start_date") is None
+ ):
+ transformed_params["start_date"] = (
+ transformed_params["start_date"]
+ if transformed_params["start_date"]
+ else (now - timedelta(days=720)).strftime("%Y-%m-%d")
+ )
+ if transformed_params.get("start_date") is None:
+ transformed_params["start_date"] = (
+ transformed_params["start_date"]
+ if transformed_params.get("start_date")
+ else "1950-01-01"
+ )
+ if params.get("end_date") is None:
+ transformed_params["end_date"] = (
+ transformed_params["end_date"]
+ if transformed_params.get("end_date")
+ else now.strftime("%Y-%m-%d")
+ )
+
+ return CboeIndexHistoricalQueryParams(**transformed_params)
@staticmethod
- def extract_data(
- query: CboeIndexHistoricalQueryParams, # pylint: disable=unused-argument
+ async def aextract_data(
+ query: CboeIndexHistoricalQueryParams,
credentials: Optional[Dict[str, str]],
**kwargs: Any,
) -> List[Dict]:
- """Return the raw data from the CBOE endpoint."""
- # Symbol directories are cached for seven days and are used for error handling and URL generation.
- INDEXES = get_cboe_index_directory().index.to_list()
- query.symbol = query.symbol.upper()
- data = pd.DataFrame()
- if "^" in query.symbol:
- query.symbol = query.symbol.replace("^", "")
- query.interval = (
- "1m" if query.symbol == "NDX" and query.interval == "1d" else query.interval
- )
+ """Return the raw data from the Cboe endpoint."""
- now = datetime.now()
- query.start_date = (
- query.start_date if query.start_date else now - timedelta(days=50000)
- )
- query.end_date = query.end_date if query.end_date else now
-
- if query.symbol not in INDEXES and query.symbol not in TICKER_EXCEPTIONS:
- raise RuntimeError(
- f"The symbol, {query.symbol}, was not found in the CBOE index directory. "
- "Use `index_search()` to find supported indices. If the index is European, try `european_index()`."
- )
+ symbols = query.symbol.split(",")
+ INDEXES = await get_index_directory(use_cache=query.use_cache)
+ INDEXES = INDEXES.set_index("index_symbol")
+ # Create a list of European indices.
+ EU_INDEXES = INDEXES[INDEXES["source"] == "eu_proprietary_index"]
- def __generate_historical_prices_url(
+ INTERVAL_DICT = {"1m": "intraday", "1d": "historical"}
+
+ def _generate_historical_prices_url(
symbol,
- data_type: Optional[Literal["intraday", "historical"]] = "historical",
+ interval_type: Literal["intraday", "historical"] = "historical",
) -> str:
- """Generate the final URL for historical prices data."""
- url: str = (
- f"https://cdn.cboe.com/api/global/delayed_quotes/charts/{data_type}"
- )
- url = (
- url + f"/_{symbol}.json"
- if symbol in TICKER_EXCEPTIONS or symbol in INDEXES
- else url + f"/{symbol}.json"
- )
+ """Generate the URL for the data."""
+ if symbol.replace("^", "") in TICKER_EXCEPTIONS:
+ interval_type = "intraday" if len(symbols) == 1 else "historical"
+ _warn(
+ "Only the most recent trading day is available for this ticker, "
+ + symbol
+ )
+ if symbol.replace("^", "") in EU_INDEXES.index:
+ base_url = "https://cdn.cboe.com/api/global/european_indices/"
+ url = (
+ base_url + "index_history/"
+ if interval_type == "historical"
+ else base_url + "intraday_chart_data/"
+ )
+ url += f"{symbol.replace('^', '')}.json"
+ else:
+ base_url: str = f"https://cdn.cboe.com/api/global/delayed_quotes/charts/{interval_type}"
+ url = (
+ base_url + f"/_{symbol.replace('^', '')}.json"
+ if symbol.replace("^", "") in TICKER_EXCEPTIONS
+ or symbol.replace("^", "") in INDEXES.index
+ else base_url + f"/{symbol.replace('^', '')}.json"
+ )
+
return url
- url = (
- __generate_historical_prices_url(query.symbol, "intraday")
- if query.interval == "1m"
- else __generate_historical_prices_url(query.symbol)
- )
- r = make_request(url)
-
- if r.status_code != 200:
- raise RuntimeError(r.status_code)
-
- if query.interval == "1d":
- data = (
- pd.DataFrame(r.json()["data"])[
- ["date", "open", "high", "low", "close", "volume"]
- ]
- ).set_index("date")
-
- # Fill in missing data from current or most recent trading session.
-
- today = pd.to_datetime(datetime.now().date())
- if today.weekday() > 4:
- day_minus = today.weekday() - 4
- today = pd.to_datetime(today - timedelta(days=day_minus))
- if today != data.index[-1]:
- _today = pd.Series(get_ticker_info(query.symbol))
- today_df = pd.Series(dtype="object")
- today_df["open"] = round(_today["open"], 2)
- today_df["high"] = round(_today["high"], 2)
- today_df["low"] = round(_today["low"], 2)
- today_df["close"] = round(_today["close"], 2)
- if (
- query.symbol not in INDEXES
- and query.symbol not in TICKER_EXCEPTIONS
- ):
- data = data[data["volume"] > 0]
- today_df["volume"] = _today["volume"]
- today_df["date"] = today.date()
- today_df = pd.DataFrame(today_df).transpose().set_index("date")
-
- data = pd.concat([data, today_df], axis=0)
-
- # If ticker is an index there is no volume data and the types must be set.
-
- if query.symbol in INDEXES or query.symbol in TICKER_EXCEPTIONS:
- data = data[["open", "high", "low", "close", "volume"]]
- data["open"] = round(data.open.astype(float), 2)
- data["high"] = round(data.high.astype(float), 2)
- data["low"] = round(data.low.astype(float), 2)
- data["close"] = round(data.close.astype(float), 2)
- data["volume"] = 0
-
- data.index = pd.to_datetime(data.index)
- data = data[data["open"] > 0]
-
- data = data[
- (data.index >= pd.to_datetime(query.start_date))
- & (data.index <= pd.to_datetime(query.end_date))
- ]
-
- if query.interval == "1m":
- data_list = r.json()["data"]
- date: List[datetime] = []
- open_: List[float] = []
- high: List[float] = []
- low: List[float] = []
- close: List[float] = []
- volume: List[float] = []
- calls_volume: List[float] = []
- puts_volume: List[float] = []
- total_options_volume: List[float] = []
-
- for data in data_list:
- date.append(data["datetime"])
- price = data["price"]
- volume_data = data["volume"]
- open_.append(price["open"])
- high.append(price["high"])
- low.append(price["low"])
- close.append(price["close"])
- volume.append(volume_data["stock_volume"])
- calls_volume.append(volume_data["calls_volume"])
- puts_volume.append(volume_data["puts_volume"])
- total_options_volume.append(volume_data["total_options_volume"])
-
- data = pd.DataFrame()
- data["date"] = pd.to_datetime(date)
- data["open"] = open_
- data["high"] = high
- data["low"] = low
- data["close"] = close
- data["volume"] = volume
- data["calls_volume"] = calls_volume
- data["puts_volume"] = puts_volume
- data["total_options_volume"] = total_options_volume
- data = data.set_index("date").sort_index()
- data.index = data.index.astype(str)
- data = data[data["open"] > 0]
-
- return data.reset_index().to_dict("records")
+ urls = [
+ _generate_historical_prices_url(symbol, INTERVAL_DICT[query.interval])
+ for symbol in symbols
+ ]
+
+ return await amake_requests(urls, **kwargs)
@staticmethod
def transform_data(
query: CboeIndexHistoricalQueryParams, data: List[Dict], **kwargs: Any
) -> List[CboeIndexHistoricalData]:
"""Transform the data to the standard format."""
- return [CboeIndexHistoricalData.model_validate(d) for d in data]
+ if not data:
+ raise EmptyDataError()
+ results = DataFrame()
+ symbols = query.symbol.split(",")
+ # Results will be different depending on the interval.
+ # We will also parse the output from multiple symbols.
+ for i, item in enumerate(data):
+ result = DataFrame()
+ _symbol = symbols[i]
+ _temp = item["data"]
+ if query.interval == "1d":
+ result = DataFrame(_temp)
+ result["symbol"] = _symbol.replace("_", "").replace("^", "")
+ result = result.set_index("date")
+ # Remove the volume column if it exists because volume will a string 0.
+ if "volume" in result.columns:
+ result = result.drop(columns="volume")
+ results = concat([results, result])
+ if query.interval == "1m":
+ _datetime = Series([d["datetime"] for d in _temp]).rename("date")
+ _price = DataFrame(d["price"] for d in _temp)
+ result = _price.join(_datetime)
+ result["symbol"] = _symbol.replace("_", "").replace("^", "")
+ result = result.set_index("date")
+ results = concat([results, result])
+ results = results.set_index("symbol", append=True).sort_index()
+
+ for c in ["open", "high", "low", "close"]:
+ if c in results.columns:
+ results[c] = results[c].astype(float).replace(0, None)
+
+ output = results.dropna(how="all", axis=1).reset_index()
+
+ # When there is only one ticker symbol, the symbol column is redundant.
+ if len(query.symbol.split(",")) == 1:
+ output = output.drop(columns="symbol")
+ # Finally, we apply the user-specified date range because it is not filtered at the source.
+ output = output[
+ (to_datetime(output["date"]) >= to_datetime(query.start_date))
+ & (
+ to_datetime(output["date"])
+ <= to_datetime(query.end_date + timedelta(days=1))
+ )
+ ]
+ return [
+ CboeIndexHistoricalData.model_validate(d) for d in output.to_dict("records")
+ ]