构建基于 Tyk 与 Node.js 的 Keras 模型服务并集成 SkyWalking 全链路追踪


一个线上 AI 推理服务的性能问题排查,最棘手的莫过于请求链路的黑盒化。当一个请求从网关进入,经过业务编排层,最终抵达 Python 模型服务时,任何一环的延迟都可能导致用户体验的下降。最近我们面临一个典型场景:一个通过 Tyk API Gateway 暴露的图像分类服务,其后端是一个 Node.js 应用,负责接收请求、预处理并调用一个 Keras 模型进行推理,整个 P95 响应时间居高不下。问题究竟出在 Tyk 的插件处理、Node.js 的 I/O 等待,还是 Keras 模型的推理本身?在没有全链路追踪的情况下,这几乎等同于猜谜。

我们的目标非常明确:为这个 Tyk -> Node.js -> Python(Keras) 的异构技术栈,构建一个无缝的、端到端的分布式追踪系统。我们选择了 Apache SkyWalking,因为它对多语言生态的支持相对完善,并且其 sw8 协议头的设计在必要时也允许我们进行手动上下文传递。这篇不是一篇入门教程,而是一次将这三个看似无关的系统捏合在一起,并实现深度可观测性的实战复盘。

第一步:建立可被追踪的靶点 - Keras 模型服务

一切追踪的基础是让每个服务都具备被追踪的能力。我们从链路的最后一环,也就是 Python Keras 模型服务开始。在真实项目中,这通常是一个由 TensorFlow Serving、Triton 或 TorchServe 部署的服务,但为了清晰地展示追踪上下文的传递,我们使用一个简单的 Flask 应用来包装 Keras 模型。这里的关键不是模型本身多复杂,而是服务如何正确地接入 SkyWalking。

一个常见的错误是,认为只要安装了 SkyWalking Python Agent,一切就会自动完成。对于主流框架如 Django 或 Flask 的标准路由,这确实可行。但当涉及到服务间的 RPC 调用时,特别是从一个非 Python 客户端(我们的 Node.js 服务)调用过来时,追踪上下文的传递往往需要手动介入。

首先,这是我们的模型服务 model_server.py 的骨架。我们使用一个预训练的 MobileNetV2 模型,这在生产环境中很常见。

# model_server.py
import os
import numpy as np
import logging
from flask import Flask, request, jsonify
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input, decode_predictions
from tensorflow.keras.preprocessing import image
from PIL import Image
import io

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 初始化 Flask 应用
app = Flask(__name__)

# 加载预训练的 Keras 模型
# 在生产环境中,模型应该在应用启动时加载一次,而不是在每次请求时加载
try:
    model = MobileNetV2(weights='imagenet')
    logger.info("MobileNetV2 model loaded successfully.")
except Exception as e:
    logger.error(f"Failed to load Keras model: {e}")
    model = None # 标记模型加载失败

# 定义推理端点
@app.route('/predict', methods=['POST'])
def predict():
    if not model:
        return jsonify({"error": "Model is not available"}), 500
    
    if 'file' not in request.files:
        return jsonify({"error": "Missing file part"}), 400

    file = request.files['file']
    if file.filename == '':
        return jsonify({"error": "No selected file"}), 400

    try:
        # 1. 图像预处理
        # 使用 PIL.Image 从内存中的文件字节流加载图像
        img_bytes = file.read()
        pil_img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
        # Keras 模型需要特定的输入尺寸
        pil_img = pil_img.resize((224, 224))
        
        # 将 PIL 图像转换为 numpy 数组
        img_array = image.img_to_array(pil_img)
        # 扩展维度以匹配模型输入形状 (1, 224, 224, 3)
        img_array = np.expand_dims(img_array, axis=0)
        # 应用 MobileNetV2 特定的预处理
        processed_img = preprocess_input(img_array)

        # 2. 模型推理
        predictions = model.predict(processed_img)
        
        # 3. 解码预测结果
        decoded = decode_predictions(predictions, top=3)[0]
        result = [{"label": label, "description": desc, "probability": float(prob)} for label, desc, prob in decoded]
        
        logger.info(f"Prediction successful for {file.filename}")
        return jsonify(result)

    except Exception as e:
        logger.error(f"An error occurred during prediction: {e}", exc_info=True)
        return jsonify({"error": "Internal server error during prediction"}), 500

if __name__ == '__main__':
    # 从环境变量获取端口,默认为 5001
    port = int(os.environ.get("PORT", 5001))
    app.run(host='0.0.0.0', port=port)

现在,我们引入 SkyWalking。首先安装依赖:

pip install "apache-skywalking<1.0.0" Pillow tensorflow

为了让 SkyWalking Agent 生效,我们不能直接 python model_server.py 启动,而是需要使用 sw-python 命令。

