NATS是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的Publish/Subscribe模型,使用Golang语言开发。
Software applications and services need to exchange data. NATS is an infrastructure that allows such
data exchange, segmented in the form of messages. We call this a “message oriented middleware”.
With NATS, application developers can:
- 
Effortlessly build distributed and scalable client-server applications. 
- 
Store and distribute data in realtime in a general manner. This can flexibly be achieved across various
environments, languages, cloud providers and on-premises systems. 
消息传递模型
特性
- 纯净的pub-sub (Pure pub-sub)
- 集群模式的server (Cluster mode server)
- 订阅者的自动裁剪 (Auto-pruning of subscribers)
- 基于文本的协议 (Text-based protocol)
- 多种服务质量 (Multiple qualities of service - QoS)
- 持久
- 缓存
适用场景
- 高吞吐量的消息分散 —— 少数的生产者需要将数据发送给很多的消费者。
- 寻址和发现 —— 将数据发送给特定的应用实例,设备或者用户,也可用于发现并连接到基础架构中的实例,设备或用户。
- 命令和控制(控制面板)—— 向程序或设备发送指令,并从程序/设备中接收状态,如SCADA,卫星遥感,物联网等。
- 负载均衡 —— 主要应用于程序会生成大量的请求,且可动态伸缩程序实例。
- N路可扩展性 —— 通信基础架构能够充分利用go的高效并发/调度机制,以增强水平和垂直的扩展性。
- 位置透明 —— 程序在各个地理位置上分布者大量实例,且你无法了解到程序之间的端点配置详情,及他们所生产或消费的数据。
- 容错
启动
下载最新版本Release v2.9.22
默认启动
| 1
2
3
4
5
6
7
8
 | .\nats-server.exe
[11716] 2023/09/07 11:46:54.035391 [INF] Starting nats-server
[11716] 2023/09/07 11:46:54.048015 [INF]   Version:  2.9.22
[11716] 2023/09/07 11:46:54.048015 [INF]   Git:      [528ee14]
[11716] 2023/09/07 11:46:54.048015 [INF]   Name:     NBZIZO2JTPMZ6EW43LNAG62HUO27XATM7RHX7DHUAEB3XBMTONZGMKWJ
[11716] 2023/09/07 11:46:54.048015 [INF]   ID:       NBZIZO2JTPMZ6EW43LNAG62HUO27XATM7RHX7DHUAEB3XBMTONZGMKWJ
[11716] 2023/09/07 11:46:54.049079 [INF] Listening for client connections on 0.0.0.0:4222
[11716] 2023/09/07 11:46:54.075664 [INF] Server is ready
 | 
 
JetStream
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 | cat js.conf
// 启用jetstream,空模块将启用并使用默认值 的jetstream
jetstream {
    // jetstream数据存放位置:/data/nats-server/jetstream
    store_dir: "./data/nats-server"
    // 1GB
    max_memory_store: 1073741824
    // 10GB
    max_file_store: 10737418240
}
.\nats-server.exe -c .\js.conf
[22500] 2023/09/07 14:52:39.716147 [INF] Starting nats-server
[22500] 2023/09/07 14:52:39.729121 [INF]   Version:  2.9.22
[22500] 2023/09/07 14:52:39.729121 [INF]   Git:      [528ee14]
[22500] 2023/09/07 14:52:39.729121 [INF]   Name:     NCLVWQUCCL3W736H36FWPW46YWZZGLWDCNSNZOU5B4H6THQ3PKGZY6G6
[22500] 2023/09/07 14:52:39.729121 [INF]   Node:     YWK2TKml
[22500] 2023/09/07 14:52:39.729121 [INF]   ID:       NCLVWQUCCL3W736H36FWPW46YWZZGLWDCNSNZOU5B4H6THQ3PKGZY6G6
[22500] 2023/09/07 14:52:39.729121 [INF] Using configuration file: .\js.conf
[22500] 2023/09/07 14:52:39.729656 [INF] Starting JetStream
[22500] 2023/09/07 14:52:39.731221 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[22500] 2023/09/07 14:52:39.731221 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[22500] 2023/09/07 14:52:39.731221 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[22500] 2023/09/07 14:52:39.731221 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[22500] 2023/09/07 14:52:39.731221 [INF]
[22500] 2023/09/07 14:52:39.731221 [INF]          https://docs.nats.io/jetstream
[22500] 2023/09/07 14:52:39.731221 [INF]
[22500] 2023/09/07 14:52:39.731221 [INF] ---------------- JETSTREAM ----------------
[22500] 2023/09/07 14:52:39.731221 [INF]   Max Memory:      1.00 GB
[22500] 2023/09/07 14:52:39.731221 [INF]   Max Storage:     10.00 GB
[22500] 2023/09/07 14:52:39.731221 [INF]   Store Directory: "data\nats-server\jetstream"
[22500] 2023/09/07 14:52:39.731221 [INF] -------------------------------------------
[22500] 2023/09/07 14:52:39.734361 [INF] Listening for client connections on 0.0.0.0:4222
[22500] 2023/09/07 14:52:39.765224 [INF] Server is ready
 | 
 
