NATS是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的Publish/Subscribe模型,使用Golang语言开发。

What is NATS

Software applications and services need to exchange data. NATS is an infrastructure that allows such

data exchange, segmented in the form of messages. We call this a “message oriented middleware”.

With NATS, application developers can:

  • Effortlessly build distributed and scalable client-server applications.

  • Store and distribute data in realtime in a general manner. This can flexibly be achieved across various environments, languages, cloud providers and on-premises systems.

消息传递模型

特性

  • 纯净的pub-sub (Pure pub-sub)
  • 集群模式的server (Cluster mode server)
  • 订阅者的自动裁剪 (Auto-pruning of subscribers)
  • 基于文本的协议 (Text-based protocol)
  • 多种服务质量 (Multiple qualities of service - QoS)
    • 最多一次投递
    • 至少一次投递
  • 持久
  • 缓存

适用场景

  • 高吞吐量的消息分散 —— 少数的生产者需要将数据发送给很多的消费者。
  • 寻址和发现 —— 将数据发送给特定的应用实例,设备或者用户,也可用于发现并连接到基础架构中的实例,设备或用户。
  • 命令和控制(控制面板)—— 向程序或设备发送指令,并从程序/设备中接收状态,如SCADA,卫星遥感,物联网等。
  • 负载均衡 —— 主要应用于程序会生成大量的请求,且可动态伸缩程序实例。
  • N路可扩展性 —— 通信基础架构能够充分利用go的高效并发/调度机制,以增强水平和垂直的扩展性。
  • 位置透明 —— 程序在各个地理位置上分布者大量实例,且你无法了解到程序之间的端点配置详情,及他们所生产或消费的数据。
  • 容错

启动

下载最新版本Release v2.9.22

默认启动

1
2
3
4
5
6
7
8
.\nats-server.exe
[11716] 2023/09/07 11:46:54.035391 [INF] Starting nats-server
[11716] 2023/09/07 11:46:54.048015 [INF]   Version:  2.9.22
[11716] 2023/09/07 11:46:54.048015 [INF]   Git:      [528ee14]
[11716] 2023/09/07 11:46:54.048015 [INF]   Name:     NBZIZO2JTPMZ6EW43LNAG62HUO27XATM7RHX7DHUAEB3XBMTONZGMKWJ
[11716] 2023/09/07 11:46:54.048015 [INF]   ID:       NBZIZO2JTPMZ6EW43LNAG62HUO27XATM7RHX7DHUAEB3XBMTONZGMKWJ
[11716] 2023/09/07 11:46:54.049079 [INF] Listening for client connections on 0.0.0.0:4222
[11716] 2023/09/07 11:46:54.075664 [INF] Server is ready

JetStream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
cat js.conf
// 启用jetstream,空模块将启用并使用默认值 的jetstream
jetstream {
    // jetstream数据存放位置:/data/nats-server/jetstream
    store_dir: "./data/nats-server"
    // 1GB
    max_memory_store: 1073741824
    // 10GB
    max_file_store: 10737418240
}

.\nats-server.exe -c .\js.conf
[22500] 2023/09/07 14:52:39.716147 [INF] Starting nats-server
[22500] 2023/09/07 14:52:39.729121 [INF]   Version:  2.9.22
[22500] 2023/09/07 14:52:39.729121 [INF]   Git:      [528ee14]
[22500] 2023/09/07 14:52:39.729121 [INF]   Name:     NCLVWQUCCL3W736H36FWPW46YWZZGLWDCNSNZOU5B4H6THQ3PKGZY6G6
[22500] 2023/09/07 14:52:39.729121 [INF]   Node:     YWK2TKml
[22500] 2023/09/07 14:52:39.729121 [INF]   ID:       NCLVWQUCCL3W736H36FWPW46YWZZGLWDCNSNZOU5B4H6THQ3PKGZY6G6
[22500] 2023/09/07 14:52:39.729121 [INF] Using configuration file: .\js.conf
[22500] 2023/09/07 14:52:39.729656 [INF] Starting JetStream
[22500] 2023/09/07 14:52:39.731221 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[22500] 2023/09/07 14:52:39.731221 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[22500] 2023/09/07 14:52:39.731221 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[22500] 2023/09/07 14:52:39.731221 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[22500] 2023/09/07 14:52:39.731221 [INF]
[22500] 2023/09/07 14:52:39.731221 [INF]          https://docs.nats.io/jetstream
[22500] 2023/09/07 14:52:39.731221 [INF]
[22500] 2023/09/07 14:52:39.731221 [INF] ---------------- JETSTREAM ----------------
[22500] 2023/09/07 14:52:39.731221 [INF]   Max Memory:      1.00 GB
[22500] 2023/09/07 14:52:39.731221 [INF]   Max Storage:     10.00 GB
[22500] 2023/09/07 14:52:39.731221 [INF]   Store Directory: "data\nats-server\jetstream"
[22500] 2023/09/07 14:52:39.731221 [INF] -------------------------------------------
[22500] 2023/09/07 14:52:39.734361 [INF] Listening for client connections on 0.0.0.0:4222
[22500] 2023/09/07 14:52:39.765224 [INF] Server is ready

