Redis Caching Strategies: From 2s to 50ms Response Time
Our product API was slow - 2 seconds to load a product page. The database was the bottleneck, running complex queries with multiple joins.
I implemented a comprehensive caching strategy with Redis. Response time dropped to 50ms - a 40x improvement.
Table of Contents
The Problem
Product page query:
SELECT
p.*,
c.name as category_name,
b.name as brand_name,
AVG(r.rating) as avg_rating,
COUNT(r.id) as review_count
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN brands b ON p.brand_id = b.id
LEFT JOIN reviews r ON r.product_id = p.id
WHERE p.id = 123
GROUP BY p.id, c.name, b.name
This query takes 1.8s with 100k products and 500k reviews.
Cache-Aside Pattern
Most common caching pattern:
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id):
# Try cache first
cache_key = f'product:{product_id}'
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - query database
product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
# Store in cache for 1 hour
r.setex(cache_key, 3600, json.dumps(product))
return product
Result: First request 1.8s, subsequent requests <5ms.
Cache Invalidation
“There are only two hard things in Computer Science: cache invalidation and naming things.” - Phil Karlton
When product is updated, invalidate cache:
def update_product(product_id, data):
# Update database
db.execute(f'UPDATE products SET ... WHERE id = {product_id}')
# Invalidate cache
r.delete(f'product:{product_id}')
Write-Through Cache
Update cache and database together:
def update_product(product_id, data):
# Update database
db.execute(f'UPDATE products SET ... WHERE id = {product_id}')
# Update cache
product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
r.setex(f'product:{product_id}', 3600, json.dumps(product))
Ensures cache is always fresh.
Cache Stampede Prevention
Problem: When cache expires, multiple requests hit the database simultaneously.
Solution: Use locking:
import time
def get_product_safe(product_id):
cache_key = f'product:{product_id}'
lock_key = f'lock:{cache_key}'
# Try cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Try to acquire lock
if r.set(lock_key, '1', nx=True, ex=10):
try:
# We got the lock - fetch from database
product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
r.setex(cache_key, 3600, json.dumps(product))
return product
finally:
r.delete(lock_key)
else:
# Someone else is fetching - wait and retry
time.sleep(0.1)
return get_product_safe(product_id)
Probabilistic Early Expiration
Prevent stampede by refreshing cache before it expires:
import random
def get_product_with_early_refresh(product_id):
cache_key = f'product:{product_id}'
cached = r.get(cache_key)
if cached:
# Get TTL
ttl = r.ttl(cache_key)
# Refresh if TTL < 10% of original and random chance
if ttl < 360 and random.random() < 0.1:
# Refresh in background
asyncio.create_task(refresh_cache(product_id))
return json.loads(cached)
# Cache miss
product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
r.setex(cache_key, 3600, json.dumps(product))
return product
Caching Lists
For product listings:
def get_products_by_category(category_id, page=1, per_page=20):
cache_key = f'category:{category_id}:page:{page}'
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Query database
offset = (page - 1) * per_page
products = db.query(f'''
SELECT * FROM products
WHERE category_id = {category_id}
LIMIT {per_page} OFFSET {offset}
''')
# Cache for 5 minutes (shorter TTL for lists)
r.setex(cache_key, 300, json.dumps(products))
return products
Invalidate all pages when category changes:
def invalidate_category_cache(category_id):
# Find all cache keys for this category
pattern = f'category:{category_id}:page:*'
keys = r.keys(pattern)
if keys:
r.delete(*keys)
Redis Data Structures
Hashes for Objects
Instead of JSON strings, use hashes:
# Store product as hash
r.hset(f'product:{product_id}', mapping={
'name': 'Product Name',
'price': '29.99',
'stock': '100'
})
# Get specific field
price = r.hget(f'product:{product_id}', 'price')
# Get all fields
product = r.hgetall(f'product:{product_id}')
Benefits:
- Update individual fields without fetching entire object
- Less memory usage
- Atomic operations
Sets for Tags
# Add tags to product
r.sadd(f'product:{product_id}:tags', 'electronics', 'sale', 'featured')
# Get all tags
tags = r.smembers(f'product:{product_id}:tags')
# Find products with specific tag
r.sadd(f'tag:electronics:products', product_id)
products = r.smembers(f'tag:electronics:products')
Sorted Sets for Rankings
# Add product to bestsellers (score = sales count)
r.zadd('bestsellers', {product_id: sales_count})
# Get top 10 bestsellers
top_products = r.zrevrange('bestsellers', 0, 9, withscores=True)
# Get product rank
rank = r.zrevrank('bestsellers', product_id)
Cache Warming
Pre-populate cache for popular items:
def warm_cache():
# Get top 100 products
popular_products = db.query('''
SELECT id FROM products
ORDER BY view_count DESC
LIMIT 100
''')
for product in popular_products:
# Load into cache
get_product(product['id'])
print(f'Warmed cache with {len(popular_products)} products')
# Run on startup
warm_cache()
Multi-Level Caching
Combine Redis with application-level cache:
from functools import lru_cache
import redis
r = redis.Redis()
# L1: In-memory cache (fast, small)
@lru_cache(maxsize=100)
def get_product_l1(product_id):
return get_product_l2(product_id)
# L2: Redis cache (slower, larger)
def get_product_l2(product_id):
cache_key = f'product:{product_id}'
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# L3: Database (slowest)
product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
r.setex(cache_key, 3600, json.loads(product))
return product
Cache Metrics
Monitor cache performance:
def get_product_with_metrics(product_id):
cache_key = f'product:{product_id}'
cached = r.get(cache_key)
if cached:
metrics.increment('cache.hit')
return json.loads(cached)
metrics.increment('cache.miss')
product = db.query(f'SELECT * FROM products WHERE id = {product_id}')
r.setex(cache_key, 3600, json.dumps(product))
return product
# Calculate hit rate
hit_rate = cache_hits / (cache_hits + cache_misses)
Target: >95% hit rate for production.
TTL Strategies
Different data needs different TTLs:
# Static data - long TTL
r.setex(f'category:{id}', 86400, data) # 24 hours
# Dynamic data - short TTL
r.setex(f'cart:{user_id}', 1800, data) # 30 minutes
# Real-time data - very short TTL
r.setex(f'stock:{product_id}', 60, data) # 1 minute
# Permanent data - no expiration
r.set(f'config:{key}', value)
Redis Persistence
Configure Redis for durability:
# redis.conf
# RDB: snapshot every 5 minutes if 100 keys changed
save 300 100
# AOF: append-only file for durability
appendonly yes
appendfsync everysec
Redis Cluster for Scale
When single Redis instance isn’t enough:
from rediscluster import RedisCluster
startup_nodes = [
{"host": "redis1", "port": "6379"},
{"host": "redis2", "port": "6379"},
{"host": "redis3", "port": "6379"}
]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# Use same API
rc.set('key', 'value')
value = rc.get('key')
Results
After implementing caching:
| Metric | Before | After | Improvement |
|---|---|---|---|
| Response time (p50) | 1.8s | 45ms | 97% |
| Response time (p99) | 3.2s | 120ms | 96% |
| Database load | 1200 qps | 50 qps | 96% |
| Cache hit rate | - | 98% | - |
Lessons Learned
- Start simple - Cache-aside pattern covers 80% of cases
- Monitor hit rate - Low hit rate means wrong caching strategy
- Invalidate carefully - Stale cache is worse than no cache
- Use appropriate TTLs - Balance freshness and performance
- Prevent stampedes - Use locking or probabilistic refresh
Common Mistakes
1. Caching everything
Don’t cache data that changes frequently or is rarely accessed.
2. Forgetting to invalidate
Always invalidate cache when data changes.
3. No monitoring
Track hit rate, memory usage, evictions.
4. Serialization overhead
Use Redis data structures instead of JSON when possible.
Conclusion
Redis caching transformed our API performance. 40x improvement with relatively simple changes.
Key takeaways:
- Use cache-aside pattern for most cases
- Invalidate cache when data changes
- Prevent cache stampedes with locking
- Monitor cache hit rate
- Use appropriate TTLs for different data types
Caching is one of the highest-ROI optimizations you can make. If your API is slow, add caching.