Openresty 实现 Websocket 连接,并将消息发到 Redis 队列中,方便其他服务消费。

docker 安装 openresty

在宿主机上创建挂载的目录

1
2
3
mkdir -p /data/openresty/nginx/
cd /data/openresty/nginx
mkdir -p conf.d html logs lua conf

下载 openresty 镜像

1
docker pull openresty/openresty:1.25.3.2-alpine-aarch64

启动容器,并将容器中的配置文件复制到宿主机上

1
2
3
4
5
docker run -p 80:80 --name openresty -d openresty/openresty:1.25.3.2-alpine-aarch64

docker cp openresty:/usr/local/openresty/nginx/conf/nginx.conf ./conf
docker cp openresty:/etc/nginx/conf.d/default.conf ./conf.d/default.conf
docker cp openresty:/usr/local/openresty/nginx/html/index.html ./html/index.html

配置文件也可以直接下载:

挂载宿主机配置目录,并重新启动容器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
docker stop openresty
docker rm openresty

docker run -it --name openresty -p 80:80 \
--privileged=true \
--restart=always \
-v ./conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf \
-v ./conf.d:/etc/nginx/conf.d \
-v ./html:/usr/local/openresty/nginx/html \
-v ./logs:/usr/local/openresty/nginx/logs \
-v ./lua:/usr/local/openresty/nginx/lua \
-d openresty/openresty:1.25.3.2-alpine-aarch64

启动成功,则可在浏览器访问: http://127.0.0.1

docker 安装 redis

下载 redis 镜像

1
docker pull redis:alpine3.22

启动 redis 容器

1
docker run -it --name redis -p 6379:6379 -d redis:alpine3.22

查看 redis 容器ip地址

1
2
3
4
docker inspect redis | grep IPAddress
            "SecondaryIPAddresses": null,
            "IPAddress": "172.17.0.3",
                    "IPAddress": "172.17.0.3",

openresty 配置

修改nginx.conf文件,添加 lua 模块路径 和 c 模块路径

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
http {
    include       mime.types;
    default_type  application/octet-stream;

    # lua 模块
    lua_package_path "/usr/local/openresty/nginx/lua/?.lua;;";
    # c 模块
    lua_package_cpath "/usr/local/openresty/nginx/lua/?.os;;";

}

openresty lua 配置示例

添加 lua/item.lua 文件

1
ngx.say('{"id":"1","name":"demo"}')

修改default.conf文件,添加item接口配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
server {
    listen       80;
    server_name  localhost;

    #charset koi8-r;
    #access_log  /var/log/nginx/host.access.log  main;

    location /api/item {
        # 默认的响应类型
        default_type application/json;
        # 响应结果有lua/item.lua文件来决定
        content_by_lua_file lua/item.lua;
    }
}

重新加载配置

1
docker exec -it openresty nginx -s reload

然后访问接口:http://127.0.0.1/api/item

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
curl -i '127.0.0.1/api/item'
HTTP/1.1 200 OK
Server: openresty/1.25.3.2
Date: Sat, 16 Aug 2025 11:05:24 GMT
Content-Type: application/json
Transfer-Encoding: chunked
Connection: keep-alive

{"id":"1","name":"demo"}

openresty websocket 服务

创建lua/websocket.lua文件,实现websocket服务,并接收连接消息,

将消息发送到redis消息队列中,并订阅队列,将消息发送给创建连接的websocket客户端。

实现内容如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
-- 简易聊天室
local server = require "resty.websocket.server"
local redis = require "resty.redis"

local channel_name = "chat"
local uname = "网友" .. tostring(math.random(10,99)) .. ": "

-- 创建 websocket 连接
local wb, err = server:new{
  timeout = 10000,
  max_payload_len = 65535
}

if not wb then
  ngx.log(ngx.ERR, "failed to create new websocket: ", err)
  return ngx.exit(444)
end


local push = function()
    -- 创建redis连接
    local red = redis:new()
    red:set_timeout(5000) -- 1 sec
    local ok, err = red:connect("172.17.0.3", 6379)
    if not ok then
        ngx.log(ngx.ERR, "failed to connect redis: ", err)
        wb:send_close()
        return
    end

    --订阅聊天频道
    local res, err = red:subscribe(channel_name)
    if not res then
        ngx.log(ngx.ERR, "failed to sub redis: ", err)
        wb:send_close()
        return
    end

    -- 死循环获取消息
    while true do
        local res, err = red:read_reply()
        if res then
            local item = res[3]
            local bytes, err = wb:send_text(item)
            if not bytes then
                -- 错误直接退出
                ngx.log(ngx.ERR, "failed to send text: ", err)
                return ngx.exit(444)
            end
        end
    end
end

-- 启用一个线程用来发送信息
local co = ngx.thread.spawn(push)

