私信 - 第四部分
本指南分为四个不同的部分
这是我们在 第三部分 结束时的位置
现在我们将了解如何扩展到多个 Socket.IO 服务器,以实现高可用性/负载均衡的目的。
安装
让我们签出第四部分的分支
git checkout examples/private-messaging-part-4
以下是您在当前目录中应该看到的内容
├── babel.config.js
├── package.json
├── public
│ ├── favicon.ico
│ ├── fonts
│ │ └── Lato-Regular.ttf
│ └── index.html
├── README.md
├── server
│ ├── cluster.js (created)
│ ├── docker-compose.yml (created)
│ ├── index.js (updated)
│ ├── messageStore.js (updated)
│ ├── package.json (updated)
│ └── sessionStore.js (updated)
└── src
├── App.vue
├── components
│ ├── Chat.vue
│ ├── MessagePanel.vue
│ ├── SelectUsername.vue
│ ├── StatusIcon.vue
│ └── User.vue
├── main.js
└── socket.js
完整的差异可以在 这里 找到。
更新服务器
对于这最后一步,我们需要在服务器端添加 3 个额外的依赖项
ioredis
: 一个很棒的 Redis 客户端socket.io-redis
: 一个基于 Redis 发布/订阅机制 的 Socket.IO 适配器@socket.io/sticky
: 一个用于在 Node.js 集群 中运行 Socket.IO 的模块
我们还需要一个 Redis 实例。为了方便起见,提供了一个 docker-compose.yml
文件
cd server
docker-compose up -d
npm install
npm start
这将创建 4 个 Node.js 工作进程,每个工作进程都运行相同的 index.js
文件。
在客户端,不需要进行任何更改,我们将重点关注服务器端。
工作原理
创建多个服务器
在创建多个 Socket.IO 服务器时,需要做两件事
- 您需要启用粘性会话(请参阅 此处 以获取完整说明)
- 您需要用 Redis 适配器(或其他兼容适配器)替换默认的内存中适配器
在我们的示例中,@socket.io/sticky
模块用于确保来自特定客户端的请求始终路由到同一个 Socket.IO 服务器。这就是所谓的“粘性会话”。
注意:我们也可以创建多个进程监听不同的端口(或使用多个主机),并在它们前面添加一个反向代理。在 文档 中介绍了为 NginX 或 HAProxy 等常见反向代理解决方案启用粘性会话。
集群是在 server/cluster.js
文件中创建的
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const WORKERS_COUNT = 4;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}
cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}
在我们现有的 server/index.js
文件中,只有一个更改:由工作进程创建的 HTTP 服务器实际上不监听任何特定端口,请求将由主进程处理,然后转发到正确的工作进程。
之前
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
之后
setupWorker(io);
@socket.io/sticky
提供的 setupWorker
方法将负责主进程和工作进程之间的同步。
会话和消息
现在粘性会话已启用,我们需要在 Socket.IO 服务器之间共享会话和消息。
我们基于 Redis 创建了一个新的 SessionStore。我们将使用 HSET 命令将每个会话存储在 Redis 哈希中
class RedisSessionStore extends SessionStore {
// ...
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(`session:${id}`, "userID", userID, "username", username, "connected", connected)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
// ...
}
我们还为键设置了过期时间,以便清理旧会话。
使用 HMGET 命令获取会话非常简单
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
// ...
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
// ...
}
获取所有会话稍微复杂一些
class RedisSessionStore extends SessionStore {
// ...
async findAllSessions() {
// first, we fetch all the keys with the SCAN command
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
// and then we retrieve the session details with multiple HMGET commands
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
类似地,我们基于 Redis 创建了一个新的 MessageStore。我们将使用 RPUSH 命令将与特定用户关联的所有消息存储在 Redis 列表中
class RedisMessageStore extends MessageStore {
// ...
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
// ...
}
使用 LRANGE 命令检索消息
class RedisMessageStore extends MessageStore {
// ...
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}
转发消息
还需要进行最后一个修改:我们需要确保消息实际上到达接收者,即使该接收者未连接到同一个 Socket.IO 服务器
这是 Redis 适配器的职责,它依赖于 Redis 发布/订阅机制在 Socket.IO 服务器之间广播消息,并最终到达所有客户端。
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});
就是这样!如果您在机器上有一个 Redis CLI,您可以检查在网络上传输的消息
$ redis-cli
127.0.0.1:6379> PSUBSCRIBE socket.io*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "socket.io*"
3) (integer) 1
1) "pmessage"
2) "socket.io*"
3) "socket.io#/#"
4) "\x93\xa6XFD3OF\x83..."
文档
注意:使用 Redis 适配器,allSockets()
方法(用于“断开连接”处理程序)会自动返回所有 Socket.IO 服务器上的 Socket ID,因此无需进行任何更新。
回顾
好的,让我们总结一下:我们创建了一个功能齐全的聊天(是的,再次!),健壮,可以水平扩展,这使我们能够引入一些有用的 Socket.IO 功能
感谢您的阅读!