summaryrefslogtreecommitdiffstats
path: root/openbb_platform/providers/fred/openbb_fred/models/university_of_michigan.py
diff options
context:
space:
mode:
Diffstat (limited to 'openbb_platform/providers/fred/openbb_fred/models/university_of_michigan.py')
-rw-r--r--openbb_platform/providers/fred/openbb_fred/models/university_of_michigan.py154
1 files changed, 154 insertions, 0 deletions
diff --git a/openbb_platform/providers/fred/openbb_fred/models/university_of_michigan.py b/openbb_platform/providers/fred/openbb_fred/models/university_of_michigan.py
new file mode 100644
index 00000000000..d83f43cccb4
--- /dev/null
+++ b/openbb_platform/providers/fred/openbb_fred/models/university_of_michigan.py
@@ -0,0 +1,154 @@
+"""FRED University of Michigan Survey Model."""
+
+# pylint: disable=unused-argument
+
+from datetime import datetime
+from typing import Any, Dict, List, Literal, Optional, Union
+
+from openbb_core.provider.abstract.annotated_result import AnnotatedResult
+from openbb_core.provider.abstract.fetcher import Fetcher
+from openbb_core.provider.standard_models.university_of_michigan import (
+ UofMichiganData,
+ UofMichiganQueryParams,
+)
+from openbb_core.provider.utils.errors import EmptyDataError
+from openbb_fred.models.series import FredSeriesFetcher
+from pandas import DataFrame
+from pydantic import Field
+
+
+class FredUofMichiganQueryParams(UofMichiganQueryParams):
+ """FRED University of Michigan Survey Query. Data from FRED is delayed by 1 month."""
+
+ frequency: Union[
+ None,
+ Literal[
+ "annual",
+ "quarter",
+ ],
+ ] = Field(
+ default=None,
+ description="""
+ Frequency aggregation to convert monthly data to lower frequency. None is monthly.
+ """,
+ json_schema_extra={
+ "choices": [
+ "annual",
+ "quarter",
+ ]
+ },
+ )
+ aggregation_method: Union[None, Literal["avg", "sum", "eop"]] = Field(
+ default=None,
+ description="""
+ A key that indicates the aggregation method used for frequency aggregation.
+ avg = Average
+ sum = Sum
+ eop = End of Period
+ """,
+ json_schema_extra={"choices": ["avg", "sum", "eop"]},
+ )
+ transform: Union[
+ None, Literal["chg", "ch1", "pch", "pc1", "pca", "cch", "cca", "log"]
+ ] = Field(
+ default=None,
+ description="""
+ Transformation type
+ None = No transformation
+ chg = Change
+ ch1 = Change from Year Ago
+ pch = Percent Change
+ pc1 = Percent Change from Year Ago
+ pca = Compounded Annual Rate of Change
+ cch = Continuously Compounded Rate of Change
+ cca = Continuously Compounded Annual Rate of Change
+ log = Natural Log
+ """,
+ json_schema_extra={
+ "choices": ["chg", "ch1", "pch", "pc1", "pca", "cch", "cca", "log"]
+ },
+ )
+
+
+class FredUofMichiganData(UofMichiganData):
+ """FRED University of Michigan Survey Data."""
+
+
+class FredUofMichiganFetcher(
+ Fetcher[FredUofMichiganQueryParams, List[FredUofMichiganData]]
+):
+ """FRED University of Michigan Survey Fetcher."""
+
+ @staticmethod
+ def transform_query(params: Dict[str, Any]) -> FredUofMichiganQueryParams:
+ """Transform query."""
+ return FredUofMichiganQueryParams(**params)
+
+ @staticmethod
+ async def aextract_data(
+ query: FredUofMichiganQueryParams,
+ credentials: Optional[Dict[str, str]],
+ **kwargs: Any
+ ) -> Dict:
+ """Extract data."""
+ ids = ["UMCSENT", "MICH"]
+ frequency = query.frequency[:1].lower() if query.frequency else None
+ if (
+ query.start_date and query.start_date < datetime(1978, 1, 1).date()
+ ) or not query.start_date:
+ ids = ids + ["UMCSENT1"]
+ try:
+ response = await FredSeriesFetcher.fetch_data(
+ dict(
+ symbol=",".join(ids),
+ start_date=query.start_date,
+ end_date=query.end_date,
+ transform=query.transform,
+ frequency=frequency,
+ aggregation_method=query.aggregation_method,
+ ),
+ credentials,
+ )
+ except Exception as e:
+ raise e from e
+
+ return {
+ "metadata": response.metadata,
+ "data": [d.model_dump() for d in response.result],
+ }
+
+ @staticmethod
+ def transform_data(
+ query: FredUofMichiganQueryParams, data: Dict, **kwargs: Any
+ ) -> List[FredUofMichiganData]:
+ """Transform data."""
+ df = DataFrame(data.get("data", []))
+ if df.empty:
+ raise EmptyDataError(
+ "There was an error with the request and was returned empty."
+ )
+ metadata = data.get("metadata", {})
+ # Combine the legacy series with the new one.
+ if "UMCSENT1" in df.columns:
+ df["UMCSENT"] = df["UMCSENT"].fillna(df["UMCSENT1"])
+ df = df.drop(columns=["UMCSENT1"])
+ metadata.pop("UMCSENT1", None)
+
+ # Normalize the percent values.
+ df["MICH"] = df["MICH"] / 100
+ if query.transform and query.transform not in ["chg", "ch1", "log"]:
+ df["UMCSENT"] = df["UMCSENT"] / 100
+
+ df = df.rename(
+ columns={"UMCSENT": "consumer_sentiment", "MICH": "inflation_expectation"}
+ )
+ records = (
+ df.sort_values(by="date")
+ .fillna("N/A")
+ .replace("N/A", None)
+ .to_dict(orient="records")
+ )
+ return AnnotatedResult(
+ result=[FredUofMichiganData.model_validate(r) for r in records],
+ metadata=metadata,
+ )