使用 NATS 处理 WebSocket 连接

在 Golang 中使用 NATS 来处理 WebSocket 连接并实现消息的发布和订阅,是一种构建分布式实时应用的常见模式。

下面示例将先展示如何使用 github.com/gorilla/websocket 处理 WebSocket 连接,并使用 github.com/nats-io/nats.go 作为消息代理来广播消息。

然后介绍如何使用 NATS 构建 WebSocket 服务器。

使用 NATS 处理 WebSocket 消息

使用 NATS 来处理 WebSocket 连接并实现消息的发布和订阅

服务端代码

服务端将处理 WebSocket 升级,接收客户端消息并通过 NATS 发布,同时订阅 NATS 主题并将消息写回 WebSocket 客户端。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main

import (
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	"github.com/nats-io/nats.go"
)

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true // 在生产环境中应检查来源
	},
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

// Client 代表一个 WebSocket 连接
type Client struct {
	conn *websocket.Conn
	mu   sync.Mutex
}

func (c *Client) writeMessage(message []byte) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.conn.WriteMessage(websocket.TextMessage, message)
}

func main() {
	// 连接到 NATS 服务器
    // 使用 nats.DefaultURL 连接到本地 NATS 服务器(nats://localhost:4222)
    // 你可以根据你的 NATS 服务器地址进行修改,例如:nats.Connect("nats://your-nats-server:4222")
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal("NATS 连接失败: ", err)
	}
	defer nc.Close()
	log.Println("已连接到 NATS 服务器")

	// 用于保存所有连接的客户端
	clients := make(map[*Client]bool)
	var mu sync.Mutex

	// 订阅 NATS 主题,接收来自其他服务的消息并广播给所有 WebSocket 客户端
	_, err = nc.Subscribe("chat.broadcast", func(m *nats.Msg) {
		log.Printf("从 NATS 收到广播消息: %s\n", string(m.Data))
		mu.Lock()
		defer mu.Unlock()
		for client := range clients {
			if err := client.writeMessage(m.Data); err != nil {
				log.Println("写消息错误: ", err)
				// 处理错误连接,这里简化处理
			}
		}
	})
	if err != nil {
		log.Fatal("NATS 订阅失败: ", err)
	}

	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		conn, err := upgrader.Upgrade(w, r, nil)
		if err != nil {
			log.Println("WebSocket 升级失败: ", err)
			return
		}
		defer conn.Close()

		client := &Client{conn: conn}
		mu.Lock()
		clients[client] = true
		mu.Unlock()
		log.Println("新的 WebSocket 客户端连接")

		// 确保在连接关闭时清理客户端
		defer func() {
			mu.Lock()
			delete(clients, client)
			mu.Unlock()
			log.Println("WebSocket 客户端断开连接")
		}()

		// 处理来自 WebSocket 客户端的消息
		for {
			_, message, err := conn.ReadMessage()
			if err != nil {
				if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
					log.Printf("读取错误: %v", err)
				}
				break
			}
			log.Printf("从 WebSocket 收到: %s\n", string(message))

			// 将收到的消息发布到 NATS 主题
			err = nc.Publish("chat.message", message)
			if err != nil {
				log.Println("发布到 NATS 错误: ", err)
			} else {
				log.Printf("消息已发布到 NATS 主题 'chat.message'")
			}

			// 可选:立即回复客户端一个确认消息
			replyMsg := "服务器已收到消息: " + string(message)
			if err := client.writeMessage([]byte(replyMsg)); err != nil {
				log.Println("写回复错误: ", err)
				break
			}
		}
	})

	// 启动一个 goroutine 来定期广播服务器时间(可选,演示用)
	go func() {
		for {
			time.Sleep(10 * time.Second)
			message := "服务器时间: " + time.Now().Format("2006-01-02 15:04:05")
			nc.Publish("chat.broadcast", []byte(message))
		}
	}()

	log.Println("WebSocket 服务器在 :8080 端口启动")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

客户端测试代码 (Golang)

这个 Golang 客户端测试程序将连接到 WebSocket 服务器,发送一条消息,并接收来自服务器的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
	"log"
	"os"
	"os/signal"
	"time"

	"github.com/gorilla/websocket"
)

func main() {
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, os.Interrupt)

	// 连接到 WebSocket 服务器
	url := "ws://localhost:8080/ws"
	c, _, err := websocket.DefaultDialer.Dial(url, nil)
	if err != nil {
		log.Fatal("连接失败:", err)
	}
	defer c.Close()
	log.Printf("已连接到服务器 %s", url)

	done := make(chan struct{})

	// 接收消息的 goroutine
	go func() {
		defer close(done)
		for {
			_, message, err := c.ReadMessage()
			if err != nil {
				log.Println("读取错误:", err)
				return
			}
			log.Printf("收到: %s", message)
		}
	}()

	// 发送一条测试消息
	testMsg := "Hello, NATS over WebSocket!"
	err = c.WriteMessage(websocket.TextMessage, []byte(testMsg))
	if err != nil {
		log.Println("写入错误:", err)
		return
	}
	log.Printf("已发送: %s", testMsg)

	// 等待中断信号或接收完成
	select {
	case <-done:
	case <-interrupt:
		log.Println("收到中断信号")
	}

	// 清理关闭
	err = c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
	if err != nil {
		log.Println("发送关闭消息错误:", err)
		return
	}
	select {
	case <-done:
	case <-time.After(time.Second):
	}
}

运行步骤

  1. 安装依赖:
1
2
3
go mod init your-module-name
go get github.com/gorilla/websocket
go get github.com/nats-io/nats.go
  1. 启动 NATS 服务器:确保有一个 NATS 服务器运行。你可以从 NATS.io 下载并运行,或者使用 Docker:
