summaryrefslogtreecommitdiffstats
path: root/openbb_platform/core/openbb_core/provider/abstract/cache.py
blob: b3275fac77f373188c1fd9e382ccd60aa1f79e66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import asyncio
import time
import hashlib
import json
from abc import ABC, abstractmethod

try:
    import redis

    REDIS_AVAILABLE = True
except ImportError:
    REDIS_AVAILABLE = False


def generate_cache_key(*args, **kwargs):
    """Generate a unique key for a request."""
    key = json.dumps((args, kwargs), sort_keys=True)
    return hashlib.sha256(key.encode("utf-8")).hexdigest()


class OpenBBAsyncCache(ABC):
    @abstractmethod
    async def get(self, key):
        pass

    @abstractmethod
    async def set(self, key, value):
        pass

    @abstractmethod
    async def clear(self):
        pass


class AsyncLRUCache(OpenBBAsyncCache):
    """Simple class to implement an LRU cache with our async fetcher.fetch_data"""

    def __init__(self, maxsize=100, expiration=600):
        self.cache = {}
        self.maxsize = maxsize
        self.history = []
        self.expiration = expiration  # Seconds
        self.lock = asyncio.Lock()

    async def get(self, key):
        async with self.lock:
            if key in self.cache:
                item, timestamp = self.cache[key]
                if time.time() - timestamp < self.expiration:
                    self.history.remove(key)
                    self.history.append(key)
                    return item
                else:
                    del self.cache[key]
                self.history.remove(key)
        return None

    async def set(self, key, value):
        async with self.lock:
            if key not in self.cache and len(self.cache) >= self.maxsize:
                oldest_key = self.history.pop(0)
                del self.cache[oldest_key]
            self.cache[key] = (value, time.time())
            self.history.append(key)

    async def clear(self):
        async with self.lock:
            self.cache = {}
            self.history = []

    async def cleanup(self):
        """Remove expired entries from the cache."""
        async with self.lock:
            current_time = time.time()
            keys_to_delete = [
                key
                for key, (value, timestamp) in self.cache.items()
                if current_time - timestamp >= self.expiration
            ]
            for key in keys_to_delete:
                del self.cache[key]
                self.history.remove(key)


class AsyncRedisCache(OpenBBAsyncCache):
    def __init__(self, host="localhost", port=6379, db=0, expiration=600):
        if not REDIS_AVAILABLE:
            raise RuntimeError(
                "Redis library is not installed. Please install it to use AsyncRedisCache."
            )
        self.client = redis.Redis(host=host, port=port, db=db)
        try:
            self.client.ping()
        except redis.exceptions.ConnectionError:
            raise RuntimeError(
                "Could not connect to Redis. Please make sure it is running."
            )
        self.expiration = expiration

    async def get(self, key):
        value = self.client.get(key)
        return json.loads(value) if value else None

    async def set(self, key, value):
        self.client.setex(key, self.expiration, json.dumps(value))

    async def clear(self):
        self.client.flushdb()