Our product API was slow - 2 seconds to load a product page. The database was the bottleneck, running complex queries with multiple joins.

I implemented a comprehensive caching strategy with Redis. Response time dropped to 50ms - a 40x improvement.

Table of Contents

The Problem

Product page query:

SELECT 
    p.*,
    c.name as category_name,
    b.name as brand_name,
    AVG(r.rating) as avg_rating,
    COUNT(r.id) as review_count
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN brands b ON p.brand_id = b.id
LEFT JOIN reviews r ON r.product_id = p.id
WHERE p.id = 123
GROUP BY p.id, c.name, b.name

This query takes 1.8s with 100k products and 500k reviews.

Cache-Aside Pattern

Most common caching pattern:

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_product(product_id):
    # Try cache first
    cache_key = f'product:{product_id}'
    cached = r.get(cache_key)
    
    if cached:
        return json.loads(cached)
    
    # Cache miss - query database
    product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
    
    # Store in cache for 1 hour
    r.setex(cache_key, 3600, json.dumps(product))
    
    return product

Result: First request 1.8s, subsequent requests <5ms.

Cache Invalidation

“There are only two hard things in Computer Science: cache invalidation and naming things.” - Phil Karlton

When product is updated, invalidate cache:

def update_product(product_id, data):
    # Update database
    db.execute(f'UPDATE products SET ... WHERE id = {product_id}')
    
    # Invalidate cache
    r.delete(f'product:{product_id}')

Write-Through Cache

Update cache and database together:

def update_product(product_id, data):
    # Update database
    db.execute(f'UPDATE products SET ... WHERE id = {product_id}')
    
    # Update cache
    product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
    r.setex(f'product:{product_id}', 3600, json.dumps(product))

Ensures cache is always fresh.

Cache Stampede Prevention

Problem: When cache expires, multiple requests hit the database simultaneously.

Solution: Use locking:

import time

def get_product_safe(product_id):
    cache_key = f'product:{product_id}'
    lock_key = f'lock:{cache_key}'
    
    # Try cache
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Try to acquire lock
    if r.set(lock_key, '1', nx=True, ex=10):
        try:
            # We got the lock - fetch from database
            product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
            r.setex(cache_key, 3600, json.dumps(product))
            return product
        finally:
            r.delete(lock_key)
    else:
        # Someone else is fetching - wait and retry
        time.sleep(0.1)
        return get_product_safe(product_id)

Probabilistic Early Expiration

Prevent stampede by refreshing cache before it expires:

import random

def get_product_with_early_refresh(product_id):
    cache_key = f'product:{product_id}'
    
    cached = r.get(cache_key)
    if cached:
        # Get TTL
        ttl = r.ttl(cache_key)
        
        # Refresh if TTL < 10% of original and random chance
        if ttl < 360 and random.random() < 0.1:
            # Refresh in background
            asyncio.create_task(refresh_cache(product_id))
        
        return json.loads(cached)
    
    # Cache miss
    product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
    r.setex(cache_key, 3600, json.dumps(product))
    return product

Caching Lists

For product listings:

def get_products_by_category(category_id, page=1, per_page=20):
    cache_key = f'category:{category_id}:page:{page}'
    
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Query database
    offset = (page - 1) * per_page
    products = db.query(f'''
        SELECT * FROM products 
        WHERE category_id = {category_id}
        LIMIT {per_page} OFFSET {offset}
    ''')
    
    # Cache for 5 minutes (shorter TTL for lists)
    r.setex(cache_key, 300, json.dumps(products))
    
    return products

Invalidate all pages when category changes:

def invalidate_category_cache(category_id):
    # Find all cache keys for this category
    pattern = f'category:{category_id}:page:*'
    keys = r.keys(pattern)
    
    if keys:
        r.delete(*keys)

Redis Data Structures

Hashes for Objects

Instead of JSON strings, use hashes:

# Store product as hash
r.hset(f'product:{product_id}', mapping={
    'name': 'Product Name',
    'price': '29.99',
    'stock': '100'
})

# Get specific field
price = r.hget(f'product:{product_id}', 'price')

# Get all fields
product = r.hgetall(f'product:{product_id}')

Benefits:

  • Update individual fields without fetching entire object
  • Less memory usage
  • Atomic operations

Sets for Tags

# Add tags to product
r.sadd(f'product:{product_id}:tags', 'electronics', 'sale', 'featured')

