NATS是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的Publish/Subscribe模型,使用Golang语言开发。
Software applications and services need to exchange data. NATS is an infrastructure that allows such
data exchange, segmented in the form of messages. We call this a “message oriented middleware”.
With NATS, application developers can:
-
Effortlessly build distributed and scalable client-server applications.
-
Store and distribute data in realtime in a general manner. This can flexibly be achieved across various
environments, languages, cloud providers and on-premises systems.
消息传递模型
特性
- 纯净的pub-sub (Pure pub-sub)
- 集群模式的server (Cluster mode server)
- 订阅者的自动裁剪 (Auto-pruning of subscribers)
- 基于文本的协议 (Text-based protocol)
- 多种服务质量 (Multiple qualities of service - QoS)
- 持久
- 缓存
适用场景
- 高吞吐量的消息分散 —— 少数的生产者需要将数据发送给很多的消费者。
- 寻址和发现 —— 将数据发送给特定的应用实例,设备或者用户,也可用于发现并连接到基础架构中的实例,设备或用户。
- 命令和控制(控制面板)—— 向程序或设备发送指令,并从程序/设备中接收状态,如SCADA,卫星遥感,物联网等。
- 负载均衡 —— 主要应用于程序会生成大量的请求,且可动态伸缩程序实例。
- N路可扩展性 —— 通信基础架构能够充分利用go的高效并发/调度机制,以增强水平和垂直的扩展性。
- 位置透明 —— 程序在各个地理位置上分布者大量实例,且你无法了解到程序之间的端点配置详情,及他们所生产或消费的数据。
- 容错
启动
下载最新版本Release v2.9.22
默认启动
1
2
3
4
5
6
7
8
|
.\nats-server.exe
[11716] 2023/09/07 11:46:54.035391 [INF] Starting nats-server
[11716] 2023/09/07 11:46:54.048015 [INF] Version: 2.9.22
[11716] 2023/09/07 11:46:54.048015 [INF] Git: [528ee14]
[11716] 2023/09/07 11:46:54.048015 [INF] Name: NBZIZO2JTPMZ6EW43LNAG62HUO27XATM7RHX7DHUAEB3XBMTONZGMKWJ
[11716] 2023/09/07 11:46:54.048015 [INF] ID: NBZIZO2JTPMZ6EW43LNAG62HUO27XATM7RHX7DHUAEB3XBMTONZGMKWJ
[11716] 2023/09/07 11:46:54.049079 [INF] Listening for client connections on 0.0.0.0:4222
[11716] 2023/09/07 11:46:54.075664 [INF] Server is ready
|
JetStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
cat js.conf
// 启用jetstream,空模块将启用并使用默认值 的jetstream
jetstream {
// jetstream数据存放位置:/data/nats-server/jetstream
store_dir: "./data/nats-server"
// 1GB
max_memory_store: 1073741824
// 10GB
max_file_store: 10737418240
}
.\nats-server.exe -c .\js.conf
[22500] 2023/09/07 14:52:39.716147 [INF] Starting nats-server
[22500] 2023/09/07 14:52:39.729121 [INF] Version: 2.9.22
[22500] 2023/09/07 14:52:39.729121 [INF] Git: [528ee14]
[22500] 2023/09/07 14:52:39.729121 [INF] Name: NCLVWQUCCL3W736H36FWPW46YWZZGLWDCNSNZOU5B4H6THQ3PKGZY6G6
[22500] 2023/09/07 14:52:39.729121 [INF] Node: YWK2TKml
[22500] 2023/09/07 14:52:39.729121 [INF] ID: NCLVWQUCCL3W736H36FWPW46YWZZGLWDCNSNZOU5B4H6THQ3PKGZY6G6
[22500] 2023/09/07 14:52:39.729121 [INF] Using configuration file: .\js.conf
[22500] 2023/09/07 14:52:39.729656 [INF] Starting JetStream
[22500] 2023/09/07 14:52:39.731221 [INF] _ ___ _____ ___ _____ ___ ___ _ __ __
[22500] 2023/09/07 14:52:39.731221 [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ |
[22500] 2023/09/07 14:52:39.731221 [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| |
[22500] 2023/09/07 14:52:39.731221 [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_|
[22500] 2023/09/07 14:52:39.731221 [INF]
[22500] 2023/09/07 14:52:39.731221 [INF] https://docs.nats.io/jetstream
[22500] 2023/09/07 14:52:39.731221 [INF]
[22500] 2023/09/07 14:52:39.731221 [INF] ---------------- JETSTREAM ----------------
[22500] 2023/09/07 14:52:39.731221 [INF] Max Memory: 1.00 GB
[22500] 2023/09/07 14:52:39.731221 [INF] Max Storage: 10.00 GB
[22500] 2023/09/07 14:52:39.731221 [INF] Store Directory: "data\nats-server\jetstream"
[22500] 2023/09/07 14:52:39.731221 [INF] -------------------------------------------
[22500] 2023/09/07 14:52:39.734361 [INF] Listening for client connections on 0.0.0.0:4222
[22500] 2023/09/07 14:52:39.765224 [INF] Server is ready
|
1
2
3
4
5
6
7
8
9
|
cat server.conf
authorization: {
users: [
{user: a, password: b},
{user: b, password: a}
]
}
nats-server -c server.conf
|
Golang客户端多种连接
NATS客户端连接断开,客户端SDK会尝试重新连接服务器。
Golang客户端库,连接函数提供多个控制连接的nats.Option
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// Connect will attempt to connect to the NATS system.
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
// Comma separated arrays are also supported, e.g. urlA, urlB.
// Options start with the defaults but can be overridden.
// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
func Connect(url string, options ...Option) (*Conn, error) {
opts := GetDefaultOptions()
opts.Servers = processUrlString(url)
for _, opt := range options {
if opt != nil {
if err := opt(&opts); err != nil {
return nil, err
}
}
}
return opts.Connect()
}
|
最大重新连接次数
限制重新连接总次数,默认为60,如果设置的次数小于nats服务节点数,那么重新连接将不会尝试到全部节点。
1
2
3
4
5
6
7
8
|
// MaxReconnects is an Option to set the maximum number of reconnect attempts.
// Defaults to 60.
func MaxReconnects(max int) Option {
return func(o *Options) error {
o.MaxReconnect = max
return nil
}
}
|
重新连接间隔等待时间
多次重新连接到同一nats服务节点的间隔等待时间。
1
2
3
4
5
6
7
8
|
// ReconnectWait is an Option to set the wait time between reconnect attempts.
// Defaults to 2s.
func ReconnectWait(t time.Duration) Option {
return func(o *Options) error {
o.ReconnectWait = t
return nil
}
}
|
避免 Thundering Herd
禁用重新连接时的随机化过程,让重新连接始终按相同的顺序检查所有nats节点,避免单个节点同时承受过多连接创建,出现惊群效应(Thundering Herd)。
1
2
3
4
5
6
7
|
// DontRandomize is an Option to turn off randomizing the server pool.
func DontRandomize() Option {
return func(o *Options) error {
o.NoRandomize = true
return nil
}
}
|
事件监听
可以监听连接创建事件,在连接创建和重新连接,或连接失败时回调处理。
1
2
3
4
5
6
7
|
// ReconnectHandler is an Option to set the reconnected handler.
func ReconnectHandler(cb ConnHandler) Option {
return func(o *Options) error {
o.ReconnectedCB = cb
return nil
}
}
|
更多事件处理选项见官方文档Option
使用示例
Learn NATS by Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
nc.Publish("greet.joe", []byte("hello"))
sub, _ := nc.SubscribeSync("greet.*")
msg, _ := sub.NextMsg(10 * time.Millisecond)
fmt.Println("subscribed after a publish...")
fmt.Printf("msg is nil? %v\n", msg == nil)
nc.Publish("greet.joe", []byte("hello"))
nc.Publish("greet.pam", []byte("hello"))
msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
nc.Publish("greet.bob", []byte("hello"))
msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
}
|
运行示例:
1
2
3
4
5
6
|
NATS_URL="${NATS_URL:-nats://localhost:4222}" go run pub-sub.go
subscribed after a publish...
msg is nil? true
msg data: "hello" on subject "greet.joe"
msg data: "hello" on subject "greet.pam"
msg data: "hello" on subject "greet.bob"
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
sub, _ := nc.Subscribe("greet.*", func(msg *nats.Msg) {
name := msg.Subject[6:]
msg.Respond([]byte("hello, " + name))
})
rep, _ := nc.Request("greet.joe", nil, time.Second)
fmt.Println(string(rep.Data))
rep, _ = nc.Request("greet.sue", nil, time.Second)
fmt.Println(string(rep.Data))
rep, _ = nc.Request("greet.bob", nil, time.Second)
fmt.Println(string(rep.Data))
sub.Unsubscribe()
_, err := nc.Request("greet.joe", nil, time.Second)
fmt.Println(err)
}
|
运行示例:
1
2
3
4
5
|
$ NATS_URL="${NATS_URL:-nats://localhost:4222}" go run request-reply.go
hello, joe
hello, sue
hello, bob
nats: no responders available for request
|
protobuf协议定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
syntax = "proto3";
option go_package = ".;main";
package main;
message GreetRequest {
string name = 1;
}
message GreetReply {
string text = 1;
}
|
生成go协议文件:
1
|
protoc.exe --go_out=paths=source_relative:. greet.proto
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/proto"
)
func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
nc.Subscribe("greet", func(msg *nats.Msg) {
var req GreetRequest
proto.Unmarshal(msg.Data, &req)
rep := GreetReply{
Text: fmt.Sprintf("hello %q!", req.Name),
}
data, _ := proto.Marshal(&rep)
msg.Respond(data)
})
req := GreetRequest{
Name: "joe",
}
data, _ := proto.Marshal(&req)
msg, _ := nc.Request("greet", data, time.Second)
var rep GreetReply
proto.Unmarshal(msg.Data, &rep)
fmt.Printf("reply: %s\n", rep.Text)
}
|
运行示例:
1
2
|
$ NATS_URL="${NATS_URL:-nats://localhost:4222}" go run nats_proto.go greet.pb.go
reply: hello "joe"!
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import asyncio
import os
import random
import nats
async def main():
nats_url = os.getenv("NATS_URL", "nats://localhost:4222")
client = await nats.connect(nats_url)
messages = (await client.subscribe("greet.*", max_msgs=50)).messages
for i in range(50):
await client.publish("greet.joe", f"hello {i}".encode())
semaphore = asyncio.Semaphore(25)
async def process_message(message):
async with semaphore:
await asyncio.sleep(random.uniform(0, 0.5))
print(f"received message: {message.data.decode()!r}")
await asyncio.gather(*[process_message(msg) async for msg in messages])
if __name__ == "__main__":
asyncio.run(main())
|
运行示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
NATS_URL="${NATS_URL:-nats://localhost:4222}" python concurrent_example.py
received message: 'hello 12'
received message: 'hello 7'
received message: 'hello 16'
received message: 'hello 24'
received message: 'hello 21'
received message: 'hello 11'
received message: 'hello 27'
received message: 'hello 14'
received message: 'hello 10'
received message: 'hello 18'
received message: 'hello 2'
received message: 'hello 19'
received message: 'hello 22'
received message: 'hello 31'
received message: 'hello 1'
received message: 'hello 23'
received message: 'hello 9'
received message: 'hello 20'
received message: 'hello 33'
received message: 'hello 35'
received message: 'hello 32'
received message: 'hello 25'
received message: 'hello 36'
received message: 'hello 4'
received message: 'hello 5'
received message: 'hello 28'
received message: 'hello 15'
received message: 'hello 41'
received message: 'hello 8'
received message: 'hello 0'
received message: 'hello 38'
received message: 'hello 17'
received message: 'hello 6'
received message: 'hello 3'
received message: 'hello 40'
received message: 'hello 43'
received message: 'hello 45'
received message: 'hello 13'
received message: 'hello 26'
received message: 'hello 34'
received message: 'hello 37'
received message: 'hello 47'
received message: 'hello 42'
received message: 'hello 44'
received message: 'hello 30'
received message: 'hello 29'
received message: 'hello 39'
received message: 'hello 48'
received message: 'hello 46'
received message: 'hello 49'
|
参考