# 环境变量配置
export SW_AGENT_NAME=keras-model-service
export SW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
# 启动服务
sw-python run model_server.py

此时,如果直接调用 /predict 接口,你会在 SkyWalking UI 上看到一个追踪记录。但这只是链路的终点。当 Node.js 调用它时,它会生成一个新的 Trace,与上游的 Trace 是断开的。这里的坑在于,SkyWalking Python Agent 默认不会自动从一个通用的 HTTP Header(如 sw8)中提取上游的 Trace Context。我们需要显式地处理它。

我们将修改 predict 函数,手动从请求头中提取 sw8,并创建一个 “Entry Span”。

# model_server.py (修改后的 predict 函数)
from sw_django.core.trace.context import SpanContext, Span, Carrier
from sw_django.core.trace.propagation import get_context_from_sw8, set_context_in_sw8

# ... (其他代码保持不变) ...

@app.route('/predict', methods=['POST'])
def predict():
    # 关键步骤:手动处理追踪上下文
    sw8_header = request.headers.get('sw8')
    # 从 sw8 header 创建入口 Span 上下文
    context = get_context_from_sw8(sw8_header) if sw8_header else SpanContext()

    # 创建一个 Entry Span,代表这个请求的入口
    # operation_name 应该是有意义的,比如 API 路径
    with Span(op="/predict", peer=request.remote_addr, context=context) as entry_span:
        # 将当前 Span 类型设置为 Entry,这是必须的
        entry_span.kind = 'Entry'
        # 可以在这里添加自定义的 tags
        entry_span.tag(key='model.name', val='MobileNetV2')
        
        # --- 原有的业务逻辑开始 ---
        if not model:
            entry_span.tag(key='error', val='true')
            entry_span.log(ex=Exception("Model is not available"))
            return jsonify({"error": "Model is not available"}), 500
        
        if 'file' not in request.files:
            entry_span.tag(key='error', val='true')
            return jsonify({"error": "Missing file part"}), 400

        file = request.files['file']
        entry_span.tag(key='image.filename', val=file.filename)
        
        # ... (此处省略了完整的图像处理和预测逻辑,与上面版本相同) ...
        # ... 假设 result 已经被计算出来 ...
        
        try:
            # ... (图像处理和预测代码) ...
            img_bytes = file.read()
            pil_img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
            pil_img = pil_img.resize((224, 224))
            img_array = image.img_to_array(pil_img)
            img_array = np.expand_dims(img_array, axis=0)
            processed_img = preprocess_input(img_array)

            # 为模型推理本身创建一个子 Span,以精确测量其耗时
            with Span(op="Keras.predict", context=context) as inference_span:
                inference_span.kind = 'Local' # 本地 Span
                predictions = model.predict(processed_img)
            
            decoded = decode_predictions(predictions, top=3)[0]
            result = [{"label": label, "description": desc, "probability": float(prob)} for label, desc, prob in decoded]
            
            logger.info(f"Prediction successful for {file.filename}")
            return jsonify(result)
        
        except Exception as e:
            logger.error(f"An error occurred during prediction: {e}", exc_info=True)
            entry_span.tag(key='error', val='true')
            entry_span.log(ex=e)
            return jsonify({"error": "Internal server error during prediction"}), 500

这个改动是连接断裂链路的关键。get_context_from_sw8 会解析上游(Node.js)传来的 sw8 头,并用它来初始化当前的 Span 上下文。这样,SkyWalking OAP Server 就能将这个 Python 服务的 Span 与上游的 Span 正确地关联起来。我们还额外为 model.predict 调用创建了一个本地 Span,这样就能在追踪详情中精确看到模型推理的耗时。

第二步:构建承上启下的 Node.js 编排服务

Node.js 服务在这里扮演着 BFF (Backend for Frontend) 的角色。它接收来自网关的请求,可能会进行一些权限校验、参数转换,然后以 multipart/form-data 的形式将文件转发给 Python 模型服务。

对于 Node.js,我们使用官方的 skywalking-backend-js agent。

npm install express axios form-data skywalking-backend-js

启动脚本 start.js 需要在应用代码之前加载 agent。

// start.js
// 必须在所有其他模块之前引入
require('skywalking-backend-js').start({
    serviceName: 'nodejs-bff-service',
    collectorAddress: '127.0.0.1:11800',
    // 开启日志,方便调试
    log: {
        level: 'DEBUG'
    }
});

require('./server.js');

然后是我们的核心业务逻辑 server.js

// server.js
const express = require('express');
const axios = require('axios');
const multer = require('multer');
const FormData = require('form-data');
const fs = require('fs');
const path = require('path');