| 1
2
3
4
5
6
7
8
9
 | cat server.conf
authorization: {
    users: [
        {user: a, password: b},
        {user: b, password: a}
    ]
}
nats-server -c server.conf
 | 
 
Golang客户端多种连接
NATS客户端连接断开,客户端SDK会尝试重新连接服务器。
Golang客户端库,连接函数提供多个控制连接的nats.Option
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
 | // Connect will attempt to connect to the NATS system.
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
// Comma separated arrays are also supported, e.g. urlA, urlB.
// Options start with the defaults but can be overridden.
// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
func Connect(url string, options ...Option) (*Conn, error) {
	opts := GetDefaultOptions()
	opts.Servers = processUrlString(url)
	for _, opt := range options {
		if opt != nil {
			if err := opt(&opts); err != nil {
				return nil, err
			}
		}
	}
	return opts.Connect()
}
 | 
 
最大重新连接次数
限制重新连接总次数,默认为60,如果设置的次数小于nats服务节点数,那么重新连接将不会尝试到全部节点。
| 1
2
3
4
5
6
7
8
 | // MaxReconnects is an Option to set the maximum number of reconnect attempts.
// Defaults to 60.
func MaxReconnects(max int) Option {
	return func(o *Options) error {
		o.MaxReconnect = max
		return nil
	}
}
 | 
 
重新连接间隔等待时间
多次重新连接到同一nats服务节点的间隔等待时间。
| 1
2
3
4
5
6
7
8
 | // ReconnectWait is an Option to set the wait time between reconnect attempts.
// Defaults to 2s.
func ReconnectWait(t time.Duration) Option {
	return func(o *Options) error {
		o.ReconnectWait = t
		return nil
	}
}
 | 
 
避免 Thundering Herd
禁用重新连接时的随机化过程,让重新连接始终按相同的顺序检查所有nats节点,避免单个节点同时承受过多连接创建,出现惊群效应(Thundering Herd)。
| 1
2
3
4
5
6
7
 | // DontRandomize is an Option to turn off randomizing the server pool.
func DontRandomize() Option {
	return func(o *Options) error {
		o.NoRandomize = true
		return nil
	}
}
 | 
 
事件监听
可以监听连接创建事件,在连接创建和重新连接,或连接失败时回调处理。
| 1
2
3
4
5
6
7
 | // ReconnectHandler is an Option to set the reconnected handler.
func ReconnectHandler(cb ConnHandler) Option {
	return func(o *Options) error {
		o.ReconnectedCB = cb
		return nil
	}
}
 | 
 
更多事件处理选项见官方文档Option
使用示例
Learn NATS by Example
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 | package main
import (
	"fmt"
	"os"
	"time"
	"github.com/nats-io/nats.go"
)
func main() {
	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}
	nc, _ := nats.Connect(url)
	defer nc.Drain()
	nc.Publish("greet.joe", []byte("hello"))
	sub, _ := nc.SubscribeSync("greet.*")
	msg, _ := sub.NextMsg(10 * time.Millisecond)
	fmt.Println("subscribed after a publish...")
	fmt.Printf("msg is nil? %v\n", msg == nil)
	nc.Publish("greet.joe", []byte("hello"))
	nc.Publish("greet.pam", []byte("hello"))
	msg, _ = sub.NextMsg(10 * time.Millisecond)
	fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
	msg, _ = sub.NextMsg(10 * time.Millisecond)
	fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
	nc.Publish("greet.bob", []byte("hello"))
	msg, _ = sub.NextMsg(10 * time.Millisecond)
	fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
}
 | 
 
运行示例:
| 1
2
3
4
5
6
 |  NATS_URL="${NATS_URL:-nats://localhost:4222}" go run pub-sub.go
subscribed after a publish...
msg is nil? true
msg data: "hello" on subject "greet.joe"
msg data: "hello" on subject "greet.pam"
msg data: "hello" on subject "greet.bob"
 | 
 
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
 | package main
