构建一个基于CRDT的实时协同状态引擎:从WebRTC数据同步到键值存储快照


要构建一个真正意义上的P2P实时协同系统,绕不开的核心问题是状态同步。传统的基于操作转换(Operational Transformation, OT)的方案强依赖一个中心化服务器来协调和转换操作,这不仅是性能瓶颈,也是一个明显的单点故障。我们的目标是移除这个中心化的瓶颈,让客户端之间可以直接通信,自我解决状态冲突。这直接将我们引向了无冲突复制数据类型(CRDT)。

我们的技术栈选择并非偶然,而是围绕着去中心化和韧性这一核心目标层层递进的结果:

  1. 核心数据结构 (CRDT): 我们将实现一个简化的序列CRDT(Log-structured sequence),它允许在不同节点独立插入和删除,并通过数学保证最终一致性。
  2. 通信协议 (WebRTC Data Channels): 替代WebSocket,我们选择WebRTC的数据通道进行点对点的操作广播。这极大地降低了服务器的流量压力,使其退化为一个轻量的信令与NAT穿透协调者。
  3. 状态持久化 (Key-Value NoSQL): CRDT的完整操作日志可能无限增长。对于新加入的协作者,重放整个历史是不现实的。因此,我们需要一个高速的键值存储(如Redis)来定期对文档的CRDT状态进行快照,新用户只需加载最新快照,然后订阅实时操作即可。
  4. 架构模式 (Headless UI): 协同逻辑本身是与UI无关的。我们将把整个协同引擎封装成一个独立的、无UI的Node.js模块。这个模块只负责状态计算、网络同步和持久化交互,可以被任何前端框架(Web、桌面甚至终端)集成。

我们从最核心的CRDT数据结构开始。我们将实现一个基于Logoot思想的简化版序列CRDT。每个字符不仅有其本身的值,还有一个唯一的、有序的位置标识符。

// src/crdt/Identifier.ts
// 每一个字符在文档中的唯一位置标识符
export interface Identifier {
  digit: number;   // 当前位置的数字
  siteId: string;  // 生成该标识符的节点ID
}

// src/crdt/Char.ts
// CRDT中的字符对象,包含了值和唯一位置
export interface Char {
  value: string;
  id: Identifier[]; // 位置标识符数组,构成了树状的位置结构
  timestamp: number;
}

// src/crdt/Operation.ts
// 定义了在CRDT上执行的操作,可以是插入或删除
export type Operation = {
  type: 'insert';
  char: Char;
} | {
  type: 'delete';
  charId: Identifier[];
};

这里的核心是Identifier[]。它不是一个简单的索引,而是一个路径。例如,[{digit: 1, siteId: 'A'}, {digit: 5, siteId: 'B'}]表示一个位置。这种结构允许我们在任意两个现有字符之间无限地生成新的位置,而无需重新计算所有索引。

接下来是CRDT引擎的核心实现。它需要维护一个字符列表,并提供应用本地和远程操作的方法。

// src/crdt/CRDTEngine.ts
import { Char, Identifier, Operation } from './types';
import { v4 as uuidv4 } from 'uuid';

export class CRDTEngine {
  private structure: Char[] = [];
  public readonly siteId: string;

  constructor(siteId?: string) {
    // 在真实项目中,siteId应该是持久且唯一的,比如用户的session ID
    this.siteId = siteId || uuidv4();
    console.log(`CRDT Engine initialized with siteId: ${this.siteId}`);
  }

  // 比较两个Identifier数组的顺序
  private compareIds(id1: Identifier[], id2: Identifier[]): number {
    const minLength = Math.min(id1.length, id2.length);
    for (let i = 0; i < minLength; i++) {
      const p1 = id1[i];
      const p2 = id2[i];
      if (p1.digit < p2.digit) return -1;
      if (p1.digit > p2.digit) return 1;
      if (p1.siteId < p2.siteId) return -1;
      if (p1.siteId > p2.siteId) return 1;
    }
    if (id1.length < id2.length) return -1;
    if (id1.length > id2.length) return 1;
    return 0;
  }

  // 根据远程或本地操作更新内部状态
  // 这是 CRDT 的核心:保证操作的幂等性和交换性
  public applyOperation(op: Operation): void {
    if (op.type === 'insert') {
      this.applyInsert(op.char);
    } else {
      this.applyDelete(op.charId);
    }
  }

