diff options
Diffstat (limited to 'openbb_platform/providers/sec/openbb_sec/utils/helpers.py')
-rw-r--r-- | openbb_platform/providers/sec/openbb_sec/utils/helpers.py | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/openbb_platform/providers/sec/openbb_sec/utils/helpers.py b/openbb_platform/providers/sec/openbb_sec/utils/helpers.py index 407a8fc773b..be306c64984 100644 --- a/openbb_platform/providers/sec/openbb_sec/utils/helpers.py +++ b/openbb_platform/providers/sec/openbb_sec/utils/helpers.py @@ -38,7 +38,7 @@ async def get_all_companies(use_cache: bool = True) -> pd.DataFrame: >>> tickers = get_all_companies() """ url = "https://www.sec.gov/files/company_tickers.json" - + response: Union[dict, List[dict]] = {} if use_cache is True: cache_dir = f"{get_user_cache_directory()}/http/sec_companies" async with CachedSession( @@ -65,6 +65,7 @@ async def get_all_ciks(use_cache: bool = True) -> pd.DataFrame: """Response callback for CIK lookup data.""" return await response.text(encoding="latin-1") + response: Union[dict, List[dict], str] = {} if use_cache is True: cache_dir = f"{get_user_cache_directory()}/http/sec_ciks" async with CachedSession( @@ -97,6 +98,7 @@ async def get_mf_and_etf_map(use_cache: bool = True) -> pd.DataFrame: symbols = pd.DataFrame() url = "https://www.sec.gov/files/company_tickers_mf.json" + response: Union[dict, List[dict]] = {} if use_cache is True: cache_dir = f"{get_user_cache_directory()}/http/sec_mf_etf_map" async with CachedSession( @@ -189,6 +191,7 @@ async def download_zip_file( """Response callback for ZIP file downloads.""" return await response.read() + response: Union[dict, List[dict]] = {} if use_cache is True: cache_dir = f"{get_user_cache_directory()}/http/sec_ftd" async with CachedSession(cache=SQLiteBackend(cache_dir)) as session: @@ -315,7 +318,7 @@ async def get_nport_candidates(symbol: str, use_cache: bool = True) -> List[Dict raise ValueError("Fund not found for, the symbol: " + symbol) url = f"https://efts.sec.gov/LATEST/search-index?q={series_id}&dateRange=all&forms=NPORT-P" - + response: Union[dict, List[dict]] = {} if use_cache is True: cache_dir = f"{get_user_cache_directory()}/http/sec_etf" async with CachedSession(cache=SQLiteBackend(cache_dir)) as session: |