Running AI agents in production is different from demos. I deployed 10 agents serving 100K users/day. Learned hard lessons about reliability, cost, and scale.

Here’s what actually works in production.

Table of Contents

Production Requirements

Non-Negotiables:

  1. 99.9% uptime
  2. <2s response time
  3. Cost predictability
  4. Error recovery
  5. Monitoring/alerting

Architecture

class ProductionAgent:
    def __init__(self, name, config):
        self.name = name
        self.config = config
        self.llm = self._init_llm()
        self.tools = self._init_tools()
        self.memory = self._init_memory()
        self.metrics = MetricsCollector(name)
        self.circuit_breaker = CircuitBreaker()
    
    async def execute(self, task):
        """Execute task with full production safeguards."""
        request_id = str(uuid.uuid4())
        
        try:
            # Check circuit breaker
            if not self.circuit_breaker.allow_request():
                return self._fallback_response("Service temporarily unavailable")
            
            # Rate limiting
            if not await self._check_rate_limit():
                return self._fallback_response("Rate limit exceeded")
            
            # Execute with timeout
            result = await asyncio.wait_for(
                self._execute_with_retry(task),
                timeout=self.config['timeout']
            )
            
            # Record success
            self.metrics.record_success(request_id)
            self.circuit_breaker.record_success()
            
            return result
            
        except asyncio.TimeoutError:
            self.metrics.record_timeout(request_id)
            return self._fallback_response("Request timeout")
            
        except Exception as e:
            self.metrics.record_error(request_id, e)
            self.circuit_breaker.record_failure()
            return self._handle_error(e)
    
    async def _execute_with_retry(self, task, max_retries=3):
        """Execute with exponential backoff retry."""
        for attempt in range(max_retries):
            try:
                return await self._execute_task(task)
            except RetryableError as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)
        
        raise Exception("Max retries exceeded")

Monitoring

from prometheus_client import Counter, Histogram, Gauge
import logging

class MetricsCollector:
    def __init__(self, agent_name):
        self.agent_name = agent_name
        
        # Metrics
        self.requests_total = Counter(
            'agent_requests_total',
            'Total requests',
            ['agent', 'status']
        )
        
        self.request_duration = Histogram(
            'agent_request_duration_seconds',
            'Request duration',
            ['agent']
        )
        
        self.active_requests = Gauge(
            'agent_active_requests',
            'Active requests',
            ['agent']
        )
        
        self.llm_tokens = Counter(
            'agent_llm_tokens_total',
            'LLM tokens used',
            ['agent', 'type']
        )
        
        self.cost = Counter(
            'agent_cost_dollars',
            'Cost in dollars',
            ['agent']
        )
    
    def record_success(self, request_id):
        """Record successful request."""
        self.requests_total.labels(agent=self.agent_name, status='success').inc()
        logging.info(f"Agent {self.agent_name} request {request_id} succeeded")
    
    def record_error(self, request_id, error):
        """Record error."""
        self.requests_total.labels(agent=self.agent_name, status='error').inc()
        logging.error(f"Agent {self.agent_name} request {request_id} failed: {error}")
    
    def record_tokens(self, input_tokens, output_tokens):
        """Record token usage."""
        self.llm_tokens.labels(agent=self.agent_name, type='input').inc(input_tokens)
        self.llm_tokens.labels(agent=self.agent_name, type='output').inc(output_tokens)
        
        # Calculate cost
        cost = (input_tokens * 0.00001) + (output_tokens * 0.00003)
        self.cost.labels(agent=self.agent_name).inc(cost)

Error Handling

class ErrorHandler:
    def __init__(self):
        self.fallback_responses = {
            'rate_limit': "I'm experiencing high demand. Please try again in a moment.",
            'timeout': "This is taking longer than expected. Let me try a simpler approach.",
            'llm_error': "I encountered an issue. Let me try differently.",
            'tool_error': "I couldn't complete that action. Please try again."
        }
    
    def handle_error(self, error):
        """Handle different error types."""
        if isinstance(error, RateLimitError):
            return self.fallback_responses['rate_limit']
        elif isinstance(error, TimeoutError):
            return self.fallback_responses['timeout']
        elif isinstance(error, LLMError):
            return self.fallback_responses['llm_error']
        elif isinstance(error, ToolError):
            return self.fallback_responses['tool_error']
        else:
            logging.error(f"Unexpected error: {error}")
            return "I encountered an unexpected issue. Please contact support."

Circuit Breaker

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = 'closed'  # closed, open, half_open
    
    def allow_request(self):
        """Check if request should be allowed."""
        if self.state == 'closed':
            return True
        
        if self.state == 'open':
            # Check if timeout has passed
            if time.time() - self.last_failure_time > self.timeout:
                self.state = 'half_open'
                return True
            return False
        
        if self.state == 'half_open':
            return True
    
    def record_success(self):
        """Record successful request."""
        if self.state == 'half_open':
            self.state = 'closed'
            self.failures = 0
    
    def record_failure(self):
        """Record failed request."""
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.failures >= self.failure_threshold:
            self.state = 'open'
            logging.warning(f"Circuit breaker opened after {self.failures} failures")

