Peak SDK Caching

The Peak SDK provides a comprehensive caching architecture designed to optimize performance across different layers of your FastAPI application. This multi-tier caching system helps reduce database load, improves response times, and enhances user experience by intelligently storing frequently accessed data.

Note: The cache decorators (@cache_result and @invalidate_cache) work exclusively with FastAPI applications and are not compatible with other web frameworks.

Architecture Overview

The Peak SDK caching system consists of multiple integrated components that work together:

Core Components:

  1. Platform Cache - Distributed cloud-native caching via Peak platform infrastructure

  2. In-Memory Cache - Thread-safe local caching with TTL support

  3. Cache Decorators - Function-level caching with automatic key generation

  4. Cache Invalidation - Smart invalidation patterns for data consistency

Key Features

  • Distributed Caching: Cloud-native platform cache shared across instances

  • Fallback Strategy: Automatic fallback to in-memory cache

  • Smart Key Generation: Entity-aware cache keys with deployment isolation

  • Pattern-Based Invalidation: Flexible targeted and broad invalidation strategies

  • Async/Sync Support: Works with both synchronous and asynchronous functions

  • TTL Flexibility: Static, dynamic, and HTTP-controlled expiration

  • Cache-Control Integration: Respects HTTP caching directives

  • Error Resilience: Graceful degradation when cache is unavailable

Quick Start

Basic Caching

from peak.decorators import cache_result


# Cache a function result for 1 hour
@cache_result(ttl=3600)
def get_user_data(user_id: str):
    # Expensive operation here
    return {"user_id": user_id, "data": "expensive_data"}


# The result will be cached for 1 hour
result = get_user_data("123")

Entity-Aware Caching

from peak.decorators import cache_result


# Cache with entity ID for targeted invalidation
@cache_result(ttl=1800, entity_id_param="user_id")
def get_user_profile(user_id: str, include_private: bool = False):
    # Creates cache key: deployment-123:user_profile:456
    return {"user_id": user_id, "profile": "data"}

Smart Cache Invalidation

from peak.decorators import invalidate_cache


# Targeted invalidation - clears specific entity caches
@invalidate_cache(
    entity_id_param="user_id", targeted_patterns=["user_profile", "user_settings"]
)
def update_user(user_id: str, data: dict):
    # Clears: deployment-123:user_profile:456, deployment-123:user_settings:456
    return {"status": "updated"}


# Broad invalidation - clears category-wide caches
@invalidate_cache(broad_patterns=["users", "analytics"])
def create_user(data: dict):
    # Clears: deployment-123:users:*, deployment-123:analytics:*
    return {"user_id": "new_user"}


# Combined invalidation strategy
@invalidate_cache(
    entity_id_param="user_id",
    targeted_patterns=["user_profile"],
    broad_patterns=["user_lists"],
)
def delete_user(user_id: str):
    # Clears specific user + all user lists
    return {"status": "deleted"}

Platform Cache

Cache Result Decorator

The @cache_result decorator provides intelligent function result caching with extensive configuration options:

from peak.decorators import cache_result


@cache_result(
    ttl=3600,  # Time to live in seconds
    key_prefix="campaign_data",  # Cache key namespace
    entity_id_param="campaign_id",  # Parameter for entity-specific caching
    respect_cache_control=True,  # Honor HTTP Cache-Control headers
    custom_key_func=None,  # Custom key generation function
    additional_prefix=None,  # Additional key prefix
)
def get_campaign_data(campaign_id: str, include_analytics: bool = False):
    # Expensive operation
    return expensive_campaign_computation(campaign_id)

Parameters Reference

Parameter

Type

Default

Description

ttl

int

3600

Time to live in seconds (1 hour default)

key_prefix

str

None

Namespace for cache keys

entity_id_param

str

None

Parameter name for entity ID extraction

custom_key_func

Callable

None

Custom function for cache key generation

additional_prefix

str

None

Additional prefix for cache keys

respect_cache_control

bool

False

Honor HTTP Cache-Control headers

Cache Key Generation

The cache system automatically generates deterministic cache keys based on your configuration:

Basic Function Caching:

@cache_result(key_prefix="user_data")
def get_user_data():
    pass


# Generated key: deployment-123:user_data

Entity-Specific Caching:

@cache_result(key_prefix="user_profile", entity_id_param="user_id")
def get_user_profile(user_id: str):
    pass


# Generated key: deployment-123:user_profile:456

Parameter-Based Caching:

@cache_result(key_prefix="search_results")
def search_users(query: str, filters: list):
    pass


# Generated key: deployment-123:search_results:hash(query+filters)

Cache Behavior Patterns

Read Operations

# Simple data retrieval
@cache_result(key_prefix="product_catalog", ttl=7200)
def get_product_catalog():
    return expensive_database_query()


