sync.ErrGroup

sync.WaitGroup可以阻塞等待多个goroutine执行完成,但是WaitGroup无法知道goroutine出错的原因,sync.ErrGroupsync.WaitGroup的基础上增加了错误的传递;当有返回错误时可以取消整个goroutine集合;

sync.WaitGroup控制多个并发任务例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
var wg sync.WaitGroup
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for _, url := range urls {
    // Increment the WaitGroup counter.
    wg.Add(1)
    // Launch a goroutine to fetch the URL.
    go func(url string) {
        // Decrement the counter when the goroutine completes.
        defer wg.Done()
        // Fetch the URL.
        http.Get(url)
    }(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()

sync.ErrGroup控制多个并发任务例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
g := new(errgroup.Group)
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}
for _, url := range urls {
    // Launch a goroutine to fetch the URL.
    url := url // https://golang.org/doc/faq#closures_and_goroutines
    g.Go(func() error {
        // Fetch the URL.
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

go.Go(func() error)函数传入一个匿名函数,能通过g.Wait()捕获错误信息

gogrep.go递归搜索目录中的go文件,添加超时设置功能,超时取消所有搜索的goroutine任务

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
package main

import (
    "bytes"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "time"

    "golang.org/x/net/context"
    "golang.org/x/sync/errgroup"
)

func main() {
    duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds")
    flag.Usage = func() {
        fmt.Printf("%s by Brian Ketelsen\n", os.Args[0])
        fmt.Println("Usage:")
        fmt.Printf("    gogrep [flags] path pattern \n")
        fmt.Println("Flags:")
        flag.PrintDefaults()
    }
    flag.Parse()
    if flag.NArg() != 2 {
        flag.Usage()
        os.Exit(-1)
    }
    path := flag.Arg(0)
    pattern := flag.Arg(1)
    ctx, _ := context.WithTimeout(context.Background(), *duration)
    m, err := search(ctx, path, pattern)
    if err != nil {
        log.Fatal(err)
    }
    for _, name := range m {
        fmt.Println(name)
    }
    fmt.Println(len(m), "hits")
}

func search(ctx context.Context, root string, pattern string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    paths := make(chan string, 100)
    // get all the paths

    g.Go(func() error {
        defer close(paths)

        return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go") {
                return nil
            }

            select {
            case paths <- path:
            case <-ctx.Done():
                return ctx.Err()
            }
            return nil
        })
    })

    c := make(chan string, 100)
    for path := range paths {
        p := path
        g.Go(func() error {
            data, err := ioutil.ReadFile(p)
            if err != nil {
                return err
            }
            if !bytes.Contains(data, []byte(pattern)) {
                return nil
            }
            select {
            case c <- p:
            case <-ctx.Done():
                return ctx.Err()
            }
            return nil
        })
    }
    go func() {
        g.Wait()
        close(c)
    }()

    var m []string
    for r := range c {
        m = append(m, r)
    }
    return m, g.Wait()
}

使用帮助

gogrep by Brian Ketelsen
Flags:
-timeout duration
    timeout in milliseconds (default 500ms)
Usage:
gogrep [flags] path pattern

运行结果

gogrep -timeout 1000ms . fmt
gogrep.go
1 hits

参考