  private applyInsert(char: Char): void {
    // 幂等性检查:如果该字符已存在,则不执行任何操作
    if (this.structure.some(c => this.compareIds(c.id, char.id) === 0)) {
      return;
    }

    // 使用二分查找找到正确的插入位置
    let left = 0;
    let right = this.structure.length - 1;
    let insertIndex = this.structure.length;

    while (left <= right) {
        const mid = Math.floor((left + right) / 2);
        const compare = this.compareIds(this.structure[mid].id, char.id);
        if (compare > 0) {
            insertIndex = mid;
            right = mid - 1;
        } else {
            left = mid + 1;
        }
    }

    this.structure.splice(insertIndex, 0, char);
  }

  private applyDelete(charId: Identifier[]): void {
    const index = this.structure.findIndex(c => this.compareIds(c.id, charId) === 0);
    if (index !== -1) {
      this.structure.splice(index, 1);
    }
  }

  // 生成本地插入操作
  public generateInsert(index: number, value: string): Operation {
    const prevId = this.structure[index - 1]?.id || [];
    const nextId = this.structure[index]?.id || [];
    
    // 这里的坑在于,位置生成策略需要非常健壮,以避免冲突和不必要的深度
    // 一个简单的策略是在前一个和后一个ID之间选择一个中间位置
    // 生产级的实现会更复杂,包含边界检查和随机化策略
    const newId = this.generateIdBetween(prevId, nextId);

    const char: Char = {
      value,
      id: newId,
      timestamp: Date.now(),
    };
    
    return { type: 'insert', char };
  }
  
  // 生成本地删除操作
  public generateDelete(index: number): Operation | null {
    if (index < 0 || index >= this.structure.length) {
      // 这里的错误处理至关重要,防止无效操作污染整个系统
      console.error(`Invalid index for deletion: ${index}`);
      return null;
    }
    const charToDelete = this.structure[index];
    return { type: 'delete', charId: charToDelete.id };
  }

  // 简化版的位置生成算法
  private generateIdBetween(prevId: Identifier[], nextId: Identifier[]): Identifier[] {
    // 此处仅为演示,真实项目的算法要处理更多边界情况
    const newDigit = (prevId[0]?.digit || 0) + 1;
    return [{ digit: newDigit, siteId: this.siteId }];
  }

  public getText(): string {
    return this.structure.map(c => c.value).join('');
  }

  public getState(): Char[] {
    return JSON.parse(JSON.stringify(this.structure)); // 返回深拷贝
  }
  
  public setState(state: Char[]): void {
    this.structure = state;
  }
}

这个引擎是完全“无头”的。它不知道UI,也不知道网络。它只负责维护一个有序的Char数组。

接下来,我们引入WebRTC来连接各个CRDTEngine实例。我们需要一个信令服务器来帮助客户端发现彼此并交换建立P2P连接所需的信息(SDP和ICE候选者)。

// src/signaling/server.js
// 一个极简的WebSocket信令服务器
const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

// 这里的房间管理非常粗糙,真实项目需要更健壮的数据结构
const rooms = {};

console.log('Signaling server started on ws://localhost:8080');

wss.on('connection', ws => {
  console.log('Client connected');

  ws.on('message', message => {
    let data;
    try {
      data = JSON.parse(message);
    } catch (e) {
      console.error('Invalid JSON received:', message);
      return;
    }

    const { type, room, payload } = data;

    switch (type) {
      case 'join':
        console.log(`Client wants to join room: ${room}`);
        if (!rooms[room]) {
          rooms[room] = new Set();
        }
        rooms[room].add(ws);
        ws.room = room; // 附加房间信息到ws实例上
        
        // 通知房间内其他成员有新人加入
        rooms[room].forEach(client => {
          if (client !== ws && client.readyState === WebSocket.OPEN) {
            client.send(JSON.stringify({ type: 'peer-connect', peerId: 'some_unique_id' })); // 实际应传递唯一ID
          }
        });
        break;
      
      // SDP offers/answers and ICE candidates are simply forwarded
      case 'signal':
        console.log(`Forwarding signal in room: ${room}`);
        if (rooms[room]) {
          rooms[room].forEach(client => {
            if (client !== ws && client.readyState === WebSocket.OPEN) {
              client.send(JSON.stringify({ type: 'signal', payload }));
            }
          });
        }
        break;
    }
  });

  ws.on('close', () => {
    console.log('Client disconnected');
    const room = ws.room;
    if (room && rooms[room]) {
      rooms[room].delete(ws);
      if (rooms[room].size === 0) {
        delete rooms[room];
      }
      // 可以在此通知其他人某成员已离开
    }
  });

  ws.on('error', (err) => {
    console.error('WebSocket error:', err);
  });
});