const app = express();
const port = process.env.PORT || 3000;
const modelServiceUrl = process.env.MODEL_SERVICE_URL || 'http://localhost:5001/predict';

// 使用 multer 处理文件上传,存储在内存中
const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

app.post('/api/v1/classify', upload.single('image'), async (req, res) => {
    if (!req.file) {
        return res.status(400).send({ error: 'No image file provided.' });
    }

    // 在真实项目中,这里可能会有复杂的业务逻辑:
    // 1. 用户身份验证
    // 2. 图像元数据记录
    // 3. 根据用户等级选择不同的模型服务
    // ...

    try {
        const form = new FormData();
        // 将内存中的 buffer 附加到 form-data
        form.append('file', req.file.buffer, {
            filename: req.file.originalname,
            contentType: req.file.mimetype,
        });

        console.log(`Forwarding request to model service at ${modelServiceUrl}`);
        
        // skywalking-backend-js 会自动拦截 axios 请求
        // 并注入 sw8 header
        const response = await axios.post(modelServiceUrl, form, {
            headers: {
                ...form.getHeaders(),
            },
            // 设置一个合理的超时时间
            timeout: 10000, 
        });

        res.status(response.status).json(response.data);

    } catch (error) {
        console.error('Error calling model service:', error.message);
        
        // 错误处理:区分是网络问题还是下游服务返回的错误
        if (error.response) {
            // 下游服务返回了非 2xx 状态码
            res.status(error.response.status).json({ 
                error: 'Model service returned an error.',
                details: error.response.data 
            });
        } else if (error.request) {
            // 请求已发出但没有收到响应
            res.status(504).json({ error: 'Gateway timeout - No response from model service.' });
        } else {
            // 其他未知错误
            res.status(500).json({ error: 'Internal server error.' });
        }
    }
});

app.listen(port, () => {
    console.log(`BFF service listening at http://localhost:${port}`);
});

启动服务:

node start.js

skywalking-backend-js agent 的一个优点是它能自动为支持的库(如 Express, axios)创建 Span。当我们的 Node.js 服务收到请求时,agent 会创建一个 Entry Span。当它使用 axios 调用 Python 服务时,agent 会创建一个 Exit Span,并将当前的 Trace Context 序列化后注入到 sw8 请求头中。这正是我们 Python 服务中所需要接收和解析的那个头。

第三步:配置 API 网关 Tyk 的可观测性

链路的最前端是 Tyk API Gateway。如果忽略了网关本身,我们就无法得知请求在网关层消耗了多少时间,例如身份验证、速率限制等中间件的处理耗时。Tyk 自身支持通过 OpenTelemetry 协议(OTLP)导出追踪数据,而 SkyWalking OAP Server 正好可以接收 OTLP 格式的数据。

这是一个双赢的局面:我们不需要为 Tyk 开发自定义的 SkyWalking 插件,只需进行正确的配置即可。

首先,修改 Tyk Gateway 的配置文件 tyk.conf,启用 OpenTelemetry 导出。

// tyk.conf
{
  // ... other configurations
  "tracing": {
    "enabled": true,
    "name": "opentelemetry",
    "options": {
      "endpoint": "skywalking-oap:4317", // OAP OTLP gRPC 接收地址
      "service_name": "tyk-api-gateway",
      "is_secure": false, // 如果 OAP 开启了 TLS,这里要设为 true
      "resource_attributes": {
        "deployment.environment": "production"
      }
    }
  },
  // ...
}

这里 skywalking-oap:4317 是 SkyWalking OAP 服务暴露的 OTLP/gRPC 端口。service_name 必须设置,这样在 SkyWalking UI 中才能清晰地看到名为 tyk-api-gateway 的服务。

接下来,定义一个 API,将流量代理到我们的 Node.js 服务。这是 Tyk API 定义的核心部分 api_definition.json

{
    "name": "AI-Inference-API",
    "api_id": "ai-inference-api-1",
    "org_id": "1",
    "use_keyless": true, // 为简化示例,禁用 API Key
    "proxy": {
        "listen_path": "/inference/",
        "target_url": "http://nodejs-bff-service:3000/", // 目标是 Node.js 服务
        "strip_listen_path": true
    },
    "version_data": {
        "not_versioned": true,
        "versions": {
            "Default": {
                "name": "Default",
                "use_extended_paths": true
            }
        }
    },
    "active": true
}

将这个 API 定义加载到 Tyk。现在,整个请求流程已经就绪。

第四步:整合与验证:端到端追踪的可视化

我们用一个 docker-compose.yml 来编排所有服务,这样可以模拟一个真实的微服务环境。

# docker-compose.yml
version: '3.8'

