构建基于tRPC、NestJS与PyTorch的多阶段RAG推理管道并集成Tyk网关与Linkerd服务网格


将一个检索增强生成(RAG)原型投入生产环境,挑战远不止于优化模型本身。真正的复杂性在于构建一个可观测、安全且可独立伸缩的服务体系。一个常见的错误是将API网关逻辑、业务编排和模型推理全部耦合在单个Python应用中。这种单体架构在初期看似高效,但在流量增长和业务逻辑迭代时会迅速成为瓶颈,运维成本急剧上升。

我们的目标是构建一个解耦的、生产级的RAG推理服务。它必须满足以下要求:

  1. 关注点分离: API管理、业务编排和AI模型计算必须是独立的服务,可以独立部署、扩缩容和迭代。
  2. 类型安全: 服务间的通信契约必须是强类型的,以减少运行时错误。
  3. 流量管控: 对外暴露的API必须有统一的认证、鉴权和速率限制。
  4. 内部通信韧性: 服务间的内部调用(东西向流量)必须具备自动重试、超时控制、流量加密和细粒度的可观测性。

基于这些考量,我们最终确定的技术栈决策是:

  • API网关 (Tyk): 处理所有入口流量(南北向流量),负责认证、路由和策略执行。
  • 业务编排层 (NestJS + tRPC): 作为一个IO密集型的中间服务,负责接收请求,协调对下游数据源和模型服务的调用。tRPC提供与客户端之间端到端的类型安全。
  • 模型推理层 (PyTorch + Flask): 作为一个计算密集型服务,专门负责运行Embedding和Reranking模型,并通过简单的HTTP接口暴露。
  • 服务网格 (Linkerd): 以Sidecar模式注入,透明地为服务间通信提供mTLS、重试、超时和遥测数据,而无需修改任何应用代码。

架构概览

整个请求生命周期遵循一个明确的路径。

graph TD
    subgraph "Public Network"
        Client[客户端]
    end

    subgraph "DMZ / API Gateway Layer"
        Tyk[Tyk API Gateway]
    end

    subgraph "Kubernetes Cluster / Internal Network"
        subgraph "Linkerd Service Mesh"
            subgraph "Orchestration Service Pod"
                L_NestJS[Linkerd Proxy]
                NestJS[NestJS Orchestrator Service]
            end
            subgraph "Inference Service Pod"
                L_PyTorch[Linkerd Proxy]
                PyTorch[PyTorch Inference Service]
            end
        end
        VectorDB[(Vector Database)]
    end

    Client -- HTTPS Request --> Tyk
    Tyk -- 1. Authenticate & Route --> L_NestJS
    L_NestJS -- 2. Forward Request --> NestJS
    NestJS -- 3. tRPC Procedure Call --> NestJS
    NestJS -- 4. Query Vector DB --> VectorDB
    NestJS -- 5. Inference Request (HTTP) --> L_NestJS
    L_NestJS -- 6. mTLS, Retry, Timeout --> L_PyTorch
    L_PyTorch -- 7. Forward Localhost Request --> PyTorch
    PyTorch -- 8. Return Inference Result --> L_PyTorch
    L_PyTorch -- 9. mTLS --> L_NestJS
    L_NestJS -- 10. Forward to NestJS --> NestJS
    NestJS -- 11. Process & Return tRPC Response --> L_NestJS
    L_NestJS -- 12. Forward --> Tyk
    Tyk -- 13. HTTPS Response --> Client

这个架构的核心在于职责的清晰划分。Tyk处理边界安全,NestJS处理业务逻辑流,PyTorch专注于数学计算,而Linkerd则处理所有棘手的网络问题。

步骤一:模型推理服务的容器化

推理服务的目标是简单和专注。它接收文本数据,返回向量或分数。我们使用Flask作为Web服务器,因为它足够轻量。在真实项目中,可能会选择性能更高的替代品,如Uvicorn。

inference_server/app.py

import os
from flask import Flask, request, jsonify
from sentence_transformers import SentenceTransformer, CrossEncoder
import torch
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = Flask(__name__)

# 确定设备,优先使用CUDA
device = 'cuda' if torch.cuda.is_available() else 'cpu'
logging.info(f"Using device: {device}")