客户端的WebRTC逻辑则要复杂得多,它负责管理多个P2P连接,并为每个连接设置数据通道。我们将其封装成一个P2PNetworkManager

// src/network/P2PNetworkManager.ts
import { Operation } from '../crdt/types';

// 这是一个示意实现,生产级代码需要使用`wrtc`库在Node.js中运行
// 或者在浏览器环境中使用原生的RTCPeerConnection
// 我们用一个EventEmitter来模拟事件
import { EventEmitter } from 'events';

export class P2PNetworkManager extends EventEmitter {
  private peers: Map<string, any> = new Map(); // key: peerId, value: RTCPeerConnection
  private signalingSocket: WebSocket;

  constructor(private signalingServerUrl: string, private roomId: string) {
    super();
    this.signalingSocket = new WebSocket(this.signalingServerUrl);
    this.setupSignaling();
  }
  
  private setupSignaling() {
    this.signalingSocket.onopen = () => {
      console.log('Connected to signaling server.');
      this.signalingSocket.send(JSON.stringify({ type: 'join', room: this.roomId }));
    };

    this.signalingSocket.onmessage = async (event) => {
      const message = JSON.parse(event.data);
      if (message.type === 'peer-connect') {
        // 一个新对等点加入,作为发起方创建连接
        this.createPeerConnection(message.peerId, true);
      } else if (message.type === 'signal') {
        const { sdp, candidate, from } = message.payload;
        let pc = this.peers.get(from);
        if (!pc) {
          pc = this.createPeerConnection(from, false);
        }

        if (sdp) {
          await pc.setRemoteDescription(new RTCSessionDescription(sdp));
          if (sdp.type === 'offer') {
            const answer = await pc.createAnswer();
            await pc.setLocalDescription(answer);
            this.sendSignal({ sdp: answer, to: from });
          }
        } else if (candidate) {
          await pc.addIceCandidate(new RTCIceCandidate(candidate));
        }
      }
    };

    this.signalingSocket.onerror = (error) => {
      console.error('Signaling socket error:', error);
      this.emit('error', new Error('Signaling connection failed'));
    };
  }
  
  private createPeerConnection(peerId: string, isInitiator: boolean) {
    // 这里的配置对性能和NAT穿透至关重要
    const pc = new RTCPeerConnection({
      iceServers: [{ urls: 'stun:stun.l.google.com:19302' }],
    });
    this.peers.set(peerId, pc);

    pc.onicecandidate = (event) => {
      if (event.candidate) {
        this.sendSignal({ candidate: event.candidate, to: peerId });
      }
    };
    
    // 数据通道是核心
    if (isInitiator) {
      const dataChannel = pc.createDataChannel('crdt-ops');
      this.setupDataChannel(dataChannel, peerId);
    } else {
      pc.ondatachannel = (event) => {
        this.setupDataChannel(event.channel, peerId);
      };
    }

    if (isInitiator) {
      const offer = await pc.createOffer();
      await pc.setLocalDescription(offer);
      this.sendSignal({ sdp: offer, to: peerId });
    }
    
    return pc;
  }
  
  private setupDataChannel(channel: RTCDataChannel, peerId: string) {
    channel.onopen = () => console.log(`Data channel with ${peerId} is open.`);
    channel.onclose = () => {
      console.log(`Data channel with ${peerId} is closed.`);
      this.peers.delete(peerId);
    };
    channel.onmessage = (event) => {
      try {
        const op: Operation = JSON.parse(event.data);
        // 收到操作后,通过事件发射出去,由上层逻辑处理
        this.emit('operation', op);
      } catch (e) {
        console.error('Failed to parse operation from peer:', event.data);
      }
    };
  }

