Python Asyncio in Production: Building High-Performance APIs
Our notification service was hitting limits. It handles 1000+ requests/second, making external API calls to send emails, SMS, and push notifications. With synchronous Flask, we maxed out at 200 req/s per instance.
I rewrote it using Python’s asyncio and aiohttp. Throughput increased to 1000+ req/s per instance - a 5x improvement.
Table of Contents
The Synchronous Bottleneck
Original Flask service:
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
@app.route('/notify', methods=['POST'])
def send_notification():
data = request.get_json()
# Send email (blocks for 200ms)
requests.post('https://email-api.com/send', json={
'to': data['email'],
'subject': data['subject']
})
# Send SMS (blocks for 150ms)
requests.post('https://sms-api.com/send', json={
'to': data['phone'],
'message': data['message']
})
return jsonify({'status': 'sent'})
Each request blocks for 350ms waiting for external APIs. With 4 Gunicorn workers, max throughput is ~45 req/s (4 workers × 1000ms / 350ms ≈ 11 req/s per worker).
Enter Asyncio
Python 3.5+ has async/await for non-blocking I/O:
import asyncio
import aiohttp
from aiohttp import web
async def send_notification(request):
data = await request.json()
async with aiohttp.ClientSession() as session:
# Send email and SMS concurrently
email_task = session.post('https://email-api.com/send', json={
'to': data['email'],
'subject': data['subject']
})
sms_task = session.post('https://sms-api.com/send', json={
'to': data['phone'],
'message': data['message']
})
# Wait for both to complete
await asyncio.gather(email_task, sms_task)
return web.json_response({'status': 'sent'})
app = web.Application()
app.router.add_post('/notify', send_notification)
if __name__ == '__main__':
web.run_app(app, port=5000)
Now while waiting for email API, we can process other requests. Single process handles 1000+ concurrent requests.
Migration Strategy
I migrated incrementally:
- Week 1: Prototype with aiohttp
- Week 2: Migrate database calls to asyncpg
- Week 3: Migrate Redis calls to aioredis
- Week 4: Load testing and deployment
Database: asyncpg
Synchronous psycopg2:
import psycopg2
conn = psycopg2.connect("postgresql://localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
Async asyncpg:
import asyncpg
pool = await asyncpg.create_pool("postgresql://localhost/mydb")
async def get_user(user_id):
async with pool.acquire() as conn:
user = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
return user
asyncpg is also faster than psycopg2 - benchmarks show 3x improvement.
Redis: aioredis
Synchronous redis-py:
import redis
r = redis.Redis(host='localhost', port=6379)
value = r.get('key')
Async aioredis:
import aioredis
redis = await aioredis.create_redis_pool('redis://localhost')
async def get_value(key):
value = await redis.get(key)
return value
HTTP Clients: aiohttp
For calling external APIs:
import aiohttp
async def fetch_user_data(user_id):
async with aiohttp.ClientSession() as session:
async with session.get(f'https://api.example.com/users/{user_id}') as resp:
return await resp.json()
# Fetch multiple users concurrently
user_ids = [1, 2, 3, 4, 5]
tasks = [fetch_user_data(uid) for uid in user_ids]
users = await asyncio.gather(*tasks)
This fetches 5 users concurrently instead of sequentially.
Error Handling
Async error handling is similar to sync:
async def send_email(email, subject):
try:
async with aiohttp.ClientSession() as session:
async with session.post('https://email-api.com/send', json={
'to': email,
'subject': subject
}) as resp:
if resp.status != 200:
raise Exception(f'Email API returned {resp.status}')
return await resp.json()
except aiohttp.ClientError as e:
logger.error(f'Failed to send email: {e}')
raise
except asyncio.TimeoutError:
logger.error('Email API timeout')
raise
Timeouts
Always set timeouts for external calls:
async def fetch_with_timeout(url, timeout=5):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
return await resp.json()
except asyncio.TimeoutError:
logger.error(f'Timeout fetching {url}')
raise
Connection Pooling
Reuse connections for better performance:
# Bad: creates new session for each request
async def fetch_user(user_id):
async with aiohttp.ClientSession() as session:
async with session.get(f'/users/{user_id}') as resp:
return await resp.json()
# Good: reuse session
class APIClient:
def __init__(self):
self.session = None
async def start(self):
self.session = aiohttp.ClientSession()
async def close(self):
await self.session.close()
async def fetch_user(self, user_id):
async with self.session.get(f'/users/{user_id}') as resp:
return await resp.json()
# Usage
client = APIClient()
await client.start()
user = await client.fetch_user(123)
await client.close()
Background Tasks
Run tasks in the background:
async def send_notification(request):
data = await request.json()
# Start background task
asyncio.create_task(send_email_async(data['email']))
# Return immediately
return web.json_response({'status': 'queued'})
async def send_email_async(email):
await asyncio.sleep(1) # Simulate work
# Send email
Testing Async Code
Use pytest-asyncio:
import pytest
@pytest.mark.asyncio
async def test_send_notification():
async with aiohttp.ClientSession() as session:
async with session.post('http://localhost:5000/notify', json={
'email': 'test@example.com',
'subject': 'Test'
}) as resp:
assert resp.status == 200
data = await resp.json()
assert data['status'] == 'sent'
Performance Results
Load testing with wrk:
# Synchronous Flask (4 workers)
wrk -t4 -c100 -d30s http://localhost:5000/notify
Requests/sec: 187.23
# Async aiohttp (single process)
wrk -t4 -c100 -d30s http://localhost:5000/notify
Requests/sec: 1043.67
5.5x improvement!
Resource Usage
| Metric | Flask (4 workers) | aiohttp (1 process) |
|---|---|---|
| CPU | 60% | 25% |
| Memory | 480MB | 95MB |
| Requests/sec | 187 | 1044 |
Async uses less resources and handles more load.
When NOT to Use Async
Async isn’t always better:
Don’t use async for:
- CPU-bound tasks (use multiprocessing)
- Simple CRUD APIs (Flask is fine)
- Blocking libraries (defeats the purpose)
Use async for:
- I/O-bound tasks
- High concurrency requirements
- External API calls
- WebSocket connections
Common Pitfalls
1. Blocking calls in async code
# Bad: blocks the event loop
async def bad_example():
time.sleep(1) # Blocks!
return "done"
# Good: use async sleep
async def good_example():
await asyncio.sleep(1)
return "done"
2. Not awaiting coroutines
# Bad: doesn't wait
async def bad_example():
fetch_data() # Returns coroutine, doesn't execute!
# Good: await it
async def good_example():
await fetch_data()
3. Mixing sync and async
# Bad: can't call async from sync
def sync_function():
result = await async_function() # SyntaxError!
# Good: use asyncio.run
def sync_function():
result = asyncio.run(async_function())
Deployment
Run with Gunicorn and uvloop:
pip install gunicorn uvloop
gunicorn app:app --bind 0.0.0.0:5000 --worker-class aiohttp.GunicornWebWorker --workers 4
uvloop is a faster event loop implementation (2x faster than default).
Monitoring
Track async-specific metrics:
import time
async def send_notification(request):
start = time.time()
# Process request
await process_notification(request)
duration = time.time() - start
metrics.histogram('notification.duration', duration)
metrics.increment('notification.count')
return web.json_response({'status': 'sent'})
Conclusion
Asyncio transformed our notification service. We handle 5x more load with fewer resources.
Key takeaways:
- Use async for I/O-bound workloads
- Migrate incrementally, not all at once
- Use async libraries (asyncpg, aioredis, aiohttp)
- Always set timeouts
- Test thoroughly - async bugs are subtle
Async Python is powerful but has a learning curve. Start with a small service, learn the patterns, then scale up.
For our use case, the performance gains were worth the migration effort.