Cost Optimization

class CostOptimizer:
    def __init__(self):
        self.cache = LRUCache(maxsize=10000)
        self.daily_budget = 1000  # $1000/day
        self.current_spend = 0
    
    async def execute_with_cost_control(self, task):
        """Execute task with cost controls."""
        # Check cache first
        cache_key = self._get_cache_key(task)
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # Check budget
        if self.current_spend >= self.daily_budget:
            return self._budget_exceeded_response()
        
        # Estimate cost
        estimated_cost = self._estimate_cost(task)
        
        # Use cheaper model if possible
        model = self._select_model(task, estimated_cost)
        
        # Execute
        result = await self._execute(task, model)
        
        # Cache result
        self.cache[cache_key] = result
        
        # Update spend
        self.current_spend += estimated_cost
        
        return result
    
    def _select_model(self, task, budget):
        """Select appropriate model based on task and budget."""
        if self._is_simple_task(task) or budget < 0.01:
            return 'gpt-3.5-turbo'  # Cheap
        elif budget < 0.05:
            return 'claude-3-sonnet'  # Medium
        else:
            return 'gpt-4'  # Expensive but best

Scaling

from kubernetes import client, config

class AgentScaler:
    def __init__(self):
        config.load_kube_config()
        self.apps_v1 = client.AppsV1Api()
    
    def scale_agents(self, agent_name, replicas):
        """Scale agent deployment."""
        self.apps_v1.patch_namespaced_deployment_scale(
            name=f"{agent_name}-deployment",
            namespace="default",
            body={'spec': {'replicas': replicas}}
        )
    
    def autoscale_based_on_metrics(self):
        """Auto-scale based on metrics."""
        metrics = self._get_current_metrics()
        
        for agent_name, agent_metrics in metrics.items():
            current_replicas = agent_metrics['replicas']
            avg_latency = agent_metrics['avg_latency']
            cpu_usage = agent_metrics['cpu_usage']
            
            # Scale up if needed
            if avg_latency > 2.0 or cpu_usage > 80:
                new_replicas = min(current_replicas * 2, 10)
                self.scale_agents(agent_name, new_replicas)
            
            # Scale down if underutilized
            elif avg_latency < 0.5 and cpu_usage < 30:
                new_replicas = max(current_replicas // 2, 1)
                self.scale_agents(agent_name, new_replicas)

Load Balancing

class AgentLoadBalancer:
    def __init__(self, agents):
        self.agents = agents
        self.current_loads = {agent.name: 0 for agent in agents}
    
    async def route_request(self, task):
        """Route request to least loaded agent."""
        # Find least loaded agent
        agent = min(self.agents, key=lambda a: self.current_loads[a.name])
        
        # Update load
        self.current_loads[agent.name] += 1
        
        try:
            # Execute
            result = await agent.execute(task)
            return result
        finally:
            # Decrease load
            self.current_loads[agent.name] -= 1

Real Results

10 Agents in Production:

  • Users: 100K/day
  • Requests: 500K/day
  • Uptime: 99.95%
  • Avg latency: 1.2s
  • Cost: $2,500/day

Performance by Agent:

AgentRequests/DayAvg LatencySuccess RateCost/Day
Customer Support200K0.8s96%$800
Code Review150K1.5s94%$900
Data Analyst80K2.0s92%$400
Content Writer40K1.8s95%$250
Others (6)30K1.0s93%$150

Incidents and Lessons

Incident 1: LLM API Outage

  • Impact: 30min downtime
  • Solution: Circuit breaker + fallback model
  • Lesson: Always have backup

Incident 2: Cost Spike

  • Impact: $5K unexpected bill
  • Solution: Daily budget limits
  • Lesson: Monitor costs in real-time

Incident 3: Memory Leak

  • Impact: Gradual performance degradation
  • Solution: Restart pods daily
  • Lesson: Monitor memory usage

Best Practices

  1. Circuit breakers: Prevent cascading failures
  2. Caching: 40% cost reduction
  3. Monitoring: Prometheus + Grafana
  4. Budgets: Daily spending limits
  5. Fallbacks: Always have plan B
  6. Auto-scaling: Handle traffic spikes
  7. Load balancing: Distribute load
  8. Error handling: Graceful degradation

Lessons Learned

  1. Production is different: Demos ≠ reality
  2. Monitoring critical: Can’t fix what you can’t see
  3. Costs add up: $2.5K/day = $75K/month
  4. Failures happen: Plan for them
  5. Scale gradually: Don’t rush

Conclusion

Production AI agents require robust infrastructure. Monitoring, error handling, and cost control are essential.

Key takeaways:

  1. 99.95% uptime achieved
  2. 500K requests/day handled
  3. $2.5K/day cost (controlled)
  4. Circuit breakers prevent cascading failures
  5. Caching reduces costs by 40%

Build for production. Not just demos.