# 从环境变量加载模型名称,提供默认值
# 这是一个好的实践,便于在不同环境中使用不同大小的模型
EMBED_MODEL_NAME = os.getenv('EMBED_MODEL_NAME', 'all-MiniLM-L6-v2')
RERANK_MODEL_NAME = os.getenv('RERANK_MODEL_NAME', 'cross-encoder/ms-marco-MiniLM-L-6-v2')

try:
    # 预加载模型到指定设备
    # 这里的关键是在服务启动时加载模型,而不是在每次请求时。
    # 这避免了巨大的延迟。
    embed_model = SentenceTransformer(EMBED_MODEL_NAME, device=device)
    rerank_model = CrossEncoder(RERANK_MODEL_NAME, device=device)
    logging.info(f"Successfully loaded models: {EMBED_MODEL_NAME}, {RERANK_MODEL_NAME}")
except Exception as e:
    logging.error(f"Failed to load models: {e}")
    # 如果模型加载失败,服务启动就是失败的,这会触发K8s的重启策略
    raise e

@app.route('/health', methods=['GET'])
def health_check():
    """Kubernetes liveness/readiness probe endpoint."""
    return jsonify({"status": "ok"}), 200

@app.route('/embed', methods=['POST'])
def embed():
    try:
        data = request.get_json()
        if not data or 'texts' not in data or not isinstance(data['texts'], list):
            return jsonify({"error": "Invalid input. 'texts' field must be a list of strings."}), 400

        # SentenceTransformer的encode方法是线程安全的
        embeddings = embed_model.encode(data['texts'], convert_to_tensor=True)
        return jsonify(embeddings.cpu().numpy().tolist())

    except Exception as e:
        logging.error(f"Error during embedding: {e}")
        return jsonify({"error": "Internal server error"}), 500

@app.route('/rerank', methods=['POST'])
def rerank():
    try:
        data = request.get_json()
        if not data or 'query' not in data or 'documents' not in data:
            return jsonify({"error": "Invalid input. 'query' and 'documents' fields are required."}), 400

        query = data['query']
        documents = data['documents']
        
        # CrossEncoder需要一个(query, document)对的列表
        model_input = [[query, doc] for doc in documents]
        
        # predict方法也是线程安全的
        scores = rerank_model.predict(model_input)
        
        return jsonify(scores.tolist())

    except Exception as e:
        logging.error(f"Error during reranking: {e}")
        return jsonify({"error": "Internal server error"}), 500

if __name__ == '__main__':
    # 在生产中,我们会用Gunicorn或Uvicorn来运行
    app.run(host='0.0.0.0', port=5001)

inference_server/Dockerfile

FROM python:3.9-slim

WORKDIR /app

# 单独复制requirements.txt以利用Docker的层缓存
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# 暴露端口
EXPOSE 5001

# 设置环境变量,允许在部署时覆盖
ENV EMBED_MODEL_NAME="all-MiniLM-L6-v2"
ENV RERANK_MODEL_NAME="cross-encoder/ms-marco-MiniLM-L-6-v2"

# 在生产环境中,应使用Gunicorn等多进程WSGI服务器
# CMD ["gunicorn", "--workers", "4", "--bind", "0.0.0.0:5001", "app:app"]
CMD ["python", "app.py"]

此服务非常纯粹:它只做计算。健康检查端点对于Kubernetes的探针至关重要。

步骤二:使用NestJS和tRPC构建业务编排层

NestJS提供了强大的依赖注入和模块化系统,是构建复杂业务逻辑的理想选择。tRPC则通过代码生成消除了API契约管理的负担。

orchestrator-service/src/trpc/trpc.router.ts

import { Injectable } from '@nestjs/common';
import { z } from 'zod';
import { TrpcService } from './trpc.service';
import { RAGService } from '../rag/rag.service';

@Injectable()
export class TrpcRouter {
  constructor(
    private readonly trpc: TrpcService,
    private readonly ragService: RAGService,
  ) {}

  // 定义 tRPC 路由
  appRouter = this.trpc.router({
    // 定义一个名为 'query' 的 procedure
    query: this.trpc.procedure
      // 使用 Zod 定义输入校验,这是 tRPC 的核心优势之一
      .input(
        z.object({
          query: z.string().min(1, 'Query cannot be empty.'),
          topK: z.number().int().positive().default(5),
        }),
      )
      .query(async ({ input }) => {
        // 这里的 'query' 对应于 GraphQL 的 query 或 REST 的 GET
        // 它调用底层的 RAG 服务来执行业务逻辑
        const results = await this.ragService.processQuery(input.query, input.topK);
        return results;
      }),
  });
}

