跳至主要内容
版本: 4.x

客户端交付

让我们看看如何确保服务器始终接收客户端发送的消息。

信息

默认情况下,Socket.IO 提供“最多一次”的交付保证(也称为“开火即忘”,这意味着如果消息未到达服务器,将不会重试。

缓冲事件

当客户端断开连接时,任何对 socket.emit() 的调用都会被缓冲,直到重新连接。

在上面的视频中,“实时”消息被缓冲,直到连接重新建立。

这种行为可能完全适合您的应用程序。但是,在某些情况下,消息可能会丢失

  • 连接在发送事件时断开
  • 服务器在处理事件时崩溃或重新启动
  • 数据库暂时不可用

至少一次

我们可以实现“至少一次”的保证

  • 使用确认手动实现
function emit(socket, event, arg) {
socket.timeout(5000).emit(event, arg, (err) => {
if (err) {
// no ack from the server, let's retry
emit(socket, event, arg);
}
});
}

emit(socket, 'hello', 'world');
  • 或使用 retries 选项
const socket = io({
ackTimeout: 10000,
retries: 3
});

socket.emit('hello', 'world');

在这两种情况下,客户端都会重试发送消息,直到从服务器收到确认。

io.on('connection', (socket) => {
socket.on('hello', (value, callback) => {
// once the event is successfully handled
callback();
});
})
提示

使用 retries 选项,可以保证消息的顺序,因为消息会排队并逐个发送。第一个选项并非如此。

恰好一次

重试的问题是,服务器现在可能会收到相同的消息多次,因此它需要一种方法来唯一标识每条消息,并且只在数据库中存储一次。

让我们看看如何在我们的聊天应用程序中实现“恰好一次”的保证。

我们将从在客户端为每条消息分配一个唯一标识符开始

index.html
<script>
let counter = 0;

const socket = io({
auth: {
serverOffset: 0
},
// enable retries
ackTimeout: 10000,
retries: 3,
});

const form = document.getElementById('form');
const input = document.getElementById('input');
const messages = document.getElementById('messages');

form.addEventListener('submit', (e) => {
e.preventDefault();
if (input.value) {
// compute a unique offset
const clientOffset = `${socket.id}-${counter++}`;
socket.emit('chat message', input.value, clientOffset);
input.value = '';
}
});

socket.on('chat message', (msg, serverOffset) => {
const item = document.createElement('li');
item.textContent = msg;
messages.appendChild(item);
window.scrollTo(0, document.body.scrollHeight);
socket.auth.serverOffset = serverOffset;
});
</script>
注意

socket.id 属性是一个随机的 20 个字符的标识符,分配给每个连接。

我们也可以使用 getRandomValues() 来生成一个唯一的偏移量。

然后,我们将此偏移量与服务器端的消息一起存储

index.js
// [...]

io.on('connection', async (socket) => {
socket.on('chat message', async (msg, clientOffset, callback) => {
let result;
try {
result = await db.run('INSERT INTO messages (content, client_offset) VALUES (?, ?)', msg, clientOffset);
} catch (e) {
if (e.errno === 19 /* SQLITE_CONSTRAINT */ ) {
// the message was already inserted, so we notify the client
callback();
} else {
// nothing to do, just let the client retry
}
return;
}
io.emit('chat message', msg, result.lastID);
// acknowledge the event
callback();
});

if (!socket.recovered) {
try {
await db.each('SELECT id, content FROM messages WHERE id > ?',
[socket.handshake.auth.serverOffset || 0],
(_err, row) => {
socket.emit('chat message', row.content, row.id);
}
)
} catch (e) {
// something went wrong
}
}
});

// [...]

这样,client_offset 列上的 UNIQUE 约束可以防止消息重复。

注意

不要忘记确认事件,否则客户端会一直重试(最多 retries 次)。

socket.on('chat message', async (msg, clientOffset, callback) => {
// ... and finally
callback();
});
信息

同样,默认保证(“最多一次”)可能足以满足您的应用程序,但现在您知道如何使其更可靠。

在下一步中,我们将看到如何水平扩展我们的应用程序。

信息

您可以在浏览器中直接运行此示例