Our Go service was processing 1000 requests/sec, but CPU usage was only 20%. We weren’t using Go’s concurrency properly.

I refactored using worker pools and pipelines. Now we process 5000 requests/sec with 80% CPU usage. 5x throughput improvement.

Table of Contents

The Problem

Sequential processing:

func ProcessOrders(orders []Order) {
    for _, order := range orders {
        ProcessOrder(order)  // Takes 100ms
    }
}

1000 orders = 100 seconds!

Basic Goroutines

First attempt:

func ProcessOrders(orders []Order) {
    for _, order := range orders {
        go ProcessOrder(order)
    }
}

Problems:

  • No limit on goroutines (could spawn 10,000+)
  • No error handling
  • No way to wait for completion
  • Memory explosion

Worker Pool Pattern

Fixed number of workers:

func ProcessOrders(orders []Order, numWorkers int) {
    jobs := make(chan Order, len(orders))
    results := make(chan Result, len(orders))
    
    // Start workers
    for w := 0; w < numWorkers; w++ {
        go worker(jobs, results)
    }
    
    // Send jobs
    for _, order := range orders {
        jobs <- order
    }
    close(jobs)
    
    // Collect results
    for i := 0; i < len(orders); i++ {
        result := <-results
        if result.Error != nil {
            log.Printf("Error: %v", result.Error)
        }
    }
}

func worker(jobs <-chan Order, results chan<- Result) {
    for order := range jobs {
        result := ProcessOrder(order)
        results <- result
    }
}

Now we control concurrency!

WaitGroup for Synchronization

Better approach:

func ProcessOrders(orders []Order, numWorkers int) error {
    jobs := make(chan Order, 100)
    var wg sync.WaitGroup
    
    // Start workers
    for w := 0; w < numWorkers; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for order := range jobs {
                if err := ProcessOrder(order); err != nil {
                    log.Printf("Error processing order %d: %v", order.ID, err)
                }
            }
        }()
    }
    
    // Send jobs
    for _, order := range orders {
        jobs <- order
    }
    close(jobs)
    
    // Wait for completion
    wg.Wait()
    return nil
}

Error Handling

Collect errors:

func ProcessOrders(orders []Order, numWorkers int) []error {
    jobs := make(chan Order, 100)
    errors := make(chan error, len(orders))
    var wg sync.WaitGroup
    
    // Start workers
    for w := 0; w < numWorkers; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for order := range jobs {
                if err := ProcessOrder(order); err != nil {
                    errors <- fmt.Errorf("order %d: %w", order.ID, err)
                }
            }
        }()
    }
    
    // Send jobs
    for _, order := range orders {
        jobs <- order
    }
    close(jobs)
    
    // Wait and close errors channel
    go func() {
        wg.Wait()
        close(errors)
    }()
    
    // Collect errors
    var errs []error
    for err := range errors {
        errs = append(errs, err)
    }
    
    return errs
}

Pipeline Pattern

Multi-stage processing:

