Golang Context

Context 接口

1
2
3
4
5
6
7
8
9
type Context interface {
    Deadline() (deadline time.Time, ok bool)

    Done() <-chan struct{}

    Err() error

    Value(key interface{}) interface{}
}

context包提供暴露Context接口可以在多个Goroutine共享数据,实现多Goroutine管理机制,Context是协程安全的;

Context接口定义了四个需要实现的方法:

  1. Deadline 方法返回第一个参数是设置的截止时间,到时间Context会自动发起取消请求; 第二个参数是bool值,为false时表示没有设置截止时间,如果要取消需要调用取消函数;
  2. Done 方法返回一个只读 Channel,类型为struct{}, 这个 Channel 会在当前工作完成或者上下文被取消之后关闭, 多次调用 Done 方法会返回同一个 Channel;
  3. Err 方法会返回当前 Context 结束的原因,它只会在 Done 返回的 Channel 被关闭时才会返回非空的值;
    1. 如果当前 Context 被取消就会返回 Canceled 错误;
    2. 如果当前 Context 超时就会返回 DeadlineExceeded 错误;
  4. Value 方法会从 Context 中返回键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,这个功能可以用来传递请求特定的数据;获取Value值时是线程安全的; key 必须为非空,且可比较;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

func Background() Context {
    return background
}

func TODO() Context {
    return todo
}

Go内置2个Context实现,通常用来做顶层的parent context; TODO一般在不知道使用什么Context时使用;

todo 和 background 两者本质上只有名字区别,在按 string 输出的时候会有区别; emptyCtx 是一个不可取消,没有设置截止时间,没有携带任何值的Context;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (e *emptyCtx) String() string {
   switch e {
   case background:
      return "context.Background"
   case todo:
      return "context.TODO"
   }
   return "unknown empty Context"
}

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}

常见的几种Context

  • emptyCtx,所有 ctx 类型的根,用 context.TODO(),或 context.Background() 来生成。
  • valueCtx,主要就是为了在 ctx 中嵌入上下文数据,一个简单的 k 和 v 结构,同一个 ctx 内只支持一对 kv,需要更多的 kv 的话,会形成一棵树形结构。
  • cancelCtx,用来取消程序的执行树,一般用 WithCancel,WithTimeout,WithDeadline 返回的取消函数本质上都是对应了 cancelCtx。
  • timerCtx,在 cancelCtx 上包了一层,支持基于时间的 cancel。

Context的继承衍生

1
2
3
4
5
6
type CancelFunc func()

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

CancelFunc是取消函数,可以取消一个Context,以及这个Context下所有的Context

使用原则

  1. 不要把Context放在结构体中,要以参数的方式传递
  2. 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
  3. 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
  4. Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
  5. Context是线程安全的,可以放心的在多个goroutine中传递
  6. 在子Context被传递到的goroutine中,应该对该子Context的Done(channel)进行监控

Context使用

Context控制多个goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    valueCtx1 := context.WithValue(ctx, "key", "goroutine 1")
    valueCtx2 := context.WithValue(ctx, "key", "goroutine 2")
    valueCtx3 := context.WithValue(ctx, "key", "goroutine 3")
    go watch(valueCtx1)
    go watch(valueCtx2)
    go watch(valueCtx3)

    time.Sleep(10 * time.Second)
    fmt.Println("cancel ", ctx.String())
    cancel()
    time.Sleep(5 * time.Second)
}

func watch(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println(ctx.String() ," done", ctx.Err())
            return
        default:
            fmt.Println(ctx.Value("key"), "wait...")
            time.Sleep(2 * time.Second)
        }
    }
}

Context http请求超时

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
    // 创建一个超时时间为100毫秒的上下文
    ctx := context.Background()
    ctx, _ = context.WithTimeout(ctx, 100*time.Millisecond)
    
    // 创建一个访问Google主页的请求
    req, _ := http.NewRequest(http.MethodGet, "http://google.com", nil)
    // 将超时上下文关联到创建的请求上
    req = req.WithContext(ctx)
    
    // 创建一个HTTP客户端并执行请求
    client := &http.Client{}
    res, err := client.Do(req)
    // 如果请求失败了,记录到STDOUT
    if err != nil {
        fmt.Println("Request failed:", err)
        return
    }
    // 请求成功后打印状态码
    fmt.Println("Response received, status code:", res.StatusCode)
}

Context http服务器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func main() {
    // 创建一个监听8000端口的服务器
    http.ListenAndServe(":8000", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    // 输出到STDOUT展示处理已经开始
    fmt.Fprint(os.Stdout, "processing request\n")
    // 通过select监听多个channel
    select {
    case <-time.After(2 * time.Second):
        // 如果两秒后接受到了一个消息后,意味请求已经处理完成
        // 我们写入"request processed"作为响应
        w.Write([]byte("request processed"))
    case <-ctx.Done():
        // 如果处理完成前取消了,在STDERR中记录请求被取消的消息
        fmt.Fprint(os.Stderr, "request cancelled\n")
    }
    }))
}

Context 超时控制多个goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func worker(ctx context.Context, wg *sync.WaitGroup) error {
    defer wg.Done()

    for {
        select {
        case <-ctx.Done():
            fmt.Println("Timeout ", ctx.Err())
            return ctx.Err()
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(ctx, &wg)
    }

    time.Sleep(time.Second)
    cancel()

    wg.Wait()
}

Context 取消控制多个goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func work1(ctx context.Context) error {
    // 假设这个操作会因为某种原因失败
    // 使用time.Sleep来模拟一个资源密集型操作
    time.Sleep(100 * time.Millisecond)
    return errors.New("failed")
}

func work2(ctx context.Context) {
    select {
    case <-time.After(500 * time.Millisecond):
        fmt.Println("done")
    case <-ctx.Done():
        fmt.Println("halted work2")
    }
}

func main() {
    ctx := context.Background()
    // cancel context
    ctx, cancel := context.WithCancel(ctx)
    // 在不同的goroutine中运行work2
    go func() {
        work2(ctx)
    }()
    go func() {
        work2(ctx)
    }()

    err := work1(ctx)
    // 如果这个操作返回错误,取消所有使用相同Context的操作
    if err != nil {
        //发出取消事件
        cancel()
    }
    time.Sleep(100 * time.Millisecond)
}

Context 控制后台goroutine 生成素数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
    "context"
    "fmt"
)

// 返回生成自然数序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            select {
            case <-ctx.Done(): // 结束生成自然数
                return
            case ch <- i:
            }
        }
    }()
    return ch
}

// 管道过滤器: 删除能被素数整除的数
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                select {
                case <-ctx.Done(): // 结束过滤器
                    return
                case out <- i:
                }
            }
        }
    }()
    return out
}

func main() {
    // 通过 Context 控制后台Goroutine状态
    ctx, cancel := context.WithCancel(context.Background())

    ch := GenerateNatural(ctx) // 自然数序列: 2, 3, 4, ...
    for i := 0; i < 10; i++ {
        prime := <-ch // 新出现的素数
        fmt.Printf("%v: %v\n", i+1, prime)
        ch = PrimeFilter(ctx, ch, prime) // 基于新素数构造的过滤器
    }
    cancel()
}