services:
  oap:
    image: apache/skywalking-oap-server:9.5.0
    container_name: oap
    ports:
      - "11800:11800"
      - "12800:12800"
      - "4317:4317" # OTLP gRPC Port
    healthcheck:
      test: ["CMD", "/skywalking/bin/swctl", "check", "health"]
      interval: 30s
      timeout: 10s
      retries: 3

  ui:
    image: apache/skywalking-ui:9.5.0
    container_name: ui
    depends_on:
      oap:
        condition: service_healthy
    ports:
      - "8080:8080"
    environment:
      SW_OAP_ADDRESS: http://oap:12800

  model-service:
    build:
      context: ./python-service
    container_name: model-service
    environment:
      - SW_AGENT_NAME=keras-model-service
      - SW_AGENT_COLLECTOR_BACKEND_SERVICES=oap:11800
    command: sw-python run model_server.py

  bff-service:
    build:
      context: ./nodejs-service
    container_name: nodejs-bff-service
    ports:
      - "3000:3000"
    environment:
      - SKYWALKING_COLLECTOR_BACKEND_SERVICES=oap:11800
      - SKYWALKING_AGENT_SERVICE_NAME=nodejs-bff-service
      - MODEL_SERVICE_URL=http://model-service:5001/predict
    depends_on:
      - model-service

  tyk-gateway:
    image: tykio/tyk-gateway:v4.0.1
    container_name: tyk-gateway
    ports:
      - "8081:8080"
    volumes:
      - ./tyk/tyk.conf:/opt/tyk-gateway/tyk.conf
      - ./tyk/apps:/opt/tyk-gateway/apps
    depends_on:
      - bff-service

启动整个栈:docker-compose up。然后,我们发起一个测试请求:

curl -X POST http://localhost:8081/inference/api/v1/classify \
  -F "image=@/path/to/your/image.jpg"

现在,打开 SkyWalking UI,我们应该能看到一条完整的追踪链。

graph TD
    A[Client] -->|HTTP POST| B(Tyk Gateway);
    B -->|OTLP gRPC| D(SkyWalking OAP);
    B -->|HTTP POST with sw8| C(Node.js BFF);
    C -->|sw8-js agent| D;
    C -->|HTTP POST with sw8| E(Python/Flask Service);
    E -->|sw-python agent| D;

在追踪详情页面,你会看到一个瀑布图,清晰地展示了:

  1. 一个根 Span,来自 tyk-api-gateway,显示了请求在网关的总耗时。
  2. 一个子 Span,来自 nodejs-bff-service,操作名为 /api/v1/classify,展示了 Node.js 服务的处理时间。
  3. 在这个 Node.js Span 下,还有一个 Exit Span,代表对 Python 服务的 POST 调用。
  4. 最后,一个子 Span,来自 keras-model-service,操作名为 /predict。这个 Span 的父 Span 正是 Node.js 的 Exit Span,证明了链路的成功连接。
  5. /predict Span 内部,还有一个 Keras.predict 的本地 Span,精确度量了模型推理的耗时。

通过这个视图,我们可以一目了然地定位性能瓶颈。如果 Keras.predict Span 耗时很长,说明模型推理是瓶颈;如果 Node.js 的 Exit Span 和 Python 的 Entry Span 之间有显著的时间差,那可能是网络延迟;如果 Tyk 的 Span 自身就很长,那么需要检查网关的插件或配置。

遗留问题与未来迭代

这个方案虽然实现了核心目标,但在生产环境中仍有几个值得优化的地方。

首先,Python 服务中手动处理 sw8 header 的方式虽然有效,但具有一定的侵入性。如果下游服务是 gRPC,利用 gRPC 的 metadata 机制传递追踪上下文会更优雅和标准化。对于更复杂的 Python 应用,直接使用 OpenTelemetry SDK 并配置 OTLP Exporter 发送到 SkyWalking OAP,可能是更灵活、更符合云原生趋势的做法。

其次,当前的追踪主要关注服务间的调用延迟。对于 AI 服务,模型推理内部的细节(例如数据从 CPU 到 GPU 的拷贝时间、特定层的计算时间)是不可见的。要实现更深度的模型性能分析,需要在 Python 代码中,使用 TensorFlow Profiler 或类似的工具,并将关键阶段的耗时作为自定义 Span 的 Tag 或 Log 上报,这需要更精细的埋点。

最后,一个完整的可观测性体系不应只有追踪(Tracing),还应包括指标(Metrics)和日志(Logging)。下一步应该是将服务的关键业务指标(如 QPS、推理成功率)和结构化日志也接入统一的平台,实现 Traces、Metrics、Logs 的自动关联,从而在出现问题时,能够从一个 Trace ID 快速下钻到相关的日志和指标异常,这才是可观测性驱动开发的最终形态。


  目录