# Entity-specific data
@cache_result(key_prefix="product_details", entity_id_param="product_id")
def get_product_details(product_id: str):
    return get_product_from_database(product_id)


# Complex queries with parameters
@cache_result(key_prefix="product_search", ttl=1800)
def search_products(query: str, category: str, price_range: tuple):
    return complex_product_search(query, category, price_range)

Async Function Support

import asyncio
from peak.decorators import cache_result


@cache_result(key_prefix="async_data", ttl=3600)
async def get_async_data(user_id: str):
    # Async operations are fully supported
    await asyncio.sleep(1)  # Simulate async work
    return {"user_id": user_id, "data": "async_result"}

Cache Invalidation

Invalidate Cache Decorator

The @invalidate_cache decorator provides flexible cache invalidation with multiple strategies for maintaining data consistency:

from peak.decorators import invalidate_cache


@invalidate_cache(
    entity_id_param="user_id",  # Parameter containing entity ID
    targeted_patterns=["user_profile"],  # Specific patterns + entity ID
    broad_patterns=["user_lists"],  # General category patterns
)
def update_user(user_id: str, data: dict):
    # Updates user and automatically invalidates related caches
    return perform_user_update(user_id, data)

Parameters Reference

Parameter

Type

Description

entity_id_param

str

Parameter name containing entity ID for targeted invalidation

targeted_patterns

List[str]

Specific patterns combined with entity ID

broad_patterns

List[str]

General patterns for category-wide invalidation

Invalidation Strategies

Targeted Patterns - Entity-specific invalidation:

@invalidate_cache(
    entity_id_param="campaign_id",
    targeted_patterns=["campaign_details", "campaign_products", "campaign_kpis"],
)
def update_campaign(campaign_id: str, data: dict):
    # For campaign_id = "123", invalidates:
    # - deployment-456:campaign_details:123
    # - deployment-456:campaign_products:123
    # - deployment-456:campaign_kpis:123
    pass

Broad Patterns - Category-wide invalidation:

@invalidate_cache(broad_patterns=["campaigns", "analytics", "reports"])
def create_campaign(data: dict):
    # Invalidates ALL keys matching:
    # - deployment-456:campaigns:*
    # - deployment-456:analytics:*
    # - deployment-456:reports:*
    pass

Combined Strategy - Both targeted and broad:

@invalidate_cache(
    entity_id_param="campaign_id",
    targeted_patterns=["campaign_details"],  # Specific entity
    broad_patterns=["campaign_lists"],  # All campaign lists
)
def delete_campaign(campaign_id: str):
    # Clears specific campaign + all campaign list caches
    pass

Cache Invalidation Flow

class ProductController:

    @cache_result(key_prefix="product_details", entity_id_param="product_id")
    def get_product(self, product_id: str):
        return self.product_service.get_by_id(product_id)

    @cache_result(key_prefix="product_catalog")
    def get_product_catalog(self):
        return self.product_service.get_all()

    @invalidate_cache(
        entity_id_param="product_id",
        targeted_patterns=["product_details"],
        broad_patterns=["product_catalog", "recommendations"],
    )
    def update_product_price(self, product_id: str, new_price: float):
        # This will:
        # 1. Update the product price
        # 2. Clear product_details:123 (targeted)
        # 3. Clear all product_catalog:* (broad)
        # 4. Clear all recommendations:* (broad)
        return self.product_service.update_price(product_id, new_price)

Standalone Cache Invalidation

You can also use invalidation without decorating functions:

from peak.decorators import invalidate_cache

# Direct cache invalidation
invalidate_cache(
    entity_id_param="user_id",
    targeted_patterns=["user_profile"],
    broad_patterns=["user_lists"],
)(
    "123"
)  # Pass user_id directly

Advanced Usage

Custom Cache Repository

The Peak SDK provides flexible cache repository option too:

from peak.decorators import PlatformCacheRepository

# Singleton platform cache (recommended)
cache_repo = PlatformCacheRepository()
# Direct cache operations
cache_repo.set("custom_key", {"data": "value"}, ttl=3600)
cached_data = cache_repo.get("custom_key")
cache_repo.delete("custom_key")
cache_repo.delete_by_prefix("user_profile:*")

Custom Key Generation

For complex caching or key generation scenarios, implement custom key functions:

def custom_key_generator(func, args, kwargs):
    """Generate custom cache keys based on business logic."""
    user_id = kwargs.get("user_id")
    filters = kwargs.get("filters", {})

    # Create deterministic key from filters
    filter_hash = hashlib.md5(json.dumps(filters, sort_keys=True).encode()).hexdigest()[
        :8
    ]

    return f"search_results:{user_id}:{filter_hash}"


@cache_result(custom_key_func=custom_key_generator, ttl=1800)
def complex_search(user_id: str, filters: dict):
    return perform_complex_search(user_id, filters)