# Get all tags
tags = r.smembers(f'product:{product_id}:tags')

# Find products with specific tag
r.sadd(f'tag:electronics:products', product_id)
products = r.smembers(f'tag:electronics:products')

Sorted Sets for Rankings

# Add product to bestsellers (score = sales count)
r.zadd('bestsellers', {product_id: sales_count})

# Get top 10 bestsellers
top_products = r.zrevrange('bestsellers', 0, 9, withscores=True)

# Get product rank
rank = r.zrevrank('bestsellers', product_id)

Cache Warming

Pre-populate cache for popular items:

def warm_cache():
    # Get top 100 products
    popular_products = db.query('''
        SELECT id FROM products 
        ORDER BY view_count DESC 
        LIMIT 100
    ''')
    
    for product in popular_products:
        # Load into cache
        get_product(product['id'])
    
    print(f'Warmed cache with {len(popular_products)} products')

# Run on startup
warm_cache()

Multi-Level Caching

Combine Redis with application-level cache:

from functools import lru_cache
import redis

r = redis.Redis()

# L1: In-memory cache (fast, small)
@lru_cache(maxsize=100)
def get_product_l1(product_id):
    return get_product_l2(product_id)

# L2: Redis cache (slower, larger)
def get_product_l2(product_id):
    cache_key = f'product:{product_id}'
    cached = r.get(cache_key)
    
    if cached:
        return json.loads(cached)
    
    # L3: Database (slowest)
    product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
    r.setex(cache_key, 3600, json.loads(product))
    
    return product

Cache Metrics

Monitor cache performance:

def get_product_with_metrics(product_id):
    cache_key = f'product:{product_id}'
    
    cached = r.get(cache_key)
    
    if cached:
        metrics.increment('cache.hit')
        return json.loads(cached)
    
    metrics.increment('cache.miss')
    
    product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
    r.setex(cache_key, 3600, json.dumps(product))
    
    return product

# Calculate hit rate
hit_rate = cache_hits / (cache_hits + cache_misses)

Target: >95% hit rate for production.

TTL Strategies

Different data needs different TTLs:

# Static data - long TTL
r.setex(f'category:{id}', 86400, data)  # 24 hours

# Dynamic data - short TTL
r.setex(f'cart:{user_id}', 1800, data)  # 30 minutes

# Real-time data - very short TTL
r.setex(f'stock:{product_id}', 60, data)  # 1 minute

# Permanent data - no expiration
r.set(f'config:{key}', value)

Redis Persistence

Configure Redis for durability:

# redis.conf

# RDB: snapshot every 5 minutes if 100 keys changed
save 300 100

# AOF: append-only file for durability
appendonly yes
appendfsync everysec

Redis Cluster for Scale

When single Redis instance isn’t enough:

from rediscluster import RedisCluster

startup_nodes = [
    {"host": "redis1", "port": "6379"},
    {"host": "redis2", "port": "6379"},
    {"host": "redis3", "port": "6379"}
]

rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# Use same API
rc.set('key', 'value')
value = rc.get('key')

Results

After implementing caching:

MetricBeforeAfterImprovement
Response time (p50)1.8s45ms97%
Response time (p99)3.2s120ms96%
Database load1200 qps50 qps96%
Cache hit rate-98%-

Lessons Learned

  1. Start simple - Cache-aside pattern covers 80% of cases
  2. Monitor hit rate - Low hit rate means wrong caching strategy
  3. Invalidate carefully - Stale cache is worse than no cache
  4. Use appropriate TTLs - Balance freshness and performance
  5. Prevent stampedes - Use locking or probabilistic refresh

Common Mistakes

1. Caching everything

Don’t cache data that changes frequently or is rarely accessed.

2. Forgetting to invalidate

Always invalidate cache when data changes.

3. No monitoring

Track hit rate, memory usage, evictions.

4. Serialization overhead

Use Redis data structures instead of JSON when possible.

Conclusion

Redis caching transformed our API performance. 40x improvement with relatively simple changes.

Key takeaways:

  1. Use cache-aside pattern for most cases
  2. Invalidate cache when data changes
  3. Prevent cache stampedes with locking
  4. Monitor cache hit rate
  5. Use appropriate TTLs for different data types

Caching is one of the highest-ROI optimizations you can make. If your API is slow, add caching.