Gathering detailed insights and metrics for websocket-cross-server-adapter
Gathering detailed insights and metrics for websocket-cross-server-adapter
Gathering detailed insights and metrics for websocket-cross-server-adapter
Gathering detailed insights and metrics for websocket-cross-server-adapter
A Node.js based WebSocket distributed communication framework that enables seamless multi-server collaboration for real-time communication, cross-server event handling, and scalable applications.
npm install websocket-cross-server-adapter
Typescript
Module System
Min. Node Version
Node Version
NPM Version
JavaScript (100%)
Total Downloads
0
Last Day
0
Last Week
0
Last Month
0
Last Year
0
NOASSERTION License
12 Stars
52 Commits
2 Forks
1 Watchers
1 Branches
1 Contributors
Updated on Jun 04, 2025
Latest Version
1.0.7
Package Id
websocket-cross-server-adapter@1.0.7
Unpacked Size
239.04 kB
Size
56.46 kB
File Count
8
NPM Version
10.5.0
Node Version
18.20.2
Published on
May 26, 2025
Cumulative downloads
Total Downloads
Last Day
0%
NaN
Compared to previous day
Last Week
0%
NaN
Compared to previous week
Last Month
0%
NaN
Compared to previous month
Last Year
0%
NaN
Compared to previous year
4
1
原生 ws
只是通信基础,心跳、重连、消息回调和房间路由都得自己实现。
Node.js 单线程和内存限制,让它难以应付大量连接和复杂业务。要支持多进程或者分布式的多服务器协同和房间管理,必须采取分布式的架构,这就是做这个框架的原因。
该适配器基于 Redis 的发布订阅机制,实现跨服务器的消息广播与事件同步,支持多节点去中心化通信,具备健康监测和自动恢复,保障高可用性。支持单服务器多进程及跨物理服务器部署,便于弹性扩展。
主要功能:
消息发送支持:
支持房间命名空间管理和跨节点统计(在线用户、房间人数等)。所有事件处理器可任意节点注册,跨节点事件可直接回调客户端,无需中转。
一个轻量级、简洁的 WebSocket 客户端类,适用于任何基于标准 WebSocket 协议的平台,例如浏览器、Node.js、Electron、React Native、移动 App、小程序、Cocos Creator 等环境。内置心跳机制、断线重连、事件回调、延迟反馈等功能,逻辑清晰、易于集成,压缩后体积仅约 5KB,适合各类实时通信场景的前端接入。
支持功能:
1npm install websocket-cross-server-adapter
如果你的项目仅需传统的单 WebSocket 服务器模式,则无需使用 Redis,也无需进行任何额外的分布式配置。
你只需像使用原生 ws 模块那样传入配置信息即可。框架会自动以单服务器模式运行。
传入的 ws 配置应使用对象形式,并遵循 ws 模块官方文档 中的配置说明。
server.js:
1 // server.js: 2 // 如果你不是在示例文件夹下运行,请将 require 地址换成包名: 3 // const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter'); 4 const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter'); 5 6 // 如果你用的是 ES Module,可以这样写: 7 // import { WebSocketCrossServerAdapter } from 'websocket-cross-server-adapter'; 8 // 默认的配置端口 9 let port = 9000; 10 11 // 解析命令行参数,可以在node命令行加上以下参数,动态配置prot,例如:node server --port=9001 12 13 const args = process.argv.slice(2); 14 args.forEach(arg => { 15 if (arg.startsWith('--port=')) { 16 port = parseInt(arg.split('=')[1], 10); 17 } 18 }); 19 20 console.log(`Using configured values - port: ${port}`); 21 22 const wsServer = new WebSocketCrossServerAdapter({ 23 wsOptions: { 24 port 25 } 26 }); 27 28 wsServer.onWebSocketEvent('connection', (socket, req) => { 29 console.log('Client connection'); 30 31 // 使用 WebSocketCrossServerAdapter 的辅助方法 parseWsRequestParams 解析 req 对象, 32 // 获取客户端通过 WebSocketConnector 类创建连接时配置的参数信息,比如 token、自定义参数等等。 33 const data = wsServer.parseWsRequestParams(req); 34 35 console.log('Connection params:', data); 36 37 // ✅ 使用客户端传来的 id 建立映射。实际业务中应在此处进行完整的身份验证(如 token 鉴权)。 38 // 例如可使用 jsonwebtoken 模块校验 data.token,并根据验证结果决定是否继续。 39 // 然而,我们更推荐在 noServer 模式下,在 WebSocket 协议升级阶段就完成鉴权逻辑,效率更高、也更安全。 40 // ws 官方虽然提供了 verifyClient 参数用于连接时鉴权,但该 API 已不推荐使用,并可能在未来版本中移除。 41 // 👉 建议查阅 ws 官方文档中的 noServer 模式以及 `server.on('upgrade')` 相关用法,了解推荐的鉴权方式。 42 43 // 此处为了演示方便,仅直接使用客户端传来的 id。 44 if (data.params.id) { 45 const playerId = String(data.params.id); 46 console.log('The client’s ID is:' + playerId); 47 // 把 id 存储到 socket.playerId 中。具体存法请根据自身业务决定, 48 // 比如 socket.player = { playerId, name } 等等。 49 // 总之需确保能从 socket 上获取到该连接的唯一身份标识。 50 socket.playerId = playerId; 51 52 // 必须建立 id(必须为字符串类型)与 socket 实例的映射, 53 // 后续房间广播、单点、多点推送才能找到对应实例。 54 wsServer.setUserSocket(playerId, socket); 55 56 } else { 57 // 模拟鉴权失败,使用自定义关闭码(4011)关闭连接。 58 // 这里的代码应根据自身业务逻辑定义。 59 // 详细查看 API 客户端关于 close 事件部分解释。 60 socket.close(4011, 'Auth failure'); 61 } 62 }); 63 64 wsServer.onWebSocketEvent('close', (socket, req) => { 65 console.log('Client disconnected,id:' + socket.playerId); 66 67 if (socket.playerId) { 68 69 // 客户端断开连接时,请务必删除 ID 和 socket 实例的映射, 70 // 否则 socket 实例可能无法被释放,导致内存泄漏。 71 wsServer.removeUserSocket(socket.playerId); 72 } 73 }); 74 75 wsServer.onWebSocketEvent('say', (socket, data, callback) => { 76 console.log(`Received 'say' event from client ${socket.playerId}:`, data); 77 78 if (callback) { 79 80 // 如果客户端使用 emit 的时候带有回调,或者使用 emitWithPromise 发送消息, 81 // 此时 callback 会为有效函数,此处可调用 callback 回传结果给客户端。 82 callback({ msg: 'I am a callback for your say event' }); 83 } 84 }); 85 86 wsServer.onWebSocketEvent('joinRoom', (socket, data, callback) => { 87 console.log(`Received 'joinRoom' event from client ${socket.playerId}:`, data); 88 if (socket.playerId) { 89 90 // 模拟加入testRoom,id为1000的房间 91 wsServer.joinRoom('testRoom', '1000', socket.playerId); 92 } 93 if (callback) { 94 callback({ msg: 'JoinRoom successfully' }); 95 } 96 }); 97 98 // 模拟定时发送广播 99 setInterval(() => { 100 wsServer.broadcast('serverSay', { msg: 'I’m sending this message to everyone' }); 101 }, 15_000) 102 103 // 模拟定时向测试房间发送消息 104 setInterval(() => { 105 wsServer.broadcastToRoom('testRoom', '1000', 'roomSay', { msg: 'This is a message sent to the test room' }); 106 },10_000) 107
client.js:
1 2 // client.js: 3 // const { WebSocketConnector } = require('websocket-cross-server-adapter'); 4 const WebSocketConnector = require('../../src/WebSocketConnector'); 5 6 // 默认的配置端口和客户端id 7 let port = 9000; 8 let id = 1; 9 10 // 解析命令行参数,可以在node命令行加上以下参数,动态配置prot和id,例如:node client --id=16 --port=9001 11 12 const args = process.argv.slice(2); 13 args.forEach(arg => { 14 if (arg.startsWith('--port=')) { 15 port = parseInt(arg.split('=')[1], 10); 16 } else if (arg.startsWith('--id=')) { 17 id = arg.split('=')[1]; 18 } 19 }); 20 21 console.log(`Using configured values - port: ${port}, id: ${id}`); 22 23 const client = new WebSocketConnector({ 24 url: `ws://localhost:${port}`, 25 customParams: { 26 name: 'Sam', 27 id 28 }, 29 // 可以通过关闭服务器端来测试以下不同参数设置时候的重连效果 30 //repeatLimit: 5, 31 //fastReconnectThreshold: 1, 32 33 }); 34 35 client.on('open', () => { 36 console.log('Connect success') 37 }) 38 39 client.on('close', (event) => { 40 console.log('onClose event:', event.code, event.reason); 41 if (event.code === 4001 || 42 event.code === 4010 || 43 event.code === 4011 || 44 event.code === 4012 45 ) { 46 // 手动断开连接或服务器在特定情况下强制注销 — 不应尝试重连 47 console.log('Connection closed manually or by forced logout/auth failure. No reconnection.'); 48 // 虽然连接已关闭,但仍需禁止自动重连,并清理所有计时器和 WebSocket 实例等资源。 49 client.manualClose(); 50 } else { 51 // 其他情况下,应手动触发重连 52 client.reconnect(); 53 } 54 }) 55 56 client.on('error', (event) => { 57 console.log('Connect on error'); 58 }); 59 60 client.on('reconnect', ({ repeat, timeout }) => { 61 console.log('Preparing for reconnection attempt #' + repeat + ', actual reconnection will occur in ' + timeout + ' ms'); 62 }) 63 64 client.on('repeat-limit', (repeatLimit) => { 65 console.log('Reached maximum reconnection attempts: ' + repeatLimit); 66 }) 67 68 69 client.on('serverSay', (data) => { 70 console.log('Received serverSay event:'); 71 console.log(data) 72 }) 73 74 client.on('roomSay', (data) => { 75 console.log('Received roomSay event:'); 76 console.log(data) 77 }) 78 79 80 client.on('ping', () => { 81 console.log('Go to ping....') 82 }) 83 84 client.on('pong', (speed) => { 85 // 在pong事件中可以测得当前网络延迟 86 console.log(`Network latency: ${speed} ms`); 87 }) 88 89 90 setTimeout(async () => { 91 // 使用 Promise 方式发送带有回调的事件 92 let data = await client.emitWithPromise('say', { msg: 'I am a client with ID: ' + id + ', and I need your promise callback.' }, { 93 onPending: () => { 94 console.log('requesting...') 95 } 96 }); 97 console.log('Received promise response:'); 98 console.log(data); 99 }, 2000); 100 101 102 setTimeout(() => { 103 // 使用 callback 方式发送带有回调的事件 104 client.emit('say', { msg: 'I am a client with ID: ' + id + ', and I need your callback.' }, (err, data) => { 105 if (err) { 106 console.log('Callback error occurred'); 107 console.log(err) 108 } else { 109 console.log('Received callback response:'); 110 console.log(data) 111 } 112 }, { 113 onPending: () => { 114 console.log('requesting...') 115 }, 116 callbackTimeout: 1000 117 }) 118 }, 4000); 119 120 setTimeout(() => { 121 // 模拟加入测试房间 122 client.emit('joinRoom', { msg: 'I want to join the test room' }, (err, data) => { 123 if (err) { 124 console.log('JoinRoon Callback error occurred'); 125 console.log(err) 126 } else { 127 console.log('JoinRoon Received callback response:'); 128 console.log(data) 129 } 130 }) 131 }, 6000); 132
请在项目主目录下执行以下命令,安装所需依赖:
1npm install
进入 examples/single-ws-server 目录:
1cd examples/single-ws-server
默认端口启动:
1node server
或者自定义端口启动:
1node server --port=9001 2
默认配置:
1node client 2
或者指定客户端 ID 和端口启动:
1node client --id=16 --port=9001 2
⚠️ 注意:每个客户端的 id 必须唯一,不能重复,否则将导致连接冲突。
你可以通过使用不同的 id 启动多个客户端,以观察各种事件情况。 还可以通过关闭服务器来测试断线场景,观察客户端的重连事件信息,然后再重启服务器,以模拟以下流程: 断线 → 重连中 → 成功重新连接
如果你想测试单点定向发送消息或多点定向发送消息的能力,
请参考 API 文档中关于以下函数的说明并自行测试:
它们支持向特定的客户端 Socket 连接发送事件消息。
除了默认监听端口启动外,WebSocket 服务器还支持以下两种方式启动:
当使用已有的 HTTP 或 HTTPS 服务器启动 WebSocket 服务时,WebSocket 将会与 HTTP(S) 共用同一个端口。
这是通过 HTTP 协议的“协议升级”(Protocol Upgrade)机制实现的。
Upgrade: websocket
字段;ws.Server
实例接管连接处理逻辑;这种方式特别适用于你希望 Web 应用(如网页、API)和 WebSocket 服务共用同一个端口 的场景,可以避免占用多个端口,方便部署与管理。
详情请查看官方文档:ws GitHub - External HTTPS Server
你可以传入已有的 HTTP Server 实例启动 WebSocket 服务:
1 const http = require('http'); 2 // const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter'); 3 const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter'); 4 const server = http.createServer(); 5 const wsServer = new WebSocketCrossServerAdapter({ 6 wsOptions: { 7 server 8 } 9 }); 10 11 server.listen(9000, () => { 12 console.log('Server is running on port 9000'); 13 }); 14 15 wsServer.onWebSocketEvent('connection', (socket, req) => { 16 console.log('Client connection'); 17 }) 18 19// ............................其他逻辑相同 20
你可以通过 noServer 模式手动处理 HTTP 升级请求。这种方式适用于你希望完全控制 HTTP 服务和升级流程的场景,例如在一个服务器上同时处理 HTTP 请求和 WebSocket 连接。 适用于: 与现有 HTTP(S) 服务共用端口 需要自定义认证、权限验证等逻辑 更精细地控制连接行为
📚 详情请查看官方文档:
ws GitHub - noServer Mode
1 const http = require('http'); 2 const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter'); 3 // const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter'); 4 const server = http.createServer(); 5 const wsServer = new WebSocketCrossServerAdapter({ 6 wsOptions: { 7 noServer: true 8 } 9 }); 10 11 server.listen(9000, () => { 12 console.log('Server is running on port 9000'); 13 }); 14 15 server.on('upgrade', (req, socket, head) => { 16 // 1. 检查 Upgrade 头必须是 websocket 17 if (req.headers['upgrade']?.toLowerCase() !== 'websocket') { 18 socket.write('HTTP/1.1 400 Bad Request\r\n\r\n'); 19 socket.destroy(); 20 return; 21 } 22 23 const data = wsServer.parseWsRequestParams(req); 24 console.log('传递参数是:') 25 console.log(data) 26 27 const id = data.params.id; 28 console.log("连接的客户端id:" + id); 29 30 if (id) { 31 // 获取 wsServer 中的 WebSocket.Server 实例,并处理 WebSocket 协议升级请求 32 wsServer.getWss()?.handleUpgrade(req, socket, head, (ws) => { 33 // 模拟完成鉴权,绑定 playerId 到该 WebSocket 实例上 34 ws.playerId = String(id); 35 // 手动触发 'connection' 事件,使该连接走统一的连接处理逻辑 36 wsServer.getWss()?.emit('connection', ws, req); 37 }) 38 } else { 39 // 模拟鉴权失败,返回 401 错误并关闭连接 40 socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); // 发送拒绝连接的 HTTP 响应 41 socket.destroy(); // 销毁连接 42 } 43 }); 44 45 46 wsServer.onWebSocketEvent('connection', (socket, req) => { 47 console.log('Client connection'); 48 console.log('客户端id:' + socket.playerId); 49 //....................其他逻辑相同 50 }) 51 52 // ............................其他逻辑相同
在真实业务场景中,建议在客户端发起连接请求时即完成用户身份认证,服务器在接收到连接请求时验证身份信息。 不要等连接建立成功后再进行鉴权然后断开,这样会导致服务器资源被不必要地占用,增加安全风险。如果必须在连接成功后进行鉴权,请务必实现认证超时关闭机制,或者定期检查并清理无效连接,防止服务器资源被恶意或无效连接耗尽。
推荐使用如 jsonwebtoken
等模块,对请求中携带的 token 进行验证。
同时,建议在正式发起 WebSocket 连接之前,先通过 HTTP 接口进行身份验证。
这是因为在 WebSocket 协议升级过程中,服务器返回的鉴权失败信息在不同平台和客户端的表现不一致,
很多情况下客户端无法准确接收到具体的错误状态和原因,导致重连或错误处理复杂且不可靠。
通过预先的 HTTP 鉴权,可以避免这些问题,提高客户端的用户体验和连接稳定性。
上述示例完整展示了在 非分布式架构下,使用单 WebSocket 服务器 进行通信的典型场景与关键能力
在完成了第一章节中单 WebSocket 服务器的通信逻辑后,我们将进入服务端之间的通信范式 —— 跨服务器通信模块(CrossServer)。
该示例不依赖 WebSocket,仅聚焦于分布式环境中服务节点之间如何稳定、高效地进行消息传递与回调处理。
该模块涵盖以下关键能力:
适用场景:
适用于不同进程或跨物理机器的服务之间通信,例如:
这为解耦系统架构、构建微服务体系提供了通用的通信机制。
💡 该模块是构建
WebSocketCrossServerAdapter
的基础部分,理解此机制将帮助你深入掌握后续跨服通信的底层逻辑。
在使用本项目之前,你需要提前安装好 Redis 服务。 安装教程或相关资源:
安装完成 Redis 后,启动 redis 服务:
1redis-server
或者指定配置文件启动(windows平台)
1redis-server redis.windows.conf
你可以通过以下方式测试是否启动成功:
1redis-cli ping
如果返回:
1PONG
说明 Redis 服务已成功启动并正常运行。
你可以通过复制并修改 Redis 配置文件来启动多个实例,每个实例监听不同的端口。
示例步骤:
1cp /etc/redis/redis.conf /etc/redis/redis-6380.conf 2cp /etc/redis/redis.conf /etc/redis/redis-6381.conf
1port 6380
1redis-server /etc/redis/redis-6380.conf 2redis-server /etc/redis/redis-6381.conf
1redis-server --port 6380 2redis-server --port 6381
Windows 同样,通过复制并修改配置文件中的端口,运行多个 redis-server 进程:
1redis-server redis-6380.conf 2redis-server redis-6381.conf
或者直接开启多个:
1redis-server --port 6380 2redis-server --port 6381
bind
配置项以允许对应主机连接。本框架底层使用 ioredis 作为 Redis 客户端,所有 Redis 相关配置参数均直接传递给 ioredis。 具体的配置选项和使用方法,请参考 ioredis 官方文档 以获取详细说明和最佳实践。
cserver.js:
1// cserver.js 2// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter'); 3const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter'); 4 5// 填入你的 Redis 配置信息,支持多个实例,请确保 Redis 服务已启动 6// 支持多个 Redis 节点,如果使用多个节点,则每次发布会根据设置的策略选择其中的一个节点进行发布, 7// 从而实现“负载均衡”。不同的策略含义请参考 API 文档。 8// 内部会维护各个节点的健康状态。 9// 重要提示:至少需要提供一个 Redis 节点,跨服务通信才能正常工作。 10const redisConfig = [ 11 { port: 6379, host: '127.0.0.1' }, 12 //{ port: 6380, host: '127.0.0.1' }, 13 // 可以添加更多节点 14]; 15 16// 请务必确保启动多个服务器时,每个服务器的名称都唯一,避免冲突 17let serverName = 'serverA'; 18 19// 解析命令行参数,可以在node命令行加上以下参数,动态配置serverName,例如:node cserver --name=serverA 20const args = process.argv.slice(2); 21args.forEach(arg => { 22 if (arg.startsWith('--name=')) { 23 serverName = arg.split('=')[1]; 24 } 25}); 26 27console.log(`Using configured values - serverName: ${serverName}`); 28 29const crossServer = new WebSocketCrossServerAdapter({ 30 redisConfig, 31 serverName, 32 // 注册监听redis节点的健康状态的事件函数,当健康状态发生变化的时候将触发 33 onRedisHealthChange: (health, info) => { 34 console.log(`Node health status changed:${health}`, info); 35 }, 36 // 当频道订阅发生错误的时候触发,info对象包含: 37 onRedisSubscriptionError: (info) => { 38 console.log('onRedisSubscriptionError:', info); 39 } 40}); 41 42 43// 注册跨服务器事件监听 44crossServer.onCrossServerEvent('say', (data, callback) => { 45 46 console.log('Received "say" event from another server:', data); 47 48 // 如果发送方通过 callback 或 Promise 方式发送消息,则此时 callback 为有效函数,可以直接调用以回调响应结果 49 if (callback) { 50 callback({ msg: `Hi, this is server ${crossServer.getServerName()} responding to you` }) 51 } 52}) 53
sender.js:
1// sender.js 2// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter'); 3const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter'); 4 5const redisConfig = [ 6 { port: 6379, host: '127.0.0.1' }, 7 //{ port: 6380, host: '127.0.0.1' }, 8 9]; 10 11let serverName = 'senderA'; 12 13const args = process.argv.slice(2); 14args.forEach(arg => { 15 if (arg.startsWith('--name=')) { 16 serverName = arg.split('=')[1]; 17 } 18}); 19 20console.log(`Using configured values - serverName: ${serverName}`); 21 22const crossServer = new WebSocketCrossServerAdapter({ 23 redisConfig, 24 serverName, 25 onRedisHealthChange: (health, info) => { 26 console.log(`Node health status changed:${health}`, info); 27 }, 28 onRedisSubscriptionError: (info) => { 29 console.log('onRedisSubscriptionError:', info); 30 } 31}); 32 33// 注册跨服务器事件监听 34// 如果发送目标 targetServer 包含了自己(即全局广播时没有排除自己,或者 targetServer 中包含了自己的 serverName), 35// 那么本服务器也会响应自己发送的该事件。 36// 该事件响应会直接在本地上下文中执行,不经过 Redis 频道中转。 37// 因此,开发者无需为本地事件做额外处理,所有优化均由内部机制自动完成。 38crossServer.onCrossServerEvent('say', (data, callback) => { 39 console.log('Received "say" event from another server:'); 40 console.log(data); 41 if (callback) { 42 callback({ msg: `Hi, this is server ${crossServer.getServerName()} responding to you` }) 43 } 44}) 45 46// 发送say事件消息,不需要任何回调 47setTimeout(() => { 48 crossServer.emitCrossServer('say', { 49 content: `Hi everyone, I am ${crossServer.getServerName()}` 50 },null, { 51 targetServer: [], 52 }) 53}, 3000); 54 55// 以callback的方式发送say事件消息,需要目标服务器回调 56setTimeout(() => { 57 58 // 每当接收到一个服务器响应都会执行一次回调函数 59 crossServer.emitCrossServer('say', { 60 content: `Hi everyone, I am ${crossServer.getServerName()}, please respond with your callback.` 61 }, (result) => { 62 console.log('Callback response result:', result); 63 64 if (result.success) { 65 console.log('Received server callback:', result.data); 66 console.log('Number of servers yet to respond:', result.remainingResponses); 67 } else { 68 // Timed out before collecting all responses 69 console.log('Error message:', result.error); 70 console.log('Number of servers that did not respond:', result.unrespondedCount); 71 } 72 }, { 73 targetServer: [], 74 expectedResponses: 3, 75 // exceptSelf: false, 76 // timeout: 2000, 77 }) 78}, 6000); 79 80// 以Promise的方式发送say事件消息,需要目标服务器回调 81setTimeout(async () => { 82 83 // Promise 会在所有预期响应(expectedResponses)全部完成后解决(resolved),如果超时未收到全部响应,也会解决,但此时 result.success 为 false。 84 let result = await crossServer.emitCrossServerWithPromise('say', { 85 content: `Hi everyone, I am ${crossServer.getServerName()}, please respond with your callback for the promise.` 86 }, { 87 targetServer: [], 88 expectedResponses: 3, 89 // exceptSelf: true, 90 // timeout: 2000, 91 }) 92 console.log('Promise response result:', result); 93 94 if (result.success) { 95 console.log('All expected nodes responded:', result.responses); 96 } else { 97 console.log('Nodes that have responded so far:', result.responses); 98 console.log('Number of servers that did not respond: ' + result.unrespondedCount); 99 } 100 101 // 也可以使用then的方式 102 // crossServer.emitCrossServerWithPromise('say', { 103 // content: `Hi everyone, I am ${crossServer.getServerName()}, please respond with your callback for the promise.` 104 // }, { 105 // targetServer: [], 106 // expectedResponses: 3, 107 // // exceptSelf: true, 108 // // timeout: 2000, 109 // }).then((result) => { 110 111 // }) 112}, 15_000); 113
请务必确保 Redis 服务已启动,且监听端口为 6379,本示例运行前需满足此条件。
1npm install
1cd examples/cross-server
concurrently是一个工具,可以一条命令同时启动多个服务器实例:
1npx concurrently "node cserver --name=serverA" "node cserver --name=serverB" "node cserver --name=serverC" "node cserver --name=serverD" "node cserver --name=serverE"
📌 注意:虽然它们共用一个终端窗口输出日志,但每个服务器仍然是独立的 Node.js 进程,彼此之间完全隔离,只是 concurrently 将它们的控制台输出集中显示,便于观察。
如果你希望每个服务器运行在自己的独立终端窗口中,便于查看日志或调试,可以分别手动启动: 启动一个默认服务器:
1node cserver 2
或者启动一个带自定义名称的服务器:
1node cserver --name=serverB 2
⚠️ 每个服务器名称必须唯一,这是保证分布式系统正常运行的前提,否则可能导致节点识别冲突或消息路由错误。
用于测试跨服务器通信的发送端:
1node sender 2
或使用自定义名称:
1node sender --name=senderB 2
成功启动后,你可以看到多个服务器之间的事件通信、回调响应等输出结果,验证系统的分布式通信能力。
你可以使用不同的参数配置来进行测试,例如:
排除自己不接收消息
指定目标服务器发送消息
设置超时时间
设置预期响应服务器个数
targetServer: []
空数组表示广播模式,所有服务器都将接收到消息。此时可配合 exceptSelf: true
来排除当前服务器自身不接收消息。
targetServer: ['serverA', 'serverB']
指定目标服务器名称(支持多个),可实现定向发送消息,仅目标服务器会接收到事件。
更多细节请参考 API 文档中
emitCrossServer
与
emitCrossServerWithPromise
。
通过使用 WebSocketCrossServerAdapter 的跨服务器通信功能,你可以轻松实现多进程或分布式环境下各个服务器节点之间的高效通信。 无论是定向消息发送、广播消息、回调机制,还是多节点响应统计等多种场景需求,都能被很好地支持,助力构建稳定且灵活的分布式系统。
在前两个章节中,我们已经实现了以下功能:
单 WebSocket 服务器模式(非分布式)
展示了如何使用 WebSocket 在单一服务实例中进行客户端通信,包括事件监听、消息发送和回调响应等基本操作。
跨服务器通信模块(纯服务端通信)
展示了不同服务节点之间如何通过 Redis 实现事件广播、定向发送和异步回调处理。
接下来我们将进入更高级的场景:将 WebSocket 与 CrossServer 模块结合,实现真正意义上的WebSocket 分布式通信。
wsserver.js:
1// wsserver.js 2// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter'); 3const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter'); 4 5const redisConfig = [ 6 { port: 6379, host: '127.0.0.1' }, 7 //{ port: 6380, host: '127.0.0.1' }, 8]; 9 10const args = process.argv.slice(2); 11let port = 9000; 12let serverName = 'serverA'; 13 14args.forEach(arg => { 15 if (arg.startsWith('--port=')) { 16 port = parseInt(arg.split('=')[1], 10); 17 } else if (arg.startsWith('--name=')) { 18 serverName = arg.split('=')[1]; 19 } 20}); 21 22console.log(`Using configured values - serverName: ${serverName},port: ${port}`); 23 24if (!(port && serverName)) { 25 throw new Error("Invalid port or server name"); 26} 27 28const wsCrossServer = new WebSocketCrossServerAdapter({ 29 redisConfig, 30 serverName, 31 wsOptions: { 32 port 33 } 34}); 35 36 37wsCrossServer.onWebSocketEvent('connection', async (socket, req) => { 38 39 const data = wsCrossServer.parseWsRequestParams(req); 40 console.log(`[${wsCrossServer.getServerName()}] Client Connection params:`, data); 41 if (data.params.id) { 42 const playerId = String(data.params.id); 43 socket.playerId = playerId; 44 wsCrossServer.setUserSocket(playerId, socket); 45 } else { 46 socket.close(4011, 'Auth failure'); 47 } 48}) 49 50wsCrossServer.onWebSocketEvent('close', async (socket, req) => { 51 console.log(`[${wsCrossServer.getServerName()}] Client ${socket.playerId} disconnected`); 52 if (socket.playerId) { 53 wsCrossServer.removeUserSocket(socket.playerId); 54 } 55}) 56 57 58wsCrossServer.onWebSocketEvent('joinRoom', (socket, data, callback) => { 59 60 if (socket.playerId && data && data.roomId) { 61 console.log(`[${wsCrossServer.getServerName()}] Client ${socket.playerId} wants to join room ${data.roomId}`); 62 wsCrossServer.joinRoom('chat', String(data.roomId), socket.playerId); 63 callback?.({ msg: 'Successfully joined the roomId:' + data.roomId }); 64 } else { 65 callback?.({ msg: 'Failed to join the room' }); 66 } 67 68}); 69 70wsCrossServer.onWebSocketEvent('command', (socket, data, callback) => { 71 console.log(`[${wsCrossServer.getServerName()}] Received 'command' event from client ${socket.playerId}:`, data); 72 73 if (!data || typeof data.action !== 'string') { 74 callback?.({ msg: 'Failed to send message' }); 75 return; 76 } 77 const { action, msg, toPlayerId, toPlayerIds, roomId } = data; 78 switch (action) { 79 case 'broadcast': 80 wsCrossServer.broadcast('say', { action, msg }); 81 break; 82 83 case 'toPlayer': 84 if (toPlayerId) { 85 wsCrossServer.toSocketId(String(toPlayerId), 'say', { action, msg }); 86 } 87 break; 88 89 case 'toPlayers': 90 if (Array.isArray(toPlayerIds)) { 91 wsCrossServer.toSocketIds(toPlayerIds, 'say', { action, msg }); 92 } 93 break; 94 95 case 'toRoom': 96 if (roomId) { 97 wsCrossServer.broadcastToRoom('chat', String(roomId), 'say', { action, msg }); 98 } 99 break; 100 101 default: 102 callback?.({ msg: 'Unknown action type' }); 103 return; 104 } 105 106 callback?.({ msg: `Message sent successfully [action:${action}] ` }); 107}); 108
clients.js:
1// clients.js 2// const { WebSocketConnector } = require('websocket-cross-server-adapter'); 3const WebSocketConnector = require('../../src/WebSocketConnector'); 4 5const totalClients = 50; 6 7const basePort = 9000; 8const portRange = 5; 9 10// 随机决定要发送加入房间消息的客户端数量 11const joinRoomCount = 10; 12const joinRoomClientIds = new Set(); 13 14// 随机挑选10个客户端id 15while (joinRoomClientIds.size < joinRoomCount) { 16 joinRoomClientIds.add(Math.floor(Math.random() * totalClients) + 1); 17} 18 19// 预定义一些房间ID 20const roomIds = ['1000', '1001', '1002']; 21 22// 模拟多个客户端加入不同的ws服务器 23for (let i = 0; i < totalClients; i++) { 24 const port = basePort + (i % portRange); 25 const id = i + 1; 26 27 const client = new WebSocketConnector({ 28 url: `ws://localhost:${port}`, 29 customParams: { 30 id: id 31 } 32 }); 33 34 client.on('open', () => { 35 console.log(`[Client ${id},port:${port}] Connect success`) 36 // 如果这个客户端在随机加入列表里,发送加入房间消息 37 if (joinRoomClientIds.has(id)) { 38 client.emit('joinRoom', { roomId: roomIds[Math.floor(Math.random() * roomIds.length)] }, (err, data) => { 39 if (err) { 40 console.log(`[Client ${id},port:${port}] JoinRoom Callback error occurred`); 41 console.log(err) 42 } else { 43 console.log(`[Client ${id},port:${port}] Received joinRoom callback response:`); 44 console.log(data) 45 } 46 }) 47 } 48 }) 49 50 client.on('close', (event) => { 51 console.log(`[Client ${id},port:${port}] onClose event:`, event.code, event.reason); 52 }) 53 54 client.on('say', (data) => { 55 console.log(`[Client ${id},port:${port}] Received say event:`, data); 56 }); 57 58} 59
boss.js:
1// boss.js 2// const { WebSocketConnector } = require('websocket-cross-server-adapter'); 3const WebSocketConnector = require('../../src/WebSocketConnector'); 4 5let port = 9000; 6let id = 555; 7 8const args = process.argv.slice(2); 9args.forEach(arg => { 10 if (arg.startsWith('--port=')) { 11 port = parseInt(arg.split('=')[1], 10); 12 } else if (arg.startsWith('--id=')) { 13 id = arg.split('=')[1]; 14 } 15}); 16 17console.log(`Using configured values - port: ${port}, id: ${id}`); 18 19const client = new WebSocketConnector({ 20 url: `ws://localhost:${port}`, 21 customParams: { 22 id 23 } 24}); 25 26client.on('open', () => { 27 console.log('Connect success') 28}) 29 30client.on('close', (event) => { 31 console.log('onClose event:', event.code, event.reason); 32 33}) 34 35client.on('say', (data) => { 36 console.log(`Received say event:`, data); 37}); 38 39setTimeout(async () => { 40 client.emit('command', { action: 'broadcast', msg: 'Hello every one' }, (err, data) => { 41 if (err) { 42 console.log('Callback error occurred'); 43 console.log(err) 44 } else { 45 console.log('Received callback response:'); 46 console.log(data) 47 } 48 }) 49}, 6_000); 50 51setTimeout(async () => { 52 client.emit('command', { action: 'toPlayer', msg: 'Hello player 13 ', toPlayerId: '13' }, (err, data) => { 53 if (err) { 54 console.log('Callback error occurred'); 55 console.log(err) 56 } else { 57 console.log('Received callback response:'); 58 console.log(data) 59 } 60 }) 61}, 9_000); 62 63setTimeout(async () => { 64 client.emit('command', { action: 'toPlayers', msg: 'Hello group players ', toPlayerIds: ['3','10','25','37'] }, (err, data) => { 65 if (err) { 66 console.log('Callback error occurred'); 67 console.log(err) 68 } else { 69 console.log('Received callback response:'); 70 console.log(data) 71 } 72 }) 73}, 12_000); 74 75setTimeout(async () => { 76 client.emit('command', { action: 'toRoom', msg: 'Hello room players ', roomId: '1000' }, (err, data) => { 77 if (err) { 78 console.log('Callback error occurred'); 79 console.log(err) 80 } else { 81 console.log('Received callback response:'); 82 console.log(data) 83 } 84 }) 85}, 15_000); 86
请务必确保 Redis 服务已启动,且监听端口为 6379,本示例运行前需满足此条件。
进入 examples/ws-cross-server 目录,使用以下命令通过 concurrently 同时启动五个不同名称和端口的 WebSocket 服务实例:
1npx concurrently "node wsserver --name=serverA --port=9000" "node wsserver --name=serverB --port=9001" "node wsserver --name=serverC --port=9002" "node wsserver --name=serverD --port=9003" "node wsserver --name=serverE --port=9004"
注意:concurrently 会将多个服务器日志集中显示在一个终端窗口中。如果你更喜欢每个服务器拥有独立窗口,可以手动分别执行下面的命令启动:
1node wsserver --name=serverA --port=9000 2node wsserver --name=serverB --port=9001 3node wsserver --name=serverC --port=9002 4node wsserver --name=serverD --port=9003 5node wsserver --name=serverE --port=9004
请确保每个服务器的名称唯一,避免节点名称冲突。
执行以下命令启动 50 个模拟客户端,这些客户端会随机连接到上述任意一个服务器,并随机把一部分客户端加入到测试房间。
1node clients
使用下面命令启动控制端客户端,用于模拟发送广播、点对点、群发、房间消息等多种指令:
1node boss
运行上述示例后,你将观察到以下分布式通信特性生效:
broadcast
)toPlayer
指定玩家)toPlayers
)toRoom
)这些特性表明:
- 各 WebSocket 服务器节点之间通过 Redis 实现了事件同步与消息路由。
- 分布式环境下,消息发送逻辑与单服务器模式几乎保持一致,开发者无需额外关注服务器部署细节。
- 整个系统具备了真正意义上的 WebSocket 分布式通信能力。
通过以上三个章节的示例,你可以循序渐进地从 单机 WebSocket 通信,到 服务器之间的跨节点通信,再到 WebSocket 客户端与跨服务器系统协同通信,完整了解整个分布式通信的工作流程与核心机制。每一阶段都紧扣实际场景,帮助你逐步建立起对 WebSocket 分布式架构的整体认知。
如果你在使用过程中有任何问题或建议,欢迎随时与我联系交流。 你也可以通过 GitHub 仓库的 Issues 反馈问题或提出建议。
为了防止邮件被误归类到垃圾邮件,请在邮件主题或正文前面加上 [WebSocketCrossServerAdapter]。 邮箱:349233775@qq.com
本项目基于 MIT 协议开源,具体内容请查看 LICENSE 文件。
No vulnerabilities found.
No security vulnerabilities found.