| info: | In-memory application layer cache |
|---|
pip install async-cacheSee full documentation at https://async-cache.readthedocs.io/
Use AsyncCache for flexible caching:
from cache import AsyncCache
cache = AsyncCache(maxsize=1000, default_ttl=300) # TTL in seconds
async def get_data(key):
return await cache.get(
key,
loader=lambda: db_query(key), # auto-caches on miss
)
# Warmup hot keys at startup
await cache.warmup({"hot:key": lambda: preload_hot()})
# Metrics for observability
print(cache.get_metrics()) # hits, misses, size, hit_rate- Thundering Herd Protection
Prevents duplicate work under concurrent load (e.g., popular keys). Without it, 100 misses = 100 DB hits; with it, = 1.
cache = AsyncCache() async def loader(): return await db_query() # expensive # 100 concurrent -> 1 loader call results = await asyncio.gather(*[cache.get('key', loader=loader) for _ in range(100)])
- DataLoader-Style Batching
Groups concurrent gets into one batch call (reduces DB load; configurable window/size).
async def batch_loader(keys): # one DB query for batch return {k: await db_batch_query(k) for k in keys} # auto-groups within 5ms window await asyncio.gather( cache.get(1, batch_loader=batch_loader), cache.get(2, batch_loader=batch_loader) )
- Cache Warmup
Preload at startup to avoid cold misses.
await cache.warmup({ "user:1": lambda: load_user(1), "config:global": lambda: load_config(), })
- Metrics
Observability for hit rate, size, etc. (global or per-function).
metrics = cache.get_metrics() # or func.get_metrics() # {'hits': 950, 'misses': 50, 'size': 200, 'hit_rate': 0.95} # Use for Prometheus/monitoring
- TTL & Invalidation
Per-key control + size-based eviction.
await cache.set('key', value, ttl=60) # override await cache.delete('key') # or func.invalidate_cache(args) cache.clear()
For simple/readable code (uses core API under the hood):
from cache import AsyncLRU, AsyncTTL
@AsyncLRU(maxsize=128)
async def func(*args):
...
@AsyncTTL(time_to_live=60, skip_args=1) # e.g. skip 'self'
async def method(self, arg):
...