Username/Password

1
2
3
4
5
6
7
8
9
cat server.conf
authorization: {
    users: [
        {user: a, password: b},
        {user: b, password: a}
    ]
}

nats-server -c server.conf

Golang客户端多种连接

NATS客户端连接断开,客户端SDK会尝试重新连接服务器。

Golang客户端库,连接函数提供多个控制连接的nats.Option

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Connect will attempt to connect to the NATS system.
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
// Comma separated arrays are also supported, e.g. urlA, urlB.
// Options start with the defaults but can be overridden.
// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
func Connect(url string, options ...Option) (*Conn, error) {
	opts := GetDefaultOptions()
	opts.Servers = processUrlString(url)
	for _, opt := range options {
		if opt != nil {
			if err := opt(&opts); err != nil {
				return nil, err
			}
		}
	}
	return opts.Connect()
}

最大重新连接次数

限制重新连接总次数,默认为60,如果设置的次数小于nats服务节点数,那么重新连接将不会尝试到全部节点。

1
2
3
4
5
6
7
8
// MaxReconnects is an Option to set the maximum number of reconnect attempts.
// Defaults to 60.
func MaxReconnects(max int) Option {
	return func(o *Options) error {
		o.MaxReconnect = max
		return nil
	}
}

重新连接间隔等待时间

多次重新连接到同一nats服务节点的间隔等待时间。

1
2
3
4
5
6
7
8
// ReconnectWait is an Option to set the wait time between reconnect attempts.
// Defaults to 2s.
func ReconnectWait(t time.Duration) Option {
	return func(o *Options) error {
		o.ReconnectWait = t
		return nil
	}
}

避免 Thundering Herd

禁用重新连接时的随机化过程,让重新连接始终按相同的顺序检查所有nats节点,避免单个节点同时承受过多连接创建,出现惊群效应(Thundering Herd)。

1
2
3
4
5
6
7
// DontRandomize is an Option to turn off randomizing the server pool.
func DontRandomize() Option {
	return func(o *Options) error {
		o.NoRandomize = true
		return nil
	}
}

事件监听

可以监听连接创建事件,在连接创建和重新连接,或连接失败时回调处理。

1
2
3
4
5
6
7
// ReconnectHandler is an Option to set the reconnected handler.
func ReconnectHandler(cb ConnHandler) Option {
	return func(o *Options) error {
		o.ReconnectedCB = cb
		return nil
	}
}

更多事件处理选项见官方文档Option

使用示例

Learn NATS by Example

Core Publish-Subscribe

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
	"fmt"
	"os"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {

	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

	nc, _ := nats.Connect(url)

	defer nc.Drain()

	nc.Publish("greet.joe", []byte("hello"))

	sub, _ := nc.SubscribeSync("greet.*")

	msg, _ := sub.NextMsg(10 * time.Millisecond)
	fmt.Println("subscribed after a publish...")
	fmt.Printf("msg is nil? %v\n", msg == nil)

	nc.Publish("greet.joe", []byte("hello"))
	nc.Publish("greet.pam", []byte("hello"))

	msg, _ = sub.NextMsg(10 * time.Millisecond)
	fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)

	msg, _ = sub.NextMsg(10 * time.Millisecond)
	fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)

	nc.Publish("greet.bob", []byte("hello"))

	msg, _ = sub.NextMsg(10 * time.Millisecond)
	fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
}

运行示例:

1
2
3
4
5
6
 NATS_URL="${NATS_URL:-nats://localhost:4222}" go run pub-sub.go
subscribed after a publish...
msg is nil? true
msg data: "hello" on subject "greet.joe"
msg data: "hello" on subject "greet.pam"
msg data: "hello" on subject "greet.bob"

