Cache

The Peak SDK Cache resource provides caching operations with features like tenant isolation, TTL (Time To Live) management, and connection management.

Features

  • Tenant Isolation: Automatic tenant-based key prefixing

  • TTL Management: Flexible expiration time controls

  • Bulk Operations: Support for multiple key operations

  • Pattern Matching: Advanced key filtering and deletion

  • Connection Management: Automatic connection handling and pooling

  • Redis Cluster Support: Optimized for Redis cluster mode with hash tags

Getting Started

Import the cache resource module and instantiate a cache client:

[ ]:
from __future__ import annotations

import contextlib
from datetime import datetime, timezone
from typing import Any

from peak.resources import cache

# Create a cache client instance
cache_client = cache.CacheClient()

Test Connection

Before performing operations, test the connection:

[ ]:
# Test the connection
ping_result = cache_client.ping()
print(f"Cache connection status: {ping_result}")

Basic Operations

Setting and Getting Values

[ ]:
# Set a simple key-value pair
cache_client.set("user:123", "john_doe")
print("Set user:123 = john_doe")

# Get the value
value = cache_client.get("user:123")
print(f"Retrieved value: {value}")

# Set with TTL (expires in 60 seconds)
cache_client.set("session:abc123", "active", ttl=60)
print("Set session:abc123 with 60 second TTL")

Working with Complex Data

[ ]:
# Store a dictionary
user_data = {
    "id": 123,
    "name": "John Doe",
    "email": "john@example.com",
    "preferences": {"theme": "dark", "notifications": True},
}

cache_client.set("user_profile:123", user_data)
print("Stored user profile data")

# Retrieve and display
retrieved_data = cache_client.get("user_profile:123")
print(f"Retrieved user profile: {retrieved_data}")
print(f"User's theme preference: {retrieved_data['preferences']['theme']}")

Key Management

Checking Key Existence and TTL

[ ]:
# Check if a key exists
exists = cache_client.exists("user:123")
print(f"Key 'user:123' exists: {exists}")

# Check TTL (Time To Live)
ttl = cache_client.ttl("session:abc123")
print(f"TTL for 'session:abc123': {ttl} seconds")

# Set expiration on existing key
cache_client.expire("user:123", 300)  # Expire in 5 minutes
print("Set expiration on user:123 to 5 minutes")

Deleting Keys

[ ]:
# Delete a single key
deleted = cache_client.delete("session:abc123")
print(f"Deleted 'session:abc123': {deleted} key(s) removed")

# Verify deletion
exists_after_delete = cache_client.exists("session:abc123")
print(f"Key exists after deletion: {exists_after_delete}")

Bulk Operations

Multiple Set and Get Operations

[ ]:
# Set multiple keys at once
key_value_pairs = {
    "product:1": {"name": "Laptop", "price": 999.99},
    "product:2": {"name": "Mouse", "price": 29.99},
    "product:3": {"name": "Keyboard", "price": 79.99},
}

cache_client.mset(key_value_pairs)
print("Set multiple product keys")

# Get multiple keys at once
keys_to_get = ["product:1", "product:2", "product:3"]
products = cache_client.mget(keys_to_get)

print("Retrieved products:")
for key, product in zip(keys_to_get, products):
    if product:
        print(f"  {key}: {product['name']} - ${product['price']}")
    else:
        print(f"  {key}: Not found")

Redis Cluster Mode and Hash Tags

The Peak SDK Cache uses Redis cluster mode, which requires special consideration for bulk operations (mset and mget). In cluster mode, keys must hash to the same slot to be processed together.

Important: If you encounter a CROSSSLOT Keys in request don't hash to the same slot error, use hash tags to ensure keys are stored in the same slot.

[ ]:
# This might fail in cluster mode (keys may hash to different slots)
# problematic_keys = {
#     "user:1": "alice",
#     "user:2": "bob",
#     "session:abc": "active"
# }
# cache_client.mset(problematic_keys)  # May cause CROSSSLOT error

# Use hash tags to ensure keys hash to the same slot
cluster_safe_keys = {
    "{users}:1": "alice",
    "{users}:2": "bob",
    "{users}:3": "charlie",
}

cache_client.mset(cluster_safe_keys)
print("Successfully set multiple keys using hash tags")

# Retrieve multiple keys with hash tags
user_keys = ["{users}:1", "{users}:2", "{users}:3"]
users = cache_client.mget(*user_keys)

print("Retrieved users:")
for key, user in zip(user_keys, users):
    print(f"  {key}: {user}")

Hash Tag Rules:

  • Use curly braces {tag} to group keys: {users}:1, {users}:2, {users}:3

  • Keys with the same hash tag (content within {}) will hash to the same slot

  • Only the content within the first pair of curly braces is used for hashing

  • This ensures bulk operations work correctly in Redis cluster mode

Mixed Bulk Operations Example

