跳至主要内容

如何构建一个基本的 Socket.IO 客户端

在本指南中,我们将使用 JavaScript 实现一个基本的 Socket.IO 客户端,以便更好地理解 Socket.IO 协议。

我们将实现以下功能

  • 创建 WebSocket 连接
  • 管理重新连接
  • 发送事件
  • 接收事件
  • 手动断开连接

官方客户端显然包含更多功能

但这应该足以让您对库在幕后的工作原理有一个很好的概述。

我们的目标是实现类似于以下内容

import { io } from "./basic-client.js";

const socket = io();

// connection
socket.on("connect", () => {
// ...
});

// receiving an event
socket.on("foo", (value) => {
// ...
});

// sending an event
socket.emit("bar", "abc");

准备好了吗?让我们开始吧!

事件发射器

Socket.IO API 很大程度上受到 Node.js EventEmitter 类的启发。

import { EventEmitter } from "node:events";

const myEmitter = new EventEmitter();

myEmitter.on("foo", () => {
console.log("foo!");
});

myEmitter.emit("foo");

该库提供了类似的 API,但在服务器和客户端之间

  • 服务器
io.on("connection", (socket) => {
// send a "foo" event to the client
socket.emit("foo");

// receive a "bar" event from the client
socket.on("bar", () => {
// ...
});
});
  • 客户端
import { io } from "socket.io-client";

const socket = io();

// receive a "foo" event from the server
socket.on("foo", () => {
// ...
});

// send a "bar" event to the server
socket.emit("bar");

服务器和客户端之间的底层连接 (WebSocket 或 HTTP 长轮询) 被抽象化并由库管理。

让我们创建一个最小的 EventEmitter

class EventEmitter {
#listeners = new Map();

on(event, listener) {
let listeners = this.#listeners.get(event);
if (!listeners) {
this.#listeners.set(event, listeners = []);
}
listeners.push(listener);
}

emit(event, ...args) {
const listeners = this.#listeners.get(event);
if (listeners) {
for (const listener of listeners) {
listener.apply(null, args);
}
}
}
}

然后,我们的 Socket 类将扩展此类,以公开 on()emit() 方法

class Socket extends EventEmitter {
constructor(uri, opts) {
super();
}
}

在我们的构造函数中,uri 参数是

  • 由用户提供
const socket = io("https://example.com");
const socket = io();

让我们创建一个入口点

export function io(uri, opts) {
if (typeof uri !== "string") {
opts = uri;
uri = location.origin;
}
return new Socket(uri, opts);
}

好的,这是一个良好的开端!

WebSocket 连接

现在,让我们创建与服务器的 WebSocket 连接

class Socket extends EventEmitter {
+ #uri;
+ #opts;
+ #ws;

constructor(uri, opts) {
super();
+ this.#uri = uri;
+ this.#opts = Object.assign({
+ path: "/socket.io/"
+ }, opts);
+ this.#open();
}

+ #open() {
+ this.#ws = new WebSocket(this.#createUrl());
+ }
+
+ #createUrl() {
+ const uri = this.#uri.replace(/^http/, "ws");
+ const queryParams = "?EIO=4&transport=websocket";
+ return `${uri}${this.#opts.path}${queryParams}`;
+ }
}

参考:https://mdn.org.cn/en-US/docs/Web/API/WebSocket

关于 createUrl() 方法的一些解释

  • WebSocket URL 以 ws://wss:// 开头,因此我们在 replace() 调用中处理它
  • Socket.IO URL 始终包含特定的请求路径,默认值为 /socket.io/
  • 有两个强制性查询参数
    • EIO=4:Engine.IO 协议的版本
    • transport=websocket:使用的传输

因此,最终的 URL 将类似于:wss://example.com/socket.io/?EIO=4&transport=websocket

Engine.IO 协议

Socket.IO 代码库分为两个不同的层

  • 低级管道:我们称之为 Engine.IO,即 Socket.IO 内部的引擎
  • 高级 API:Socket.IO 本身

另请参阅

使用 WebSocket 时,通过网络发送的消息格式很简单:<packet type><payload>

以下是协议第 4 版 (因此上面的 EIO=4) 中的不同数据包类型

名称表示描述
OPEN0在握手期间使用。
CLOSE1用于指示传输可以关闭。
PING2在心跳机制中使用。
PONG3在心跳机制中使用。
MESSAGE4用于将有效负载发送到另一方。
UPGRADE5在升级过程中使用(此处未使用)。
NOOP6在升级过程中使用(此处未使用)。

示例

4hello

with:

4 => MESSAGE packet type
hello => message payload (UTF-8 encoded)

让我们处理 WebSocket 消息

