summaryrefslogtreecommitdiffstats
path: root/openbb_platform/core/openbb_core/provider/abstract/cache.py
blob: d7e5cfe66da7eb32bae6f0303ede0c1d7c0c9af6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import asyncio
import time
import hashlib
import json
from abc import ABC, abstractmethod
from datetime import date, datetime

try:
    import redis

    REDIS_AVAILABLE = True
except ImportError:
    REDIS_AVAILABLE = False


def jsonify_date(obj):
    """JSON serializer for objects not serializable by default json code"""

    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")


def generate_cache_key(*args, **kwargs):
    """Generate a unique key for a request."""
    key = json.dumps((args, kwargs), sort_keys=True, default=jsonify_date)
    return hashlib.sha256(key.encode("utf-8")).hexdigest()


class OpenBBAsyncCache(ABC):
    @abstractmethod
    async def get(self, key):
        pass

    @abstractmethod
    async def set(self, key, value):
        pass

    @abstractmethod
    async def clear(self):
        pass


class AsyncLRUCache(OpenBBAsyncCache):
    """Simple class to implement an LRU cache with our async fetcher.fetch_data"""

    def __init__(self, maxsize=100, expiration=600):
        self.cache = {}
        self.maxsize = maxsize
        self.history = []
        self.expiration = expiration  # Seconds
        self.lock = asyncio.Lock()

    async def get(self, key):
        async with self.lock:
            if key in self.cache:
                item, timestamp = self.cache[key]
                if time.time() - timestamp < self.expiration:
                    self.history.remove(key)
                    self.history.append(key)
                    return item
                else:
                    del self.cache[key]
                self.history.remove(key)
        return None

    async def set(self, key, value):
        async with self.lock:
            if key not in self.cache and len(self.cache) >= self.maxsize:
                oldest_key = self.history.pop(0)
                del self.cache[oldest_key]
            self.cache[key] = (value, time.time())
            self.history.append(key)

    async def clear(self):
        async with self.lock:
            self.cache = {}
            self.history = []

    async def cleanup(self):
        """Remove expired entries from the cache."""
        async with self.lock:
            current_time = time.time()
            keys_to_delete = [
                key
                for key, (value, timestamp) in self.cache.items()
                if current_time - timestamp >= self.expiration
            ]
            for key in keys_to_delete:
                del self.cache[key]
                self.history.remove(key)


class AsyncRedisCache(OpenBBAsyncCache):
    def __init__(self, host="localhost", port=6379, db=0, expiration=600):
        if not REDIS_AVAILABLE:
            raise RuntimeError(
                "Redis library is not installed. Please install it to use AsyncRedisCache."
            )
        self.client = redis.Redis(host=host, port=port, db=db)
        try:
            self.client.ping()
        except redis.exceptions.ConnectionError:
            raise RuntimeError(
                "Could not connect to Redis. Please make sure it is running."
            )
        self.expiration = expiration

    async def get(self, key):
        value = self.client.get(key)
        return json.loads(value) if value else None

    async def set(self, key, value):
        self.client.setex(key, self.expiration, json.dumps(value))

    async def clear(self):
        self.client.flushdb()