HTTP Cache-Control Integration

The cache system respects HTTP Cache-Control headers:

from peak.decorators import cache_result
from starlette.requests import Request


@cache_result(ttl=3600, respect_cache_control=True)
def get_data(request: Request):
    # Honors Cache-Control: no-cache, max-age, etc.
    # Skips caching if Cache-Control headers indicate no-cache
    return {"data": "value"}


# Dynamic TTL from Cache-Control headers
@cache_result(respect_cache_control=True)
def api_endpoint(request: Request):
    # TTL extracted from Cache-Control: max-age header
    return expensive_computation()

Dynamic TTL Management

# TTL from function result
@cache_result(key_prefix="dynamic_data")
def get_data_with_ttl():
    result = expensive_operation()
    result._cache_ttl = 7200  # 2 hours
    return result


# TTL based on data freshness
@cache_result(key_prefix="time_sensitive")
def get_time_sensitive_data():
    data = fetch_data()
    # Cache for shorter time if data is stale
    ttl = 300 if data.is_stale else 3600
    data._cache_ttl = ttl
    return data

Environment-Based Configuration

import os
from peak.decorators import cache_result

# TTL based on environment
TTL_CONFIG = {
    "development": 300,  # 5 minutes
    "staging": 1800,  # 30 minutes
    "production": 3600,  # 1 hour
}

env = os.getenv("ENVIRONMENT", "development")
cache_ttl = TTL_CONFIG[env]


@cache_result(ttl=cache_ttl, key_prefix="user_data")
def get_user_data(user_id: str):
    return expensive_user_lookup(user_id)

Best Practices

DO

Cache Expensive Operations

# Cache database queries
@cache_result(key_prefix="user_profile", entity_id_param="user_id", ttl=3600)
def get_user_profile(user_id: str):
    return database.query("SELECT * FROM users WHERE id = ?", user_id)


# Cache complex calculations
@cache_result(key_prefix="analytics", ttl=1800)
def calculate_monthly_analytics():
    return perform_heavy_analytics_computation()


# Cache external API calls
@cache_result(key_prefix="weather_data", ttl=600)
def get_weather_data(city: str):
    return external_weather_api.get_current(city)

Use Appropriate TTL

# Short TTL for frequently changing data
@cache_result(key_prefix="stock_prices", ttl=30)  # 30 seconds
def get_stock_price(symbol: str):
    return stock_api.get_price(symbol)


# Long TTL for static data
@cache_result(key_prefix="country_list", ttl=86400)  # 24 hours
def get_countries():
    return reference_data.get_countries()

Implement Smart Invalidation

# Clear related caches on updates
@invalidate_cache(
    entity_id_param="user_id",
    targeted_patterns=["user_profile", "user_settings"],
    broad_patterns=["user_listings", "analytics"],
)
def update_user(user_id: str, data: dict):
    return user_service.update(user_id, data)

DON’T

Don’t Cache Side Effects

# Don't cache functions with side effects
@cache_result(key_prefix="email_send")  # Wrong!
def send_email(to: str, subject: str):
    email_service.send(to, subject)  # Side effect!
    return {"status": "sent"}


# Only cache the lookup, not the action
@cache_result(key_prefix="user_email", entity_id_param="user_id")
def get_user_email(user_id: str):
    return user_service.get_email(user_id)


def send_email_to_user(user_id: str, subject: str):
    email = get_user_email(user_id)  # Cached lookup
    email_service.send(email, subject)  # Uncached action

Don’t Cache Sensitive Data

# Don't cache sensitive information
@cache_result(key_prefix="user_password")  # Wrong!
def get_user_password(user_id: str):
    return user.password_hash


# Don't cache personal data
@cache_result(key_prefix="credit_card")  # Wrong!
def get_user_payment_info(user_id: str):
    return user.credit_card_details

Don’t Over-Cache Dynamic Data

# Don't cache frequently changing data with long TTL
@cache_result(key_prefix="real_time_data", ttl=3600)  # Wrong!
def get_real_time_stock_price(symbol: str):
    return stock_api.get_real_time_price(symbol)


# Use appropriate TTL for data volatility
@cache_result(key_prefix="real_time_data", ttl=5)  # 5 seconds
def get_real_time_stock_price(symbol: str):
    return stock_api.get_real_time_price(symbol)

Error Resilience

The Peak SDK cache decorator is designed for graceful degradation:

@cache_result(key_prefix="resilient_operation", ttl=3600)
def resilient_function(param: str):
    # If cache fails, function executes normally
    # No application disruption from cache issues
    return critical_business_logic(param)


# Cache errors are logged but don't propagate:
# logger.exception("Error in cache wrapper")
# Function continues with normal execution

This ensures your application remains functional even when caching infrastructure experiences issues.