要构建一个真正意义上的P2P实时协同系统,绕不开的核心问题是状态同步。传统的基于操作转换(Operational Transformation, OT)的方案强依赖一个中心化服务器来协调和转换操作,这不仅是性能瓶颈,也是一个明显的单点故障。我们的目标是移除这个中心化的瓶颈,让客户端之间可以直接通信,自我解决状态冲突。这直接将我们引向了无冲突复制数据类型(CRDT)。
我们的技术栈选择并非偶然,而是围绕着去中心化和韧性这一核心目标层层递进的结果:
- 核心数据结构 (CRDT): 我们将实现一个简化的序列CRDT(Log-structured sequence),它允许在不同节点独立插入和删除,并通过数学保证最终一致性。
- 通信协议 (WebRTC Data Channels): 替代WebSocket,我们选择WebRTC的数据通道进行点对点的操作广播。这极大地降低了服务器的流量压力,使其退化为一个轻量的信令与NAT穿透协调者。
- 状态持久化 (Key-Value NoSQL): CRDT的完整操作日志可能无限增长。对于新加入的协作者,重放整个历史是不现实的。因此,我们需要一个高速的键值存储(如Redis)来定期对文档的CRDT状态进行快照,新用户只需加载最新快照,然后订阅实时操作即可。
- 架构模式 (Headless UI): 协同逻辑本身是与UI无关的。我们将把整个协同引擎封装成一个独立的、无UI的Node.js模块。这个模块只负责状态计算、网络同步和持久化交互,可以被任何前端框架(Web、桌面甚至终端)集成。
我们从最核心的CRDT数据结构开始。我们将实现一个基于Logoot
思想的简化版序列CRDT。每个字符不仅有其本身的值,还有一个唯一的、有序的位置标识符。
// src/crdt/Identifier.ts
// 每一个字符在文档中的唯一位置标识符
export interface Identifier {
digit: number; // 当前位置的数字
siteId: string; // 生成该标识符的节点ID
}
// src/crdt/Char.ts
// CRDT中的字符对象,包含了值和唯一位置
export interface Char {
value: string;
id: Identifier[]; // 位置标识符数组,构成了树状的位置结构
timestamp: number;
}
// src/crdt/Operation.ts
// 定义了在CRDT上执行的操作,可以是插入或删除
export type Operation = {
type: 'insert';
char: Char;
} | {
type: 'delete';
charId: Identifier[];
};
这里的核心是Identifier[]
。它不是一个简单的索引,而是一个路径。例如,[{digit: 1, siteId: 'A'}, {digit: 5, siteId: 'B'}]
表示一个位置。这种结构允许我们在任意两个现有字符之间无限地生成新的位置,而无需重新计算所有索引。
接下来是CRDT引擎的核心实现。它需要维护一个字符列表,并提供应用本地和远程操作的方法。
// src/crdt/CRDTEngine.ts
import { Char, Identifier, Operation } from './types';
import { v4 as uuidv4 } from 'uuid';
export class CRDTEngine {
private structure: Char[] = [];
public readonly siteId: string;
constructor(siteId?: string) {
// 在真实项目中,siteId应该是持久且唯一的,比如用户的session ID
this.siteId = siteId || uuidv4();
console.log(`CRDT Engine initialized with siteId: ${this.siteId}`);
}
// 比较两个Identifier数组的顺序
private compareIds(id1: Identifier[], id2: Identifier[]): number {
const minLength = Math.min(id1.length, id2.length);
for (let i = 0; i < minLength; i++) {
const p1 = id1[i];
const p2 = id2[i];
if (p1.digit < p2.digit) return -1;
if (p1.digit > p2.digit) return 1;
if (p1.siteId < p2.siteId) return -1;
if (p1.siteId > p2.siteId) return 1;
}
if (id1.length < id2.length) return -1;
if (id1.length > id2.length) return 1;
return 0;
}
// 根据远程或本地操作更新内部状态
// 这是 CRDT 的核心:保证操作的幂等性和交换性
public applyOperation(op: Operation): void {
if (op.type === 'insert') {
this.applyInsert(op.char);
} else {
this.applyDelete(op.charId);
}
}
private applyInsert(char: Char): void {
// 幂等性检查:如果该字符已存在,则不执行任何操作
if (this.structure.some(c => this.compareIds(c.id, char.id) === 0)) {
return;
}
// 使用二分查找找到正确的插入位置
let left = 0;
let right = this.structure.length - 1;
let insertIndex = this.structure.length;
while (left <= right) {
const mid = Math.floor((left + right) / 2);
const compare = this.compareIds(this.structure[mid].id, char.id);
if (compare > 0) {
insertIndex = mid;
right = mid - 1;
} else {
left = mid + 1;
}
}
this.structure.splice(insertIndex, 0, char);
}
private applyDelete(charId: Identifier[]): void {
const index = this.structure.findIndex(c => this.compareIds(c.id, charId) === 0);
if (index !== -1) {
this.structure.splice(index, 1);
}
}
// 生成本地插入操作
public generateInsert(index: number, value: string): Operation {
const prevId = this.structure[index - 1]?.id || [];
const nextId = this.structure[index]?.id || [];
// 这里的坑在于,位置生成策略需要非常健壮,以避免冲突和不必要的深度
// 一个简单的策略是在前一个和后一个ID之间选择一个中间位置
// 生产级的实现会更复杂,包含边界检查和随机化策略
const newId = this.generateIdBetween(prevId, nextId);
const char: Char = {
value,
id: newId,
timestamp: Date.now(),
};
return { type: 'insert', char };
}
// 生成本地删除操作
public generateDelete(index: number): Operation | null {
if (index < 0 || index >= this.structure.length) {
// 这里的错误处理至关重要,防止无效操作污染整个系统
console.error(`Invalid index for deletion: ${index}`);
return null;
}
const charToDelete = this.structure[index];
return { type: 'delete', charId: charToDelete.id };
}
// 简化版的位置生成算法
private generateIdBetween(prevId: Identifier[], nextId: Identifier[]): Identifier[] {
// 此处仅为演示,真实项目的算法要处理更多边界情况
const newDigit = (prevId[0]?.digit || 0) + 1;
return [{ digit: newDigit, siteId: this.siteId }];
}
public getText(): string {
return this.structure.map(c => c.value).join('');
}
public getState(): Char[] {
return JSON.parse(JSON.stringify(this.structure)); // 返回深拷贝
}
public setState(state: Char[]): void {
this.structure = state;
}
}
这个引擎是完全“无头”的。它不知道UI,也不知道网络。它只负责维护一个有序的Char
数组。
接下来,我们引入WebRTC来连接各个CRDTEngine
实例。我们需要一个信令服务器来帮助客户端发现彼此并交换建立P2P连接所需的信息(SDP和ICE候选者)。
// src/signaling/server.js
// 一个极简的WebSocket信令服务器
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
// 这里的房间管理非常粗糙,真实项目需要更健壮的数据结构
const rooms = {};
console.log('Signaling server started on ws://localhost:8080');
wss.on('connection', ws => {
console.log('Client connected');
ws.on('message', message => {
let data;
try {
data = JSON.parse(message);
} catch (e) {
console.error('Invalid JSON received:', message);
return;
}
const { type, room, payload } = data;
switch (type) {
case 'join':
console.log(`Client wants to join room: ${room}`);
if (!rooms[room]) {
rooms[room] = new Set();
}
rooms[room].add(ws);
ws.room = room; // 附加房间信息到ws实例上
// 通知房间内其他成员有新人加入
rooms[room].forEach(client => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type: 'peer-connect', peerId: 'some_unique_id' })); // 实际应传递唯一ID
}
});
break;
// SDP offers/answers and ICE candidates are simply forwarded
case 'signal':
console.log(`Forwarding signal in room: ${room}`);
if (rooms[room]) {
rooms[room].forEach(client => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type: 'signal', payload }));
}
});
}
break;
}
});
ws.on('close', () => {
console.log('Client disconnected');
const room = ws.room;
if (room && rooms[room]) {
rooms[room].delete(ws);
if (rooms[room].size === 0) {
delete rooms[room];
}
// 可以在此通知其他人某成员已离开
}
});
ws.on('error', (err) => {
console.error('WebSocket error:', err);
});
});
客户端的WebRTC逻辑则要复杂得多,它负责管理多个P2P连接,并为每个连接设置数据通道。我们将其封装成一个P2PNetworkManager
。
// src/network/P2PNetworkManager.ts
import { Operation } from '../crdt/types';
// 这是一个示意实现,生产级代码需要使用`wrtc`库在Node.js中运行
// 或者在浏览器环境中使用原生的RTCPeerConnection
// 我们用一个EventEmitter来模拟事件
import { EventEmitter } from 'events';
export class P2PNetworkManager extends EventEmitter {
private peers: Map<string, any> = new Map(); // key: peerId, value: RTCPeerConnection
private signalingSocket: WebSocket;
constructor(private signalingServerUrl: string, private roomId: string) {
super();
this.signalingSocket = new WebSocket(this.signalingServerUrl);
this.setupSignaling();
}
private setupSignaling() {
this.signalingSocket.onopen = () => {
console.log('Connected to signaling server.');
this.signalingSocket.send(JSON.stringify({ type: 'join', room: this.roomId }));
};
this.signalingSocket.onmessage = async (event) => {
const message = JSON.parse(event.data);
if (message.type === 'peer-connect') {
// 一个新对等点加入,作为发起方创建连接
this.createPeerConnection(message.peerId, true);
} else if (message.type === 'signal') {
const { sdp, candidate, from } = message.payload;
let pc = this.peers.get(from);
if (!pc) {
pc = this.createPeerConnection(from, false);
}
if (sdp) {
await pc.setRemoteDescription(new RTCSessionDescription(sdp));
if (sdp.type === 'offer') {
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
this.sendSignal({ sdp: answer, to: from });
}
} else if (candidate) {
await pc.addIceCandidate(new RTCIceCandidate(candidate));
}
}
};
this.signalingSocket.onerror = (error) => {
console.error('Signaling socket error:', error);
this.emit('error', new Error('Signaling connection failed'));
};
}
private createPeerConnection(peerId: string, isInitiator: boolean) {
// 这里的配置对性能和NAT穿透至关重要
const pc = new RTCPeerConnection({
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }],
});
this.peers.set(peerId, pc);
pc.onicecandidate = (event) => {
if (event.candidate) {
this.sendSignal({ candidate: event.candidate, to: peerId });
}
};
// 数据通道是核心
if (isInitiator) {
const dataChannel = pc.createDataChannel('crdt-ops');
this.setupDataChannel(dataChannel, peerId);
} else {
pc.ondatachannel = (event) => {
this.setupDataChannel(event.channel, peerId);
};
}
if (isInitiator) {
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
this.sendSignal({ sdp: offer, to: peerId });
}
return pc;
}
private setupDataChannel(channel: RTCDataChannel, peerId: string) {
channel.onopen = () => console.log(`Data channel with ${peerId} is open.`);
channel.onclose = () => {
console.log(`Data channel with ${peerId} is closed.`);
this.peers.delete(peerId);
};
channel.onmessage = (event) => {
try {
const op: Operation = JSON.parse(event.data);
// 收到操作后,通过事件发射出去,由上层逻辑处理
this.emit('operation', op);
} catch (e) {
console.error('Failed to parse operation from peer:', event.data);
}
};
}
// 广播操作到所有已连接的对等点
public broadcast(op: Operation) {
const message = JSON.stringify(op);
this.peers.forEach(pc => {
// 检查数据通道是否存在且已打开
const channel = pc.getSenders()[0]?.transport?.iceTransport?.role === 'controlling'
? pc.getSenders()[0]?.transport?._dataChannel
: pc.getReceivers()[0]?.transport?._dataChannel;
if (channel && channel.readyState === 'open') {
channel.send(message);
}
});
}
private sendSignal(payload: any) {
this.signalingSocket.send(JSON.stringify({
type: 'signal',
room: this.roomId,
payload
}));
}
}
现在,我们将CRDT引擎和P2P网络管理器组合起来,并引入快照持久化。
// src/persistence/SnapshotManager.ts
import { createClient, RedisClientType } from 'redis';
import { Char } from '../crdt/types';
export class SnapshotManager {
private redisClient: RedisClientType;
private docId: string;
constructor(docId: string, redisUrl: string) {
this.docId = `doc_snapshot:${docId}`;
this.redisClient = createClient({ url: redisUrl });
this.redisClient.on('error', (err) => console.error('Redis Client Error', err));
// 立即连接,在真实项目中应该有更好的连接管理和重试逻辑
this.redisClient.connect();
}
async saveSnapshot(state: Char[]): Promise<void> {
if (!this.redisClient.isOpen) {
console.warn('Redis client not connected. Skipping snapshot save.');
return;
}
try {
const serializedState = JSON.stringify(state);
// 使用 SET 命令覆盖旧快照。对于大型文档,这可能是一个耗时操作。
await this.redisClient.set(this.docId, serializedState);
console.log(`Snapshot saved for doc ${this.docId}`);
} catch (err) {
console.error('Failed to save snapshot to Redis:', err);
// 这里的失败处理很关键,可能需要触发告警
}
}
async loadSnapshot(): Promise<Char[] | null> {
if (!this.redisClient.isOpen) {
console.warn('Redis client not connected. Skipping snapshot load.');
return null;
}
try {
const serializedState = await this.redisClient.get(this.docId);
if (serializedState) {
console.log(`Snapshot loaded for doc ${this.docId}`);
return JSON.parse(serializedState);
}
return null;
} catch (err) {
console.error('Failed to load snapshot from Redis:', err);
return null;
}
}
async disconnect(): Promise<void> {
if (this.redisClient.isOpen) {
await this.redisClient.quit();
}
}
}
最后,我们创建一个顶层的CollaborativeSession
类来编排所有组件。这就是我们Headless引擎的最终API。
// src/CollaborativeSession.ts
import { CRDTEngine } from './crdt/CRDTEngine';
import { P2PNetworkManager } from './network/P2PNetworkManager';
import { SnapshotManager } from './persistence/SnapshotManager';
import { Operation } from './crdt/types';
import { EventEmitter } from 'events';
import { debounce } from 'lodash';
// 配置应从外部注入
const REDIS_URL = 'redis://localhost:6379';
const SIGNALING_URL = 'ws://localhost:8080';
const SNAPSHOT_DEBOUNCE_MS = 5000; // 5秒内无操作则保存快照
export class CollaborativeSession extends EventEmitter {
private crdtEngine: CRDTEngine;
private networkManager: P2PNetworkManager;
private snapshotManager: SnapshotManager;
private debouncedSave: () => void;
constructor(private documentId: string, siteId?: string) {
super();
this.crdtEngine = new CRDTEngine(siteId);
this.networkManager = new P2PNetworkManager(SIGNALING_URL, documentId);
this.snapshotManager = new SnapshotManager(documentId, REDIS_URL);
// 防抖的快照保存策略,避免频繁写入Redis
this.debouncedSave = debounce(
() => this.snapshotManager.saveSnapshot(this.crdtEngine.getState()),
SNAPSHOT_DEBOUNCE_MS,
{ leading: false, trailing: true }
);
}
async start() {
console.log(`Starting collaborative session for document: ${this.documentId}`);
const snapshot = await this.snapshotManager.loadSnapshot();
if (snapshot) {
this.crdtEngine.setState(snapshot);
this.emit('update', this.crdtEngine.getText());
}
this.networkManager.on('operation', (op: Operation) => {
console.log('Received remote operation:', op.type);
this.crdtEngine.applyOperation(op);
this.emit('update', this.crdtEngine.getText());
});
this.networkManager.on('error', (err) => {
console.error("Network manager encountered an error:", err);
this.emit('error', err);
});
// 连接到 P2P 网络
// 注意:这里的`start`方法被省略了,因为P2PNetworkManager的构造函数中已开始连接
}
// 暴露给外部(UI层)的接口
public localInsert(index: number, value: string) {
const op = this.crdtEngine.generateInsert(index, value);
this.crdtEngine.applyOperation(op);
this.networkManager.broadcast(op);
this.emit('update', this.crdtEngine.getText());
this.debouncedSave(); // 任何本地修改都触发一次防抖保存
}
public localDelete(index: number) {
const op = this.crdtEngine.generateDelete(index);
if (op) {
this.crdtEngine.applyOperation(op);
this.networkManager.broadcast(op);
this.emit('update', this.crdtEngine.getText());
this.debouncedSave();
}
}
public getText(): string {
return this.crdtEngine.getText();
}
public async stop() {
// 确保最后的更改被保存
this.debouncedSave.flush();
await this.snapshotManager.disconnect();
// 关闭网络连接等清理工作
console.log(`Session for document ${this.documentId} stopped.`);
}
}
这个架构的整体流程可以用下面的图来表示:
graph TD subgraph Client A UI_A[UI Layer] -- localInsert/localDelete --> Session_A[CollaborativeSession] Session_A -- update event --> UI_A Session_A -- interacts with --> Engine_A[CRDTEngine] Session_A -- interacts with --> Network_A[P2PNetworkManager] Session_A -- interacts with --> SnapshotMgr_A[SnapshotManager] end subgraph Client B UI_B[UI Layer] -- localInsert/localDelete --> Session_B[CollaborativeSession] Session_B -- update event --> UI_B Session_B -- interacts with --> Engine_B[CRDTEngine] Session_B -- interacts with --> Network_B[P2PNetworkManager] Session_B -- interacts with --> SnapshotMgr_B[SnapshotManager] end subgraph Backend Signaling[Signaling Server] Redis[(Redis KV Store)] end Network_A -- SDP/ICE via WebSocket --> Signaling Network_B -- SDP/ICE via WebSocket --> Signaling Network_A <-. WebRTC Data Channel .-> Network_B SnapshotMgr_A -- save/load snapshot --> Redis SnapshotMgr_B -- save/load snapshot --> Redis style Signaling fill:#f9f,stroke:#333,stroke-width:2px style Redis fill:#f96,stroke:#333,stroke-width:2px
这个方案的实现并非没有挑战。一个关键的局限性在于,我们实现的CRDT算法非常基础,其生成的ID可能会迅速变得复杂和冗长,导致元数据开销巨大。生产级CRDT(如Yjs或Automerge)使用了更高级的ID分配和压缩策略来缓解这个问题。此外,对删除操作的处理,我们没有实现“墓碑”(tombstone)机制,这意味着被删除的字符在数据结构中被永久移除,这可能会在某些并发编辑场景下导致问题。
另一个架构上的权衡是P2P网络拓扑。我们采用的是全连接(Full Mesh)模型,每个客户端都与其他所有客户端建立直接连接。当参与者数量超过10-15人时,连接数会呈平方级增长,迅速耗尽客户端的计算和网络资源。对于更大规模的会话,一个更实际的方案是引入SFU(Selective Forwarding Unit),客户端只向SFU发送数据,由SFU负责转发给其他客户端。这虽然引入了一个中心化组件,但它的职责远比OT服务器轻量,只做数据包转发,不做状态计算,因此更容易扩展。
最后,快照策略也值得深思。固定时间的防抖是一个起点,但在高并发写入的文档中,可能需要更智能的策略,例如基于操作数量、文档大小变化或活跃用户数来动态调整快照频率,以在Redis负载和新用户加入速度之间找到最佳平衡。