diff options
Diffstat (limited to 'openbb_platform/providers/intrinio/openbb_intrinio/models/options_snapshots.py')
-rw-r--r-- | openbb_platform/providers/intrinio/openbb_intrinio/models/options_snapshots.py | 329 |
1 files changed, 329 insertions, 0 deletions
diff --git a/openbb_platform/providers/intrinio/openbb_intrinio/models/options_snapshots.py b/openbb_platform/providers/intrinio/openbb_intrinio/models/options_snapshots.py new file mode 100644 index 00000000000..cee1273238f --- /dev/null +++ b/openbb_platform/providers/intrinio/openbb_intrinio/models/options_snapshots.py @@ -0,0 +1,329 @@ +"""Intrinio Options Snapshots Model.""" + +# pylint: disable=unused-argument + +import gzip +from datetime import ( + date as dateType, + datetime, +) +from io import BytesIO +from typing import Any, Dict, List, Optional, Union + +import numpy as np +from openbb_core.app.model.abstract.error import OpenBBError +from openbb_core.provider.abstract.fetcher import Fetcher +from openbb_core.provider.standard_models.options_snapshots import ( + OptionsSnapshotsData, + OptionsSnapshotsQueryParams, +) +from openbb_core.provider.utils.helpers import amake_request +from pandas import DataFrame, NaT, Series, read_csv, to_datetime +from pydantic import Field +from pytz import timezone + + +class IntrinioOptionsSnapshotsQueryParams(OptionsSnapshotsQueryParams): + """Intrinio Options Snapshots Query. + + Source: https://docs.intrinio.com/documentation/web_api/get_options_snapshots_v2 + """ + + date: Optional[Union[dateType, datetime, str]] = Field( + default=None, + description="The date of the data. Can be a datetime or an ISO datetime string." + + " Data appears to go back to around 2022-06-01" + + " Example: '2024-03-08T12:15:00+0400'", + ) + only_traded: bool = Field( + default=True, + description="Only include options that have been traded during the session, default is True." + + " Setting to false will dramatically increase the size of the response - use with caution.", + ) + + +class IntrinioOptionsSnapshotsData(OptionsSnapshotsData): + """Intrinio Options Snapshots Data. Warning: This is a large file.""" + + bid: Optional[float] = Field( + default=None, + description="The last bid price at the time.", + json_schema_extra={"x-unit_measurement": "currency"}, + ) + bid_size: Optional[int] = Field( + default=None, + description="The size of the last bid price.", + ) + bid_timestamp: Optional[datetime] = Field( + default=None, + description="The timestamp of the last bid price.", + ) + ask: Optional[float] = Field( + default=None, + description="The last ask price at the time.", + json_schema_extra={"x-unit_measurement": "currency"}, + ) + ask_size: Optional[int] = Field( + default=None, + description="The size of the last ask price.", + ) + ask_timestamp: Optional[datetime] = Field( + default=None, + description="The timestamp of the last ask price.", + ) + total_bid_volume: Optional[int] = Field( + default=None, + description="Total volume of bids.", + ) + bid_high: Optional[float] = Field( + default=None, + description="The highest bid price.", + json_schema_extra={"x-unit_measurement": "currency"}, + ) + bid_low: Optional[float] = Field( + default=None, + description="The lowest bid price.", + json_schema_extra={"x-unit_measurement": "currency"}, + ) + total_ask_volume: Optional[int] = Field( + default=None, + description="Total volume of asks.", + ) + ask_high: Optional[float] = Field( + default=None, + description="The highest ask price.", + json_schema_extra={"x-unit_measurement": "currency"}, + ) + ask_low: Optional[float] = Field( + default=None, + description="The lowest ask price.", + json_schema_extra={"x-unit_measurement": "currency"}, + ) + + +class IntrinioOptionsSnapshotsFetcher( + Fetcher[ + IntrinioOptionsSnapshotsQueryParams, + List[IntrinioOptionsSnapshotsData], + ] +): + """Intrinio Options Snapshots Fetcher.""" + + @staticmethod + def transform_query(params: Dict[str, Any]) -> IntrinioOptionsSnapshotsQueryParams: + """Transform the query params.""" + transformed_params = params.copy() + if "date" in transformed_params: + if isinstance(transformed_params["date"], datetime): + dt = transformed_params["date"] + dt = dt.astimezone(tz=timezone("America/New_York")) + if isinstance(transformed_params["date"], dateType): + dt = transformed_params["date"] # type: ignore + if isinstance(dt, dateType): + dt = datetime( + dt.year, + dt.month, + dt.day, + 16, + 15, + 0, + 0, + tzinfo=timezone("America/New_York"), + ) + if isinstance(transformed_params["date"], str): + dt = datetime.fromisoformat(transformed_params["date"]) + else: + try: + dt = datetime.fromisoformat(str(transformed_params["date"])) # type: ignore + except ValueError as exc: + raise OpenBBError( + "Invalid date format. Please use '2024-03-08T12:15-0400'." + ) from exc + + transformed_params["date"] = ( + dt.strftime("%Y-%m-%dT%H:%M:%S.%f%z") + .replace("+", "-") + .replace("T00:", "T20:") + if isinstance(dt, datetime) + else dt + ) + return IntrinioOptionsSnapshotsQueryParams(**transformed_params) + + @staticmethod + async def aextract_data( + query: IntrinioOptionsSnapshotsQueryParams, + credentials: Optional[Dict[str, str]], + **kwargs: Any, + ) -> DataFrame: + """Return the raw data from the Intrinio endpoint.""" + api_key = credentials.get("intrinio_api_key") if credentials else "" + + # This gets the URL to the actual file. + url = f"https://api-v2.intrinio.com/options/snapshots?api_key={api_key}" + if query.date: + url += f"&at_datetime={query.date}" + + try: + response = await amake_request(url, **kwargs) + except Exception as exc: + raise OpenBBError("Could not fetch data from Intrinio.") from exc + + if isinstance(response, dict) and "error" in response: + raise OpenBBError( + f"{response.get('error')}. Message: {response.get('message')}" + ) + urls = [] + # Get the URL to the CSV file. + if response.get("snapshots"): # type: ignore + for d in response["snapshots"]: # type: ignore + if d.get("files"): + for f in d["files"]: + if f.get("url"): + urls.append(f.get("url")) + if not urls: + raise OpenBBError("No snapshots found.") + + async def response_callback(response, _): + """Response Callback.""" + return await response.read() + + async def get_csv(url) -> DataFrame: + """Return the CSV data.""" + try: + response = await amake_request( + url, response_callback=response_callback, **kwargs + ) + df = DataFrame() + if isinstance(response, bytes): + file = gzip.decompress(response) + df = read_csv(BytesIO(file)) + + return df + + except Exception as exc: + try: + df = read_csv(response) + return df + except Exception: + raise OpenBBError("Could not read file from URL.") from exc + + # There should only be one URL with this bulk data. + return await get_csv(urls[0]) + + @staticmethod + def transform_data( + query: IntrinioOptionsSnapshotsQueryParams, + data: DataFrame, + **kwargs: Any, + ) -> List[IntrinioOptionsSnapshotsData]: + """Return the transformed data.""" + df = data + if df.empty: + raise OpenBBError("Empty CSV file") + COL_MAP = { + "CONTRACT ID": "contract_symbol", + "OPEN INTEREST": "open_interest", + "TRADE PRICE": "last_price", + "TRADE SIZE": "last_size", + "TOTAL TRADE VOLUME": "volume", + "LAST TRADE TIMESTAMP": "last_timestamp", + "TRADE HIGH PRICE": "high", + "TRADE LOW PRICE": "low", + "ASK PRICE": "ask", + "ASK SIZE": "ask_size", + "LAST ASK TIMESTAMP": "ask_timestamp", + "BID PRICE": "bid", + "BID SIZE": "bid_size", + "LAST BID TIMESTAMP": "bid_timestamp", + "TOTAL ASK VOLUME": "total_ask_volume", + "ASK HIGH PRICE": "ask_high", + "ASK LOW PRICE": "ask_low", + "TOTAL BID VOLUME": "total_bid_volume", + "BID HIGH PRICE": "bid_high", + "BID LOW PRICE": "bid_low", + } + df = df.rename(columns=COL_MAP) + to_drop_na = ( + ["bid_timestamp", "ask_timestamp", "last_timestamp"] + if query.only_traded is True + else ["bid_timestamp", "ask_timestamp"] + ) + df = df.dropna(subset=to_drop_na + ["contract_symbol"]) + for col in ["last_timestamp", "bid_timestamp", "ask_timestamp"]: + # Convert Unix timestamp to tz-aware datetime + df[col] = ( + to_datetime(df[col].replace("", None).astype(float), unit="s") + .dt.tz_localize(timezone("UTC")) + .dt.tz_convert(timezone("America/New_York")) + .dt.floor("s") + ) + + # Extract the underlying symbol, expiration, option type, and strike. + symbols = Series(df["contract_symbol"].copy()) + df["underlying_symbol"] = symbols.str.extract(r"^(?P<underlying_symbol>[^_]*)") + split_symbols = symbols.str.rsplit("_", n=1).str[-1] + df["expiration"] = to_datetime( + [symbol[:6] for symbol in split_symbols], + format="%y%m%d", + ) + df["option_type"] = split_symbols.str.extract( + r"^\d*(?P<option_type>\D)" + ).replace({"C": "call", "P": "put"}) + df["strike"] = [ + ( + symbol[7:].lstrip("0")[:-3] + "." + symbol[7:].lstrip("0")[-3:] + if "." not in symbol[7:] + else symbol[7:] + ) + for symbol in split_symbols + ] + + def calculate_dte(df): + """Calculate the DTE.""" + new_df = df[ + ["expiration", "last_timestamp", "bid_timestamp", "ask_timestamp"] + ].copy() + conditions = [ + new_df["last_timestamp"].notna(), + new_df["bid_timestamp"].notna(), + new_df["ask_timestamp"].notna(), + ] + choices = [ + (new_df["expiration"].dt.date - new_df["last_timestamp"].dt.date) + .apply(lambda x: x) + .dt.days, + (new_df["expiration"].dt.date - new_df["bid_timestamp"].dt.date) + .apply(lambda x: x) + .dt.days, + (new_df["expiration"].dt.date - new_df["ask_timestamp"].dt.date) + .apply(lambda x: x) + .dt.days, + ] + new_df["dte"] = np.select(conditions, choices, default=None) + return new_df["dte"] + + df["dte"] = calculate_dte(df) + + def apply_contract_symbol(x): + """Construct the OCC Contract Symbol.""" + symbol = x.split("_")[0].replace("_", "") + exp = x.rsplit("_")[-1][:6] + cp = x.rsplit("_")[-1][6] + strike = x.rsplit("_")[-1][7:] + _strike = strike.split(".") + front = "0" * (5 - len(_strike[0])) + back = "0" * (3 - len(_strike[1])) + strike = f"{front}{_strike[0]}{_strike[1]}{back}" + return symbol + exp + cp + strike + + if symbols.str.contains("\.").any(): # noqa + df["contract_symbol"] = df["contract_symbol"].apply(apply_contract_symbol) + else: + df["contract_symbol"] = symbols.str.replace("_", "") + df = df.replace({NaT: None, np.nan: None}) + df = df.sort_values(by="volume", ascending=False) + + return [ + IntrinioOptionsSnapshotsData.model_validate(d) + for d in df.to_dict(orient="records") + ] |