将一个检索增强生成(RAG)原型投入生产环境,挑战远不止于优化模型本身。真正的复杂性在于构建一个可观测、安全且可独立伸缩的服务体系。一个常见的错误是将API网关逻辑、业务编排和模型推理全部耦合在单个Python应用中。这种单体架构在初期看似高效,但在流量增长和业务逻辑迭代时会迅速成为瓶颈,运维成本急剧上升。
我们的目标是构建一个解耦的、生产级的RAG推理服务。它必须满足以下要求:
- 关注点分离: API管理、业务编排和AI模型计算必须是独立的服务,可以独立部署、扩缩容和迭代。
- 类型安全: 服务间的通信契约必须是强类型的,以减少运行时错误。
- 流量管控: 对外暴露的API必须有统一的认证、鉴权和速率限制。
- 内部通信韧性: 服务间的内部调用(东西向流量)必须具备自动重试、超时控制、流量加密和细粒度的可观测性。
基于这些考量,我们最终确定的技术栈决策是:
- API网关 (Tyk): 处理所有入口流量(南北向流量),负责认证、路由和策略执行。
- 业务编排层 (NestJS + tRPC): 作为一个IO密集型的中间服务,负责接收请求,协调对下游数据源和模型服务的调用。tRPC提供与客户端之间端到端的类型安全。
- 模型推理层 (PyTorch + Flask): 作为一个计算密集型服务,专门负责运行Embedding和Reranking模型,并通过简单的HTTP接口暴露。
- 服务网格 (Linkerd): 以Sidecar模式注入,透明地为服务间通信提供mTLS、重试、超时和遥测数据,而无需修改任何应用代码。
架构概览
整个请求生命周期遵循一个明确的路径。
graph TD subgraph "Public Network" Client[客户端] end subgraph "DMZ / API Gateway Layer" Tyk[Tyk API Gateway] end subgraph "Kubernetes Cluster / Internal Network" subgraph "Linkerd Service Mesh" subgraph "Orchestration Service Pod" L_NestJS[Linkerd Proxy] NestJS[NestJS Orchestrator Service] end subgraph "Inference Service Pod" L_PyTorch[Linkerd Proxy] PyTorch[PyTorch Inference Service] end end VectorDB[(Vector Database)] end Client -- HTTPS Request --> Tyk Tyk -- 1. Authenticate & Route --> L_NestJS L_NestJS -- 2. Forward Request --> NestJS NestJS -- 3. tRPC Procedure Call --> NestJS NestJS -- 4. Query Vector DB --> VectorDB NestJS -- 5. Inference Request (HTTP) --> L_NestJS L_NestJS -- 6. mTLS, Retry, Timeout --> L_PyTorch L_PyTorch -- 7. Forward Localhost Request --> PyTorch PyTorch -- 8. Return Inference Result --> L_PyTorch L_PyTorch -- 9. mTLS --> L_NestJS L_NestJS -- 10. Forward to NestJS --> NestJS NestJS -- 11. Process & Return tRPC Response --> L_NestJS L_NestJS -- 12. Forward --> Tyk Tyk -- 13. HTTPS Response --> Client
这个架构的核心在于职责的清晰划分。Tyk处理边界安全,NestJS处理业务逻辑流,PyTorch专注于数学计算,而Linkerd则处理所有棘手的网络问题。
步骤一:模型推理服务的容器化
推理服务的目标是简单和专注。它接收文本数据,返回向量或分数。我们使用Flask作为Web服务器,因为它足够轻量。在真实项目中,可能会选择性能更高的替代品,如Uvicorn。
inference_server/app.py
import os
from flask import Flask, request, jsonify
from sentence_transformers import SentenceTransformer, CrossEncoder
import torch
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__)
# 确定设备,优先使用CUDA
device = 'cuda' if torch.cuda.is_available() else 'cpu'
logging.info(f"Using device: {device}")
# 从环境变量加载模型名称,提供默认值
# 这是一个好的实践,便于在不同环境中使用不同大小的模型
EMBED_MODEL_NAME = os.getenv('EMBED_MODEL_NAME', 'all-MiniLM-L6-v2')
RERANK_MODEL_NAME = os.getenv('RERANK_MODEL_NAME', 'cross-encoder/ms-marco-MiniLM-L-6-v2')
try:
# 预加载模型到指定设备
# 这里的关键是在服务启动时加载模型,而不是在每次请求时。
# 这避免了巨大的延迟。
embed_model = SentenceTransformer(EMBED_MODEL_NAME, device=device)
rerank_model = CrossEncoder(RERANK_MODEL_NAME, device=device)
logging.info(f"Successfully loaded models: {EMBED_MODEL_NAME}, {RERANK_MODEL_NAME}")
except Exception as e:
logging.error(f"Failed to load models: {e}")
# 如果模型加载失败,服务启动就是失败的,这会触发K8s的重启策略
raise e
@app.route('/health', methods=['GET'])
def health_check():
"""Kubernetes liveness/readiness probe endpoint."""
return jsonify({"status": "ok"}), 200
@app.route('/embed', methods=['POST'])
def embed():
try:
data = request.get_json()
if not data or 'texts' not in data or not isinstance(data['texts'], list):
return jsonify({"error": "Invalid input. 'texts' field must be a list of strings."}), 400
# SentenceTransformer的encode方法是线程安全的
embeddings = embed_model.encode(data['texts'], convert_to_tensor=True)
return jsonify(embeddings.cpu().numpy().tolist())
except Exception as e:
logging.error(f"Error during embedding: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route('/rerank', methods=['POST'])
def rerank():
try:
data = request.get_json()
if not data or 'query' not in data or 'documents' not in data:
return jsonify({"error": "Invalid input. 'query' and 'documents' fields are required."}), 400
query = data['query']
documents = data['documents']
# CrossEncoder需要一个(query, document)对的列表
model_input = [[query, doc] for doc in documents]
# predict方法也是线程安全的
scores = rerank_model.predict(model_input)
return jsonify(scores.tolist())
except Exception as e:
logging.error(f"Error during reranking: {e}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
# 在生产中,我们会用Gunicorn或Uvicorn来运行
app.run(host='0.0.0.0', port=5001)
inference_server/Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 单独复制requirements.txt以利用Docker的层缓存
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 暴露端口
EXPOSE 5001
# 设置环境变量,允许在部署时覆盖
ENV EMBED_MODEL_NAME="all-MiniLM-L6-v2"
ENV RERANK_MODEL_NAME="cross-encoder/ms-marco-MiniLM-L-6-v2"
# 在生产环境中,应使用Gunicorn等多进程WSGI服务器
# CMD ["gunicorn", "--workers", "4", "--bind", "0.0.0.0:5001", "app:app"]
CMD ["python", "app.py"]
此服务非常纯粹:它只做计算。健康检查端点对于Kubernetes的探针至关重要。
步骤二:使用NestJS和tRPC构建业务编排层
NestJS提供了强大的依赖注入和模块化系统,是构建复杂业务逻辑的理想选择。tRPC则通过代码生成消除了API契约管理的负担。
orchestrator-service/src/trpc/trpc.router.ts
import { Injectable } from '@nestjs/common';
import { z } from 'zod';
import { TrpcService } from './trpc.service';
import { RAGService } from '../rag/rag.service';
@Injectable()
export class TrpcRouter {
constructor(
private readonly trpc: TrpcService,
private readonly ragService: RAGService,
) {}
// 定义 tRPC 路由
appRouter = this.trpc.router({
// 定义一个名为 'query' 的 procedure
query: this.trpc.procedure
// 使用 Zod 定义输入校验,这是 tRPC 的核心优势之一
.input(
z.object({
query: z.string().min(1, 'Query cannot be empty.'),
topK: z.number().int().positive().default(5),
}),
)
.query(async ({ input }) => {
// 这里的 'query' 对应于 GraphQL 的 query 或 REST 的 GET
// 它调用底层的 RAG 服务来执行业务逻辑
const results = await this.ragService.processQuery(input.query, input.topK);
return results;
}),
});
}
// 导出路由类型,用于客户端代码生成
export type AppRouter = TrpcRouter['appRouter'];
orchestrator-service/src/rag/rag.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import axios from 'axios';
// 定义一个假设的向量数据库客户端返回的类型
interface VectorSearchResult {
id: string;
content: string;
score: number;
}
@Injectable()
export class RAGService {
private readonly logger = new Logger(RAGService.name);
private readonly inferenceServiceUrl: string;
constructor(private readonly configService: ConfigService) {
// 从配置中获取下游服务的地址,这是生产级应用的基本要求
this.inferenceServiceUrl = this.configService.get<string>('INFERENCE_SERVICE_URL');
if (!this.inferenceServiceUrl) {
throw new Error('INFERENCE_SERVICE_URL is not defined in environment variables.');
}
}
async processQuery(query: string, topK: number) {
this.logger.log(`Processing query: "${query}" with topK=${topK}`);
// 步骤 1: (模拟) 从向量数据库检索初始文档
// 在真实项目中,这里会调用 Pinecone, Weaviate, or Qdrant 等
const initialDocuments = this.mockVectorSearch(query, topK * 2); // 取回更多文档以供重排
// 步骤 2: 调用推理服务进行重排
const rerankedDocuments = await this.rerankDocuments(
query,
initialDocuments.map(d => d.content),
);
// 步骤 3: 组合结果并返回
return rerankedDocuments.slice(0, topK);
}
private async rerankDocuments(query: string, documents: string[]) {
try {
const response = await axios.post<{ data: number[] }>(
`${this.inferenceServiceUrl}/rerank`,
{ query, documents },
{
// 这里的超时设置非常关键。Linkerd可以提供更智能的超时和重试,
// 但应用层面的超时是一个重要的兜底策略。
timeout: 5000
},
);
const scores = response.data;
const ranked = documents
.map((doc, index) => ({ document: doc, score: scores[index] }))
.sort((a, b) => b.score - a.score); // 按分数降序排列
return ranked;
} catch (error) {
this.logger.error(`Failed to call rerank service at ${this.inferenceServiceUrl}`, error.stack);
// 如果下游服务失败,必须抛出异常,让上层(或服务网格)来处理
throw new Error('Reranking process failed.');
}
}
private mockVectorSearch(query: string, count: number): VectorSearchResult[] {
// 模拟返回结果
this.logger.log(`Mocking vector search for query: "${query}"`);
return Array.from({ length: count }, (_, i) => ({
id: `doc_${i + 1}`,
content: `This is mock document number ${i + 1} related to the query.`,
score: Math.random(),
}));
}
}
这个服务清晰地展示了编排逻辑:获取数据 -> 调用模型 -> 整理结果。对下游服务的调用是通过一个简单的HTTP客户端完成的。我们不需要在这里实现复杂的重试或断路器逻辑,因为Linkerd会为我们处理。
步骤三:配置Tyk API网关
Tyk作为入口,我们需要定义一个API,将流量代理到我们的NestJS服务。这通常通过Tyk的Dashboard或API来完成。
tyk_api_definition.json
{
"name": "RAG Service API",
"api_id": "rag-api",
"org_id": "default",
"use_keyless": false, // 我们要求使用API Key
"auth": {
"auth_header_name": "Authorization"
},
"version_data": {
"not_versioned": true,
"versions": {
"Default": {
"name": "Default",
"expires": "",
"paths": {
"white_list": [],
"black_list": []
},
"use_extended_paths": true,
"extended_paths": {}
}
}
},
"proxy": {
"listen_path": "/rag/",
"target_url": "http://orchestrator-service.default.svc.cluster.local:3000/", // Kubernetes内部服务地址
"strip_listen_path": true
},
"active": true
}
这份定义做了几件关键的事情:
-
"use_keyless": false
: 强制所有请求必须携带API Key。 -
"listen_path": "/rag/"
: Tyk会监听/rag/
路径。 -
"target_url"
: 指向Kubernetes集群内部的NestJS服务地址。 -
"strip_listen_path": true
: 在将请求转发给上游服务时,会移除/rag/
前缀,这样NestJS服务收到的就是干净的/trpc/...
路径。
步骤四:Kubernetes部署与Linkerd注入
这是将所有部分粘合在一起的关键。我们将为每个服务创建Deployment和Service资源,并通过一个简单的注解来启用Linkerd的sidecar注入。
k8s/deployment.yaml
apiVersion: v1
kind: Service
metadata:
name: orchestrator-service
spec:
selector:
app: orchestrator
ports:
- protocol: TCP
port: 3000
targetPort: 3000
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: orchestrator-deployment
spec:
replicas: 2
selector:
matchLabels:
app: orchestrator
template:
metadata:
labels:
app: orchestrator
annotations:
linkerd.io/inject: enabled # <-- Linkerd注入的关键注解
spec:
containers:
- name: orchestrator
image: your-repo/orchestrator-service:latest
ports:
- containerPort: 3000
env:
- name: INFERENCE_SERVICE_URL
value: "http://inference-service:5001" # 通过K8s服务名通信
---
apiVersion: v1
kind: Service
metadata:
name: inference-service
spec:
selector:
app: inference
ports:
- protocol: TCP
port: 5001
targetPort: 5001
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
spec:
replicas: 1 # 模型服务通常受限于GPU资源,需要更复杂的伸缩策略
selector:
matchLabels:
app: inference
template:
metadata:
labels:
app: inference
annotations:
linkerd.io/inject: enabled # <-- Linkerd注入
spec:
containers:
- name: inference
image: your-repo/inference-server:latest
ports:
- containerPort: 5001
resources: # 在真实项目中,必须为模型服务请求GPU资源
limits:
nvidia.com/gpu: 1
部署这些清单后,Linkerd的控制平面会自动检测到linkerd.io/inject: enabled
注解,并在每个Pod创建时注入一个linkerd-proxy
sidecar容器。从这时起,orchestrator-service
对inference-service
的所有出站HTTP调用都会被其sidecar拦截,加密后发送到inference-service
的sidecar,后者再解密并将请求转发给本地的Python应用。
这一切对应用程序是完全透明的。rag.service.ts
中的axios
调用代码无需任何更改。但现在,我们可以通过Linkerd的CLI或仪表盘观察到这两个服务之间的黄金指标(成功率、请求速率、延迟),并可以配置ServiceProfile
来实现自动重试和超时。
例如,创建一个ServiceProfile
来为rerank
调用配置重试:k8s/service-profile.yaml
apiVersion: linkerd.io/v1alpha2
kind: ServiceProfile
metadata:
name: inference-service.default.svc.cluster.local
spec:
routes:
- name: '/rerank'
condition:
method: POST
pathRegex: /rerank
isRetryable: true # 将此路由标记为可重试
timeout: 10s # 设置一个比应用层更长的超时
应用这个配置后,如果orchestrator-service
对/rerank
的调用失败(例如,网络抖动或Pod重启),Linkerd的代理会自动重试,这极大地增强了系统的韧性。
架构的局限性与未来路径
这个架构虽然健壮,但并非没有权衡。
首先,同步HTTP调用在编排层和推理层之间引入了延迟耦合。如果模型推理时间很长(超过几秒),NestJS的Node.js事件循环可能会因为等待HTTP响应而承受压力。一个更具弹性的演进方向是采用异步模式,例如,NestJS将推理任务放入消息队列(如RabbitMQ或Redis Streams),推理服务作为消费者处理任务,并通过WebSocket或回调将结果推回。
其次,tRPC非常适合于Node.js生态系统内部或与Web前端的通信。但在一个多语言的微服务环境中,如果需要与其他非TypeScript/JavaScript服务进行类型安全的通信,gRPC和Protobuf会是更通用的选择。幸运的是,Linkerd对gRPC的支持同样出色,迁移成本主要在应用层。
最后,对模型服务的伸缩管理是一个复杂的课题。简单的副本数伸缩(HPA)可能不足以应对突发流量和高昂的GPU成本。KEDA(Kubernetes-based Event-Driven Autoscaling)结合自定义指标(如队列长度),可以实现更精细化的、基于负载的伸缩策略,甚至可以缩容至零以节约成本。