1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
"""Test crypto API endpoints."""
import base64
import pytest
import requests
from extensions.tests.conftest import parametrize
from openbb_core.env import Env
from openbb_core.provider.utils.helpers import get_querystring
# pylint: disable=redefined-outer-name
@pytest.fixture(scope="session")
def headers():
userpass = f"{Env().API_USERNAME}:{Env().API_PASSWORD}"
userpass_bytes = userpass.encode("ascii")
base64_bytes = base64.b64encode(userpass_bytes)
return {"Authorization": f"Basic {base64_bytes.decode('ascii')}"}
@parametrize(
"params",
[
({"query": "asd"}),
({"query": "btc", "provider": "fmp"}),
],
)
@pytest.mark.integration
def test_crypto_search(params, headers):
params = {p: v for p, v in params.items() if v}
query_str = get_querystring(params, [])
url = f"http://0.0.0.0:8000/api/v1/crypto/search?{query_str}"
result = requests.get(url, headers=headers, timeout=10)
assert isinstance(result, requests.Response)
assert result.status_code == 200
@parametrize(
"params",
[
(
{
"interval": "1d",
"provider": "fmp",
"symbol": "BTCUSD",
"start_date": "2023-01-01",
"end_date": "2023-01-02",
}
),
(
{
"interval": "1h",
"provider": "fmp",
"symbol": "BTCUSD,ETHUSD",
"start_date": None,
"end_date": None,
}
),
(
{
"interval": "1m",
"sort": "desc",
"limit": 49999,
"provider": "polygon",
"symbol": "BTCUSD",
"start_date": "2023-01-01",
"end_date": "2023-01-02",
}
),
(
{
"interval": "1d",
"sort": "desc",
"limit": 49999,
"provider": "polygon",
"symbol": "BTCUSD",
"start_date": "2023-01-01",
"end_date": "2023-06-06",
}
),
(
{
"interval": "1d",
"provider": "yfinance",
"symbol": "BTCUSD",
"start_date": "2023-01-01",
"end_date": "2023-01-04",
}
),
(
{
"provider": "tiingo",
"interval": "1d",
"exchanges": None,
"symbol": "BTCUSD",
"start_date": "2023-01-01",
"end_date": "2023-06-06",
}
),
(
{
"provider": "tiingo",
"interval": "1h",
"exchanges": ["POLONIEX", "GDAX"],
"symbol": "BTCUSD",
"start_date": "2023-01-01",
"end_date": "2023-01-02",
}
),
],
)
@pytest.mark.integration
def test_crypto_price_historical(params, headers):
params = {p: v for p, v in params.items() if v}
query_str = get_querystring(params, [])
url = f"http://0.0.0.0:8000/api/v1/crypto/price/historical?{query_str}"
result = requests.get(url, headers=headers, timeout=10)
assert isinstance(result, requests.Response)
assert result.status_code == 200
|