Advanced Go Concurrency Patterns: Worker Pools and Pipelines
Our Go service was processing 1000 requests/sec, but CPU usage was only 20%. We weren’t using Go’s concurrency properly.
I refactored using worker pools and pipelines. Now we process 5000 requests/sec with 80% CPU usage. 5x throughput improvement.
Table of Contents
The Problem
Sequential processing:
func ProcessOrders(orders []Order) {
for _, order := range orders {
ProcessOrder(order) // Takes 100ms
}
}
1000 orders = 100 seconds!
Basic Goroutines
First attempt:
func ProcessOrders(orders []Order) {
for _, order := range orders {
go ProcessOrder(order)
}
}
Problems:
- No limit on goroutines (could spawn 10,000+)
- No error handling
- No way to wait for completion
- Memory explosion
Worker Pool Pattern
Fixed number of workers:
func ProcessOrders(orders []Order, numWorkers int) {
jobs := make(chan Order, len(orders))
results := make(chan Result, len(orders))
// Start workers
for w := 0; w < numWorkers; w++ {
go worker(jobs, results)
}
// Send jobs
for _, order := range orders {
jobs <- order
}
close(jobs)
// Collect results
for i := 0; i < len(orders); i++ {
result := <-results
if result.Error != nil {
log.Printf("Error: %v", result.Error)
}
}
}
func worker(jobs <-chan Order, results chan<- Result) {
for order := range jobs {
result := ProcessOrder(order)
results <- result
}
}
Now we control concurrency!
WaitGroup for Synchronization
Better approach:
func ProcessOrders(orders []Order, numWorkers int) error {
jobs := make(chan Order, 100)
var wg sync.WaitGroup
// Start workers
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for order := range jobs {
if err := ProcessOrder(order); err != nil {
log.Printf("Error processing order %d: %v", order.ID, err)
}
}
}()
}
// Send jobs
for _, order := range orders {
jobs <- order
}
close(jobs)
// Wait for completion
wg.Wait()
return nil
}
Error Handling
Collect errors:
func ProcessOrders(orders []Order, numWorkers int) []error {
jobs := make(chan Order, 100)
errors := make(chan error, len(orders))
var wg sync.WaitGroup
// Start workers
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for order := range jobs {
if err := ProcessOrder(order); err != nil {
errors <- fmt.Errorf("order %d: %w", order.ID, err)
}
}
}()
}
// Send jobs
for _, order := range orders {
jobs <- order
}
close(jobs)
// Wait and close errors channel
go func() {
wg.Wait()
close(errors)
}()
// Collect errors
var errs []error
for err := range errors {
errs = append(errs, err)
}
return errs
}
Pipeline Pattern
Multi-stage processing:
// Stage 1: Fetch orders
func fetchOrders(ctx context.Context) <-chan Order {
out := make(chan Order)
go func() {
defer close(out)
orders := GetOrdersFromDB()
for _, order := range orders {
select {
case out <- order:
case <-ctx.Done():
return
}
}
}()
return out
}
// Stage 2: Validate orders
func validateOrders(ctx context.Context, in <-chan Order) <-chan Order {
out := make(chan Order)
go func() {
defer close(out)
for order := range in {
if ValidateOrder(order) {
select {
case out <- order:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Stage 3: Process orders
func processOrders(ctx context.Context, in <-chan Order) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for order := range in {
result := ProcessOrder(order)
select {
case out <- result:
case <-ctx.Done():
return
}
}
}()
return out
}
// Pipeline
func OrderPipeline(ctx context.Context) {
orders := fetchOrders(ctx)
validated := validateOrders(ctx, orders)
results := processOrders(ctx, validated)
for result := range results {
log.Printf("Processed order %d", result.OrderID)
}
}
Fan-Out, Fan-In
Parallel processing, then merge:
func fanOut(ctx context.Context, in <-chan Order, numWorkers int) []<-chan Result {
channels := make([]<-chan Result, numWorkers)
for i := 0; i < numWorkers; i++ {
channels[i] = processOrders(ctx, in)
}
return channels
}
func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan Result) {
defer wg.Done()
for result := range c {
select {
case out <- result:
case <-ctx.Done():
return
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Usage
func ProcessWithFanOut(ctx context.Context) {
orders := fetchOrders(ctx)
workers := fanOut(ctx, orders, 10)
results := fanIn(ctx, workers...)
for result := range results {
log.Printf("Result: %v", result)
}
}
Context for Cancellation
Graceful shutdown:
func ProcessOrders(ctx context.Context, orders []Order) error {
jobs := make(chan Order, 100)
var wg sync.WaitGroup
// Start workers
for w := 0; w < 10; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case order, ok := <-jobs:
if !ok {
return
}
ProcessOrder(order)
case <-ctx.Done():
return
}
}
}()
}
// Send jobs
for _, order := range orders {
select {
case jobs <- order:
case <-ctx.Done():
close(jobs)
wg.Wait()
return ctx.Err()
}
}
close(jobs)
wg.Wait()
return nil
}
// Usage
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := ProcessOrders(ctx, orders); err != nil {
log.Printf("Processing cancelled: %v", err)
}
Rate Limiting
Control request rate:
func ProcessWithRateLimit(orders []Order, rps int) {
limiter := time.NewTicker(time.Second / time.Duration(rps))
defer limiter.Stop()
for _, order := range orders {
<-limiter.C
go ProcessOrder(order)
}
}
Or use golang.org/x/time/rate:
import "golang.org/x/time/rate"
func ProcessWithRateLimit(ctx context.Context, orders []Order, rps int) error {
limiter := rate.NewLimiter(rate.Limit(rps), rps)
for _, order := range orders {
if err := limiter.Wait(ctx); err != nil {
return err
}
go ProcessOrder(order)
}
return nil
}
Semaphore Pattern
Limit concurrent operations:
type Semaphore chan struct{}
func NewSemaphore(max int) Semaphore {
return make(Semaphore, max)
}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) Release() {
<-s
}
func ProcessOrders(orders []Order, maxConcurrent int) {
sem := NewSemaphore(maxConcurrent)
var wg sync.WaitGroup
for _, order := range orders {
wg.Add(1)
go func(o Order) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
ProcessOrder(o)
}(order)
}
wg.Wait()
}
Common Pitfalls
1. Goroutine leaks:
// BAD - goroutine never exits
func leak() {
ch := make(chan int)
go func() {
val := <-ch // Blocks forever if nothing sent
fmt.Println(val)
}()
}
// GOOD - use context
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
return
}
}()
}
2. Closing closed channel:
// BAD
close(ch)
close(ch) // Panic!
// GOOD - use sync.Once
var once sync.Once
once.Do(func() {
close(ch)
})
3. Race conditions:
// BAD
var counter int
for i := 0; i < 1000; i++ {
go func() {
counter++ // Race!
}()
}
// GOOD - use mutex
var (
counter int
mu sync.Mutex
)
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
counter++
mu.Unlock()
}()
}
// BETTER - use atomic
var counter int64
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&counter, 1)
}()
}
Real-World Example
Our order processing service:
type OrderProcessor struct {
numWorkers int
db *sql.DB
cache *redis.Client
}
func (p *OrderProcessor) Process(ctx context.Context, orders []Order) error {
jobs := make(chan Order, 100)
errors := make(chan error, len(orders))
var wg sync.WaitGroup
// Start workers
for w := 0; w < p.numWorkers; w++ {
wg.Add(1)
go p.worker(ctx, jobs, errors, &wg)
}
// Send jobs
go func() {
for _, order := range orders {
select {
case jobs <- order:
case <-ctx.Done():
close(jobs)
return
}
}
close(jobs)
}()
// Wait for completion
go func() {
wg.Wait()
close(errors)
}()
// Collect errors
var errs []error
for err := range errors {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("processed with %d errors", len(errs))
}
return nil
}
func (p *OrderProcessor) worker(ctx context.Context, jobs <-chan Order, errors chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case order, ok := <-jobs:
if !ok {
return
}
if err := p.processOrder(ctx, order); err != nil {
errors <- err
}
case <-ctx.Done():
return
}
}
}
func (p *OrderProcessor) processOrder(ctx context.Context, order Order) error {
// Check cache
if cached, err := p.cache.Get(ctx, fmt.Sprintf("order:%d", order.ID)).Result(); err == nil {
return nil
}
// Process order
if err := ValidateOrder(order); err != nil {
return err
}
if err := SaveOrder(p.db, order); err != nil {
return err
}
// Cache result
p.cache.Set(ctx, fmt.Sprintf("order:%d", order.ID), "processed", time.Hour)
return nil
}
Results
Before:
- Sequential processing
- 1000 requests/sec
- 20% CPU usage
- 100ms per order
After:
- Worker pool (50 workers)
- 5000 requests/sec
- 80% CPU usage
- 20ms per order (parallel)
Lessons Learned
- Use worker pools - Control concurrency
- Always use context - Enable cancellation
- Handle errors properly - Don’t ignore them
- Avoid goroutine leaks - Always have exit condition
- Test with race detector -
go test -race
Conclusion
Go’s concurrency is powerful but requires careful design. Worker pools and pipelines are essential patterns.
Key takeaways:
- Worker pools for controlled concurrency
- Pipelines for multi-stage processing
- Context for cancellation
- WaitGroup for synchronization
- Avoid common pitfalls (leaks, races)
Master these patterns and your Go services will scale beautifully.