一个线上 AI 推理服务的性能问题排查,最棘手的莫过于请求链路的黑盒化。当一个请求从网关进入,经过业务编排层,最终抵达 Python 模型服务时,任何一环的延迟都可能导致用户体验的下降。最近我们面临一个典型场景:一个通过 Tyk API Gateway 暴露的图像分类服务,其后端是一个 Node.js 应用,负责接收请求、预处理并调用一个 Keras 模型进行推理,整个 P95 响应时间居高不下。问题究竟出在 Tyk 的插件处理、Node.js 的 I/O 等待,还是 Keras 模型的推理本身?在没有全链路追踪的情况下,这几乎等同于猜谜。
我们的目标非常明确:为这个 Tyk -> Node.js -> Python(Keras)
的异构技术栈,构建一个无缝的、端到端的分布式追踪系统。我们选择了 Apache SkyWalking,因为它对多语言生态的支持相对完善,并且其 sw8
协议头的设计在必要时也允许我们进行手动上下文传递。这篇不是一篇入门教程,而是一次将这三个看似无关的系统捏合在一起,并实现深度可观测性的实战复盘。
第一步:建立可被追踪的靶点 - Keras 模型服务
一切追踪的基础是让每个服务都具备被追踪的能力。我们从链路的最后一环,也就是 Python Keras 模型服务开始。在真实项目中,这通常是一个由 TensorFlow Serving、Triton 或 TorchServe 部署的服务,但为了清晰地展示追踪上下文的传递,我们使用一个简单的 Flask 应用来包装 Keras 模型。这里的关键不是模型本身多复杂,而是服务如何正确地接入 SkyWalking。
一个常见的错误是,认为只要安装了 SkyWalking Python Agent,一切就会自动完成。对于主流框架如 Django 或 Flask 的标准路由,这确实可行。但当涉及到服务间的 RPC 调用时,特别是从一个非 Python 客户端(我们的 Node.js 服务)调用过来时,追踪上下文的传递往往需要手动介入。
首先,这是我们的模型服务 model_server.py
的骨架。我们使用一个预训练的 MobileNetV2 模型,这在生产环境中很常见。
# model_server.py
import os
import numpy as np
import logging
from flask import Flask, request, jsonify
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input, decode_predictions
from tensorflow.keras.preprocessing import image
from PIL import Image
import io
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 初始化 Flask 应用
app = Flask(__name__)
# 加载预训练的 Keras 模型
# 在生产环境中,模型应该在应用启动时加载一次,而不是在每次请求时加载
try:
model = MobileNetV2(weights='imagenet')
logger.info("MobileNetV2 model loaded successfully.")
except Exception as e:
logger.error(f"Failed to load Keras model: {e}")
model = None # 标记模型加载失败
# 定义推理端点
@app.route('/predict', methods=['POST'])
def predict():
if not model:
return jsonify({"error": "Model is not available"}), 500
if 'file' not in request.files:
return jsonify({"error": "Missing file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
try:
# 1. 图像预处理
# 使用 PIL.Image 从内存中的文件字节流加载图像
img_bytes = file.read()
pil_img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
# Keras 模型需要特定的输入尺寸
pil_img = pil_img.resize((224, 224))
# 将 PIL 图像转换为 numpy 数组
img_array = image.img_to_array(pil_img)
# 扩展维度以匹配模型输入形状 (1, 224, 224, 3)
img_array = np.expand_dims(img_array, axis=0)
# 应用 MobileNetV2 特定的预处理
processed_img = preprocess_input(img_array)
# 2. 模型推理
predictions = model.predict(processed_img)
# 3. 解码预测结果
decoded = decode_predictions(predictions, top=3)[0]
result = [{"label": label, "description": desc, "probability": float(prob)} for label, desc, prob in decoded]
logger.info(f"Prediction successful for {file.filename}")
return jsonify(result)
except Exception as e:
logger.error(f"An error occurred during prediction: {e}", exc_info=True)
return jsonify({"error": "Internal server error during prediction"}), 500
if __name__ == '__main__':
# 从环境变量获取端口,默认为 5001
port = int(os.environ.get("PORT", 5001))
app.run(host='0.0.0.0', port=port)
现在,我们引入 SkyWalking。首先安装依赖:
pip install "apache-skywalking<1.0.0" Pillow tensorflow
为了让 SkyWalking Agent 生效,我们不能直接 python model_server.py
启动,而是需要使用 sw-python
命令。
# 环境变量配置
export SW_AGENT_NAME=keras-model-service
export SW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
# 启动服务
sw-python run model_server.py
此时,如果直接调用 /predict
接口,你会在 SkyWalking UI 上看到一个追踪记录。但这只是链路的终点。当 Node.js 调用它时,它会生成一个新的 Trace,与上游的 Trace 是断开的。这里的坑在于,SkyWalking Python Agent 默认不会自动从一个通用的 HTTP Header(如 sw8
)中提取上游的 Trace Context。我们需要显式地处理它。
我们将修改 predict
函数,手动从请求头中提取 sw8
,并创建一个 “Entry Span”。
# model_server.py (修改后的 predict 函数)
from sw_django.core.trace.context import SpanContext, Span, Carrier
from sw_django.core.trace.propagation import get_context_from_sw8, set_context_in_sw8
# ... (其他代码保持不变) ...
@app.route('/predict', methods=['POST'])
def predict():
# 关键步骤:手动处理追踪上下文
sw8_header = request.headers.get('sw8')
# 从 sw8 header 创建入口 Span 上下文
context = get_context_from_sw8(sw8_header) if sw8_header else SpanContext()
# 创建一个 Entry Span,代表这个请求的入口
# operation_name 应该是有意义的,比如 API 路径
with Span(op="/predict", peer=request.remote_addr, context=context) as entry_span:
# 将当前 Span 类型设置为 Entry,这是必须的
entry_span.kind = 'Entry'
# 可以在这里添加自定义的 tags
entry_span.tag(key='model.name', val='MobileNetV2')
# --- 原有的业务逻辑开始 ---
if not model:
entry_span.tag(key='error', val='true')
entry_span.log(ex=Exception("Model is not available"))
return jsonify({"error": "Model is not available"}), 500
if 'file' not in request.files:
entry_span.tag(key='error', val='true')
return jsonify({"error": "Missing file part"}), 400
file = request.files['file']
entry_span.tag(key='image.filename', val=file.filename)
# ... (此处省略了完整的图像处理和预测逻辑,与上面版本相同) ...
# ... 假设 result 已经被计算出来 ...
try:
# ... (图像处理和预测代码) ...
img_bytes = file.read()
pil_img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
pil_img = pil_img.resize((224, 224))
img_array = image.img_to_array(pil_img)
img_array = np.expand_dims(img_array, axis=0)
processed_img = preprocess_input(img_array)
# 为模型推理本身创建一个子 Span,以精确测量其耗时
with Span(op="Keras.predict", context=context) as inference_span:
inference_span.kind = 'Local' # 本地 Span
predictions = model.predict(processed_img)
decoded = decode_predictions(predictions, top=3)[0]
result = [{"label": label, "description": desc, "probability": float(prob)} for label, desc, prob in decoded]
logger.info(f"Prediction successful for {file.filename}")
return jsonify(result)
except Exception as e:
logger.error(f"An error occurred during prediction: {e}", exc_info=True)
entry_span.tag(key='error', val='true')
entry_span.log(ex=e)
return jsonify({"error": "Internal server error during prediction"}), 500
这个改动是连接断裂链路的关键。get_context_from_sw8
会解析上游(Node.js)传来的 sw8
头,并用它来初始化当前的 Span 上下文。这样,SkyWalking OAP Server 就能将这个 Python 服务的 Span 与上游的 Span 正确地关联起来。我们还额外为 model.predict
调用创建了一个本地 Span,这样就能在追踪详情中精确看到模型推理的耗时。
第二步:构建承上启下的 Node.js 编排服务
Node.js 服务在这里扮演着 BFF (Backend for Frontend) 的角色。它接收来自网关的请求,可能会进行一些权限校验、参数转换,然后以 multipart/form-data
的形式将文件转发给 Python 模型服务。
对于 Node.js,我们使用官方的 skywalking-backend-js
agent。
npm install express axios form-data skywalking-backend-js
启动脚本 start.js
需要在应用代码之前加载 agent。
// start.js
// 必须在所有其他模块之前引入
require('skywalking-backend-js').start({
serviceName: 'nodejs-bff-service',
collectorAddress: '127.0.0.1:11800',
// 开启日志,方便调试
log: {
level: 'DEBUG'
}
});
require('./server.js');
然后是我们的核心业务逻辑 server.js
。
// server.js
const express = require('express');
const axios = require('axios');
const multer = require('multer');
const FormData = require('form-data');
const fs = require('fs');
const path = require('path');
const app = express();
const port = process.env.PORT || 3000;
const modelServiceUrl = process.env.MODEL_SERVICE_URL || 'http://localhost:5001/predict';
// 使用 multer 处理文件上传,存储在内存中
const storage = multer.memoryStorage();
const upload = multer({ storage: storage });
app.post('/api/v1/classify', upload.single('image'), async (req, res) => {
if (!req.file) {
return res.status(400).send({ error: 'No image file provided.' });
}
// 在真实项目中,这里可能会有复杂的业务逻辑:
// 1. 用户身份验证
// 2. 图像元数据记录
// 3. 根据用户等级选择不同的模型服务
// ...
try {
const form = new FormData();
// 将内存中的 buffer 附加到 form-data
form.append('file', req.file.buffer, {
filename: req.file.originalname,
contentType: req.file.mimetype,
});
console.log(`Forwarding request to model service at ${modelServiceUrl}`);
// skywalking-backend-js 会自动拦截 axios 请求
// 并注入 sw8 header
const response = await axios.post(modelServiceUrl, form, {
headers: {
...form.getHeaders(),
},
// 设置一个合理的超时时间
timeout: 10000,
});
res.status(response.status).json(response.data);
} catch (error) {
console.error('Error calling model service:', error.message);
// 错误处理:区分是网络问题还是下游服务返回的错误
if (error.response) {
// 下游服务返回了非 2xx 状态码
res.status(error.response.status).json({
error: 'Model service returned an error.',
details: error.response.data
});
} else if (error.request) {
// 请求已发出但没有收到响应
res.status(504).json({ error: 'Gateway timeout - No response from model service.' });
} else {
// 其他未知错误
res.status(500).json({ error: 'Internal server error.' });
}
}
});
app.listen(port, () => {
console.log(`BFF service listening at http://localhost:${port}`);
});
启动服务:
node start.js
skywalking-backend-js
agent 的一个优点是它能自动为支持的库(如 Express, axios)创建 Span。当我们的 Node.js 服务收到请求时,agent 会创建一个 Entry Span。当它使用 axios
调用 Python 服务时,agent 会创建一个 Exit Span,并将当前的 Trace Context 序列化后注入到 sw8
请求头中。这正是我们 Python 服务中所需要接收和解析的那个头。
第三步:配置 API 网关 Tyk 的可观测性
链路的最前端是 Tyk API Gateway。如果忽略了网关本身,我们就无法得知请求在网关层消耗了多少时间,例如身份验证、速率限制等中间件的处理耗时。Tyk 自身支持通过 OpenTelemetry 协议(OTLP)导出追踪数据,而 SkyWalking OAP Server 正好可以接收 OTLP 格式的数据。
这是一个双赢的局面:我们不需要为 Tyk 开发自定义的 SkyWalking 插件,只需进行正确的配置即可。
首先,修改 Tyk Gateway 的配置文件 tyk.conf
,启用 OpenTelemetry 导出。
// tyk.conf
{
// ... other configurations
"tracing": {
"enabled": true,
"name": "opentelemetry",
"options": {
"endpoint": "skywalking-oap:4317", // OAP OTLP gRPC 接收地址
"service_name": "tyk-api-gateway",
"is_secure": false, // 如果 OAP 开启了 TLS,这里要设为 true
"resource_attributes": {
"deployment.environment": "production"
}
}
},
// ...
}
这里 skywalking-oap:4317
是 SkyWalking OAP 服务暴露的 OTLP/gRPC 端口。service_name
必须设置,这样在 SkyWalking UI 中才能清晰地看到名为 tyk-api-gateway
的服务。
接下来,定义一个 API,将流量代理到我们的 Node.js 服务。这是 Tyk API 定义的核心部分 api_definition.json
:
{
"name": "AI-Inference-API",
"api_id": "ai-inference-api-1",
"org_id": "1",
"use_keyless": true, // 为简化示例,禁用 API Key
"proxy": {
"listen_path": "/inference/",
"target_url": "http://nodejs-bff-service:3000/", // 目标是 Node.js 服务
"strip_listen_path": true
},
"version_data": {
"not_versioned": true,
"versions": {
"Default": {
"name": "Default",
"use_extended_paths": true
}
}
},
"active": true
}
将这个 API 定义加载到 Tyk。现在,整个请求流程已经就绪。
第四步:整合与验证:端到端追踪的可视化
我们用一个 docker-compose.yml
来编排所有服务,这样可以模拟一个真实的微服务环境。
# docker-compose.yml
version: '3.8'
services:
oap:
image: apache/skywalking-oap-server:9.5.0
container_name: oap
ports:
- "11800:11800"
- "12800:12800"
- "4317:4317" # OTLP gRPC Port
healthcheck:
test: ["CMD", "/skywalking/bin/swctl", "check", "health"]
interval: 30s
timeout: 10s
retries: 3
ui:
image: apache/skywalking-ui:9.5.0
container_name: ui
depends_on:
oap:
condition: service_healthy
ports:
- "8080:8080"
environment:
SW_OAP_ADDRESS: http://oap:12800
model-service:
build:
context: ./python-service
container_name: model-service
environment:
- SW_AGENT_NAME=keras-model-service
- SW_AGENT_COLLECTOR_BACKEND_SERVICES=oap:11800
command: sw-python run model_server.py
bff-service:
build:
context: ./nodejs-service
container_name: nodejs-bff-service
ports:
- "3000:3000"
environment:
- SKYWALKING_COLLECTOR_BACKEND_SERVICES=oap:11800
- SKYWALKING_AGENT_SERVICE_NAME=nodejs-bff-service
- MODEL_SERVICE_URL=http://model-service:5001/predict
depends_on:
- model-service
tyk-gateway:
image: tykio/tyk-gateway:v4.0.1
container_name: tyk-gateway
ports:
- "8081:8080"
volumes:
- ./tyk/tyk.conf:/opt/tyk-gateway/tyk.conf
- ./tyk/apps:/opt/tyk-gateway/apps
depends_on:
- bff-service
启动整个栈:docker-compose up
。然后,我们发起一个测试请求:
curl -X POST http://localhost:8081/inference/api/v1/classify \
-F "image=@/path/to/your/image.jpg"
现在,打开 SkyWalking UI,我们应该能看到一条完整的追踪链。
graph TD A[Client] -->|HTTP POST| B(Tyk Gateway); B -->|OTLP gRPC| D(SkyWalking OAP); B -->|HTTP POST with sw8| C(Node.js BFF); C -->|sw8-js agent| D; C -->|HTTP POST with sw8| E(Python/Flask Service); E -->|sw-python agent| D;
在追踪详情页面,你会看到一个瀑布图,清晰地展示了:
- 一个根 Span,来自
tyk-api-gateway
,显示了请求在网关的总耗时。 - 一个子 Span,来自
nodejs-bff-service
,操作名为/api/v1/classify
,展示了 Node.js 服务的处理时间。 - 在这个 Node.js Span 下,还有一个 Exit Span,代表对 Python 服务的
POST
调用。 - 最后,一个子 Span,来自
keras-model-service
,操作名为/predict
。这个 Span 的父 Span 正是 Node.js 的 Exit Span,证明了链路的成功连接。 - 在
/predict
Span 内部,还有一个Keras.predict
的本地 Span,精确度量了模型推理的耗时。
通过这个视图,我们可以一目了然地定位性能瓶颈。如果 Keras.predict
Span 耗时很长,说明模型推理是瓶颈;如果 Node.js 的 Exit Span 和 Python 的 Entry Span 之间有显著的时间差,那可能是网络延迟;如果 Tyk 的 Span 自身就很长,那么需要检查网关的插件或配置。
遗留问题与未来迭代
这个方案虽然实现了核心目标,但在生产环境中仍有几个值得优化的地方。
首先,Python 服务中手动处理 sw8
header 的方式虽然有效,但具有一定的侵入性。如果下游服务是 gRPC,利用 gRPC 的 metadata
机制传递追踪上下文会更优雅和标准化。对于更复杂的 Python 应用,直接使用 OpenTelemetry SDK 并配置 OTLP Exporter 发送到 SkyWalking OAP,可能是更灵活、更符合云原生趋势的做法。
其次,当前的追踪主要关注服务间的调用延迟。对于 AI 服务,模型推理内部的细节(例如数据从 CPU 到 GPU 的拷贝时间、特定层的计算时间)是不可见的。要实现更深度的模型性能分析,需要在 Python 代码中,使用 TensorFlow Profiler 或类似的工具,并将关键阶段的耗时作为自定义 Span 的 Tag 或 Log 上报,这需要更精细的埋点。
最后,一个完整的可观测性体系不应只有追踪(Tracing),还应包括指标(Metrics)和日志(Logging)。下一步应该是将服务的关键业务指标(如 QPS、推理成功率)和结构化日志也接入统一的平台,实现 Traces、Metrics、Logs 的自动关联,从而在出现问题时,能够从一个 Trace ID 快速下钻到相关的日志和指标异常,这才是可观测性驱动开发的最终形态。