Supervisor 是一个进程管理工具,在 3.0 版本之后,新增了 Event 的高级特性, 主要用做(进程启动、退出、失败等)事件告警服务

Event 特性是将监听的服务(listener)注册到supervisord中,当supervisord监听到相应事件时,将事件信息推送给监听对应事件的listener。

事件类型

Event 可以设置 27 种事件类型,事件可以被单独监听,也可以一个listener 监听多种事件。具体可以分为如下几类:

  1. 监控进程状态转移事件;
  2. 监控进程状态日志变更事件;
  3. 进程组中进程添加删除事件;
  4. supervisord 进程本身日志变更事件;
  5. supervisord 进程本身状态变更的事件;
  6. 定时触发事件;

Listener 的实现

  • 与supervisord 的交互

由于supervisord 是 listener的父进程,所以交互方式采用最简单的标准输入输出的方式交互。

listener 通过stdin获取事件,通过stdout通知supervisord listener的事件处理结果和当前listener状态。

  • listener 的状态

listener 有三种状态:

  • ACKNOWLEDGED: listener 未就绪的状态。(发送READY之前的状态)
  • READY: 等待事件触发的状态。(发送READY 消息后,未收到消息的状态)
  • BUSY: 事件处理中的状态。(即输出 OK, FAIL 之前处理Event消息时的状态)

消息协议

消息包括supervisord 通知给listener 的事件消息和 listener 通知给supervisord 的状态变更消息。

  • listener 的状态变更消息
    • 状态OK的 “READY\n” 消息
    • 处理成功 “RESULT 2\nOK” 消息
    • 处理失败 “RESULT 4\nFAIL” 消息
  • supervisord 广播的事件消息:
    • 事件消息分为 header 和 payload 两部分。header 中采用kv的方式发送,header 中包含了 payload 的长度。

官网header示例:

ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54

header 含义:

  • serial 为事件的序列号
  • pool 表示listener 的进程池名称(listener支持启动多个)
  • poolserial 表示listener的进程池序列号
  • eventname 事件名称
  • len body 的长度

golang 实现listener

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
  "bufio"
  "os"
  "strconv"
  "strings"
)

const RESP_OK = "RESULT 2\nOK"
const RESP_FAIL = "RESULT 4\nFAIL"

func main() {
  stdin := bufio.NewReader(os.Stdin)
  stdout := bufio.NewWriter(os.Stdout)
  stderr := bufio.NewWriter(os.Stderr)

  for {
    // 发送后等待接收event
    _, _ = stdout.WriteString("READY\n")
    _ = stdout.Flush()
    // 接收header
    line, _, _ := stdin.ReadLine()
    stderr.WriteString("read" + string(line))
    stderr.Flush()

    header, payloadSize := praseHeader(line)

    // 接收payload
    payload := make([]byte, payloadSize)
    stdin.Read(payload)
    stderr.WriteString("read : " + string(payload))
    stderr.Flush()

    result := alarm(header, payload)

    if result { // 发送处理结果
      stdout.WriteString(RESP_OK)
    } else {
      stdout.WriteString(RESP_FAIL)
    }
    stdout.Flush()
  }
}

func praseHeader(data []byte) (header map[string]string,
  payloadSize int) {
  pairs := strings.Split(string(data), " ")
  header = make(map[string]string, len(pairs))

  for _, pair := range pairs {
    token := strings.Split(pair, ":")
    header[token[0]] = token[1]
  }

  payloadSize, _ = strconv.Atoi(header["len"])
  return header, payloadSize
}

// 这里设置报警即可
func alarm(header map[string]string, payload []byte) bool {
  // send mail
  return true
}

在supervisor 中添加配置,监听服务:

1
2
3
4
[eventlistener:listener]
command=/root/listener
events=PROCESS_STATE,TICK_5
stderr_logfile=/var/log/tmp/listener_stderr.log

监听了服务的处理状态,以及每5s的心跳消息

启动服务:

supervisorctl start listener

参考