import (
	"fmt"
	"os"
	"time"
	"github.com/nats-io/nats.go"
)
func main() {
	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}
	nc, _ := nats.Connect(url)
	defer nc.Drain()
	sub, _ := nc.Subscribe("greet.*", func(msg *nats.Msg) {
		name := msg.Subject[6:]
		msg.Respond([]byte("hello, " + name))
	})
	rep, _ := nc.Request("greet.joe", nil, time.Second)
	fmt.Println(string(rep.Data))
	rep, _ = nc.Request("greet.sue", nil, time.Second)
	fmt.Println(string(rep.Data))
	rep, _ = nc.Request("greet.bob", nil, time.Second)
	fmt.Println(string(rep.Data))
	sub.Unsubscribe()
	_, err := nc.Request("greet.joe", nil, time.Second)
	fmt.Println(err)
}
 | 
 
运行示例:
| 1
2
3
4
5
 | $ NATS_URL="${NATS_URL:-nats://localhost:4222}" go run request-reply.go
hello, joe
hello, sue
hello, bob
nats: no responders available for request
 | 
 
protobuf协议定义:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
 | syntax = "proto3";
option go_package = ".;main";
package main;
message GreetRequest {
    string name = 1;
}
message GreetReply {
    string text = 1;
}
 | 
 
生成go协议文件:
| 1
 | protoc.exe --go_out=paths=source_relative:. greet.proto
 | 
 
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 | package main
import (
	"fmt"
	"os"
	"time"
	"github.com/nats-io/nats.go"
	"google.golang.org/protobuf/proto"
)
func main() {
	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}
	nc, _ := nats.Connect(url)
	defer nc.Drain()
	nc.Subscribe("greet", func(msg *nats.Msg) {
		var req GreetRequest
		proto.Unmarshal(msg.Data, &req)
		rep := GreetReply{
			Text: fmt.Sprintf("hello %q!", req.Name),
		}
		data, _ := proto.Marshal(&rep)
		msg.Respond(data)
	})
	req := GreetRequest{
		Name: "joe",
	}
	data, _ := proto.Marshal(&req)
	msg, _ := nc.Request("greet", data, time.Second)
	var rep GreetReply
	proto.Unmarshal(msg.Data, &rep)
	fmt.Printf("reply: %s\n", rep.Text)
}
 | 
 
运行示例:
| 1
2
 | $ NATS_URL="${NATS_URL:-nats://localhost:4222}" go run nats_proto.go greet.pb.go
reply: hello "joe"!
 | 
 
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 | import asyncio
import os
import random
import nats
async def main():
    nats_url = os.getenv("NATS_URL", "nats://localhost:4222")
    client = await nats.connect(nats_url)
    messages = (await client.subscribe("greet.*", max_msgs=50)).messages
    for i in range(50):
        await client.publish("greet.joe", f"hello {i}".encode())
    semaphore = asyncio.Semaphore(25)
    async def process_message(message):
        async with semaphore:
            await asyncio.sleep(random.uniform(0, 0.5))
            print(f"received message: {message.data.decode()!r}")
    await asyncio.gather(*[process_message(msg) async for msg in messages])
if __name__ == "__main__":
    asyncio.run(main())
 | 
 
运行示例:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
 | NATS_URL="${NATS_URL:-nats://localhost:4222}" python concurrent_example.py
received message: 'hello 12'
received message: 'hello 7'
received message: 'hello 16'
received message: 'hello 24'
received message: 'hello 21'
received message: 'hello 11'
received message: 'hello 27'
received message: 'hello 14'
received message: 'hello 10'
received message: 'hello 18'
received message: 'hello 2'
received message: 'hello 19'
received message: 'hello 22'
received message: 'hello 31'
received message: 'hello 1'
received message: 'hello 23'
received message: 'hello 9'
received message: 'hello 20'
received message: 'hello 33'
received message: 'hello 35'
received message: 'hello 32'
received message: 'hello 25'
received message: 'hello 36'
received message: 'hello 4'
received message: 'hello 5'
received message: 'hello 28'
received message: 'hello 15'
received message: 'hello 41'
received message: 'hello 8'
received message: 'hello 0'
received message: 'hello 38'
received message: 'hello 17'
received message: 'hello 6'
received message: 'hello 3'
received message: 'hello 40'
received message: 'hello 43'
received message: 'hello 45'
received message: 'hello 13'
received message: 'hello 26'
received message: 'hello 34'
received message: 'hello 37'
received message: 'hello 47'
received message: 'hello 42'
received message: 'hello 44'
received message: 'hello 30'
received message: 'hello 29'
received message: 'hello 39'
received message: 'hello 48'
received message: 'hello 46'
received message: 'hello 49'
 | 
 
参考