+const EIOPacketType = {
+ OPEN: "0",
+ CLOSE: "1",
+ PING: "2",
+ PONG: "3",
+ MESSAGE: "4",
+};

+function noop() {}

class Socket extends EventEmitter {
[...]

#open() {
this.#ws = new WebSocket(this.#createUrl());
+ this.#ws.onmessage = ({ data }) => this.#onMessage(data);
+ this.#ws.onclose = () => this.#onClose("transport close");
}

+ #onMessage(data) {
+ if (typeof data !== "string") {
+ // TODO handle binary payloads
+ return;
+ }
+
+ switch (data[0]) {
+ case EIOPacketType.CLOSE:
+ this.#onClose("transport close");
+ break;
+
+ default:
+ this.#onClose("parse error");
+ break;
+ }
+ }
+
+ #onClose(reason) {
+ if (this.#ws) {
+ this.#ws.onclose = noop;
+ this.#ws.close();
+ }
+ }
+}

心跳

心跳机制用于确保服务器和客户端之间的连接正常。

服务器在初始握手期间发送两个值:pingIntervalpingTimeout

然后它将每隔 pingInterval 毫秒发送一个 PING 数据包,并期望从客户端收到一个 PONG 数据包。让我们这样做

class Socket extends EventEmitter {
+ #pingTimeoutTimer;
+ #pingTimeoutDelay;

[...]

#onMessage(data) {
if (typeof data !== "string") {
// TODO handle binary payloads
return;
}

switch (data[0]) {
+ case EIOPacketType.OPEN:
+ this.#onOpen(data);
+ break;
+
case EIOPacketType.CLOSE:
this.#onClose("transport close");
break;

+ case EIOPacketType.PING:
+ this.#resetPingTimeout();
+ this.#send(EIOPacketType.PONG);
+ break;

default:
this.#onClose("parse error");
break;
}
}

+ #onOpen(data) {
+ let handshake;
+ try {
+ handshake = JSON.parse(data.substring(1));
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
+ this.#resetPingTimeout();
+ }
+
+ #resetPingTimeout() {
+ clearTimeout(this.#pingTimeoutTimer);
+ this.#pingTimeoutTimer = setTimeout(() => {
+ this.#onClose("ping timeout");
+ }, this.#pingTimeoutDelay);
+ }
+
+ #send(data) {
+ if (this.#ws.readyState === WebSocket.OPEN) {
+ this.#ws.send(data);
+ }
+ }

#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}

+ clearTimeout(this.#pingTimeoutTimer);
}
}

重新连接

趁此机会,我们还将处理重新连接。WebSockets 非常棒,但它们可能会(并且会在现实生活中)断开连接,因此我们必须注意这一点

class Socket extends EventEmitter {
[...]

constructor(uri, opts) {
super();
this.#uri = uri;
this.#opts = Object.assign(
{
path: "/socket.io/",
+ reconnectionDelay: 2000,
},
opts
);
this.#open();
}

#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}

clearTimeout(this.#pingTimeoutTimer);

+ setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
}
信息

官方 Socket.IO 客户端使用了一种带有随机性的奇特的指数延迟,以防止大量客户端同时重新连接时出现负载峰值,但这里我们将保持简单,并使用一个常量值。

好的,让我们总结一下,我们现在有一个可以

  • 打开与服务器的 WebSocket 连接
  • 通过响应 PING 数据包来遵守心跳机制
  • 在失败时自动重新连接

这就是 Engine.IO 协议!现在让我们深入了解 Socket.IO 协议。

Socket.IO 协议

Socket.IO 协议构建在前面描述的 Engine.IO 协议 之上,这意味着每个 Socket.IO 数据包在通过网络发送时都将以“4”(Engine.IO MESSAGE 数据包类型)为前缀。

参考:Socket.IO 协议

在没有二进制元素的情况下,格式如下

<packet type>[JSON-stringified payload]

以下是可用数据包类型的列表

类型ID用法
CONNECT0在连接到命名空间期间使用。
DISCONNECT1在从命名空间断开连接时使用。
EVENT2用于将数据发送到另一方。
ACK3用于确认事件(此处未使用)。
CONNECT_ERROR4在连接到命名空间期间使用(此处未使用)。
BINARY_EVENT5用于将二进制数据发送到另一方(此处未使用)。
BINARY_ACK6用于确认事件(响应包含二进制数据)(此处未使用)。

示例

2["hello","world"]

with:

2 => EVENT packet type
["hello","world"] => JSON.stringified() payload

连接

客户端必须在 Socket.IO 会话开始时发送一个 CONNECT 数据包

+const SIOPacketType = {
+ CONNECT: 0,
+ DISCONNECT: 1,
+ EVENT: 2,
+};

