使用 NATS 处理 WebSocket 连接
在 Golang 中使用 NATS 来处理 WebSocket 连接并实现消息的发布和订阅,是一种构建分布式实时应用的常见模式。
下面示例将先展示如何使用 github.com/gorilla/websocket 处理 WebSocket 连接,并使用 github.com/nats-io/nats.go 作为消息代理来广播消息。
然后介绍如何使用 NATS 构建 WebSocket 服务器。
使用 NATS 处理 WebSocket 消息
使用 NATS 来处理 WebSocket 连接并实现消息的发布和订阅
服务端代码
服务端将处理 WebSocket 升级,接收客户端消息并通过 NATS 发布,同时订阅 NATS 主题并将消息写回 WebSocket 客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
package main
import (
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 在生产环境中应检查来源
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Client 代表一个 WebSocket 连接
type Client struct {
conn *websocket.Conn
mu sync.Mutex
}
func (c *Client) writeMessage(message []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn.WriteMessage(websocket.TextMessage, message)
}
func main() {
// 连接到 NATS 服务器
// 使用 nats.DefaultURL 连接到本地 NATS 服务器(nats://localhost:4222)
// 你可以根据你的 NATS 服务器地址进行修改,例如:nats.Connect("nats://your-nats-server:4222")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal("NATS 连接失败: ", err)
}
defer nc.Close()
log.Println("已连接到 NATS 服务器")
// 用于保存所有连接的客户端
clients := make(map[*Client]bool)
var mu sync.Mutex
// 订阅 NATS 主题,接收来自其他服务的消息并广播给所有 WebSocket 客户端
_, err = nc.Subscribe("chat.broadcast", func(m *nats.Msg) {
log.Printf("从 NATS 收到广播消息: %s\n", string(m.Data))
mu.Lock()
defer mu.Unlock()
for client := range clients {
if err := client.writeMessage(m.Data); err != nil {
log.Println("写消息错误: ", err)
// 处理错误连接,这里简化处理
}
}
})
if err != nil {
log.Fatal("NATS 订阅失败: ", err)
}
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("WebSocket 升级失败: ", err)
return
}
defer conn.Close()
client := &Client{conn: conn}
mu.Lock()
clients[client] = true
mu.Unlock()
log.Println("新的 WebSocket 客户端连接")
// 确保在连接关闭时清理客户端
defer func() {
mu.Lock()
delete(clients, client)
mu.Unlock()
log.Println("WebSocket 客户端断开连接")
}()
// 处理来自 WebSocket 客户端的消息
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("读取错误: %v", err)
}
break
}
log.Printf("从 WebSocket 收到: %s\n", string(message))
// 将收到的消息发布到 NATS 主题
err = nc.Publish("chat.message", message)
if err != nil {
log.Println("发布到 NATS 错误: ", err)
} else {
log.Printf("消息已发布到 NATS 主题 'chat.message'")
}
// 可选:立即回复客户端一个确认消息
replyMsg := "服务器已收到消息: " + string(message)
if err := client.writeMessage([]byte(replyMsg)); err != nil {
log.Println("写回复错误: ", err)
break
}
}
})
// 启动一个 goroutine 来定期广播服务器时间(可选,演示用)
go func() {
for {
time.Sleep(10 * time.Second)
message := "服务器时间: " + time.Now().Format("2006-01-02 15:04:05")
nc.Publish("chat.broadcast", []byte(message))
}
}()
log.Println("WebSocket 服务器在 :8080 端口启动")
log.Fatal(http.ListenAndServe(":8080", nil))
}
|
客户端测试代码 (Golang)
这个 Golang 客户端测试程序将连接到 WebSocket 服务器,发送一条消息,并接收来自服务器的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package main
import (
"log"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
// 连接到 WebSocket 服务器
url := "ws://localhost:8080/ws"
c, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Fatal("连接失败:", err)
}
defer c.Close()
log.Printf("已连接到服务器 %s", url)
done := make(chan struct{})
// 接收消息的 goroutine
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Println("读取错误:", err)
return
}
log.Printf("收到: %s", message)
}
}()
// 发送一条测试消息
testMsg := "Hello, NATS over WebSocket!"
err = c.WriteMessage(websocket.TextMessage, []byte(testMsg))
if err != nil {
log.Println("写入错误:", err)
return
}
log.Printf("已发送: %s", testMsg)
// 等待中断信号或接收完成
select {
case <-done:
case <-interrupt:
log.Println("收到中断信号")
}
// 清理关闭
err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("发送关闭消息错误:", err)
return
}
select {
case <-done:
case <-time.After(time.Second):
}
}
|
运行步骤
- 安装依赖:
1
2
3
|
go mod init your-module-name
go get github.com/gorilla/websocket
go get github.com/nats-io/nats.go
|
- 启动 NATS 服务器:确保有一个 NATS 服务器运行。你可以从 NATS.io 下载并运行,或者使用 Docker:
1
2
3
4
5
6
|
docker run -d \
--name nats-server \
-p 4222:4222 \ # 映射客户端端口
-p 8222:8222 \ # 映射监控端口(如果配置中启用了)
-p 6222:6222 \ # 映射集群端口(如果配置了集群)
nats:latest
|
默认情况下,NATS 服务器会在 nats://localhost:4222
监听。
- 运行服务端程序:保存服务端代码(例如 server.go)并运行:
- 运行客户端测试程序:保存客户端代码(例如 client.go)并运行:
关键点说明
-
WebSocket 升级:服务端使用 gorilla/websocket 的 Upgrader 将 HTTP 连接升级为 WebSocket 连接1。
-
NATS 集成:服务端连接到 NATS 服务器,订阅一个主题(chat.broadcast)来接收广播消息,并将从 WebSocket 客户端收到的消息发布到另一个主题(chat.message)。
-
客户端管理:服务端使用一个 Map (clients) 来跟踪所有连接的 WebSocket 客户端,并使用互斥锁 (sync.Mutex) 来保证并发安全。
-
广播机制:通过 NATS 订阅,服务端可以将收到的消息(来自其他服务或内部定时器)广播给所有连接的 WebSocket 客户端。
-
错误处理:代码中包含了一些基本的错误处理,在生产环境中可能需要更完善的机制。
使用 NATS 构建 WebSocket 服务器
可以将Nginx的WebSocket连接直接指向NATS服务。NATS服务器本身支持WebSocket协议1,而Nginx可以通过反向代理配置,将客户端的WebSocket请求转发到NATS服务的WebSocket端口上。
NATS 配置
确保你的NATS服务器配置中启用了WebSocket支持。在NATS的配置文件(例如 nats-server.conf)中,需要添加类似以下的 websocket 配置块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# Client port of 4222 on all interfaces
port: 4222
# HTTP monitoring port
monitor_port: 8222
# This is for clustering multiple servers together.
cluster {
# Route connections to be received on any interface on port 6222
port: 6222
# Authorization for route connections
authorization {
user: ruser
password: T0pS3cr3t
timeout: 2
}
# Routes are actively solicited and connected to from this server.
routes = []
}
# Maximum payload size, default is 1MB
max_payload: 20MB
# Write deadline, after which NATS will automatically stop this publication
write_deadline: 60s
websocket {
port: 4223 # WebSocket监听的端口,可根据需要修改
no_tls: true # 如果不在NATS层面直接使用TLS,设置为true(通常由Nginx或前置负载均衡器处理TLS)
# allowed_origins: ["*"] # 允许的源,生产环境应设置为具体的域名以提高安全性
}
|
websocket 配置项示例,WebSocket Configuration Example
使用 Docker 启动 NATS 并挂载配置文件
启动 NATS 容器时,使用 -v 或 –mount 参数将宿主机的配置文件挂载到容器内相应的路径。
1
2
3
4
5
6
7
8
|
docker run -d \
--name nats-server \
-p 4222:4222 \ # 映射客户端端口
-p 8222:8222 \ # 映射监控端口(如果配置中启用了)
-p 6222:6222 \ # 映射集群端口(如果配置了集群)
-v /path/on/host/nats-server.conf:/etc/nats/nats-server.conf \ # 挂载配置文件
nats:latest \
-c /etc/nats/nats-server.conf # 指定容器内的配置文件路径
|
Nginx配置
配置Nginx的反向代理,将到达特定路径(例如 /nats)的WebSocket请求转发到NATS服务的WebSocket端口(例如上面配置的 4223 端口)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
server {
listen 443 ssl; # 监听HTTPS端口
server_name your-domain.com; # 你的域名
# SSL证书配置
ssl_certificate /path/to/your/cert.pem;
ssl_certificate_key /path/to/your/private.key;
# WebSocket 代理配置
location /nats { # 代理的路径,客户端连接时使用
proxy_pass http://localhost:4223; # 转发到NATS的WebSocket端口,根据实际情况修改IP和端口
proxy_http_version 1.1; # 使用HTTP/1.1
# 以下两行是支持WebSocket协议升级的关键
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 可选:传递一些客户端信息
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置(可选,根据需要调整)
proxy_read_timeout 86400s; # WebSocket连接是长连接,设置较长的超时时间
proxy_send_timeout 86400s;
}
# 其他location配置...
}
|
修改配置后,使用 sudo nginx -s reload 重新加载Nginx配置使其生效。
在Nginx层面还可以实施额外的安全措施,如身份验证(例如使用JWT Token)、限制访问IP等。
客户端连接方式
配置好Nginx和NATS后,客户端(例如使用JavaScript)就可以通过Nginx代理的地址来连接NATS服务了。
1
2
3
4
5
|
// 假设Nginx域名为 your-domain.com,配置的location是 /nats
const nc = await connect({
servers: "wss://your-domain.com/nats", // 使用wss协议(WebSocket Secure)
// 其他NATS连接选项,如token、user、pass等(如果需要认证)
});
|
通过在NATS服务器中启用WebSocket支持,并在Nginx中配置反向代理,
将特定的WebSocket请求路径(如 /nats)转发到NATS的WebSocket端口,
就可以实现客户端通过Nginx代理连接NATS服务。这种方式既利用了Nginx处理TLS/SSL、负载均衡和反向代理的优势,又利用了NATS强大的消息传递能力,是一种常见且实用的部署方式。
参考