基于 NestJS 构建一个集成了 Puppeteer 网页抓取与 Milvus 向量化的实时 WebSocket 数据管道


在处理一个需要即时响应用户输入,并执行长耗时、多阶段后台任务的场景时,传统的 HTTP 请求-响应模型显得力不从心。设想这样一个需求:用户提交一个 URL,系统需要立刻开始抓取该网页的动态渲染内容,对内容进行分块和向量化,最后存入 Milvus 向量数据库以供语义检索。整个过程可能耗时数十秒甚至几分钟,而用户需要实时了解进度:抓取成功了没有?向量化到哪一步了?什么时候可以开始搜索?

直接使用一个 POST 接口处理这个流程是不可行的。HTTP 连接会因超时而中断,客户端也无法获取中间状态。显而易见的解决方案是采用异步任务队列加状态轮询,但这会增加客户端的复杂性和服务器的轮询压力。一个更优雅的架构是使用 WebSocket,在客户端与服务端之间建立一条持久化通信管道,用于任务提交和状态的实时推送。

本文将复盘如何使用 NestJS 作为核心框架,整合 Puppeteer 进行网页内容抓取,调用(模拟的)模型服务进行文本向量化,将结果存入 Milvus,并通过 WebSocket 将整个流程的状态实时反馈给客户端。我们将重点关注如何组织代码,处理异步流程,以及保证系统的健壮性。

架构设计与数据流

在动手编码前,我们先明确整个系统的组件和数据流。一个清晰的架构图是后续所有实现的基础。

graph TD
    subgraph Client
        A[用户浏览器]
    end

    subgraph "NestJS Backend"
        B(WebSocket Gateway)
        C{任务分发与状态管理器}
        D[Crawler Service - Puppeteer]
        E[Vector Service - Milvus]
        F(Embedding Service - Mock)
    end

    subgraph "External Services"
        G[Milvus Server]
        H[Target Website]
    end

    A -- "1. scrapeUrl(url) via WebSocket" --> B
    B -- "2. 提交任务" --> C
    C -- "3. 分发抓取任务" --> D
    C -- "8. 分发向量化与索引任务" --> E
    D -- "4. 启动 Puppeteer 抓取" --> H
    D -- "5. 返回页面内容" --> C
    C -- "6. 推送抓取成功状态" --> B
    B -- "7. status: 'scraped'" --> A
    C -- "7. 请求文本向量化" --> F
    F -- "8. 返回向量" --> C
    E -- "9. 连接 Milvus" --> G
    E -- "10. 插入向量" --> G
    C -- "11. 推送索引完成状态" --> B
    B -- "12. status: 'indexed'" --> A

这个流程的核心在于 任务分发与状态管理器 (在我们的实现中由一个 TaskService 扮演)。它解耦了 WebSocket 的连接层和具体的业务执行层。WebSocket Gateway 只负责接收指令和推送消息,而 TaskService 则负责调用 CrawlerService 和 VectorService,并在每个关键步骤完成后,通过事件或回调通知 Gateway 将状态更新给正确的客户端。

项目初始化与模块划分

我们使用 NestJS CLI 创建一个新项目,并规划出清晰的模块结构。在真实项目中,合理的模块化是可维护性的关键。

nest new real-time-vector-pipeline
cd real-time-vector-pipeline

# 安装所需依赖
npm install --save @nestjs/websockets @nestjs/platform-socket.io socket.io
npm install --save puppeteer @zilliz/milvus2-sdk-node class-validator class-transformer
npm install --save-dev @types/puppeteer

我们将创建以下几个核心模块:

  • TaskModule: 核心业务逻辑,协调抓取和向量化。
  • WebsocketModule: 处理 WebSocket 连接与通信。
  • CrawlerModule: 封装 Puppeteer 的抓取逻辑。
  • MilvusModule: 封装与 Milvus 的交互逻辑。

WebSocket 网关的实现

网关是系统的入口。它需要处理客户端连接、断开以及消息订阅。一个常见的错误是把大量的业务逻辑直接写在 Gateway 里,这会导致它与业务逻辑紧密耦合,难以测试和维护。我们的 Gateway 将保持轻量,只做消息转发。

src/websocket/events.gateway.ts:

