Idiomatic variable-size worker pool in Go

Hut8 picture Hut8 · May 23, 2014 · Viewed 7.2k times · Source

I'm trying to implement a pool of workers in Go. The go-wiki (and Effective Go in the Channels section) feature excellent examples of bounding resource use. Simply make a channel with a buffer that's as large as the worker pool. Then fill that channel with workers, and send them back into the channel when they're done. Receiving from the channel blocks until a worker is available. So the channel and a loop is the entire implementation -- very cool!

Alternatively one could block on sending into the channel, but same idea.

My question is about changing the size of the worker pool while it's running. I don't believe there's a way to change the size of a channel. I have some ideas, but most of them seem way too complicated. This page actually implements a semaphore using a channel and empty structs in much the same way, but it has the same problem (these things come up all the time while Googling for "golang semaphore".

Answer

tux21b picture tux21b · May 23, 2014

I would do it the other way round. Instead of spawning many goroutines (which still require a considerable amount of memory) and use a channel to block them, I would model the workers as goroutines and use a channel to distribute the work. Something like this:

package main

import (
    "fmt"
    "sync"
)

type Task string

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                return
            }
            fmt.Println("processing task", task)
        case <-quit:
            return
        }
    }
}

func main() {
    tasks := make(chan Task, 128)
    quit := make(chan bool)
    var wg sync.WaitGroup

    // spawn 5 workers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute some tasks
    tasks <- Task("foo")
    tasks <- Task("bar")

    // remove two workers
    quit <- true
    quit <- true

    // add three more workers
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute more tasks
    for i := 0; i < 20; i++ {
        tasks <- Task(fmt.Sprintf("additional_%d", i+1))
    }

    // end of tasks. the workers should quit afterwards
    close(tasks)
    // use "close(quit)", if you do not want to wait for the remaining tasks

    // wait for all workers to shut down properly
    wg.Wait()
}

It might be a good idea to create a separate WorkerPool type with some convenient methods. Also, instead of type Task string it is quite common to use a struct that also contains a done channel that is used to signal that the task had been executed successfully.

Edit: I've played around a bit more and came up with the following: http://play.golang.org/p/VlEirPRk8V. It's basically the same example, with a nicer API.