  // 广播操作到所有已连接的对等点
  public broadcast(op: Operation) {
    const message = JSON.stringify(op);
    this.peers.forEach(pc => {
      // 检查数据通道是否存在且已打开
      const channel = pc.getSenders()[0]?.transport?.iceTransport?.role === 'controlling' 
        ? pc.getSenders()[0]?.transport?._dataChannel 
        : pc.getReceivers()[0]?.transport?._dataChannel;

      if (channel && channel.readyState === 'open') {
        channel.send(message);
      }
    });
  }

  private sendSignal(payload: any) {
    this.signalingSocket.send(JSON.stringify({
      type: 'signal',
      room: this.roomId,
      payload
    }));
  }
}

现在,我们将CRDT引擎和P2P网络管理器组合起来,并引入快照持久化。

// src/persistence/SnapshotManager.ts
import { createClient, RedisClientType } from 'redis';
import { Char } from '../crdt/types';

export class SnapshotManager {
  private redisClient: RedisClientType;
  private docId: string;

  constructor(docId: string, redisUrl: string) {
    this.docId = `doc_snapshot:${docId}`;
    this.redisClient = createClient({ url: redisUrl });
    this.redisClient.on('error', (err) => console.error('Redis Client Error', err));
    // 立即连接,在真实项目中应该有更好的连接管理和重试逻辑
    this.redisClient.connect();
  }

  async saveSnapshot(state: Char[]): Promise<void> {
    if (!this.redisClient.isOpen) {
        console.warn('Redis client not connected. Skipping snapshot save.');
        return;
    }
    try {
      const serializedState = JSON.stringify(state);
      // 使用 SET 命令覆盖旧快照。对于大型文档,这可能是一个耗时操作。
      await this.redisClient.set(this.docId, serializedState);
      console.log(`Snapshot saved for doc ${this.docId}`);
    } catch (err) {
      console.error('Failed to save snapshot to Redis:', err);
      // 这里的失败处理很关键,可能需要触发告警
    }
  }

  async loadSnapshot(): Promise<Char[] | null> {
    if (!this.redisClient.isOpen) {
        console.warn('Redis client not connected. Skipping snapshot load.');
        return null;
    }
    try {
      const serializedState = await this.redisClient.get(this.docId);
      if (serializedState) {
        console.log(`Snapshot loaded for doc ${this.docId}`);
        return JSON.parse(serializedState);
      }
      return null;
    } catch (err) {
      console.error('Failed to load snapshot from Redis:', err);
      return null;
    }
  }

  async disconnect(): Promise<void> {
    if (this.redisClient.isOpen) {
      await this.redisClient.quit();
    }
  }
}

最后,我们创建一个顶层的CollaborativeSession类来编排所有组件。这就是我们Headless引擎的最终API。

// src/CollaborativeSession.ts
import { CRDTEngine } from './crdt/CRDTEngine';
import { P2PNetworkManager } from './network/P2PNetworkManager';
import { SnapshotManager } from './persistence/SnapshotManager';
import { Operation } from './crdt/types';
import { EventEmitter } from 'events';
import { debounce } from 'lodash';

// 配置应从外部注入
const REDIS_URL = 'redis://localhost:6379';
const SIGNALING_URL = 'ws://localhost:8080';
const SNAPSHOT_DEBOUNCE_MS = 5000; // 5秒内无操作则保存快照

export class CollaborativeSession extends EventEmitter {
  private crdtEngine: CRDTEngine;
  private networkManager: P2PNetworkManager;
  private snapshotManager: SnapshotManager;
  private debouncedSave: () => void;

  constructor(private documentId: string, siteId?: string) {
    super();
    this.crdtEngine = new CRDTEngine(siteId);
    this.networkManager = new P2PNetworkManager(SIGNALING_URL, documentId);
    this.snapshotManager = new SnapshotManager(documentId, REDIS_URL);

    // 防抖的快照保存策略,避免频繁写入Redis
    this.debouncedSave = debounce(
      () => this.snapshotManager.saveSnapshot(this.crdtEngine.getState()),
      SNAPSHOT_DEBOUNCE_MS,
      { leading: false, trailing: true }
    );
  }

