Peak SDK Caching
The Peak SDK provides a comprehensive caching architecture designed to optimize performance across different layers of your FastAPI application. This multi-tier caching system helps reduce database load, improves response times, and enhances user experience by intelligently storing frequently accessed data.
Note: The cache decorators (
@cache_result
and@invalidate_cache
) work exclusively with FastAPI applications and are not compatible with other web frameworks.
Architecture Overview
The Peak SDK caching system consists of multiple integrated components that work together:
Core Components:
Platform Cache - Distributed cloud-native caching via Peak platform infrastructure
In-Memory Cache - Thread-safe local caching with TTL support
Cache Decorators - Function-level caching with automatic key generation
Cache Invalidation - Smart invalidation patterns for data consistency
Key Features
Distributed Caching: Cloud-native platform cache shared across instances
Fallback Strategy: Automatic fallback to in-memory cache
Smart Key Generation: Entity-aware cache keys with deployment isolation
Pattern-Based Invalidation: Flexible targeted and broad invalidation strategies
Async/Sync Support: Works with both synchronous and asynchronous functions
TTL Flexibility: Static, dynamic, and HTTP-controlled expiration
Cache-Control Integration: Respects HTTP caching directives
Error Resilience: Graceful degradation when cache is unavailable
Quick Start
Basic Caching
from peak.decorators import cache_result
# Cache a function result for 1 hour
@cache_result(ttl=3600)
def get_user_data(user_id: str):
# Expensive operation here
return {"user_id": user_id, "data": "expensive_data"}
# The result will be cached for 1 hour
result = get_user_data("123")
Entity-Aware Caching
from peak.decorators import cache_result
# Cache with entity ID for targeted invalidation
@cache_result(ttl=1800, entity_id_param="user_id")
def get_user_profile(user_id: str, include_private: bool = False):
# Creates cache key: deployment-123:user_profile:456
return {"user_id": user_id, "profile": "data"}
Smart Cache Invalidation
from peak.decorators import invalidate_cache
# Targeted invalidation - clears specific entity caches
@invalidate_cache(
entity_id_param="user_id", targeted_patterns=["user_profile", "user_settings"]
)
def update_user(user_id: str, data: dict):
# Clears: deployment-123:user_profile:456, deployment-123:user_settings:456
return {"status": "updated"}
# Broad invalidation - clears category-wide caches
@invalidate_cache(broad_patterns=["users", "analytics"])
def create_user(data: dict):
# Clears: deployment-123:users:*, deployment-123:analytics:*
return {"user_id": "new_user"}
# Combined invalidation strategy
@invalidate_cache(
entity_id_param="user_id",
targeted_patterns=["user_profile"],
broad_patterns=["user_lists"],
)
def delete_user(user_id: str):
# Clears specific user + all user lists
return {"status": "deleted"}
Platform Cache
Cache Result Decorator
The @cache_result
decorator provides intelligent function result caching with extensive configuration options:
from peak.decorators import cache_result
@cache_result(
ttl=3600, # Time to live in seconds
key_prefix="campaign_data", # Cache key namespace
entity_id_param="campaign_id", # Parameter for entity-specific caching
respect_cache_control=True, # Honor HTTP Cache-Control headers
custom_key_func=None, # Custom key generation function
additional_prefix=None, # Additional key prefix
)
def get_campaign_data(campaign_id: str, include_analytics: bool = False):
# Expensive operation
return expensive_campaign_computation(campaign_id)
Parameters Reference
Parameter |
Type |
Default |
Description |
---|---|---|---|
|
|
3600 |
Time to live in seconds (1 hour default) |
|
|
|
Namespace for cache keys |
|
|
|
Parameter name for entity ID extraction |
|
|
|
Custom function for cache key generation |
|
|
|
Additional prefix for cache keys |
|
|
|
Honor HTTP Cache-Control headers |
Cache Key Generation
The cache system automatically generates deterministic cache keys based on your configuration:
Basic Function Caching:
@cache_result(key_prefix="user_data")
def get_user_data():
pass
# Generated key: deployment-123:user_data
Entity-Specific Caching:
@cache_result(key_prefix="user_profile", entity_id_param="user_id")
def get_user_profile(user_id: str):
pass
# Generated key: deployment-123:user_profile:456
Parameter-Based Caching:
@cache_result(key_prefix="search_results")
def search_users(query: str, filters: list):
pass
# Generated key: deployment-123:search_results:hash(query+filters)
Cache Behavior Patterns
Read Operations
# Simple data retrieval
@cache_result(key_prefix="product_catalog", ttl=7200)
def get_product_catalog():
return expensive_database_query()
# Entity-specific data
@cache_result(key_prefix="product_details", entity_id_param="product_id")
def get_product_details(product_id: str):
return get_product_from_database(product_id)
# Complex queries with parameters
@cache_result(key_prefix="product_search", ttl=1800)
def search_products(query: str, category: str, price_range: tuple):
return complex_product_search(query, category, price_range)
Async Function Support
import asyncio
from peak.decorators import cache_result
@cache_result(key_prefix="async_data", ttl=3600)
async def get_async_data(user_id: str):
# Async operations are fully supported
await asyncio.sleep(1) # Simulate async work
return {"user_id": user_id, "data": "async_result"}
Cache Invalidation
Invalidate Cache Decorator
The @invalidate_cache
decorator provides flexible cache invalidation with multiple strategies for maintaining data consistency:
from peak.decorators import invalidate_cache
@invalidate_cache(
entity_id_param="user_id", # Parameter containing entity ID
targeted_patterns=["user_profile"], # Specific patterns + entity ID
broad_patterns=["user_lists"], # General category patterns
)
def update_user(user_id: str, data: dict):
# Updates user and automatically invalidates related caches
return perform_user_update(user_id, data)
Parameters Reference
Parameter |
Type |
Description |
---|---|---|
|
|
Parameter name containing entity ID for targeted invalidation |
|
|
Specific patterns combined with entity ID |
|
|
General patterns for category-wide invalidation |
Invalidation Strategies
Targeted Patterns - Entity-specific invalidation:
@invalidate_cache(
entity_id_param="campaign_id",
targeted_patterns=["campaign_details", "campaign_products", "campaign_kpis"],
)
def update_campaign(campaign_id: str, data: dict):
# For campaign_id = "123", invalidates:
# - deployment-456:campaign_details:123
# - deployment-456:campaign_products:123
# - deployment-456:campaign_kpis:123
pass
Broad Patterns - Category-wide invalidation:
@invalidate_cache(broad_patterns=["campaigns", "analytics", "reports"])
def create_campaign(data: dict):
# Invalidates ALL keys matching:
# - deployment-456:campaigns:*
# - deployment-456:analytics:*
# - deployment-456:reports:*
pass
Combined Strategy - Both targeted and broad:
@invalidate_cache(
entity_id_param="campaign_id",
targeted_patterns=["campaign_details"], # Specific entity
broad_patterns=["campaign_lists"], # All campaign lists
)
def delete_campaign(campaign_id: str):
# Clears specific campaign + all campaign list caches
pass
Cache Invalidation Flow
class ProductController:
@cache_result(key_prefix="product_details", entity_id_param="product_id")
def get_product(self, product_id: str):
return self.product_service.get_by_id(product_id)
@cache_result(key_prefix="product_catalog")
def get_product_catalog(self):
return self.product_service.get_all()
@invalidate_cache(
entity_id_param="product_id",
targeted_patterns=["product_details"],
broad_patterns=["product_catalog", "recommendations"],
)
def update_product_price(self, product_id: str, new_price: float):
# This will:
# 1. Update the product price
# 2. Clear product_details:123 (targeted)
# 3. Clear all product_catalog:* (broad)
# 4. Clear all recommendations:* (broad)
return self.product_service.update_price(product_id, new_price)
Standalone Cache Invalidation
You can also use invalidation without decorating functions:
from peak.decorators import invalidate_cache
# Direct cache invalidation
invalidate_cache(
entity_id_param="user_id",
targeted_patterns=["user_profile"],
broad_patterns=["user_lists"],
)(
"123"
) # Pass user_id directly
Advanced Usage
Custom Cache Repository
The Peak SDK provides flexible cache repository option too:
from peak.decorators import PlatformCacheRepository
# Singleton platform cache (recommended)
cache_repo = PlatformCacheRepository()
# Direct cache operations
cache_repo.set("custom_key", {"data": "value"}, ttl=3600)
cached_data = cache_repo.get("custom_key")
cache_repo.delete("custom_key")
cache_repo.delete_by_prefix("user_profile:*")
Custom Key Generation
For complex caching or key generation scenarios, implement custom key functions:
def custom_key_generator(func, args, kwargs):
"""Generate custom cache keys based on business logic."""
user_id = kwargs.get("user_id")
filters = kwargs.get("filters", {})
# Create deterministic key from filters
filter_hash = hashlib.md5(json.dumps(filters, sort_keys=True).encode()).hexdigest()[
:8
]
return f"search_results:{user_id}:{filter_hash}"
@cache_result(custom_key_func=custom_key_generator, ttl=1800)
def complex_search(user_id: str, filters: dict):
return perform_complex_search(user_id, filters)
HTTP Cache-Control Integration
The cache system respects HTTP Cache-Control headers:
from peak.decorators import cache_result
from starlette.requests import Request
@cache_result(ttl=3600, respect_cache_control=True)
def get_data(request: Request):
# Honors Cache-Control: no-cache, max-age, etc.
# Skips caching if Cache-Control headers indicate no-cache
return {"data": "value"}
# Dynamic TTL from Cache-Control headers
@cache_result(respect_cache_control=True)
def api_endpoint(request: Request):
# TTL extracted from Cache-Control: max-age header
return expensive_computation()
Dynamic TTL Management
# TTL from function result
@cache_result(key_prefix="dynamic_data")
def get_data_with_ttl():
result = expensive_operation()
result._cache_ttl = 7200 # 2 hours
return result
# TTL based on data freshness
@cache_result(key_prefix="time_sensitive")
def get_time_sensitive_data():
data = fetch_data()
# Cache for shorter time if data is stale
ttl = 300 if data.is_stale else 3600
data._cache_ttl = ttl
return data
Environment-Based Configuration
import os
from peak.decorators import cache_result
# TTL based on environment
TTL_CONFIG = {
"development": 300, # 5 minutes
"staging": 1800, # 30 minutes
"production": 3600, # 1 hour
}
env = os.getenv("ENVIRONMENT", "development")
cache_ttl = TTL_CONFIG[env]
@cache_result(ttl=cache_ttl, key_prefix="user_data")
def get_user_data(user_id: str):
return expensive_user_lookup(user_id)
Best Practices
DO
Cache Expensive Operations
# Cache database queries
@cache_result(key_prefix="user_profile", entity_id_param="user_id", ttl=3600)
def get_user_profile(user_id: str):
return database.query("SELECT * FROM users WHERE id = ?", user_id)
# Cache complex calculations
@cache_result(key_prefix="analytics", ttl=1800)
def calculate_monthly_analytics():
return perform_heavy_analytics_computation()
# Cache external API calls
@cache_result(key_prefix="weather_data", ttl=600)
def get_weather_data(city: str):
return external_weather_api.get_current(city)
Use Appropriate TTL
# Short TTL for frequently changing data
@cache_result(key_prefix="stock_prices", ttl=30) # 30 seconds
def get_stock_price(symbol: str):
return stock_api.get_price(symbol)
# Long TTL for static data
@cache_result(key_prefix="country_list", ttl=86400) # 24 hours
def get_countries():
return reference_data.get_countries()
Implement Smart Invalidation
# Clear related caches on updates
@invalidate_cache(
entity_id_param="user_id",
targeted_patterns=["user_profile", "user_settings"],
broad_patterns=["user_listings", "analytics"],
)
def update_user(user_id: str, data: dict):
return user_service.update(user_id, data)
DON’T
Don’t Cache Side Effects
# Don't cache functions with side effects
@cache_result(key_prefix="email_send") # Wrong!
def send_email(to: str, subject: str):
email_service.send(to, subject) # Side effect!
return {"status": "sent"}
# Only cache the lookup, not the action
@cache_result(key_prefix="user_email", entity_id_param="user_id")
def get_user_email(user_id: str):
return user_service.get_email(user_id)
def send_email_to_user(user_id: str, subject: str):
email = get_user_email(user_id) # Cached lookup
email_service.send(email, subject) # Uncached action
Don’t Cache Sensitive Data
# Don't cache sensitive information
@cache_result(key_prefix="user_password") # Wrong!
def get_user_password(user_id: str):
return user.password_hash
# Don't cache personal data
@cache_result(key_prefix="credit_card") # Wrong!
def get_user_payment_info(user_id: str):
return user.credit_card_details
Don’t Over-Cache Dynamic Data
# Don't cache frequently changing data with long TTL
@cache_result(key_prefix="real_time_data", ttl=3600) # Wrong!
def get_real_time_stock_price(symbol: str):
return stock_api.get_real_time_price(symbol)
# Use appropriate TTL for data volatility
@cache_result(key_prefix="real_time_data", ttl=5) # 5 seconds
def get_real_time_stock_price(symbol: str):
return stock_api.get_real_time_price(symbol)
Error Resilience
The Peak SDK cache decorator is designed for graceful degradation:
@cache_result(key_prefix="resilient_operation", ttl=3600)
def resilient_function(param: str):
# If cache fails, function executes normally
# No application disruption from cache issues
return critical_business_logic(param)
# Cache errors are logged but don't propagate:
# logger.exception("Error in cache wrapper")
# Function continues with normal execution
This ensures your application remains functional even when caching infrastructure experiences issues.