// 导出路由类型,用于客户端代码生成
export type AppRouter = TrpcRouter['appRouter'];

orchestrator-service/src/rag/rag.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import axios from 'axios';

// 定义一个假设的向量数据库客户端返回的类型
interface VectorSearchResult {
  id: string;
  content: string;
  score: number;
}

@Injectable()
export class RAGService {
  private readonly logger = new Logger(RAGService.name);
  private readonly inferenceServiceUrl: string;

  constructor(private readonly configService: ConfigService) {
    // 从配置中获取下游服务的地址,这是生产级应用的基本要求
    this.inferenceServiceUrl = this.configService.get<string>('INFERENCE_SERVICE_URL');
    if (!this.inferenceServiceUrl) {
      throw new Error('INFERENCE_SERVICE_URL is not defined in environment variables.');
    }
  }

  async processQuery(query: string, topK: number) {
    this.logger.log(`Processing query: "${query}" with topK=${topK}`);

    // 步骤 1: (模拟) 从向量数据库检索初始文档
    // 在真实项目中,这里会调用 Pinecone, Weaviate, or Qdrant 等
    const initialDocuments = this.mockVectorSearch(query, topK * 2); // 取回更多文档以供重排

    // 步骤 2: 调用推理服务进行重排
    const rerankedDocuments = await this.rerankDocuments(
      query,
      initialDocuments.map(d => d.content),
    );

    // 步骤 3: 组合结果并返回
    return rerankedDocuments.slice(0, topK);
  }

  private async rerankDocuments(query: string, documents: string[]) {
    try {
      const response = await axios.post<{ data: number[] }>(
        `${this.inferenceServiceUrl}/rerank`,
        { query, documents },
        { 
          // 这里的超时设置非常关键。Linkerd可以提供更智能的超时和重试,
          // 但应用层面的超时是一个重要的兜底策略。
          timeout: 5000 
        },
      );
      
      const scores = response.data;

      const ranked = documents
        .map((doc, index) => ({ document: doc, score: scores[index] }))
        .sort((a, b) => b.score - a.score); // 按分数降序排列

      return ranked;
    } catch (error) {
      this.logger.error(`Failed to call rerank service at ${this.inferenceServiceUrl}`, error.stack);
      // 如果下游服务失败,必须抛出异常,让上层(或服务网格)来处理
      throw new Error('Reranking process failed.');
    }
  }

  private mockVectorSearch(query: string, count: number): VectorSearchResult[] {
    // 模拟返回结果
    this.logger.log(`Mocking vector search for query: "${query}"`);
    return Array.from({ length: count }, (_, i) => ({
      id: `doc_${i + 1}`,
      content: `This is mock document number ${i + 1} related to the query.`,
      score: Math.random(),
    }));
  }
}

这个服务清晰地展示了编排逻辑:获取数据 -> 调用模型 -> 整理结果。对下游服务的调用是通过一个简单的HTTP客户端完成的。我们不需要在这里实现复杂的重试或断路器逻辑,因为Linkerd会为我们处理。

步骤三:配置Tyk API网关

Tyk作为入口,我们需要定义一个API,将流量代理到我们的NestJS服务。这通常通过Tyk的Dashboard或API来完成。

tyk_api_definition.json

{
  "name": "RAG Service API",
  "api_id": "rag-api",
  "org_id": "default",
  "use_keyless": false, // 我们要求使用API Key
  "auth": {
    "auth_header_name": "Authorization"
  },
  "version_data": {
    "not_versioned": true,
    "versions": {
      "Default": {
        "name": "Default",
        "expires": "",
        "paths": {
          "white_list": [],
          "black_list": []
        },
        "use_extended_paths": true,
        "extended_paths": {}
      }
    }
  },
  "proxy": {
    "listen_path": "/rag/",
    "target_url": "http://orchestrator-service.default.svc.cluster.local:3000/", // Kubernetes内部服务地址
    "strip_listen_path": true
  },
  "active": true
}

这份定义做了几件关键的事情:

  1. "use_keyless": false: 强制所有请求必须携带API Key。
  2. "listen_path": "/rag/": Tyk会监听/rag/路径。
  3. "target_url": 指向Kubernetes集群内部的NestJS服务地址。
  4. "strip_listen_path": true: 在将请求转发给上游服务时,会移除/rag/前缀,这样NestJS服务收到的就是干净的/trpc/...路径。