  async start() {
    console.log(`Starting collaborative session for document: ${this.documentId}`);
    const snapshot = await this.snapshotManager.loadSnapshot();
    if (snapshot) {
      this.crdtEngine.setState(snapshot);
      this.emit('update', this.crdtEngine.getText());
    }

    this.networkManager.on('operation', (op: Operation) => {
      console.log('Received remote operation:', op.type);
      this.crdtEngine.applyOperation(op);
      this.emit('update', this.crdtEngine.getText());
    });

    this.networkManager.on('error', (err) => {
      console.error("Network manager encountered an error:", err);
      this.emit('error', err);
    });

    // 连接到 P2P 网络
    // 注意:这里的`start`方法被省略了,因为P2PNetworkManager的构造函数中已开始连接
  }

  // 暴露给外部(UI层)的接口
  public localInsert(index: number, value: string) {
    const op = this.crdtEngine.generateInsert(index, value);
    this.crdtEngine.applyOperation(op);
    this.networkManager.broadcast(op);
    this.emit('update', this.crdtEngine.getText());
    this.debouncedSave(); // 任何本地修改都触发一次防抖保存
  }

  public localDelete(index: number) {
    const op = this.crdtEngine.generateDelete(index);
    if (op) {
      this.crdtEngine.applyOperation(op);
      this.networkManager.broadcast(op);
      this.emit('update', this.crdtEngine.getText());
      this.debouncedSave();
    }
  }

  public getText(): string {
    return this.crdtEngine.getText();
  }

  public async stop() {
    // 确保最后的更改被保存
    this.debouncedSave.flush(); 
    await this.snapshotManager.disconnect();
    // 关闭网络连接等清理工作
    console.log(`Session for document ${this.documentId} stopped.`);
  }
}

这个架构的整体流程可以用下面的图来表示:

graph TD
    subgraph Client A
        UI_A[UI Layer] -- localInsert/localDelete --> Session_A[CollaborativeSession]
        Session_A -- update event --> UI_A
        Session_A -- interacts with --> Engine_A[CRDTEngine]
        Session_A -- interacts with --> Network_A[P2PNetworkManager]
        Session_A -- interacts with --> SnapshotMgr_A[SnapshotManager]
    end

    subgraph Client B
        UI_B[UI Layer] -- localInsert/localDelete --> Session_B[CollaborativeSession]
        Session_B -- update event --> UI_B
        Session_B -- interacts with --> Engine_B[CRDTEngine]
        Session_B -- interacts with --> Network_B[P2PNetworkManager]
        Session_B -- interacts with --> SnapshotMgr_B[SnapshotManager]
    end

    subgraph Backend
        Signaling[Signaling Server]
        Redis[(Redis KV Store)]
    end

    Network_A -- SDP/ICE via WebSocket --> Signaling
    Network_B -- SDP/ICE via WebSocket --> Signaling
    
    Network_A <-. WebRTC Data Channel .-> Network_B
    
    SnapshotMgr_A -- save/load snapshot --> Redis
    SnapshotMgr_B -- save/load snapshot --> Redis

    style Signaling fill:#f9f,stroke:#333,stroke-width:2px
    style Redis fill:#f96,stroke:#333,stroke-width:2px

这个方案的实现并非没有挑战。一个关键的局限性在于,我们实现的CRDT算法非常基础,其生成的ID可能会迅速变得复杂和冗长,导致元数据开销巨大。生产级CRDT(如Yjs或Automerge)使用了更高级的ID分配和压缩策略来缓解这个问题。此外,对删除操作的处理,我们没有实现“墓碑”(tombstone)机制,这意味着被删除的字符在数据结构中被永久移除,这可能会在某些并发编辑场景下导致问题。

另一个架构上的权衡是P2P网络拓扑。我们采用的是全连接(Full Mesh)模型,每个客户端都与其他所有客户端建立直接连接。当参与者数量超过10-15人时,连接数会呈平方级增长,迅速耗尽客户端的计算和网络资源。对于更大规模的会话,一个更实际的方案是引入SFU(Selective Forwarding Unit),客户端只向SFU发送数据,由SFU负责转发给其他客户端。这虽然引入了一个中心化组件,但它的职责远比OT服务器轻量,只做数据包转发,不做状态计算,因此更容易扩展。

最后,快照策略也值得深思。固定时间的防抖是一个起点,但在高并发写入的文档中,可能需要更智能的策略,例如基于操作数量、文档大小变化或活跃用户数来动态调整快照频率,以在Redis负载和新用户加入速度之间找到最佳平衡。


  目录