-- 主线程
while true do

    -- 如果连接损坏 退出
    if wb.fatal then
        ngx.log(ngx.ERR, "failed to receive frame: ", err)
        return ngx.exit(444)
    end

    local data, typ, err = wb:recv_frame()

    if not data then
        -- 空消息, 发送心跳
        local bytes, err = wb:send_ping()
        if not bytes then
          ngx.log(ngx.ERR, "failed to send ping: ", err)
          return ngx.exit(444)
        end
        ngx.log(ngx.ERR, "send ping: ", data)
    elseif typ == "close" then
        -- 关闭连接
        break
    elseif typ == "ping" then
        -- 回复心跳
        local bytes, err = wb:send_pong()
        if not bytes then
            ngx.log(ngx.ERR, "failed to send pong: ", err)
            return ngx.exit(444)
        end
    elseif typ == "pong" then
        -- 心跳回包
        ngx.log(ngx.ERR, "client ponged")
    elseif typ == "text" then
        -- 将消息发送到 redis 频道
        local red2 = redis:new()
        red2:set_timeout(1000) -- 1 sec
        local ok, err = red2:connect("172.17.0.3", 6379)
        if not ok then
            ngx.log(ngx.ERR, "failed to connect redis: ", err)
            break
        end
        local res, err = red2:publish(channel_name, uname .. data)
        if not res then
            ngx.log(ngx.ERR, "failed to publish redis: ", err)
        end
    end
end

wb:send_close()
ngx.thread.wait(co)

实现参考:基于 OpenResty 实现一个 WS 聊天室

openresty websocket 配置

conf.d/default.conf 中添加如下配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    location /ws {
        # WebSocket 支持
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_read_timeout 300s;
        
        content_by_lua_file lua/websocket.lua;
    }

重新加载配置,让配置生效

1
docker exec -it openresty nginx -s reload

websocket client html

创建 html/chat.html 文件中添加如下代码,来模拟客户端建立websocket连接,并发送消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<!DOCTYPE HTML>
<html>

<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=no">
    <style>
        p{margin:0;}
    </style>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
    <script type="text/javascript">
        var ws = null;

        function WebSocketConn() {
            if (ws != null && ws.readyState == 1) {
                log("已经在线");
                return
            }

            if ("WebSocket" in window) {
                // Let us open a web socket
                ws = new WebSocket("ws://127.0.0.1/ws");

                ws.onopen = function () {
                    log('成功进入聊天室');
                };

                ws.onmessage = function (event) {
                    log(event.data)
                };

                ws.onclose = function () {
                    // websocket is closed.
                    log("已经和服务器断开");
                };

                ws.onerror = function (event) {
                    console.log("error " + event.data);
                };
            } else {
                // The browser doesn't support WebSocket
                alert("WebSocket NOT supported by your Browser!");
            }
        }

        function SendMsg() {
            if (ws != null && ws.readyState == 1) {
                var msg = document.getElementById('msgtext').value;
                ws.send(msg);
            } else {
                log('请先进入聊天室');
            }
        }

        function WebSocketClose() {
            if (ws != null && ws.readyState == 1) {
                ws.close();
                log("发送断开服务器请求");
            } else {
                log("当前没有连接服务器")
            }
        }

        function log(text) {
            var li = document.createElement('p');
            li.appendChild(document.createTextNode(text));
            //document.getElementById('log').appendChild(li);
            $("#log").prepend(li);
            return false;
        }

        WebSocketConn();
    </script>
</head>

<body>
<div id="sse">
    <a href="javascript:WebSocketConn()">进入聊天室</a> &nbsp;
    <a href="javascript:WebSocketClose()">离开聊天室</a>
    <br>
    <br>
    <input id="msgtext" type="text">
    <br>
    <a href="javascript:SendMsg()">发送信息</a>
    <br>
    <br>
    <div id="log"></div>
</div>
</body>

</html>

在浏览器中打开 http://127.0.0.1/chat.html

可以打开多个标签访问,这样就可以模拟多个用户聊天了。

消费 redis 队列

在 go 语言中创建一个文件 main.go,来实现消费 redis 队列,并向队列中发送消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
ackage main

import (
	"context"
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // 没有密码,默认值
		DB:       0,  // 默认DB 0
	})

	var ctx = context.Background()
	// 订阅聊天室消息
	pubsub := rdb.Subscribe(ctx, "chat")
	defer pubsub.Close()

	// 处理收到的消息
	go func() {
		for {
			msg, err := pubsub.ReceiveMessage(ctx)
			if err != nil {
				log.Println("接收错误:", err)
				continue

			}
			fmt.Println("收到消息:", msg.Payload)

			parts := strings.Split(msg.Payload, ":")
			if len(parts) != 2 {
				continue
			}

			sender, content := parts[0], parts[1]
			fmt.Printf("收到消息: [%s] %s\n", sender, content)
		}
	}()

	// 模拟服务主动推送
	ticker := time.NewTicker(10 * time.Second)
	for range ticker.C {
		rdb.Publish(ctx, "chat", "系统: 这是定时广播消息:"+time.Now().String())
	}
}

在实际业务中,相当于后端服务,消费客户端发送的消息,并给客户端推送消息。

参考