步骤四:Kubernetes部署与Linkerd注入

这是将所有部分粘合在一起的关键。我们将为每个服务创建Deployment和Service资源,并通过一个简单的注解来启用Linkerd的sidecar注入。

k8s/deployment.yaml

apiVersion: v1
kind: Service
metadata:
  name: orchestrator-service
spec:
  selector:
    app: orchestrator
  ports:
    - protocol: TCP
      port: 3000
      targetPort: 3000
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: orchestrator-deployment
spec:
  replicas: 2
  selector:
    matchLabels:
      app: orchestrator
  template:
    metadata:
      labels:
        app: orchestrator
      annotations:
        linkerd.io/inject: enabled # <-- Linkerd注入的关键注解
    spec:
      containers:
      - name: orchestrator
        image: your-repo/orchestrator-service:latest
        ports:
        - containerPort: 3000
        env:
        - name: INFERENCE_SERVICE_URL
          value: "http://inference-service:5001" # 通过K8s服务名通信
---
apiVersion: v1
kind: Service
metadata:
  name: inference-service
spec:
  selector:
    app: inference
  ports:
    - protocol: TCP
      port: 5001
      targetPort: 5001
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
spec:
  replicas: 1 # 模型服务通常受限于GPU资源,需要更复杂的伸缩策略
  selector:
    matchLabels:
      app: inference
  template:
    metadata:
      labels:
        app: inference
      annotations:
        linkerd.io/inject: enabled # <-- Linkerd注入
    spec:
      containers:
      - name: inference
        image: your-repo/inference-server:latest
        ports:
        - containerPort: 5001
        resources: # 在真实项目中,必须为模型服务请求GPU资源
          limits:
            nvidia.com/gpu: 1

部署这些清单后,Linkerd的控制平面会自动检测到linkerd.io/inject: enabled注解,并在每个Pod创建时注入一个linkerd-proxy sidecar容器。从这时起,orchestrator-serviceinference-service的所有出站HTTP调用都会被其sidecar拦截,加密后发送到inference-service的sidecar,后者再解密并将请求转发给本地的Python应用。

这一切对应用程序是完全透明的。rag.service.ts中的axios调用代码无需任何更改。但现在,我们可以通过Linkerd的CLI或仪表盘观察到这两个服务之间的黄金指标(成功率、请求速率、延迟),并可以配置ServiceProfile来实现自动重试和超时。

例如,创建一个ServiceProfile来为rerank调用配置重试:
k8s/service-profile.yaml

apiVersion: linkerd.io/v1alpha2
kind: ServiceProfile
metadata:
  name: inference-service.default.svc.cluster.local
spec:
  routes:
  - name: '/rerank'
    condition:
      method: POST
      pathRegex: /rerank
    isRetryable: true # 将此路由标记为可重试
    timeout: 10s # 设置一个比应用层更长的超时

应用这个配置后,如果orchestrator-service/rerank的调用失败(例如,网络抖动或Pod重启),Linkerd的代理会自动重试,这极大地增强了系统的韧性。

架构的局限性与未来路径

这个架构虽然健壮,但并非没有权衡。

首先,同步HTTP调用在编排层和推理层之间引入了延迟耦合。如果模型推理时间很长(超过几秒),NestJS的Node.js事件循环可能会因为等待HTTP响应而承受压力。一个更具弹性的演进方向是采用异步模式,例如,NestJS将推理任务放入消息队列(如RabbitMQ或Redis Streams),推理服务作为消费者处理任务,并通过WebSocket或回调将结果推回。

其次,tRPC非常适合于Node.js生态系统内部或与Web前端的通信。但在一个多语言的微服务环境中,如果需要与其他非TypeScript/JavaScript服务进行类型安全的通信,gRPC和Protobuf会是更通用的选择。幸运的是,Linkerd对gRPC的支持同样出色,迁移成本主要在应用层。

最后,对模型服务的伸缩管理是一个复杂的课题。简单的副本数伸缩(HPA)可能不足以应对突发流量和高昂的GPU成本。KEDA(Kubernetes-based Event-Driven Autoscaling)结合自定义指标(如队列长度),可以实现更精细化的、基于负载的伸缩策略,甚至可以缩容至零以节约成本。


  目录