// Stage 1: Fetch orders
func fetchOrders(ctx context.Context) <-chan Order {
    out := make(chan Order)
    go func() {
        defer close(out)
        orders := GetOrdersFromDB()
        for _, order := range orders {
            select {
            case out <- order:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// Stage 2: Validate orders
func validateOrders(ctx context.Context, in <-chan Order) <-chan Order {
    out := make(chan Order)
    go func() {
        defer close(out)
        for order := range in {
            if ValidateOrder(order) {
                select {
                case out <- order:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

// Stage 3: Process orders
func processOrders(ctx context.Context, in <-chan Order) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for order := range in {
            result := ProcessOrder(order)
            select {
            case out <- result:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// Pipeline
func OrderPipeline(ctx context.Context) {
    orders := fetchOrders(ctx)
    validated := validateOrders(ctx, orders)
    results := processOrders(ctx, validated)
    
    for result := range results {
        log.Printf("Processed order %d", result.OrderID)
    }
}

Fan-Out, Fan-In

Parallel processing, then merge:

func fanOut(ctx context.Context, in <-chan Order, numWorkers int) []<-chan Result {
    channels := make([]<-chan Result, numWorkers)
    
    for i := 0; i < numWorkers; i++ {
        channels[i] = processOrders(ctx, in)
    }
    
    return channels
}

func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
    out := make(chan Result)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan Result) {
            defer wg.Done()
            for result := range c {
                select {
                case out <- result:
                case <-ctx.Done():
                    return
                }
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// Usage
func ProcessWithFanOut(ctx context.Context) {
    orders := fetchOrders(ctx)
    workers := fanOut(ctx, orders, 10)
    results := fanIn(ctx, workers...)
    
    for result := range results {
        log.Printf("Result: %v", result)
    }
}

Context for Cancellation

Graceful shutdown:

func ProcessOrders(ctx context.Context, orders []Order) error {
    jobs := make(chan Order, 100)
    var wg sync.WaitGroup
    
    // Start workers
    for w := 0; w < 10; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case order, ok := <-jobs:
                    if !ok {
                        return
                    }
                    ProcessOrder(order)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }
    
    // Send jobs
    for _, order := range orders {
        select {
        case jobs <- order:
        case <-ctx.Done():
            close(jobs)
            wg.Wait()
            return ctx.Err()
        }
    }
    close(jobs)
    
    wg.Wait()
    return nil
}

// Usage
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := ProcessOrders(ctx, orders); err != nil {
    log.Printf("Processing cancelled: %v", err)
}

Rate Limiting

Control request rate:

func ProcessWithRateLimit(orders []Order, rps int) {
    limiter := time.NewTicker(time.Second / time.Duration(rps))
    defer limiter.Stop()
    
    for _, order := range orders {
        <-limiter.C
        go ProcessOrder(order)
    }
}

Or use golang.org/x/time/rate:

import "golang.org/x/time/rate"

func ProcessWithRateLimit(ctx context.Context, orders []Order, rps int) error {
    limiter := rate.NewLimiter(rate.Limit(rps), rps)
    
    for _, order := range orders {
        if err := limiter.Wait(ctx); err != nil {
            return err
        }
        go ProcessOrder(order)
    }
    
    return nil
}

Semaphore Pattern

Limit concurrent operations:

type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() {
    s <- struct{}{}
}

func (s Semaphore) Release() {
    <-s
}

func ProcessOrders(orders []Order, maxConcurrent int) {
    sem := NewSemaphore(maxConcurrent)
    var wg sync.WaitGroup
    
    for _, order := range orders {
        wg.Add(1)
        go func(o Order) {
            defer wg.Done()
            sem.Acquire()
            defer sem.Release()
            ProcessOrder(o)
        }(order)
    }
    
    wg.Wait()
}

Common Pitfalls

1. Goroutine leaks:

// BAD - goroutine never exits
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch  // Blocks forever if nothing sent
        fmt.Println(val)
    }()
}

// GOOD - use context
func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            return
        }
    }()
}

2. Closing closed channel:

// BAD
close(ch)
close(ch)  // Panic!

// GOOD - use sync.Once
var once sync.Once
once.Do(func() {
    close(ch)
})

3. Race conditions:

// BAD
var counter int
for i := 0; i < 1000; i++ {
    go func() {
        counter++  // Race!
    }()
}

// GOOD - use mutex
var (
    counter int
    mu      sync.Mutex
)
for i := 0; i < 1000; i++ {
    go func() {
        mu.Lock()
        counter++
        mu.Unlock()
    }()
}

// BETTER - use atomic
var counter int64
for i := 0; i < 1000; i++ {
    go func() {
        atomic.AddInt64(&counter, 1)
    }()
}

Real-World Example

Our order processing service:

type OrderProcessor struct {
    numWorkers int
    db         *sql.DB
    cache      *redis.Client
}

func (p *OrderProcessor) Process(ctx context.Context, orders []Order) error {
    jobs := make(chan Order, 100)
    errors := make(chan error, len(orders))
    var wg sync.WaitGroup
    
    // Start workers
    for w := 0; w < p.numWorkers; w++ {
        wg.Add(1)
        go p.worker(ctx, jobs, errors, &wg)
    }
    
    // Send jobs
    go func() {
        for _, order := range orders {
            select {
            case jobs <- order:
            case <-ctx.Done():
                close(jobs)
                return
            }
        }
        close(jobs)
    }()
    
    // Wait for completion
    go func() {
        wg.Wait()
        close(errors)
    }()
    
    // Collect errors
    var errs []error
    for err := range errors {
        errs = append(errs, err)
    }
    
    if len(errs) > 0 {
        return fmt.Errorf("processed with %d errors", len(errs))
    }
    
    return nil
}

func (p *OrderProcessor) worker(ctx context.Context, jobs <-chan Order, errors chan<- error, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for {
        select {
        case order, ok := <-jobs:
            if !ok {
                return
            }
            if err := p.processOrder(ctx, order); err != nil {
                errors <- err
            }
        case <-ctx.Done():
            return
        }
    }
}

func (p *OrderProcessor) processOrder(ctx context.Context, order Order) error {
    // Check cache
    if cached, err := p.cache.Get(ctx, fmt.Sprintf("order:%d", order.ID)).Result(); err == nil {
        return nil
    }
    
    // Process order
    if err := ValidateOrder(order); err != nil {
        return err
    }
    
    if err := SaveOrder(p.db, order); err != nil {
        return err
    }
    
    // Cache result
    p.cache.Set(ctx, fmt.Sprintf("order:%d", order.ID), "processed", time.Hour)
    
    return nil
}

Results

Before:

  • Sequential processing
  • 1000 requests/sec
  • 20% CPU usage
  • 100ms per order

After:

  • Worker pool (50 workers)
  • 5000 requests/sec
  • 80% CPU usage
  • 20ms per order (parallel)

Lessons Learned

  1. Use worker pools - Control concurrency
  2. Always use context - Enable cancellation
  3. Handle errors properly - Don’t ignore them
  4. Avoid goroutine leaks - Always have exit condition
  5. Test with race detector - go test -race

Conclusion

Go’s concurrency is powerful but requires careful design. Worker pools and pipelines are essential patterns.

Key takeaways:

  1. Worker pools for controlled concurrency
  2. Pipelines for multi-stage processing
  3. Context for cancellation
  4. WaitGroup for synchronization
  5. Avoid common pitfalls (leaks, races)

Master these patterns and your Go services will scale beautifully.