Request-Reply

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"fmt"
	"os"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {

	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

	nc, _ := nats.Connect(url)
	defer nc.Drain()

	sub, _ := nc.Subscribe("greet.*", func(msg *nats.Msg) {

		name := msg.Subject[6:]
		msg.Respond([]byte("hello, " + name))
	})

	rep, _ := nc.Request("greet.joe", nil, time.Second)
	fmt.Println(string(rep.Data))

	rep, _ = nc.Request("greet.sue", nil, time.Second)
	fmt.Println(string(rep.Data))

	rep, _ = nc.Request("greet.bob", nil, time.Second)
	fmt.Println(string(rep.Data))

	sub.Unsubscribe()

	_, err := nc.Request("greet.joe", nil, time.Second)
	fmt.Println(err)
}

运行示例:

1
2
3
4
5
$ NATS_URL="${NATS_URL:-nats://localhost:4222}" go run request-reply.go
hello, joe
hello, sue
hello, bob
nats: no responders available for request

Protobuf for Message Payloads

protobuf协议定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
syntax = "proto3";

option go_package = ".;main";

package main;

message GreetRequest {
    string name = 1;
}

message GreetReply {
    string text = 1;
}

生成go协议文件:

1
protoc.exe --go_out=paths=source_relative:. greet.proto
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
	"fmt"
	"os"
	"time"

	"github.com/nats-io/nats.go"
	"google.golang.org/protobuf/proto"
)

func main() {
	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

	nc, _ := nats.Connect(url)
	defer nc.Drain()

	nc.Subscribe("greet", func(msg *nats.Msg) {
		var req GreetRequest
		proto.Unmarshal(msg.Data, &req)

		rep := GreetReply{
			Text: fmt.Sprintf("hello %q!", req.Name),
		}
		data, _ := proto.Marshal(&rep)
		msg.Respond(data)
	})

	req := GreetRequest{
		Name: "joe",
	}
	data, _ := proto.Marshal(&req)

	msg, _ := nc.Request("greet", data, time.Second)

	var rep GreetReply
	proto.Unmarshal(msg.Data, &rep)

	fmt.Printf("reply: %s\n", rep.Text)
}

运行示例:

1
2
$ NATS_URL="${NATS_URL:-nats://localhost:4222}" go run nats_proto.go greet.pb.go
reply: hello "joe"!

Concurrent Message Processing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import os
import random

import nats


async def main():
    nats_url = os.getenv("NATS_URL", "nats://localhost:4222")

    client = await nats.connect(nats_url)

    messages = (await client.subscribe("greet.*", max_msgs=50)).messages

    for i in range(50):
        await client.publish("greet.joe", f"hello {i}".encode())

    semaphore = asyncio.Semaphore(25)

    async def process_message(message):
        async with semaphore:
            await asyncio.sleep(random.uniform(0, 0.5))
            print(f"received message: {message.data.decode()!r}")

    await asyncio.gather(*[process_message(msg) async for msg in messages])


if __name__ == "__main__":
    asyncio.run(main())

运行示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
NATS_URL="${NATS_URL:-nats://localhost:4222}" python concurrent_example.py
received message: 'hello 12'
received message: 'hello 7'
received message: 'hello 16'
received message: 'hello 24'
received message: 'hello 21'
received message: 'hello 11'
received message: 'hello 27'
received message: 'hello 14'
received message: 'hello 10'
received message: 'hello 18'
received message: 'hello 2'
received message: 'hello 19'
received message: 'hello 22'
received message: 'hello 31'
received message: 'hello 1'
received message: 'hello 23'
received message: 'hello 9'
received message: 'hello 20'
received message: 'hello 33'
received message: 'hello 35'
received message: 'hello 32'
received message: 'hello 25'
received message: 'hello 36'
received message: 'hello 4'
received message: 'hello 5'
received message: 'hello 28'
received message: 'hello 15'
received message: 'hello 41'
received message: 'hello 8'
received message: 'hello 0'
received message: 'hello 38'
received message: 'hello 17'
received message: 'hello 6'
received message: 'hello 3'
received message: 'hello 40'
received message: 'hello 43'
received message: 'hello 45'
received message: 'hello 13'
received message: 'hello 26'
received message: 'hello 34'
received message: 'hello 37'
received message: 'hello 47'
received message: 'hello 42'
received message: 'hello 44'
received message: 'hello 30'
received message: 'hello 29'
received message: 'hello 39'
received message: 'hello 48'
received message: 'hello 46'
received message: 'hello 49'

参考