[ ]:
# Example: E-commerce cart operations with hash tags
cart_data = {
    "{cart:user123}:items": ["laptop", "mouse", "keyboard"],
    "{cart:user123}:total": 1109.97,
    "{cart:user123}:currency": "USD",
    "{cart:user123}:timestamp": datetime.now(timezone.utc).isoformat(),
}

# Set all cart data atomically
cache_client.mset(cart_data)
print("Set complete cart data")

# Retrieve all cart data at once
cart_keys = list(cart_data.keys())
cart_values = cache_client.mget(*cart_keys)

print("Cart contents:")
for key, value in zip(cart_keys, cart_values):
    field_name = key.split(":")[-1]
    print(f"  {field_name}: {value}")

Advanced Features

Using Additional Prefixes

[ ]:
# Set an additional prefix for better organization
cache_client.set_additional_prefix("app:v1")
print("Set additional prefix to 'app:v1'")

# Now all keys will be prefixed with 'app:v1'
cache_client.set(
    "config:database",
    {"host": "localhost", "port": 5432, "database": "myapp"},
)
print("Set database config with prefix")

# Retrieve with the same prefix context
config = cache_client.get("config:database")
print(f"Database config: {config}")

# Clear the additional prefix
cache_client.set_additional_prefix(None)
print("Cleared additional prefix")

Pattern-Based Operations

[ ]:
# Create some test keys
test_keys = {
    "temp:file1": "data1",
    "temp:file2": "data2",
    "temp:file3": "data3",
    "cache:user1": "user_data1",
    "cache:user2": "user_data2",
}

cache_client.mset(test_keys)
print("Created test keys")

# Delete all keys matching 'temp:*' pattern
deleted_count = cache_client.flush_by_pattern("temp:*")
print(f"Deleted {deleted_count} keys matching 'temp:*' pattern")

# Verify temp keys are gone but cache keys remain
remaining_keys = cache_client.mget("temp:file1", "cache:user1")
print(f"temp:file1 exists: {remaining_keys[0] is not None}")
print(f"cache:user1 exists: {remaining_keys[1] is not None}")

Caching API Responses

[ ]:
def cache_api_response(
    endpoint: str,
    response_data: dict[str, Any],
    cache_duration: int = 300,
) -> str:
    """Cache API response with metadata."""
    cache_key = f"api:response:{endpoint.replace('/', ':')}"

    cached_response = {
        "data": response_data,
        "cached_at": datetime.now(timezone.utc).isoformat(),
        "endpoint": endpoint,
        "cache_duration": cache_duration,
    }

    cache_client.set(cache_key, cached_response, ttl=cache_duration)
    return cache_key


def get_cached_response(endpoint: str) -> dict[str, Any] | None:
    """Retrieve cached API response."""
    cache_key = f"api:response:{endpoint.replace('/', ':')}"
    return cache_client.get(cache_key)


# Example usage
api_data = {
    "users": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}],
    "total": 2,
}

# Cache the response
cache_key = cache_api_response("/api/users", api_data, cache_duration=180)
print(f"Cached API response: {cache_key}")

# Retrieve cached response
cached_data = get_cached_response("/api/users")
if cached_data:
    print(f"Found cached data from {cached_data['cached_at']}")
    print(f"User count: {len(cached_data['data']['users'])}")
else:
    print("No cached data found")

Cleanup

Clean up test data:

[ ]:
# Clean up test keys
cleanup_keys = [
    "user:123",
    "user_profile:123",
    "product:1",
    "product:2",
    "product:3",
    "config:database",
    "cache:user1",
    "cache:user2",
    "{users}:1",
    "{users}:2",
    "{users}:3",
    "{cart:user123}:items",
    "{cart:user123}:total",
    "{cart:user123}:currency",
    "{cart:user123}:timestamp",
]

for key in cleanup_keys:
    with contextlib.suppress(Exception):
        cache_client.delete(key)

# Clean up pattern-based keys
cache_client.flush_by_pattern("session:*")
cache_client.flush_by_pattern("api:*")

print("Cleanup completed")

Summary

The Peak SDK Cache resource provides:

  • Simple Operations: Basic set/get/delete operations

  • Bulk Operations: Efficient multiple key operations with mset and mget

  • Redis Cluster Support: Hash tags for bulk operations in cluster mode

  • TTL Management: Flexible expiration controls with expire and ttl

  • Pattern Matching: Advanced key filtering with flush_by_pattern

  • Organization: Key prefixing for better organization

  • Real-world Applications: Session management, API response caching, and more

Key Takeaways for Bulk Operations:

  • Use hash tags {tag} when using mset or mget with multiple unrelated keys

  • Hash tags ensure keys hash to the same slot in Redis cluster mode

  • This prevents CROSSSLOT errors and ensures reliable bulk operations

For more information, see the Cache Resource Documentation.