Our notification service was hitting limits. It handles 1000+ requests/second, making external API calls to send emails, SMS, and push notifications. With synchronous Flask, we maxed out at 200 req/s per instance.

I rewrote it using Python’s asyncio and aiohttp. Throughput increased to 1000+ req/s per instance - a 5x improvement.

Table of Contents

The Synchronous Bottleneck

Original Flask service:

from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

@app.route('/notify', methods=['POST'])
def send_notification():
    data = request.get_json()
    
    # Send email (blocks for 200ms)
    requests.post('https://email-api.com/send', json={
        'to': data['email'],
        'subject': data['subject']
    })
    
    # Send SMS (blocks for 150ms)
    requests.post('https://sms-api.com/send', json={
        'to': data['phone'],
        'message': data['message']
    })
    
    return jsonify({'status': 'sent'})

Each request blocks for 350ms waiting for external APIs. With 4 Gunicorn workers, max throughput is ~45 req/s (4 workers × 1000ms / 350ms ≈ 11 req/s per worker).

Enter Asyncio

Python 3.5+ has async/await for non-blocking I/O:

import asyncio
import aiohttp
from aiohttp import web

async def send_notification(request):
    data = await request.json()
    
    async with aiohttp.ClientSession() as session:
        # Send email and SMS concurrently
        email_task = session.post('https://email-api.com/send', json={
            'to': data['email'],
            'subject': data['subject']
        })
        
        sms_task = session.post('https://sms-api.com/send', json={
            'to': data['phone'],
            'message': data['message']
        })
        
        # Wait for both to complete
        await asyncio.gather(email_task, sms_task)
    
    return web.json_response({'status': 'sent'})

app = web.Application()
app.router.add_post('/notify', send_notification)

if __name__ == '__main__':
    web.run_app(app, port=5000)

Now while waiting for email API, we can process other requests. Single process handles 1000+ concurrent requests.

Migration Strategy

I migrated incrementally:

  1. Week 1: Prototype with aiohttp
  2. Week 2: Migrate database calls to asyncpg
  3. Week 3: Migrate Redis calls to aioredis
  4. Week 4: Load testing and deployment

Database: asyncpg

Synchronous psycopg2:

import psycopg2