class Socket extends EventEmitter {
[...]

#onOpen(data) {
let handshake;
try {
handshake = JSON.parse(data.substring(1));
} catch (e) {
return this.#onClose("parse error");
}
this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
this.#resetPingTimeout();
+ this.#doConnect();
}

+ #doConnect() {
+ this.#sendPacket({ type: SIOPacketType.CONNECT });
+ }
+
+ #sendPacket(packet) {
+ this.#send(EIOPacketType.MESSAGE + encode(packet));
+ }
}

+function encode(packet) {
+ let output = "" + packet.type;
+
+ return output;
+}

如果允许连接,则服务器将发送一个 CONNECT 数据包作为回复

class Socket extends EventEmitter {
+ id;

[...]

#onMessage(data) {
switch (data[0]) {
[...]

+ case EIOPacketType.MESSAGE:
+ let packet;
+ try {
+ packet = decode(data);
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#onPacket(packet);
+ break;
}
}

+ #onPacket(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ this.#onConnect(packet);
+ break;
+ }
+ }

+ #onConnect(packet) {
+ this.id = packet.data.sid;
+
+ super.emit("connect");
+ }
}

+function decode(data) {
+ let i = 1; // skip "4" prefix
+
+ const packet = {
+ type: parseInt(data.charAt(i++), 10),
+ };
+
+ if (!isPacketValid(packet)) {
+ throw new Error("invalid format");
+ }
+
+ return packet;
+}
+
+function isPacketValid(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ return typeof packet.data === "object";
+ default:
+ return false;
+ }
+}
注意

我们使用 super.emit(...),这样我们就可以稍后覆盖 emit() 方法来发送事件。

发送事件

让我们向服务器发送一些数据。我们需要跟踪底层连接的状态,并在连接就绪之前缓冲数据包

class Socket extends EventEmitter {
+ connected = false;

+ #sendBuffer = [];

[...]

+ emit(...args) {
+ const packet = {
+ type: SIOPacketType.EVENT,
+ data: args,
+ };
+
+ if (this.connected) {
+ this.#sendPacket(packet);
+ } else {
+ this.#sendBuffer.push(packet);
+ }
+ }

#onConnect(packet) {
this.id = packet.data.sid;
+ this.connected = true;

+ this.#sendBuffer.forEach((packet) => this.#sendPacket(packet));
+ this.#sendBuffer.slice(0);

super.emit("connect");
}
}

function encode(packet) {
let output = "" + packet.type;

+ if (packet.data) {
+ output += JSON.stringify(packet.data);
+ }

return output;
}

接收事件

相反,让我们处理服务器发送的 EVENT 数据包

class Socket extends EventEmitter {
[...]

#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;

+ case SIOPacketType.EVENT:
+ super.emit.apply(this, packet.data);
+ break;
}
}
}

function decode(data) {
let i = 1; // skip "4" prefix

const packet = {
type: parseInt(data.charAt(i++), 10),
};

+ if (data.charAt(i)) {
+ packet.data = JSON.parse(data.substring(i));
+ }

if (!isPacketValid(packet)) {
throw new Error("invalid format");
}

return packet;
}

function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.EVENT: {
+ const args = packet.data;
+ return (
+ Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
+ );
+ }
default:
return false;
}
}

手动断开连接

最后,让我们处理套接字不应该尝试重新连接的几种情况

  • 当客户端调用 socket.disconnect()
  • 当服务器调用 socket.disconnect()
class Socket extends EventEmitter {
+ #reconnectTimer;
+ #shouldReconnect = true;

[...]

#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;

+ case SIOPacketType.DISCONNECT:
+ this.#shouldReconnect = false;
+ this.#onClose("io server disconnect");
+ break;

case SIOPacketType.EVENT:
super.emit.apply(this, packet.data);
break;
}
}

#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}

clearTimeout(this.#pingTimeoutTimer);
+ clearTimeout(this.#reconnectTimer);
+
+ if (this.#shouldReconnect) {
+ this.#reconnectTimer = setTimeout(
+ () => this.#open(),
+ this.#opts.reconnectionDelay
+ );
+ }
- setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}

+ disconnect() {
+ this.#shouldReconnect = false;
+ this.#onClose("io client disconnect");
+ }
}

function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.DISCONNECT:
+ return packet.data === undefined;
case SIOPacketType.EVENT: {
const args = packet.data;
return (
Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
);
}
default:
return false;
}
}

结束语

这就是我们的基本 Socket.IO 客户端!让我们回顾一下。

我们已经实现了以下功能

  • 创建 WebSocket 连接
  • 管理重新连接
  • 发送事件
  • 接收事件
  • 手动断开连接

希望您现在对库在幕后的工作原理有了更好的了解。

完整的源代码可以在 此处 找到。

感谢您的阅读!