Golang实现一个工作池处理并发任务

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main

import (
    "log"
    "time"
)

// Worker 工作者
type Worker struct {
    dataCh chan interface{} // worker channel
    stopCh chan struct{}    // stop channel
}

// NewWorker 新建一个工作者
func NewWorker(lenght int) *Worker {
    return &Worker{
        dataCh: make(chan interface{}, lenght),
        stopCh: make(chan struct{}),
    }
}

// Run 运行一个工作任务
func (w *Worker) Run() {
    for {
        select {
        case msg := <-w.dataCh:
            w.handler(msg)
        case <-w.stopCh:
            return
        }
    }
}

func (w *Worker) stop() {
    select {
    case <-w.stopCh:
        return
    default:
    }
    close(w.stopCh)
}

func (w *Worker) handler(message interface{}) {
    switch msg := message.(type) {
    case stopEvent: // 停止工作任务
        log.Println("worker exit")
        w.stop()
    default:
        //TODO 处理工作任务
        log.Printf("unkown msg %#v", msg)
    }
}

type stopEvent int

// Dispatcher 工作调度器
type Dispatcher struct {
    maxWorkers   int                   // 最大worker数量
    workerLength int                   // worker缓冲长度
    queue        chan interface{}      // 任务调度队列
    workerPool   chan chan interface{} // worker channel pool
    stopCh       chan struct{}         // stop channel
}

// NewDispatcher 创建一个调度器
func NewDispatcher(maxQueue, maxWorkers, workerLength int) *Dispatcher {
    pool := make(chan chan interface{}, maxWorkers) // 创建最大数量
    return &Dispatcher{
        workerPool:   pool,
        maxWorkers:   maxWorkers,
        workerLength: workerLength,
        stopCh:       make(chan struct{}),
        queue:        make(chan interface{}, maxQueue),
    }
}

func (d *Dispatcher) spawnWorker() {
    worker := NewWorker(d.workerLength)
    go worker.Run()
    d.workerPool <- worker.dataCh
}

// Run 运行调度器
func (d *Dispatcher) Run() {
    for i := 0; i < d.maxWorkers; i++ {
        d.spawnWorker()
    }
    go d.dispatch()
}

func (d *Dispatcher) stop() {
    select {
    case <-d.stopCh:
        return
    default:
    }
    close(d.stopCh)
}

// 任务分派器
func (d *Dispatcher) dispatch() {
    for {
        select {
        case msg := <-d.queue:
            d.handler(msg)
        case <-d.stopCh:
            return
        }
    }
}

func (d *Dispatcher) handler(msg interface{}) {
    switch msg.(type) {
    case stopEvent: // 停止分派任务
        d.stop()
        log.Println("dispatcher closed")
        return
    }
    for workerCh := range d.workerPool {
        if (len(workerCh) + 1) == cap(workerCh) {
            workerCh <- stopEvent(1)
            continue
        }
        workerCh <- msg
        d.workerPool <- workerCh
        break
    }
    if len(d.workerPool) < d.maxWorkers {
        d.spawnWorker()
    }
}

func main() {
    defaultDispatch := NewDispatcher(4, 5, 100)
    defaultDispatch.Run()
    for i := 0; i < 100; i++ {
        select {
        case <-defaultDispatch.stopCh:
            return
        default:
        }
        defaultDispatch.queue <- i
    }
    time.Sleep(5 * time.Second)
    //defaultDispatch.queue <- stopEvent(1)
}