Sling Academy
Home/Golang/Dynamic Worker Pool Implementation in Go

Dynamic Worker Pool Implementation in Go

Last updated: November 27, 2024

Implementing a dynamic worker pool allows you to efficiently manage task execution with adjustable concurrency. This guide walks you through building a worker pool from scratch in Go.

Understanding the Worker Pool

A worker pool consists of a fixed or variable number of goroutines (workers) that process jobs from a queue. It helps in controlling the number of concurrent workers to balance load and resource utilization.

Core Components of a Dynamic Worker Pool

  • Workers: Goroutines that perform the given tasks.
  • Job Queue: A buffered channel to queue tasks for workers.
  • Dispatcher: Manages worker creation and assignment of tasks to workers.

Step-by-Step Implementation

1. Define Job and Worker Structures

Start by defining a structure for the job and the worker.


type Job struct {
    ID       int
    Name     string
    Payload  any
}

type Worker struct {
    ID        int
    JobQueue  chan Job
    QuitChan  chan bool
}

2. Create a New Worker

Create a function to initialize a new worker.


func NewWorker(id int) Worker {
    return Worker{
        ID:        id,
        JobQueue:  make(chan Job),
        QuitChan:  make(chan bool),
    }
}

3. Implement Worker Start Method

The start method listens for jobs on the job queue and processes them.


func (w Worker) Start() {
    go func() {
        for {
            select {
            case job := <-w.JobQueue:
                // Process the job
                fmt.Printf("Worker %d: Processing job %d\n", w.ID, job.ID)
            case <-w.QuitChan:
                // Receive stop signal
                fmt.Printf("Worker %d stopping\n", w.ID)
                return
            }
        }
    }()
}

4. Create the Dispatcher

The dispatcher manages a pool of workers:


type Dispatcher struct {
    WorkerPool chan chan Job
    maxWorkers int
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers}
}

5. Launch the Workers

Create and dispatch the workers.


func (d *Dispatcher) Run() {
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(i + 1)
        worker.Start()
    }
}

6. Scheduling Jobs to Workers

The dispatcher takes incoming jobs and assigns them to available workers.


func (d *Dispatcher) Dispatch(job Job) {
    go func() {
        // Assign job to an available worker's queue
        workerChannel := <-d.WorkerPool
        workerChannel <- job
    }()
}

Conclusion

By following this guide, you've now got a basic dynamic worker pool in Go, capable of spawning workers as needed to manage and distribute jobs efficiently.

Next Article: Using `sync.Cond` for Conditional Synchronization in Go

Previous Article: Building a Thread-Safe Counter with Go

Series: Concurrency and Synchronization in Go

Golang

Related Articles

You May Also Like

  • How to remove HTML tags in a string in Go
  • How to remove special characters in a string in Go
  • How to remove consecutive whitespace in a string in Go
  • How to count words and characters in a string in Go
  • Relative imports in Go: Tutorial & Examples
  • How to run Python code with Go
  • How to generate slug from title in Go
  • How to create an XML sitemap in Go
  • How to redirect in Go (301, 302, etc)
  • Using Go with MongoDB: CRUD example
  • Auto deploy Go apps with CI/ CD and GitHub Actions
  • Fixing Go error: method redeclared with different receiver type
  • Fixing Go error: copy argument must have slice type
  • Fixing Go error: attempted to use nil slice
  • Fixing Go error: assignment to constant variable
  • Fixing Go error: cannot compare X (type Y) with Z (type W)
  • Fixing Go error: method has pointer receiver, not called with pointer
  • Fixing Go error: assignment mismatch: X variables but Y values
  • Fixing Go error: array index must be non-negative integer constant