conn = psycopg2.connect("postgresql://localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()

Async asyncpg:

import asyncpg

pool = await asyncpg.create_pool("postgresql://localhost/mydb")

async def get_user(user_id):
    async with pool.acquire() as conn:
        user = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
        return user

asyncpg is also faster than psycopg2 - benchmarks show 3x improvement.

Redis: aioredis

Synchronous redis-py:

import redis

r = redis.Redis(host='localhost', port=6379)
value = r.get('key')

Async aioredis:

import aioredis

redis = await aioredis.create_redis_pool('redis://localhost')

async def get_value(key):
    value = await redis.get(key)
    return value

HTTP Clients: aiohttp

For calling external APIs:

import aiohttp

async def fetch_user_data(user_id):
    async with aiohttp.ClientSession() as session:
        async with session.get(f'https://api.example.com/users/{user_id}') as resp:
            return await resp.json()

# Fetch multiple users concurrently
user_ids = [1, 2, 3, 4, 5]
tasks = [fetch_user_data(uid) for uid in user_ids]
users = await asyncio.gather(*tasks)

This fetches 5 users concurrently instead of sequentially.

Error Handling

Async error handling is similar to sync:

async def send_email(email, subject):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post('https://email-api.com/send', json={
                'to': email,
                'subject': subject
            }) as resp:
                if resp.status != 200:
                    raise Exception(f'Email API returned {resp.status}')
                return await resp.json()
    except aiohttp.ClientError as e:
        logger.error(f'Failed to send email: {e}')
        raise
    except asyncio.TimeoutError:
        logger.error('Email API timeout')
        raise

Timeouts

Always set timeouts for external calls:

async def fetch_with_timeout(url, timeout=5):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
                return await resp.json()
        except asyncio.TimeoutError:
            logger.error(f'Timeout fetching {url}')
            raise

Connection Pooling

Reuse connections for better performance:

# Bad: creates new session for each request
async def fetch_user(user_id):
    async with aiohttp.ClientSession() as session:
        async with session.get(f'/users/{user_id}') as resp:
            return await resp.json()

# Good: reuse session
class APIClient:
    def __init__(self):
        self.session = None
    
    async def start(self):
        self.session = aiohttp.ClientSession()
    
    async def close(self):
        await self.session.close()
    
    async def fetch_user(self, user_id):
        async with self.session.get(f'/users/{user_id}') as resp:
            return await resp.json()

# Usage
client = APIClient()
await client.start()
user = await client.fetch_user(123)
await client.close()

Background Tasks

Run tasks in the background:

async def send_notification(request):
    data = await request.json()
    
    # Start background task
    asyncio.create_task(send_email_async(data['email']))
    
    # Return immediately
    return web.json_response({'status': 'queued'})

async def send_email_async(email):
    await asyncio.sleep(1)  # Simulate work
    # Send email

Testing Async Code

Use pytest-asyncio:

import pytest

@pytest.mark.asyncio
async def test_send_notification():
    async with aiohttp.ClientSession() as session:
        async with session.post('http://localhost:5000/notify', json={
            'email': 'test@example.com',
            'subject': 'Test'
        }) as resp:
            assert resp.status == 200
            data = await resp.json()
            assert data['status'] == 'sent'

Performance Results

Load testing with wrk:

# Synchronous Flask (4 workers)
wrk -t4 -c100 -d30s http://localhost:5000/notify
Requests/sec: 187.23

# Async aiohttp (single process)
wrk -t4 -c100 -d30s http://localhost:5000/notify
Requests/sec: 1043.67

5.5x improvement!

Resource Usage

MetricFlask (4 workers)aiohttp (1 process)
CPU60%25%
Memory480MB95MB
Requests/sec1871044

Async uses less resources and handles more load.

When NOT to Use Async

Async isn’t always better:

Don’t use async for:

  • CPU-bound tasks (use multiprocessing)
  • Simple CRUD APIs (Flask is fine)
  • Blocking libraries (defeats the purpose)

Use async for:

  • I/O-bound tasks
  • High concurrency requirements
  • External API calls
  • WebSocket connections

Common Pitfalls

1. Blocking calls in async code

# Bad: blocks the event loop
async def bad_example():
    time.sleep(1)  # Blocks!
    return "done"

# Good: use async sleep
async def good_example():
    await asyncio.sleep(1)
    return "done"

2. Not awaiting coroutines

# Bad: doesn't wait
async def bad_example():
    fetch_data()  # Returns coroutine, doesn't execute!

# Good: await it
async def good_example():
    await fetch_data()

3. Mixing sync and async

# Bad: can't call async from sync
def sync_function():
    result = await async_function()  # SyntaxError!

# Good: use asyncio.run
def sync_function():
    result = asyncio.run(async_function())

Deployment

Run with Gunicorn and uvloop:

pip install gunicorn uvloop

gunicorn app:app --bind 0.0.0.0:5000 --worker-class aiohttp.GunicornWebWorker --workers 4

uvloop is a faster event loop implementation (2x faster than default).

Monitoring

Track async-specific metrics:

import time

async def send_notification(request):
    start = time.time()
    
    # Process request
    await process_notification(request)
    
    duration = time.time() - start
    metrics.histogram('notification.duration', duration)
    metrics.increment('notification.count')
    
    return web.json_response({'status': 'sent'})

Conclusion

Asyncio transformed our notification service. We handle 5x more load with fewer resources.

Key takeaways:

  1. Use async for I/O-bound workloads
  2. Migrate incrementally, not all at once
  3. Use async libraries (asyncpg, aioredis, aiohttp)
  4. Always set timeouts
  5. Test thoroughly - async bugs are subtle

Async Python is powerful but has a learning curve. Start with a small service, learn the patterns, then scale up.

For our use case, the performance gains were worth the migration effort.