import {
  WebSocketGateway,
  SubscribeMessage,
  WebSocketServer,
  OnGatewayConnection,
  OnGatewayDisconnect,
  MessageBody,
  ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { TaskService } from '../task/task.service';
import { ScrapeUrlDto } from './dto/scrape-url.dto';

// 生产环境中应配置具体的 origin
@WebSocketGateway({ cors: { origin: '*' } })
export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private readonly logger = new Logger(EventsGateway.name);

  constructor(private readonly taskService: TaskService) {}

  handleConnection(client: Socket) {
    this.logger.log(`Client connected: ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`Client disconnected: ${client.id}`);
    // 在真实项目中,这里可能需要清理与该 client 相关的任务或资源
  }

  @SubscribeMessage('scrapeUrl')
  async handleScrapeUrl(
    @MessageBody() data: ScrapeUrlDto,
    @ConnectedSocket() client: Socket,
  ): Promise<void> {
    this.logger.log(`Received scrapeUrl event from ${client.id} for url: ${data.url}`);

    // 定义一个回调函数,让 TaskService 在状态更新时调用
    // 这样做可以避免 TaskService 直接依赖 WebSocket Server 实例
    const statusUpdateCallback = (status: string, message: string, data?: any) => {
      client.emit('taskStatus', { status, message, data });
    };

    try {
      client.emit('taskStatus', { status: 'starting', message: 'Task received, starting process...' });
      // 异步执行任务,不阻塞 WebSocket 消息循环
      this.taskService.processUrl(data.url, statusUpdateCallback);
    } catch (error) {
      this.logger.error(`Error processing URL for client ${client.id}`, error.stack);
      client.emit('taskError', { message: 'Failed to start task.', error: error.message });
    }
  }
}

注意 scrapeUrl 方法的设计。它没有直接 await 长时间的 taskService.processUrl 调用,而是将一个回调函数 statusUpdateCallback 传递给 TaskService。这使得业务逻辑层可以在任何时候通过这个回调来更新客户端状态,而无需了解 WebSocket 的具体实现。这是实现关注点分离(Separation of Concerns)的重要实践。

DTO (ScrapeUrlDto) 用于验证输入数据,这是 NestJS 的最佳实践之一。

src/websocket/dto/scrape-url.dto.ts:

import { IsUrl } from 'class-validator';

export class ScrapeUrlDto {
  @IsUrl({}, { message: 'Invalid URL provided.' })
  url: string;
}

CrawlerService:封装 Puppeteer 的复杂性

抓取服务是第一个耗时步骤。直接在主服务中调用 Puppeteer 会带来很多问题:资源管理混乱、错误处理困难。我们需要一个专用的服务来封装这些细节。

src/crawler/crawler.service.ts:

import { Injectable, Logger } from '@nestjs/common';
import puppeteer, { Browser, Page } from 'puppeteer';

@Injectable()
export class CrawlerService {
  private readonly logger = new Logger(CrawlerService.name);

  // 生产环境中,Browser 实例应该被池化和复用,而不是每次都创建
  // 这里为了简化示例,每次都创建一个新的实例
  async scrapeUrl(url: string): Promise<string> {
    let browser: Browser | null = null;
    try {
      this.logger.log(`Launching Puppeteer to scrape ${url}`);
      browser = await puppeteer.launch({
        headless: true,
        // 在 Docker 容器中运行时,常常需要这些参数
        args: ['--no-sandbox', '--disable-setuid-sandbox'],
      });
      const page: Page = await browser.newPage();
      
      // 设置合理的超时,防止页面加载时间过长导致进程卡死
      await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });

      // 提取页面主要文本内容,可以根据实际需求定制更复杂的提取逻辑
      const content = await page.evaluate(() => {
        // 移除脚本和样式,减少噪音
        document.querySelectorAll('script, style').forEach(elem => elem.remove());
        // 返回 body 的文本内容,这是一种比较粗糙但通用的方式
        return document.body.innerText;
      });

      this.logger.log(`Successfully scraped content from ${url}. Length: ${content.length}`);
      return content;

    } catch (error) {
      this.logger.error(`Failed to scrape ${url}`, error.stack);
      // 抛出自定义错误或重新包装错误,以便上层服务处理
      throw new Error(`Puppeteer failed to scrape the URL: ${error.message}`);
    } finally {
      if (browser) {
        this.logger.log('Closing Puppeteer browser instance.');
        await browser.close();
      }
    }
  }
}

这段代码有几个关键点:

  1. 资源管理: try...finally 结构确保了无论抓取成功与否,browser.close() 都会被调用,防止僵尸进程的产生。这是一个在生产环境中至关重要的细节。
  2. 错误处理: 捕获具体的 Puppeteer 异常,并包装成一个业务层面的错误向上抛出。
  3. 超时设置: page.goto 中的 timeout 参数是必不可少的,它可以防止因为目标网站响应慢而导致整个服务被卡住。
  4. 内容提取: 使用 page.evaluate 在浏览器上下文中执行脚本来获取数据,这种方式比直接获取 page.content() 更灵活,可以预处理掉不必要的 HTML 标签。

MilvusService:与向量数据库交互

这个服务负责处理所有与 Milvus 的通信,包括连接、检查集合、创建集合和插入向量。

src/milvus/milvus.service.ts:

import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { MilvusClient, DataType, IndexType } from '@zilliz/milvus2-sdk-node';
import { ConfigService } from '@nestjs/config';

// 这里的配置值应该来自环境变量
const COLLECTION_NAME = 'web_pages';
const VECTOR_DIM = 768; // 示例维度,取决于你使用的 embedding 模型

@Injectable()
export class MilvusService implements OnModuleInit {
  private readonly logger = new Logger(MilvusService.name);
  private client: MilvusClient;

  constructor(private configService: ConfigService) {
    this.client = new MilvusClient({
        address: this.configService.get<string>('MILVUS_ADDRESS'),
        // 如果有鉴权,需要配置 username/password
    });
  }
  
  async onModuleInit() {
    await this.ensureCollection();
  }

  private async ensureCollection(): Promise<void> {
    try {
      const res = await this.client.hasCollection({ collection_name: COLLECTION_NAME });
      if (res.value) {
        this.logger.log(`Collection '${COLLECTION_NAME}' already exists.`);
        // 确保集合被加载到内存中以供搜索
        await this.client.loadCollectionSync({ collection_name: COLLECTION_NAME });
        return;
      }

      this.logger.log(`Collection '${COLLECTION_NAME}' does not exist. Creating...`);
      await this.client.createCollection({
        collection_name: COLLECTION_NAME,
        fields: [
          { name: 'id', data_type: DataType.Int64, is_primary_key: true, autoID: true },
          { name: 'url', data_type: DataType.VarChar, max_length: 512 },
          { name: 'chunk', data_type: DataType.VarChar, max_length: 4096 }, // 存储原始文本块
          { name: 'vector', data_type: DataType.FloatVector, dim: VECTOR_DIM },
        ],
      });
      
      this.logger.log(`Creating index for collection '${COLLECTION_NAME}'...`);
      // HNSW 是常用的高性能近似最近邻搜索索引
      await this.client.createIndex({
        collection_name: COLLECTION_NAME,
        field_name: 'vector',
        index_type: IndexType.HNSW,
        metric_type: 'L2', // 或 'IP',取决于你的模型
        params: { M: 16, efConstruction: 200 },
      });
      
      await this.client.loadCollectionSync({ collection_name: COLLECTION_NAME });
      this.logger.log('Collection created and loaded successfully.');

    } catch (error) {
      this.logger.error('Failed to ensure Milvus collection exists', error);
      throw error; // 启动时失败,让应用崩溃重启
    }
  }

  async insertVectors(data: { url: string; chunk: string; vector: number[] }[]): Promise<any> {
    if (!data || data.length === 0) {
      return;
    }
    
    this.logger.log(`Inserting ${data.length} vectors into Milvus.`);
    try {
      const res = await this.client.insert({
        collection_name: COLLECTION_NAME,
        fields_data: data,
      });

      if (res.status.error_code !== 'Success') {
        throw new Error(`Milvus insertion failed: ${res.status.reason}`);
      }
      
      // 生产环境中,flush 操作应该谨慎使用,或者依赖 Milvus 的自动 flush
      // 这里为了即时可见性,手动调用
      await this.client.flushSync({ collection_names: [COLLECTION_NAME] });
      
      this.logger.log(`Successfully inserted ${res.succ_index.length} vectors.`);
      return res;
    } catch (error) {
      this.logger.error('Failed to insert vectors into Milvus', error);
      throw error;
    }
  }
}

Milvus 服务的 onModuleInit 生命周期钩子是关键。它确保了在应用开始接受请求之前,所需的 Milvus 集合和索引已经准备就绪。如果初始化失败,应用应该直接退出,因为后续的服务都无法正常工作。代码中还包含了创建索引的参数 MefConstruction,这些参数对搜索性能和召回率有直接影响,需要在真实项目中根据数据量和硬件进行调优。

TaskService: 流程编排

这是所有逻辑的粘合剂。它接收来自 Gateway 的任务,调用其他服务,并使用传入的回调函数报告进度。

src/task/task.service.ts:

import { Injectable, Logger } from '@nestjs/common';
import { CrawlerService } from '../crawler/crawler.service';
import { MilvusService } from '../milvus/milvus.service';

type StatusCallback = (status: string, message: string, data?: any) => void;

@Injectable()
export class TaskService {
  private readonly logger = new Logger(TaskService.name);

  constructor(
    private readonly crawlerService: CrawlerService,
    private readonly milvusService: MilvusService,
  ) {}

  public async processUrl(url: string, statusCallback: StatusCallback): Promise<void> {
    try {
      // Step 1: Scrape the URL
      statusCallback('scraping', `Starting to scrape content from ${url}...`);
      const content = await this.crawlerService.scrapeUrl(url);
      statusCallback('scraped', 'Successfully scraped content.', { contentLength: content.length });
      
      // Step 2: Chunk the content
      // 真实项目中,分块策略非常重要,影响检索效果。这里使用简单固定长度分块
      const chunks = this.chunkText(content, 1000, 200);
      statusCallback('chunking', `Content chunked into ${chunks.length} pieces.`);

      // Step 3: Vectorize each chunk (mocked)
      statusCallback('vectorizing', 'Starting vectorization for all chunks...');
      const vectors = await this.getEmbeddings(chunks);
      statusCallback('vectorized', 'All chunks have been vectorized.');

      // Step 4: Insert into Milvus
      const dataToInsert = chunks.map((chunk, index) => ({
          url,
          chunk,
          vector: vectors[index],
      }));
      statusCallback('indexing', `Indexing ${dataToInsert.length} vectors into Milvus...`);
      await this.milvusService.insertVectors(dataToInsert);
      statusCallback('indexed', 'Task completed successfully. Content is now searchable.');

    } catch (error) {
      this.logger.error(`Task failed for URL ${url}`, error.stack);
      statusCallback('error', 'An error occurred during processing.', { error: error.message });
    }
  }

  private chunkText(text: string, chunkSize: number, overlap: number): string[] {
    const chunks: string[] = [];
    let i = 0;
    while (i < text.length) {
      chunks.push(text.substring(i, i + chunkSize));
      i += chunkSize - overlap;
    }
    return chunks;
  }
  
  // 这是一个模拟的 embedding 函数
  // 在真实项目中,这里会调用一个机器学习模型服务 (e.g., HuggingFace, OpenAI)
  private async getEmbeddings(chunks: string[]): Promise<number[][]> {
    this.logger.log(`Mocking embedding generation for ${chunks.length} chunks.`);
    // 模拟网络延迟和计算时间
    await new Promise(resolve => setTimeout(resolve, 50 * chunks.length));
    
    // 生成随机向量作为替代
    const VECTOR_DIM = 768;
    return chunks.map(() => Array.from({ length: VECTOR_DIM }, () => Math.random()));
  }
}

processUrl 方法清晰地展示了整个业务流程。每一步完成后,都会调用 statusCallback 推送最新状态。注意,整个方法是 async 的,但 Gateway 并没有 await 它,这正是我们想要的——任务在后台执行,不阻塞主线程。

局限性与未来迭代方向

到此为止,我们已经构建了一个功能完备的原型。然而,在将其投入生产环境之前,还有几个方面需要严肃考虑:

  1. 任务队列的持久化: 当前的任务分发逻辑是内存中的直接调用。如果服务重启,正在进行的任务会全部丢失。在生产环境中,必须引入一个真正的消息队列,如 RabbitMQ 或 Redis Streams。Gateway 负责将任务投递到队列,而一个或多个独立的 Worker 进程负责消费队列中的任务并执行。

  2. Puppeteer 实例的扩展性: 每个 Puppeteer 实例都相当消耗资源。随着并发请求的增加,在单机上启动大量浏览器实例会迅速耗尽服务器内存和 CPU。需要构建一个 Puppeteer Worker 池,甚至将其部署为独立的、可水平扩展的服务(例如使用 puppeteer-cluster 或在 Kubernetes 中管理)。

  3. 状态管理的健壮性: 如果客户端在任务执行过程中断线重连,它将丢失之前的状态更新。一个更完整的方案需要将任务状态持久化到数据库(如 Redis 或 PostgreSQL),并在客户端重连时,能够查询并恢复其任务的当前状态。

  4. Embedding 服务的解耦: 向量化是一个计算密集型任务,不应该和主应用服务部署在一起。它应该是一个独立的微服务,可以独立扩展,并且主应用通过 RPC 或 HTTP 调用它。我们代码中的 getEmbeddings 模拟正是为此留出的接口。

  5. 配置与安全: 硬编码的配置(如 Milvus 集合名、向量维度)都应该通过 ConfigService 从环境变量中读取。WebSocket 的 cors 配置需要收紧,只允许来自受信任域的连接。此外,对于需要认证的场景,应在 WebSocket 连接握手阶段集成身份验证逻辑。


  目录