1
2
3
4
5
6
docker run -d \
  --name nats-server \
  -p 4222:4222 \          # 映射客户端端口
  -p 8222:8222 \          # 映射监控端口(如果配置中启用了)
  -p 6222:6222 \          # 映射集群端口(如果配置了集群)
  nats:latest

默认情况下,NATS 服务器会在 nats://localhost:4222 监听。

  1. 运行服务端程序:保存服务端代码(例如 server.go)并运行:
1
go run server.go
  1. 运行客户端测试程序:保存客户端代码(例如 client.go)并运行:
1
go run client.go

关键点说明

  • WebSocket 升级:服务端使用 gorilla/websocket 的 Upgrader 将 HTTP 连接升级为 WebSocket 连接1。

  • NATS 集成:服务端连接到 NATS 服务器,订阅一个主题(chat.broadcast)来接收广播消息,并将从 WebSocket 客户端收到的消息发布到另一个主题(chat.message)。

  • 客户端管理:服务端使用一个 Map (clients) 来跟踪所有连接的 WebSocket 客户端,并使用互斥锁 (sync.Mutex) 来保证并发安全。

  • 广播机制:通过 NATS 订阅,服务端可以将收到的消息(来自其他服务或内部定时器)广播给所有连接的 WebSocket 客户端。

  • 错误处理:代码中包含了一些基本的错误处理,在生产环境中可能需要更完善的机制。

使用 NATS 构建 WebSocket 服务器

可以将Nginx的WebSocket连接直接指向NATS服务。NATS服务器本身支持WebSocket协议1,而Nginx可以通过反向代理配置,将客户端的WebSocket请求转发到NATS服务的WebSocket端口上。

NATS 配置

确保你的NATS服务器配置中启用了WebSocket支持。在NATS的配置文件(例如 nats-server.conf)中,需要添加类似以下的 websocket 配置块

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Client port of 4222 on all interfaces
port: 4222

# HTTP monitoring port
monitor_port: 8222

# This is for clustering multiple servers together.
cluster {
  # Route connections to be received on any interface on port 6222
  port: 6222

  # Authorization for route connections
  authorization {
    user: ruser
    password: T0pS3cr3t
    timeout: 2
  }

  # Routes are actively solicited and connected to from this server.
  routes = []
}

# Maximum payload size, default is 1MB
max_payload: 20MB
# Write deadline, after which NATS will automatically stop this publication
write_deadline: 60s

websocket {
  port: 4223  # WebSocket监听的端口,可根据需要修改
  no_tls: true  # 如果不在NATS层面直接使用TLS,设置为true(通常由Nginx或前置负载均衡器处理TLS)
  # allowed_origins: ["*"]  # 允许的源,生产环境应设置为具体的域名以提高安全性
}

websocket 配置项示例,WebSocket Configuration Example

使用 Docker 启动 NATS 并挂载配置文件

启动 NATS 容器时,使用 -v 或 –mount 参数将宿主机的配置文件挂载到容器内相应的路径。

1
2
3
4
5
6
7
8
docker run -d \
  --name nats-server \
  -p 4222:4222 \          # 映射客户端端口
  -p 8222:8222 \          # 映射监控端口(如果配置中启用了)
  -p 6222:6222 \          # 映射集群端口(如果配置了集群)
  -v /path/on/host/nats-server.conf:/etc/nats/nats-server.conf \  # 挂载配置文件
  nats:latest \
  -c /etc/nats/nats-server.conf  # 指定容器内的配置文件路径

Nginx配置

配置Nginx的反向代理,将到达特定路径(例如 /nats)的WebSocket请求转发到NATS服务的WebSocket端口(例如上面配置的 4223 端口)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
server {
    listen 443 ssl; # 监听HTTPS端口
    server_name your-domain.com; # 你的域名

    # SSL证书配置
    ssl_certificate /path/to/your/cert.pem;
    ssl_certificate_key /path/to/your/private.key;

    # WebSocket 代理配置
    location /nats { # 代理的路径,客户端连接时使用
        proxy_pass http://localhost:4223; # 转发到NATS的WebSocket端口,根据实际情况修改IP和端口
        proxy_http_version 1.1; # 使用HTTP/1.1

        # 以下两行是支持WebSocket协议升级的关键
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        # 可选:传递一些客户端信息
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # 超时设置(可选,根据需要调整)
        proxy_read_timeout 86400s; # WebSocket连接是长连接,设置较长的超时时间
        proxy_send_timeout 86400s;
    }

    # 其他location配置...
}

修改配置后,使用 sudo nginx -s reload 重新加载Nginx配置使其生效。

在Nginx层面还可以实施额外的安全措施,如身份验证(例如使用JWT Token)、限制访问IP等。

客户端连接方式

配置好Nginx和NATS后,客户端(例如使用JavaScript)就可以通过Nginx代理的地址来连接NATS服务了。

1
2
3
4
5
// 假设Nginx域名为 your-domain.com,配置的location是 /nats
const nc = await connect({
  servers: "wss://your-domain.com/nats", // 使用wss协议(WebSocket Secure)
  // 其他NATS连接选项,如token、user、pass等(如果需要认证)
});

通过在NATS服务器中启用WebSocket支持,并在Nginx中配置反向代理

将特定的WebSocket请求路径(如 /nats)转发到NATS的WebSocket端口,

就可以实现客户端通过Nginx代理连接NATS服务。这种方式既利用了Nginx处理TLS/SSL、负载均衡和反向代理的优势,又利用了NATS强大的消息传递能力,是一种常见且实用的部署方式。

参考