在处理一个需要即时响应用户输入,并执行长耗时、多阶段后台任务的场景时,传统的 HTTP 请求-响应模型显得力不从心。设想这样一个需求:用户提交一个 URL,系统需要立刻开始抓取该网页的动态渲染内容,对内容进行分块和向量化,最后存入 Milvus 向量数据库以供语义检索。整个过程可能耗时数十秒甚至几分钟,而用户需要实时了解进度:抓取成功了没有?向量化到哪一步了?什么时候可以开始搜索?
直接使用一个 POST 接口处理这个流程是不可行的。HTTP 连接会因超时而中断,客户端也无法获取中间状态。显而易见的解决方案是采用异步任务队列加状态轮询,但这会增加客户端的复杂性和服务器的轮询压力。一个更优雅的架构是使用 WebSocket,在客户端与服务端之间建立一条持久化通信管道,用于任务提交和状态的实时推送。
本文将复盘如何使用 NestJS 作为核心框架,整合 Puppeteer 进行网页内容抓取,调用(模拟的)模型服务进行文本向量化,将结果存入 Milvus,并通过 WebSocket 将整个流程的状态实时反馈给客户端。我们将重点关注如何组织代码,处理异步流程,以及保证系统的健壮性。
架构设计与数据流
在动手编码前,我们先明确整个系统的组件和数据流。一个清晰的架构图是后续所有实现的基础。
graph TD subgraph Client A[用户浏览器] end subgraph "NestJS Backend" B(WebSocket Gateway) C{任务分发与状态管理器} D[Crawler Service - Puppeteer] E[Vector Service - Milvus] F(Embedding Service - Mock) end subgraph "External Services" G[Milvus Server] H[Target Website] end A -- "1. scrapeUrl(url) via WebSocket" --> B B -- "2. 提交任务" --> C C -- "3. 分发抓取任务" --> D C -- "8. 分发向量化与索引任务" --> E D -- "4. 启动 Puppeteer 抓取" --> H D -- "5. 返回页面内容" --> C C -- "6. 推送抓取成功状态" --> B B -- "7. status: 'scraped'" --> A C -- "7. 请求文本向量化" --> F F -- "8. 返回向量" --> C E -- "9. 连接 Milvus" --> G E -- "10. 插入向量" --> G C -- "11. 推送索引完成状态" --> B B -- "12. status: 'indexed'" --> A
这个流程的核心在于 任务分发与状态管理器
(在我们的实现中由一个 TaskService
扮演)。它解耦了 WebSocket 的连接层和具体的业务执行层。WebSocket Gateway 只负责接收指令和推送消息,而 TaskService 则负责调用 CrawlerService 和 VectorService,并在每个关键步骤完成后,通过事件或回调通知 Gateway 将状态更新给正确的客户端。
项目初始化与模块划分
我们使用 NestJS CLI 创建一个新项目,并规划出清晰的模块结构。在真实项目中,合理的模块化是可维护性的关键。
nest new real-time-vector-pipeline
cd real-time-vector-pipeline
# 安装所需依赖
npm install --save @nestjs/websockets @nestjs/platform-socket.io socket.io
npm install --save puppeteer @zilliz/milvus2-sdk-node class-validator class-transformer
npm install --save-dev @types/puppeteer
我们将创建以下几个核心模块:
-
TaskModule
: 核心业务逻辑,协调抓取和向量化。 -
WebsocketModule
: 处理 WebSocket 连接与通信。 -
CrawlerModule
: 封装 Puppeteer 的抓取逻辑。 -
MilvusModule
: 封装与 Milvus 的交互逻辑。
WebSocket 网关的实现
网关是系统的入口。它需要处理客户端连接、断开以及消息订阅。一个常见的错误是把大量的业务逻辑直接写在 Gateway 里,这会导致它与业务逻辑紧密耦合,难以测试和维护。我们的 Gateway 将保持轻量,只做消息转发。
src/websocket/events.gateway.ts
:
import {
WebSocketGateway,
SubscribeMessage,
WebSocketServer,
OnGatewayConnection,
OnGatewayDisconnect,
MessageBody,
ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { TaskService } from '../task/task.service';
import { ScrapeUrlDto } from './dto/scrape-url.dto';
// 生产环境中应配置具体的 origin
@WebSocketGateway({ cors: { origin: '*' } })
export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly logger = new Logger(EventsGateway.name);
constructor(private readonly taskService: TaskService) {}
handleConnection(client: Socket) {
this.logger.log(`Client connected: ${client.id}`);
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
// 在真实项目中,这里可能需要清理与该 client 相关的任务或资源
}
@SubscribeMessage('scrapeUrl')
async handleScrapeUrl(
@MessageBody() data: ScrapeUrlDto,
@ConnectedSocket() client: Socket,
): Promise<void> {
this.logger.log(`Received scrapeUrl event from ${client.id} for url: ${data.url}`);
// 定义一个回调函数,让 TaskService 在状态更新时调用
// 这样做可以避免 TaskService 直接依赖 WebSocket Server 实例
const statusUpdateCallback = (status: string, message: string, data?: any) => {
client.emit('taskStatus', { status, message, data });
};
try {
client.emit('taskStatus', { status: 'starting', message: 'Task received, starting process...' });
// 异步执行任务,不阻塞 WebSocket 消息循环
this.taskService.processUrl(data.url, statusUpdateCallback);
} catch (error) {
this.logger.error(`Error processing URL for client ${client.id}`, error.stack);
client.emit('taskError', { message: 'Failed to start task.', error: error.message });
}
}
}
注意 scrapeUrl
方法的设计。它没有直接 await
长时间的 taskService.processUrl
调用,而是将一个回调函数 statusUpdateCallback
传递给 TaskService
。这使得业务逻辑层可以在任何时候通过这个回调来更新客户端状态,而无需了解 WebSocket 的具体实现。这是实现关注点分离(Separation of Concerns)的重要实践。
DTO (ScrapeUrlDto
) 用于验证输入数据,这是 NestJS 的最佳实践之一。
src/websocket/dto/scrape-url.dto.ts
:
import { IsUrl } from 'class-validator';
export class ScrapeUrlDto {
@IsUrl({}, { message: 'Invalid URL provided.' })
url: string;
}
CrawlerService:封装 Puppeteer 的复杂性
抓取服务是第一个耗时步骤。直接在主服务中调用 Puppeteer 会带来很多问题:资源管理混乱、错误处理困难。我们需要一个专用的服务来封装这些细节。
src/crawler/crawler.service.ts
:
import { Injectable, Logger } from '@nestjs/common';
import puppeteer, { Browser, Page } from 'puppeteer';
@Injectable()
export class CrawlerService {
private readonly logger = new Logger(CrawlerService.name);
// 生产环境中,Browser 实例应该被池化和复用,而不是每次都创建
// 这里为了简化示例,每次都创建一个新的实例
async scrapeUrl(url: string): Promise<string> {
let browser: Browser | null = null;
try {
this.logger.log(`Launching Puppeteer to scrape ${url}`);
browser = await puppeteer.launch({
headless: true,
// 在 Docker 容器中运行时,常常需要这些参数
args: ['--no-sandbox', '--disable-setuid-sandbox'],
});
const page: Page = await browser.newPage();
// 设置合理的超时,防止页面加载时间过长导致进程卡死
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
// 提取页面主要文本内容,可以根据实际需求定制更复杂的提取逻辑
const content = await page.evaluate(() => {
// 移除脚本和样式,减少噪音
document.querySelectorAll('script, style').forEach(elem => elem.remove());
// 返回 body 的文本内容,这是一种比较粗糙但通用的方式
return document.body.innerText;
});
this.logger.log(`Successfully scraped content from ${url}. Length: ${content.length}`);
return content;
} catch (error) {
this.logger.error(`Failed to scrape ${url}`, error.stack);
// 抛出自定义错误或重新包装错误,以便上层服务处理
throw new Error(`Puppeteer failed to scrape the URL: ${error.message}`);
} finally {
if (browser) {
this.logger.log('Closing Puppeteer browser instance.');
await browser.close();
}
}
}
}
这段代码有几个关键点:
- 资源管理:
try...finally
结构确保了无论抓取成功与否,browser.close()
都会被调用,防止僵尸进程的产生。这是一个在生产环境中至关重要的细节。 - 错误处理: 捕获具体的 Puppeteer 异常,并包装成一个业务层面的错误向上抛出。
- 超时设置:
page.goto
中的timeout
参数是必不可少的,它可以防止因为目标网站响应慢而导致整个服务被卡住。 - 内容提取: 使用
page.evaluate
在浏览器上下文中执行脚本来获取数据,这种方式比直接获取page.content()
更灵活,可以预处理掉不必要的 HTML 标签。
MilvusService:与向量数据库交互
这个服务负责处理所有与 Milvus 的通信,包括连接、检查集合、创建集合和插入向量。
src/milvus/milvus.service.ts
:
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { MilvusClient, DataType, IndexType } from '@zilliz/milvus2-sdk-node';
import { ConfigService } from '@nestjs/config';
// 这里的配置值应该来自环境变量
const COLLECTION_NAME = 'web_pages';
const VECTOR_DIM = 768; // 示例维度,取决于你使用的 embedding 模型
@Injectable()
export class MilvusService implements OnModuleInit {
private readonly logger = new Logger(MilvusService.name);
private client: MilvusClient;
constructor(private configService: ConfigService) {
this.client = new MilvusClient({
address: this.configService.get<string>('MILVUS_ADDRESS'),
// 如果有鉴权,需要配置 username/password
});
}
async onModuleInit() {
await this.ensureCollection();
}
private async ensureCollection(): Promise<void> {
try {
const res = await this.client.hasCollection({ collection_name: COLLECTION_NAME });
if (res.value) {
this.logger.log(`Collection '${COLLECTION_NAME}' already exists.`);
// 确保集合被加载到内存中以供搜索
await this.client.loadCollectionSync({ collection_name: COLLECTION_NAME });
return;
}
this.logger.log(`Collection '${COLLECTION_NAME}' does not exist. Creating...`);
await this.client.createCollection({
collection_name: COLLECTION_NAME,
fields: [
{ name: 'id', data_type: DataType.Int64, is_primary_key: true, autoID: true },
{ name: 'url', data_type: DataType.VarChar, max_length: 512 },
{ name: 'chunk', data_type: DataType.VarChar, max_length: 4096 }, // 存储原始文本块
{ name: 'vector', data_type: DataType.FloatVector, dim: VECTOR_DIM },
],
});
this.logger.log(`Creating index for collection '${COLLECTION_NAME}'...`);
// HNSW 是常用的高性能近似最近邻搜索索引
await this.client.createIndex({
collection_name: COLLECTION_NAME,
field_name: 'vector',
index_type: IndexType.HNSW,
metric_type: 'L2', // 或 'IP',取决于你的模型
params: { M: 16, efConstruction: 200 },
});
await this.client.loadCollectionSync({ collection_name: COLLECTION_NAME });
this.logger.log('Collection created and loaded successfully.');
} catch (error) {
this.logger.error('Failed to ensure Milvus collection exists', error);
throw error; // 启动时失败,让应用崩溃重启
}
}
async insertVectors(data: { url: string; chunk: string; vector: number[] }[]): Promise<any> {
if (!data || data.length === 0) {
return;
}
this.logger.log(`Inserting ${data.length} vectors into Milvus.`);
try {
const res = await this.client.insert({
collection_name: COLLECTION_NAME,
fields_data: data,
});
if (res.status.error_code !== 'Success') {
throw new Error(`Milvus insertion failed: ${res.status.reason}`);
}
// 生产环境中,flush 操作应该谨慎使用,或者依赖 Milvus 的自动 flush
// 这里为了即时可见性,手动调用
await this.client.flushSync({ collection_names: [COLLECTION_NAME] });
this.logger.log(`Successfully inserted ${res.succ_index.length} vectors.`);
return res;
} catch (error) {
this.logger.error('Failed to insert vectors into Milvus', error);
throw error;
}
}
}
Milvus 服务的 onModuleInit
生命周期钩子是关键。它确保了在应用开始接受请求之前,所需的 Milvus 集合和索引已经准备就绪。如果初始化失败,应用应该直接退出,因为后续的服务都无法正常工作。代码中还包含了创建索引的参数 M
和 efConstruction
,这些参数对搜索性能和召回率有直接影响,需要在真实项目中根据数据量和硬件进行调优。
TaskService: 流程编排
这是所有逻辑的粘合剂。它接收来自 Gateway 的任务,调用其他服务,并使用传入的回调函数报告进度。
src/task/task.service.ts
:
import { Injectable, Logger } from '@nestjs/common';
import { CrawlerService } from '../crawler/crawler.service';
import { MilvusService } from '../milvus/milvus.service';
type StatusCallback = (status: string, message: string, data?: any) => void;
@Injectable()
export class TaskService {
private readonly logger = new Logger(TaskService.name);
constructor(
private readonly crawlerService: CrawlerService,
private readonly milvusService: MilvusService,
) {}
public async processUrl(url: string, statusCallback: StatusCallback): Promise<void> {
try {
// Step 1: Scrape the URL
statusCallback('scraping', `Starting to scrape content from ${url}...`);
const content = await this.crawlerService.scrapeUrl(url);
statusCallback('scraped', 'Successfully scraped content.', { contentLength: content.length });
// Step 2: Chunk the content
// 真实项目中,分块策略非常重要,影响检索效果。这里使用简单固定长度分块
const chunks = this.chunkText(content, 1000, 200);
statusCallback('chunking', `Content chunked into ${chunks.length} pieces.`);
// Step 3: Vectorize each chunk (mocked)
statusCallback('vectorizing', 'Starting vectorization for all chunks...');
const vectors = await this.getEmbeddings(chunks);
statusCallback('vectorized', 'All chunks have been vectorized.');
// Step 4: Insert into Milvus
const dataToInsert = chunks.map((chunk, index) => ({
url,
chunk,
vector: vectors[index],
}));
statusCallback('indexing', `Indexing ${dataToInsert.length} vectors into Milvus...`);
await this.milvusService.insertVectors(dataToInsert);
statusCallback('indexed', 'Task completed successfully. Content is now searchable.');
} catch (error) {
this.logger.error(`Task failed for URL ${url}`, error.stack);
statusCallback('error', 'An error occurred during processing.', { error: error.message });
}
}
private chunkText(text: string, chunkSize: number, overlap: number): string[] {
const chunks: string[] = [];
let i = 0;
while (i < text.length) {
chunks.push(text.substring(i, i + chunkSize));
i += chunkSize - overlap;
}
return chunks;
}
// 这是一个模拟的 embedding 函数
// 在真实项目中,这里会调用一个机器学习模型服务 (e.g., HuggingFace, OpenAI)
private async getEmbeddings(chunks: string[]): Promise<number[][]> {
this.logger.log(`Mocking embedding generation for ${chunks.length} chunks.`);
// 模拟网络延迟和计算时间
await new Promise(resolve => setTimeout(resolve, 50 * chunks.length));
// 生成随机向量作为替代
const VECTOR_DIM = 768;
return chunks.map(() => Array.from({ length: VECTOR_DIM }, () => Math.random()));
}
}
processUrl
方法清晰地展示了整个业务流程。每一步完成后,都会调用 statusCallback
推送最新状态。注意,整个方法是 async
的,但 Gateway 并没有 await
它,这正是我们想要的——任务在后台执行,不阻塞主线程。
局限性与未来迭代方向
到此为止,我们已经构建了一个功能完备的原型。然而,在将其投入生产环境之前,还有几个方面需要严肃考虑:
任务队列的持久化: 当前的任务分发逻辑是内存中的直接调用。如果服务重启,正在进行的任务会全部丢失。在生产环境中,必须引入一个真正的消息队列,如 RabbitMQ 或 Redis Streams。Gateway 负责将任务投递到队列,而一个或多个独立的 Worker 进程负责消费队列中的任务并执行。
Puppeteer 实例的扩展性: 每个 Puppeteer 实例都相当消耗资源。随着并发请求的增加,在单机上启动大量浏览器实例会迅速耗尽服务器内存和 CPU。需要构建一个 Puppeteer Worker 池,甚至将其部署为独立的、可水平扩展的服务(例如使用
puppeteer-cluster
或在 Kubernetes 中管理)。状态管理的健壮性: 如果客户端在任务执行过程中断线重连,它将丢失之前的状态更新。一个更完整的方案需要将任务状态持久化到数据库(如 Redis 或 PostgreSQL),并在客户端重连时,能够查询并恢复其任务的当前状态。
Embedding 服务的解耦: 向量化是一个计算密集型任务,不应该和主应用服务部署在一起。它应该是一个独立的微服务,可以独立扩展,并且主应用通过 RPC 或 HTTP 调用它。我们代码中的
getEmbeddings
模拟正是为此留出的接口。配置与安全: 硬编码的配置(如 Milvus 集合名、向量维度)都应该通过
ConfigService
从环境变量中读取。WebSocket 的cors
配置需要收紧,只允许来自受信任域的连接。此外,对于需要认证的场景,应在 WebSocket 连接握手阶段集成身份验证逻辑。