如何构建一个基本的 Socket.IO 客户端
在本指南中,我们将使用 JavaScript 实现一个基本的 Socket.IO 客户端,以便更好地理解 Socket.IO 协议。
我们将实现以下功能
- 创建 WebSocket 连接
- 管理重新连接
- 发送事件
- 接收事件
- 手动断开连接
官方客户端显然包含更多功能
但这应该足以让您对库在幕后的工作原理有一个很好的概述。
我们的目标是实现类似于以下内容
import { io } from "./basic-client.js";
const socket = io();
// connection
socket.on("connect", () => {
// ...
});
// receiving an event
socket.on("foo", (value) => {
// ...
});
// sending an event
socket.emit("bar", "abc");
准备好了吗?让我们开始吧!
事件发射器
Socket.IO API 很大程度上受到 Node.js EventEmitter 类的启发。
import { EventEmitter } from "node:events";
const myEmitter = new EventEmitter();
myEmitter.on("foo", () => {
console.log("foo!");
});
myEmitter.emit("foo");
该库提供了类似的 API,但在服务器和客户端之间
- 服务器
io.on("connection", (socket) => {
// send a "foo" event to the client
socket.emit("foo");
// receive a "bar" event from the client
socket.on("bar", () => {
// ...
});
});
- 客户端
import { io } from "socket.io-client";
const socket = io();
// receive a "foo" event from the server
socket.on("foo", () => {
// ...
});
// send a "bar" event to the server
socket.emit("bar");
服务器和客户端之间的底层连接 (WebSocket 或 HTTP 长轮询) 被抽象化并由库管理。
让我们创建一个最小的 EventEmitter
类
class EventEmitter {
#listeners = new Map();
on(event, listener) {
let listeners = this.#listeners.get(event);
if (!listeners) {
this.#listeners.set(event, listeners = []);
}
listeners.push(listener);
}
emit(event, ...args) {
const listeners = this.#listeners.get(event);
if (listeners) {
for (const listener of listeners) {
listener.apply(null, args);
}
}
}
}
然后,我们的 Socket
类将扩展此类,以公开 on()
和 emit()
方法
class Socket extends EventEmitter {
constructor(uri, opts) {
super();
}
}
在我们的构造函数中,uri
参数是
- 由用户提供
const socket = io("https://example.com");
- 或从
window.location
对象推断
const socket = io();
让我们创建一个入口点
export function io(uri, opts) {
if (typeof uri !== "string") {
opts = uri;
uri = location.origin;
}
return new Socket(uri, opts);
}
好的,这是一个良好的开端!
WebSocket 连接
现在,让我们创建与服务器的 WebSocket 连接
class Socket extends EventEmitter {
+ #uri;
+ #opts;
+ #ws;
constructor(uri, opts) {
super();
+ this.#uri = uri;
+ this.#opts = Object.assign({
+ path: "/socket.io/"
+ }, opts);
+ this.#open();
}
+ #open() {
+ this.#ws = new WebSocket(this.#createUrl());
+ }
+
+ #createUrl() {
+ const uri = this.#uri.replace(/^http/, "ws");
+ const queryParams = "?EIO=4&transport=websocket";
+ return `${uri}${this.#opts.path}${queryParams}`;
+ }
}
参考:https://mdn.org.cn/en-US/docs/Web/API/WebSocket
关于 createUrl()
方法的一些解释
- WebSocket URL 以
ws://
或wss://
开头,因此我们在replace()
调用中处理它 - Socket.IO URL 始终包含特定的请求路径,默认值为
/socket.io/
- 有两个强制性查询参数
EIO=4
:Engine.IO 协议的版本transport=websocket
:使用的传输
因此,最终的 URL 将类似于:wss://example.com/socket.io/?EIO=4&transport=websocket
Engine.IO 协议
Socket.IO 代码库分为两个不同的层
- 低级管道:我们称之为 Engine.IO,即 Socket.IO 内部的引擎
- 高级 API:Socket.IO 本身
另请参阅
使用 WebSocket 时,通过网络发送的消息格式很简单:<packet type><payload>
以下是协议第 4 版 (因此上面的 EIO=4
) 中的不同数据包类型
名称 | 表示 | 描述 |
---|---|---|
OPEN | 0 | 在握手期间使用。 |
CLOSE | 1 | 用于指示传输可以关闭。 |
PING | 2 | 在心跳机制中使用。 |
PONG | 3 | 在心跳机制中使用。 |
MESSAGE | 4 | 用于将有效负载发送到另一方。 |
UPGRADE | 5 | 在升级过程中使用(此处未使用)。 |
NOOP | 6 | 在升级过程中使用(此处未使用)。 |
示例
4hello
with:
4 => MESSAGE packet type
hello => message payload (UTF-8 encoded)
让我们处理 WebSocket 消息
+const EIOPacketType = {
+ OPEN: "0",
+ CLOSE: "1",
+ PING: "2",
+ PONG: "3",
+ MESSAGE: "4",
+};
+function noop() {}
class Socket extends EventEmitter {
[...]
#open() {
this.#ws = new WebSocket(this.#createUrl());
+ this.#ws.onmessage = ({ data }) => this.#onMessage(data);
+ this.#ws.onclose = () => this.#onClose("transport close");
}
+ #onMessage(data) {
+ if (typeof data !== "string") {
+ // TODO handle binary payloads
+ return;
+ }
+
+ switch (data[0]) {
+ case EIOPacketType.CLOSE:
+ this.#onClose("transport close");
+ break;
+
+ default:
+ this.#onClose("parse error");
+ break;
+ }
+ }
+
+ #onClose(reason) {
+ if (this.#ws) {
+ this.#ws.onclose = noop;
+ this.#ws.close();
+ }
+ }
+}
心跳
心跳机制用于确保服务器和客户端之间的连接正常。
服务器在初始握手期间发送两个值:pingInterval
和 pingTimeout
然后它将每隔 pingInterval
毫秒发送一个 PING 数据包,并期望从客户端收到一个 PONG 数据包。让我们这样做
class Socket extends EventEmitter {
+ #pingTimeoutTimer;
+ #pingTimeoutDelay;
[...]
#onMessage(data) {
if (typeof data !== "string") {
// TODO handle binary payloads
return;
}
switch (data[0]) {
+ case EIOPacketType.OPEN:
+ this.#onOpen(data);
+ break;
+
case EIOPacketType.CLOSE:
this.#onClose("transport close");
break;
+ case EIOPacketType.PING:
+ this.#resetPingTimeout();
+ this.#send(EIOPacketType.PONG);
+ break;
default:
this.#onClose("parse error");
break;
}
}
+ #onOpen(data) {
+ let handshake;
+ try {
+ handshake = JSON.parse(data.substring(1));
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
+ this.#resetPingTimeout();
+ }
+
+ #resetPingTimeout() {
+ clearTimeout(this.#pingTimeoutTimer);
+ this.#pingTimeoutTimer = setTimeout(() => {
+ this.#onClose("ping timeout");
+ }, this.#pingTimeoutDelay);
+ }
+
+ #send(data) {
+ if (this.#ws.readyState === WebSocket.OPEN) {
+ this.#ws.send(data);
+ }
+ }
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
+ clearTimeout(this.#pingTimeoutTimer);
}
}
重新连接
趁此机会,我们还将处理重新连接。WebSockets 非常棒,但它们可能会(并且会在现实生活中)断开连接,因此我们必须注意这一点
class Socket extends EventEmitter {
[...]
constructor(uri, opts) {
super();
this.#uri = uri;
this.#opts = Object.assign(
{
path: "/socket.io/",
+ reconnectionDelay: 2000,
},
opts
);
this.#open();
}
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
clearTimeout(this.#pingTimeoutTimer);
+ setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
}
官方 Socket.IO 客户端使用了一种带有随机性的奇特的指数延迟,以防止大量客户端同时重新连接时出现负载峰值,但这里我们将保持简单,并使用一个常量值。
好的,让我们总结一下,我们现在有一个可以
- 打开与服务器的 WebSocket 连接
- 通过响应 PING 数据包来遵守心跳机制
- 在失败时自动重新连接
这就是 Engine.IO 协议!现在让我们深入了解 Socket.IO 协议。
Socket.IO 协议
Socket.IO 协议构建在前面描述的 Engine.IO 协议 之上,这意味着每个 Socket.IO 数据包在通过网络发送时都将以“4”(Engine.IO MESSAGE 数据包类型)为前缀。
参考:Socket.IO 协议
在没有二进制元素的情况下,格式如下
<packet type>[JSON-stringified payload]
以下是可用数据包类型的列表
类型 | ID | 用法 |
---|---|---|
CONNECT | 0 | 在连接到命名空间期间使用。 |
DISCONNECT | 1 | 在从命名空间断开连接时使用。 |
EVENT | 2 | 用于将数据发送到另一方。 |
ACK | 3 | 用于确认事件(此处未使用)。 |
CONNECT_ERROR | 4 | 在连接到命名空间期间使用(此处未使用)。 |
BINARY_EVENT | 5 | 用于将二进制数据发送到另一方(此处未使用)。 |
BINARY_ACK | 6 | 用于确认事件(响应包含二进制数据)(此处未使用)。 |
示例
2["hello","world"]
with:
2 => EVENT packet type
["hello","world"] => JSON.stringified() payload
连接
客户端必须在 Socket.IO 会话开始时发送一个 CONNECT 数据包
+const SIOPacketType = {
+ CONNECT: 0,
+ DISCONNECT: 1,
+ EVENT: 2,
+};
class Socket extends EventEmitter {
[...]
#onOpen(data) {
let handshake;
try {
handshake = JSON.parse(data.substring(1));
} catch (e) {
return this.#onClose("parse error");
}
this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
this.#resetPingTimeout();
+ this.#doConnect();
}
+ #doConnect() {
+ this.#sendPacket({ type: SIOPacketType.CONNECT });
+ }
+
+ #sendPacket(packet) {
+ this.#send(EIOPacketType.MESSAGE + encode(packet));
+ }
}
+function encode(packet) {
+ let output = "" + packet.type;
+
+ return output;
+}
如果允许连接,则服务器将发送一个 CONNECT 数据包作为回复
class Socket extends EventEmitter {
+ id;
[...]
#onMessage(data) {
switch (data[0]) {
[...]
+ case EIOPacketType.MESSAGE:
+ let packet;
+ try {
+ packet = decode(data);
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#onPacket(packet);
+ break;
}
}
+ #onPacket(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ this.#onConnect(packet);
+ break;
+ }
+ }
+ #onConnect(packet) {
+ this.id = packet.data.sid;
+
+ super.emit("connect");
+ }
}
+function decode(data) {
+ let i = 1; // skip "4" prefix
+
+ const packet = {
+ type: parseInt(data.charAt(i++), 10),
+ };
+
+ if (!isPacketValid(packet)) {
+ throw new Error("invalid format");
+ }
+
+ return packet;
+}
+
+function isPacketValid(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ return typeof packet.data === "object";
+ default:
+ return false;
+ }
+}
我们使用 super.emit(...)
,这样我们就可以稍后覆盖 emit()
方法来发送事件。
发送事件
让我们向服务器发送一些数据。我们需要跟踪底层连接的状态,并在连接就绪之前缓冲数据包
class Socket extends EventEmitter {
+ connected = false;
+ #sendBuffer = [];
[...]
+ emit(...args) {
+ const packet = {
+ type: SIOPacketType.EVENT,
+ data: args,
+ };
+
+ if (this.connected) {
+ this.#sendPacket(packet);
+ } else {
+ this.#sendBuffer.push(packet);
+ }
+ }
#onConnect(packet) {
this.id = packet.data.sid;
+ this.connected = true;
+ this.#sendBuffer.forEach((packet) => this.#sendPacket(packet));
+ this.#sendBuffer.slice(0);
super.emit("connect");
}
}
function encode(packet) {
let output = "" + packet.type;
+ if (packet.data) {
+ output += JSON.stringify(packet.data);
+ }
return output;
}
接收事件
相反,让我们处理服务器发送的 EVENT 数据包
class Socket extends EventEmitter {
[...]
#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;
+ case SIOPacketType.EVENT:
+ super.emit.apply(this, packet.data);
+ break;
}
}
}
function decode(data) {
let i = 1; // skip "4" prefix
const packet = {
type: parseInt(data.charAt(i++), 10),
};
+ if (data.charAt(i)) {
+ packet.data = JSON.parse(data.substring(i));
+ }
if (!isPacketValid(packet)) {
throw new Error("invalid format");
}
return packet;
}
function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.EVENT: {
+ const args = packet.data;
+ return (
+ Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
+ );
+ }
default:
return false;
}
}
手动断开连接
最后,让我们处理套接字不应该尝试重新连接的几种情况
- 当客户端调用
socket.disconnect()
时 - 当服务器调用
socket.disconnect()
时
class Socket extends EventEmitter {
+ #reconnectTimer;
+ #shouldReconnect = true;
[...]
#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;
+ case SIOPacketType.DISCONNECT:
+ this.#shouldReconnect = false;
+ this.#onClose("io server disconnect");
+ break;
case SIOPacketType.EVENT:
super.emit.apply(this, packet.data);
break;
}
}
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
clearTimeout(this.#pingTimeoutTimer);
+ clearTimeout(this.#reconnectTimer);
+
+ if (this.#shouldReconnect) {
+ this.#reconnectTimer = setTimeout(
+ () => this.#open(),
+ this.#opts.reconnectionDelay
+ );
+ }
- setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
+ disconnect() {
+ this.#shouldReconnect = false;
+ this.#onClose("io client disconnect");
+ }
}
function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.DISCONNECT:
+ return packet.data === undefined;
case SIOPacketType.EVENT: {
const args = packet.data;
return (
Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
);
}
default:
return false;
}
}
结束语
这就是我们的基本 Socket.IO 客户端!让我们回顾一下。
我们已经实现了以下功能
- 创建 WebSocket 连接
- 管理重新连接
- 发送事件
- 接收事件
- 手动断开连接
希望您现在对库在幕后的工作原理有了更好的了解。
完整的源代码可以在 此处 找到。
感谢您的阅读!