diff options
Diffstat (limited to 'openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py')
-rw-r--r-- | openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py | 286 |
1 files changed, 141 insertions, 145 deletions
diff --git a/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py b/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py index d68b413fa43..2da810d1acf 100644 --- a/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py +++ b/openbb_platform/providers/cboe/openbb_cboe/models/index_historical.py @@ -1,37 +1,54 @@ -"""CBOE Index Historical Model.""" +"""Cboe Market Indices Model.""" +import warnings from datetime import datetime, timedelta from typing import Any, Dict, List, Literal, Optional -import pandas as pd from openbb_cboe.utils.helpers import ( TICKER_EXCEPTIONS, - get_cboe_index_directory, - get_ticker_info, + get_index_directory, ) from openbb_core.provider.abstract.fetcher import Fetcher from openbb_core.provider.standard_models.index_historical import ( IndexHistoricalData, IndexHistoricalQueryParams, ) -from openbb_core.provider.utils.helpers import make_request +from openbb_core.provider.utils.descriptions import QUERY_DESCRIPTIONS +from openbb_core.provider.utils.errors import EmptyDataError +from openbb_core.provider.utils.helpers import amake_requests +from pandas import DataFrame, Series, concat, to_datetime from pydantic import Field +_warn = warnings.warn + class CboeIndexHistoricalQueryParams(IndexHistoricalQueryParams): - """CBOE Index Historical Query. + """CBOE Market Indices Query. Source: https://www.cboe.com/ """ - interval: Literal["1d", "1m"] = Field( - description="Use interval, 1m, for intraday prices during the most recent trading period.", + interval: Literal["1m", "1d"] = Field( default="1d", + description=( + QUERY_DESCRIPTIONS.get("interval", "") + + " The most recent trading day is not including in daily historical data." + + " Intraday data is only available for the most recent trading day at 1 minute intervals." + ), + ) + use_cache: bool = Field( + default=True, + description="When True, the company directories will be cached for 24 hours and are used to validate symbols." + + " The results of the function are not cached. Set as False to bypass.", ) class CboeIndexHistoricalData(IndexHistoricalData): - """CBOE Index Historical Data.""" + """CBOE Market Indices Data.""" + + __alias_dict__ = { + "volume": "stock_volume", + } calls_volume: Optional[float] = Field( default=None, @@ -57,156 +74,135 @@ class CboeIndexHistoricalFetcher( @staticmethod def transform_query(params: Dict[str, Any]) -> CboeIndexHistoricalQueryParams: - """Transform the query. Setting the start and end dates for a 1 year period.""" - return CboeIndexHistoricalQueryParams(**params) + """Transform the query.""" + transformed_params = params.copy() + now = datetime.now() + if ( + len(params.get("symbol", "").split(",")) > 1 + and params.get("start_date") is None + ): + transformed_params["start_date"] = ( + transformed_params["start_date"] + if transformed_params["start_date"] + else (now - timedelta(days=720)).strftime("%Y-%m-%d") + ) + if transformed_params.get("start_date") is None: + transformed_params["start_date"] = ( + transformed_params["start_date"] + if transformed_params.get("start_date") + else "1950-01-01" + ) + if params.get("end_date") is None: + transformed_params["end_date"] = ( + transformed_params["end_date"] + if transformed_params.get("end_date") + else now.strftime("%Y-%m-%d") + ) + + return CboeIndexHistoricalQueryParams(**transformed_params) @staticmethod - def extract_data( - query: CboeIndexHistoricalQueryParams, # pylint: disable=unused-argument + async def aextract_data( + query: CboeIndexHistoricalQueryParams, credentials: Optional[Dict[str, str]], **kwargs: Any, ) -> List[Dict]: - """Return the raw data from the CBOE endpoint.""" - # Symbol directories are cached for seven days and are used for error handling and URL generation. - INDEXES = get_cboe_index_directory().index.to_list() - query.symbol = query.symbol.upper() - data = pd.DataFrame() - if "^" in query.symbol: - query.symbol = query.symbol.replace("^", "") - query.interval = ( - "1m" if query.symbol == "NDX" and query.interval == "1d" else query.interval - ) + """Return the raw data from the Cboe endpoint.""" - now = datetime.now() - query.start_date = ( - query.start_date if query.start_date else now - timedelta(days=50000) - ) - query.end_date = query.end_date if query.end_date else now - - if query.symbol not in INDEXES and query.symbol not in TICKER_EXCEPTIONS: - raise RuntimeError( - f"The symbol, {query.symbol}, was not found in the CBOE index directory. " - "Use `index_search()` to find supported indices. If the index is European, try `european_index()`." - ) + symbols = query.symbol.split(",") + INDEXES = await get_index_directory(use_cache=query.use_cache) + INDEXES = INDEXES.set_index("index_symbol") + # Create a list of European indices. + EU_INDEXES = INDEXES[INDEXES["source"] == "eu_proprietary_index"] - def __generate_historical_prices_url( + INTERVAL_DICT = {"1m": "intraday", "1d": "historical"} + + def _generate_historical_prices_url( symbol, - data_type: Optional[Literal["intraday", "historical"]] = "historical", + interval_type: Literal["intraday", "historical"] = "historical", ) -> str: - """Generate the final URL for historical prices data.""" - url: str = ( - f"https://cdn.cboe.com/api/global/delayed_quotes/charts/{data_type}" - ) - url = ( - url + f"/_{symbol}.json" - if symbol in TICKER_EXCEPTIONS or symbol in INDEXES - else url + f"/{symbol}.json" - ) + """Generate the URL for the data.""" + if symbol.replace("^", "") in TICKER_EXCEPTIONS: + interval_type = "intraday" if len(symbols) == 1 else "historical" + _warn( + "Only the most recent trading day is available for this ticker, " + + symbol + ) + if symbol.replace("^", "") in EU_INDEXES.index: + base_url = "https://cdn.cboe.com/api/global/european_indices/" + url = ( + base_url + "index_history/" + if interval_type == "historical" + else base_url + "intraday_chart_data/" + ) + url += f"{symbol.replace('^', '')}.json" + else: + base_url: str = f"https://cdn.cboe.com/api/global/delayed_quotes/charts/{interval_type}" + url = ( + base_url + f"/_{symbol.replace('^', '')}.json" + if symbol.replace("^", "") in TICKER_EXCEPTIONS + or symbol.replace("^", "") in INDEXES.index + else base_url + f"/{symbol.replace('^', '')}.json" + ) + return url - url = ( - __generate_historical_prices_url(query.symbol, "intraday") - if query.interval == "1m" - else __generate_historical_prices_url(query.symbol) - ) - r = make_request(url) - - if r.status_code != 200: - raise RuntimeError(r.status_code) - - if query.interval == "1d": - data = ( - pd.DataFrame(r.json()["data"])[ - ["date", "open", "high", "low", "close", "volume"] - ] - ).set_index("date") - - # Fill in missing data from current or most recent trading session. - - today = pd.to_datetime(datetime.now().date()) - if today.weekday() > 4: - day_minus = today.weekday() - 4 - today = pd.to_datetime(today - timedelta(days=day_minus)) - if today != data.index[-1]: - _today = pd.Series(get_ticker_info(query.symbol)) - today_df = pd.Series(dtype="object") - today_df["open"] = round(_today["open"], 2) - today_df["high"] = round(_today["high"], 2) - today_df["low"] = round(_today["low"], 2) - today_df["close"] = round(_today["close"], 2) - if ( - query.symbol not in INDEXES - and query.symbol not in TICKER_EXCEPTIONS - ): - data = data[data["volume"] > 0] - today_df["volume"] = _today["volume"] - today_df["date"] = today.date() - today_df = pd.DataFrame(today_df).transpose().set_index("date") - - data = pd.concat([data, today_df], axis=0) - - # If ticker is an index there is no volume data and the types must be set. - - if query.symbol in INDEXES or query.symbol in TICKER_EXCEPTIONS: - data = data[["open", "high", "low", "close", "volume"]] - data["open"] = round(data.open.astype(float), 2) - data["high"] = round(data.high.astype(float), 2) - data["low"] = round(data.low.astype(float), 2) - data["close"] = round(data.close.astype(float), 2) - data["volume"] = 0 - - data.index = pd.to_datetime(data.index) - data = data[data["open"] > 0] - - data = data[ - (data.index >= pd.to_datetime(query.start_date)) - & (data.index <= pd.to_datetime(query.end_date)) - ] - - if query.interval == "1m": - data_list = r.json()["data"] - date: List[datetime] = [] - open_: List[float] = [] - high: List[float] = [] - low: List[float] = [] - close: List[float] = [] - volume: List[float] = [] - calls_volume: List[float] = [] - puts_volume: List[float] = [] - total_options_volume: List[float] = [] - - for data in data_list: - date.append(data["datetime"]) - price = data["price"] - volume_data = data["volume"] - open_.append(price["open"]) - high.append(price["high"]) - low.append(price["low"]) - close.append(price["close"]) - volume.append(volume_data["stock_volume"]) - calls_volume.append(volume_data["calls_volume"]) - puts_volume.append(volume_data["puts_volume"]) - total_options_volume.append(volume_data["total_options_volume"]) - - data = pd.DataFrame() - data["date"] = pd.to_datetime(date) - data["open"] = open_ - data["high"] = high - data["low"] = low - data["close"] = close - data["volume"] = volume - data["calls_volume"] = calls_volume - data["puts_volume"] = puts_volume - data["total_options_volume"] = total_options_volume - data = data.set_index("date").sort_index() - data.index = data.index.astype(str) - data = data[data["open"] > 0] - - return data.reset_index().to_dict("records") + urls = [ + _generate_historical_prices_url(symbol, INTERVAL_DICT[query.interval]) + for symbol in symbols + ] + + return await amake_requests(urls, **kwargs) @staticmethod def transform_data( query: CboeIndexHistoricalQueryParams, data: List[Dict], **kwargs: Any ) -> List[CboeIndexHistoricalData]: """Transform the data to the standard format.""" - return [CboeIndexHistoricalData.model_validate(d) for d in data] + if not data: + raise EmptyDataError() + results = DataFrame() + symbols = query.symbol.split(",") + # Results will be different depending on the interval. + # We will also parse the output from multiple symbols. + for i, item in enumerate(data): + result = DataFrame() + _symbol = symbols[i] + _temp = item["data"] + if query.interval == "1d": + result = DataFrame(_temp) + result["symbol"] = _symbol.replace("_", "").replace("^", "") + result = result.set_index("date") + # Remove the volume column if it exists because volume will a string 0. + if "volume" in result.columns: + result = result.drop(columns="volume") + results = concat([results, result]) + if query.interval == "1m": + _datetime = Series([d["datetime"] for d in _temp]).rename("date") + _price = DataFrame(d["price"] for d in _temp) + result = _price.join(_datetime) + result["symbol"] = _symbol.replace("_", "").replace("^", "") + result = result.set_index("date") + results = concat([results, result]) + results = results.set_index("symbol", append=True).sort_index() + + for c in ["open", "high", "low", "close"]: + if c in results.columns: + results[c] = results[c].astype(float).replace(0, None) + + output = results.dropna(how="all", axis=1).reset_index() + + # When there is only one ticker symbol, the symbol column is redundant. + if len(query.symbol.split(",")) == 1: + output = output.drop(columns="symbol") + # Finally, we apply the user-specified date range because it is not filtered at the source. + output = output[ + (to_datetime(output["date"]) >= to_datetime(query.start_date)) + & ( + to_datetime(output["date"]) + <= to_datetime(query.end_date + timedelta(days=1)) + ) + ] + return [ + CboeIndexHistoricalData.model_validate(d) for d in output.to_dict("records") + ] |