summaryrefslogtreecommitdiffstats
path: root/openbb_platform/providers/intrinio/openbb_intrinio/models/forward_pe_estimates.py
blob: 552db46a76ff29e0fd0abaa113f1b0db8e877df0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Intrinio Forward PE Estimates Model."""

# pylint: disable=unused-argument

import asyncio
from datetime import date as dateType
from typing import Any, Dict, List, Optional
from warnings import warn

from openbb_core.provider.abstract.fetcher import Fetcher
from openbb_core.provider.standard_models.forward_pe_estimates import (
    ForwardPeEstimatesData,
    ForwardPeEstimatesQueryParams,
)
from openbb_core.provider.utils.errors import EmptyDataError
from openbb_core.provider.utils.helpers import amake_request
from openbb_intrinio.utils.helpers import response_callback
from pydantic import Field


class IntrinioForwardPeEstimatesQueryParams(ForwardPeEstimatesQueryParams):
    """Intrinio Forward PE Estimates Query.

    https://api-v2.intrinio.com/zacks/forward_pe?
    """

    __json_schema_extra__ = {"symbol": {"multiple_items_allowed": True}}


class IntrinioForwardPeEstimatesData(ForwardPeEstimatesData):
    """Intrinio Forward PE Estimates Data."""

    __alias_dict__ = {
        "symbol": "ticker",
        "name": "company_name",
        "year1": "forward_pe_year1",
        "year2": "forward_pe_year2",
        "year3": "forward_pe_year3",
        "year4": "forward_pe_year4",
        "year5": "forward_pe_year5",
        "peg_ratio_year1": "forward_peg_ratio_year1",
        "eps_ttm": "latest_ttm_eps",
        "last_udpated": "updated_date",
    }

    peg_ratio_year1: Optional[float] = Field(
        default=None,
        description="Estimated Forward PEG ratio for the next fiscal year.",
    )
    eps_ttm: Optional[float] = Field(
        default=None,
        description="The latest trailing twelve months earnings per share.",
    )
    last_udpated: Optional[dateType] = Field(
        default=None,
        description="The date the data was last updated.",
    )


class IntrinioForwardPeEstimatesFetcher(
    Fetcher[IntrinioForwardPeEstimatesQueryParams, List[IntrinioForwardPeEstimatesData]]
):
    """Intrinio Forward PE Estimates Fetcher."""

    @staticmethod
    def transform_query(
        params: Dict[str, Any]
    ) -> IntrinioForwardPeEstimatesQueryParams:
        """Transform the query params."""
        return IntrinioForwardPeEstimatesQueryParams(**params)

    @staticmethod
    async def aextract_data(
        query: IntrinioForwardPeEstimatesQueryParams,
        credentials: Optional[Dict[str, str]],
        **kwargs: Any,
    ) -> List[Dict]:
        """Return the raw data from the Intrinio endpoint."""
        api_key = credentials.get("intrinio_api_key") if credentials else ""
        BASE_URL = "https://api-v2.intrinio.com/zacks/forward_pe"
        symbols = query.symbol.split(",") if query.symbol else None
        results: List[Dict] = []

        async def get_one(symbol):
            """Get the data for one symbol."""
            url = f"{BASE_URL}/{symbol}?api_key={api_key}"
            try:
                data = await amake_request(
                    url, response_callback=response_callback, **kwargs
                )
            except Exception as e:
                warn(f"Symbol Error: {symbol} --> {e}")
            else:
                if data:
                    results.append(data)  # type: ignore

        if symbols:
            await asyncio.gather(*[get_one(symbol) for symbol in symbols])
            if not results:
                raise EmptyDataError(
                    f"There were no results found for any of the given symbols. -> {symbols}"
                )
            return results

        async def fetch_callback(response, session):
            """Use callback for pagination."""
            data = await response.json()
            error = data.get("error", None)
            if error:
                message = data.get("message", None)
                raise RuntimeError(f"Error: {error} -> {message}")
            forward_pe = data.get("forward_pe")
            if forward_pe and len(forward_pe) > 0:  # type: ignore
                results.extend(forward_pe)  # type: ignore
            return results

        url = f"{BASE_URL}?page_size=10000&api_key={api_key}"
        results = await amake_request(url, response_callback=fetch_callback, **kwargs)  # type: ignore

        if not results:
            raise EmptyDataError("The request was successful but was returned empty.")

        return results

    @staticmethod
    def transform_data(
        query: IntrinioForwardPeEstimatesQueryParams,
        data: List[Dict],
        **kwargs: Any,
    ) -> List[IntrinioForwardPeEstimatesData]:
        """Transform the raw data into the standard format."""
        symbols = query.symbol.split(",") if query.symbol else []
        if symbols:
            data.sort(
                key=lambda item: (
                    symbols.index(item.get("ticker"))  # type: ignore
                    if item.get("ticker") in symbols
                    else len(symbols)
                )
            )
        return [IntrinioForwardPeEstimatesData.model_validate(d) for d in data]