idiomatic goroutine termination and error handling

gerad picture gerad · Nov 25, 2016 · Viewed 30.1k times · Source

I have a simple concurrency use case in go, and it's driving me nuts I can't figure out an elegant solution. Any help would be appreciated.

I want to write a method fetchAll that queries an unspecified number of resources from remote servers in parallel. If any of the fetches fails, I want to return that first error immediately.

My initial, naive implementation, leaks goroutines:

package main

import (
  "fmt"
  "math/rand"
  "sync"
  "time"
)

func fetchAll() error {
  wg := sync.WaitGroup{}
  errs := make(chan error)
  leaks := make(map[int]struct{})
  defer fmt.Println("these goroutines leaked:", leaks)

  // run all the http requests in parallel
  for i := 0; i < 4; i++ {
    leaks[i] = struct{}{}
    wg.Add(1)
    go func(i int) {
      defer wg.Done()
      defer delete(leaks, i)

      // pretend this does an http request and returns an error
      time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
      errs <- fmt.Errorf("goroutine %d's error returned", i)
    }(i)
  }

  // wait until all the fetches are done and close the error
  // channel so the loop below terminates
  go func() {
    wg.Wait()
    close(errs)
  }()

  // return the first error
  for err := range errs {
    if err != nil {
      return err
    }
  }

  return nil
}

func main() {
  fmt.Println(fetchAll())
}

Playground: https://play.golang.org/p/Be93J514R5

I know from reading https://blog.golang.org/pipelines that I can create a signal channel to cleanup the other threads. Alternatively, I could probably use context to accomplish it. But it seems like such a simple use case should have a simpler solution that I'm missing.

Answer

joth picture joth · Mar 12, 2019

Using Error Group makes this even simpler. This automatically waits for all the supplied Go Routines to complete successfully, or cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller).

package main

import (
        "context"
        "fmt"
        "math/rand"
        "time"

        "golang.org/x/sync/errgroup"
)

func fetchAll(ctx context.Context) error {
        errs, ctx := errgroup.WithContext(ctx)

        // run all the http requests in parallel
        for i := 0; i < 4; i++ {
                errs.Go(func() error {
                        // pretend this does an http request and returns an error                                                  
                        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)                                               
                        return fmt.Errorf("error in go routine, bailing")                                                      
                })
        }

        // Wait for completion and return the first error (if any)                                                                 
        return errs.Wait()
}

func main() {
        fmt.Println(fetchAll(context.Background()))
}