Production AI Agents: Lessons from Running 10 Agents at Scale
Running AI agents in production is different from demos. I deployed 10 agents serving 100K users/day. Learned hard lessons about reliability, cost, and scale.
Here’s what actually works in production.
Table of Contents
Production Requirements
Non-Negotiables:
- 99.9% uptime
- <2s response time
- Cost predictability
- Error recovery
- Monitoring/alerting
Architecture
class ProductionAgent:
def __init__(self, name, config):
self.name = name
self.config = config
self.llm = self._init_llm()
self.tools = self._init_tools()
self.memory = self._init_memory()
self.metrics = MetricsCollector(name)
self.circuit_breaker = CircuitBreaker()
async def execute(self, task):
"""Execute task with full production safeguards."""
request_id = str(uuid.uuid4())
try:
# Check circuit breaker
if not self.circuit_breaker.allow_request():
return self._fallback_response("Service temporarily unavailable")
# Rate limiting
if not await self._check_rate_limit():
return self._fallback_response("Rate limit exceeded")
# Execute with timeout
result = await asyncio.wait_for(
self._execute_with_retry(task),
timeout=self.config['timeout']
)
# Record success
self.metrics.record_success(request_id)
self.circuit_breaker.record_success()
return result
except asyncio.TimeoutError:
self.metrics.record_timeout(request_id)
return self._fallback_response("Request timeout")
except Exception as e:
self.metrics.record_error(request_id, e)
self.circuit_breaker.record_failure()
return self._handle_error(e)
async def _execute_with_retry(self, task, max_retries=3):
"""Execute with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await self._execute_task(task)
except RetryableError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
raise Exception("Max retries exceeded")
Monitoring
from prometheus_client import Counter, Histogram, Gauge
import logging
class MetricsCollector:
def __init__(self, agent_name):
self.agent_name = agent_name
# Metrics
self.requests_total = Counter(
'agent_requests_total',
'Total requests',
['agent', 'status']
)
self.request_duration = Histogram(
'agent_request_duration_seconds',
'Request duration',
['agent']
)
self.active_requests = Gauge(
'agent_active_requests',
'Active requests',
['agent']
)
self.llm_tokens = Counter(
'agent_llm_tokens_total',
'LLM tokens used',
['agent', 'type']
)
self.cost = Counter(
'agent_cost_dollars',
'Cost in dollars',
['agent']
)
def record_success(self, request_id):
"""Record successful request."""
self.requests_total.labels(agent=self.agent_name, status='success').inc()
logging.info(f"Agent {self.agent_name} request {request_id} succeeded")
def record_error(self, request_id, error):
"""Record error."""
self.requests_total.labels(agent=self.agent_name, status='error').inc()
logging.error(f"Agent {self.agent_name} request {request_id} failed: {error}")
def record_tokens(self, input_tokens, output_tokens):
"""Record token usage."""
self.llm_tokens.labels(agent=self.agent_name, type='input').inc(input_tokens)
self.llm_tokens.labels(agent=self.agent_name, type='output').inc(output_tokens)
# Calculate cost
cost = (input_tokens * 0.00001) + (output_tokens * 0.00003)
self.cost.labels(agent=self.agent_name).inc(cost)
Error Handling
class ErrorHandler:
def __init__(self):
self.fallback_responses = {
'rate_limit': "I'm experiencing high demand. Please try again in a moment.",
'timeout': "This is taking longer than expected. Let me try a simpler approach.",
'llm_error': "I encountered an issue. Let me try differently.",
'tool_error': "I couldn't complete that action. Please try again."
}
def handle_error(self, error):
"""Handle different error types."""
if isinstance(error, RateLimitError):
return self.fallback_responses['rate_limit']
elif isinstance(error, TimeoutError):
return self.fallback_responses['timeout']
elif isinstance(error, LLMError):
return self.fallback_responses['llm_error']
elif isinstance(error, ToolError):
return self.fallback_responses['tool_error']
else:
logging.error(f"Unexpected error: {error}")
return "I encountered an unexpected issue. Please contact support."
Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = 'closed' # closed, open, half_open
def allow_request(self):
"""Check if request should be allowed."""
if self.state == 'closed':
return True
if self.state == 'open':
# Check if timeout has passed
if time.time() - self.last_failure_time > self.timeout:
self.state = 'half_open'
return True
return False
if self.state == 'half_open':
return True
def record_success(self):
"""Record successful request."""
if self.state == 'half_open':
self.state = 'closed'
self.failures = 0
def record_failure(self):
"""Record failed request."""
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = 'open'
logging.warning(f"Circuit breaker opened after {self.failures} failures")
Cost Optimization
class CostOptimizer:
def __init__(self):
self.cache = LRUCache(maxsize=10000)
self.daily_budget = 1000 # $1000/day
self.current_spend = 0
async def execute_with_cost_control(self, task):
"""Execute task with cost controls."""
# Check cache first
cache_key = self._get_cache_key(task)
if cache_key in self.cache:
return self.cache[cache_key]
# Check budget
if self.current_spend >= self.daily_budget:
return self._budget_exceeded_response()
# Estimate cost
estimated_cost = self._estimate_cost(task)
# Use cheaper model if possible
model = self._select_model(task, estimated_cost)
# Execute
result = await self._execute(task, model)
# Cache result
self.cache[cache_key] = result
# Update spend
self.current_spend += estimated_cost
return result
def _select_model(self, task, budget):
"""Select appropriate model based on task and budget."""
if self._is_simple_task(task) or budget < 0.01:
return 'gpt-3.5-turbo' # Cheap
elif budget < 0.05:
return 'claude-3-sonnet' # Medium
else:
return 'gpt-4' # Expensive but best
Scaling
from kubernetes import client, config
class AgentScaler:
def __init__(self):
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
def scale_agents(self, agent_name, replicas):
"""Scale agent deployment."""
self.apps_v1.patch_namespaced_deployment_scale(
name=f"{agent_name}-deployment",
namespace="default",
body={'spec': {'replicas': replicas}}
)
def autoscale_based_on_metrics(self):
"""Auto-scale based on metrics."""
metrics = self._get_current_metrics()
for agent_name, agent_metrics in metrics.items():
current_replicas = agent_metrics['replicas']
avg_latency = agent_metrics['avg_latency']
cpu_usage = agent_metrics['cpu_usage']
# Scale up if needed
if avg_latency > 2.0 or cpu_usage > 80:
new_replicas = min(current_replicas * 2, 10)
self.scale_agents(agent_name, new_replicas)
# Scale down if underutilized
elif avg_latency < 0.5 and cpu_usage < 30:
new_replicas = max(current_replicas // 2, 1)
self.scale_agents(agent_name, new_replicas)
Load Balancing
class AgentLoadBalancer:
def __init__(self, agents):
self.agents = agents
self.current_loads = {agent.name: 0 for agent in agents}
async def route_request(self, task):
"""Route request to least loaded agent."""
# Find least loaded agent
agent = min(self.agents, key=lambda a: self.current_loads[a.name])
# Update load
self.current_loads[agent.name] += 1
try:
# Execute
result = await agent.execute(task)
return result
finally:
# Decrease load
self.current_loads[agent.name] -= 1
Real Results
10 Agents in Production:
- Users: 100K/day
- Requests: 500K/day
- Uptime: 99.95%
- Avg latency: 1.2s
- Cost: $2,500/day
Performance by Agent:
| Agent | Requests/Day | Avg Latency | Success Rate | Cost/Day |
|---|---|---|---|---|
| Customer Support | 200K | 0.8s | 96% | $800 |
| Code Review | 150K | 1.5s | 94% | $900 |
| Data Analyst | 80K | 2.0s | 92% | $400 |
| Content Writer | 40K | 1.8s | 95% | $250 |
| Others (6) | 30K | 1.0s | 93% | $150 |
Incidents and Lessons
Incident 1: LLM API Outage
- Impact: 30min downtime
- Solution: Circuit breaker + fallback model
- Lesson: Always have backup
Incident 2: Cost Spike
- Impact: $5K unexpected bill
- Solution: Daily budget limits
- Lesson: Monitor costs in real-time
Incident 3: Memory Leak
- Impact: Gradual performance degradation
- Solution: Restart pods daily
- Lesson: Monitor memory usage
Best Practices
- Circuit breakers: Prevent cascading failures
- Caching: 40% cost reduction
- Monitoring: Prometheus + Grafana
- Budgets: Daily spending limits
- Fallbacks: Always have plan B
- Auto-scaling: Handle traffic spikes
- Load balancing: Distribute load
- Error handling: Graceful degradation
Lessons Learned
- Production is different: Demos ≠ reality
- Monitoring critical: Can’t fix what you can’t see
- Costs add up: $2.5K/day = $75K/month
- Failures happen: Plan for them
- Scale gradually: Don’t rush
Conclusion
Production AI agents require robust infrastructure. Monitoring, error handling, and cost control are essential.
Key takeaways:
- 99.95% uptime achieved
- 500K requests/day handled
- $2.5K/day cost (controlled)
- Circuit breakers prevent cascading failures
- Caching reduces costs by 40%
Build for production. Not just demos.