一个线上问题的排查请求打破了周五下午的平静。用户反馈:“提交数据分析任务后,等了快一分钟才在页面上看到结果,系统是不是变慢了?” 我查了API网关日志,对应接口的响应时间稳定在50ms以内。再看应用服务器日志,请求处理也一切正常。问题显然出在了我们通过Celery处理的那个异步分析任务上,但麻烦的是,从API服务器的日志里,我们完全看不到是哪个Celery worker、在哪台机器上、具体执行了什么数据库查询导致了缓慢。API请求的日志和Celery worker的日志是完全割裂的,我们丢失了上下文。
这个场景在真实项目中太常见了。HTTP请求触发一个异步任务,然后快速返回。后台的异步任务可能经过多个队列、多个worker,最终操作数据库。一旦出现性能问题或错误,想要把最初的用户操作和最终的底层异常关联起来,无异于大海捞捞针。解决方案很明确:引入分布式链路追踪。目标是,从用户在前端UI(使用Vue和Pinia)的点击开始,生成一个唯一的Trace ID,并让这个ID贯穿整个调用链:Frontend -> API Backend -> Celery Broker -> Celery Worker -> PostgreSQL
。
第一步:技术选型与基础设定
我们的技术栈是固定的:Python FastAPI作为后端,Celery处理异步任务,PostgreSQL作为主数据库,Vue 3 + Pinia构建前端。为了实现追踪,我们选择OpenTelemetry,它是CNCF的孵化项目,提供了厂商中立的API和SDK,避免了技术锁定。
整个追踪系统的核心在于“上下文传播”(Context Propagation)。OpenTelemetry遵循W3C Trace Context规范,通过一个名为 traceparent
的HTTP Header在服务间传递追踪信息。我们的任务就是确保这个 traceparent
Header能从Pinia发起的请求开始,一路传递到最终执行SQL查询的那个Celery Worker进程中。
这是我们将要构建的追踪流程图:
sequenceDiagram participant User as 用户 (浏览器) participant Pinia as Pinia状态管理 participant FastAPI as FastAPI应用 participant CeleryBroker as 消息队列 (Redis/RabbitMQ) participant CeleryWorker as Celery Worker participant PostgreSQL as 数据库 User->>Pinia: 点击“开始分析”按钮 Pinia->>FastAPI: 发起API请求 (携带 traceparent Header) FastAPI->>FastAPI: 创建Root Span FastAPI->>CeleryBroker: 推送任务 (在消息头中注入traceparent) FastAPI-->>Pinia: 立即返回任务ID CeleryBroker-->>CeleryWorker: 消费任务 CeleryWorker->>CeleryWorker: 从消息头中提取traceparent, 创建Child Span CeleryWorker->>PostgreSQL: 执行耗时查询 PostgreSQL-->>CeleryWorker: 返回查询结果 CeleryWorker->>CeleryWorker: 更新任务状态
第二步:后端深度埋点 - 让Celery感知Trace
后端的埋点是整个链路中最复杂,也是最关键的一环。我们需要对FastAPI、Celery和PostgreSQL驱动进行埋点。
2.1. OpenTelemetry基础配置
首先,在我们的Python项目中安装必要的库:
pip install "opentelemetry-distro[otlp]" \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-celery \
opentelemetry-instrumentation-psycopg2
然后,创建一个tracing.py
模块来集中管理OpenTelemetry的初始化。在真实项目中,配置项应该来自环境变量。
# file: my_project/tracing.py
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracer(service_name: str):
"""
为指定服务配置并初始化全局Tracer
"""
# 从环境变量获取OTLP导出端点,方便在不同环境(本地/CI/生产)中配置
# 比如在CircleCI中,我们可以启动一个Jaeger容器并设置此变量
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
# Resource用于标识当前服务的信息,会附加到所有Span上
resource = Resource.create(attributes={
"service.name": service_name,
"service.instance.id": os.uname().nodename,
})
# 设置全局的TracerProvider
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
# 使用OTLP gRPC Exporter将追踪数据发送到收集器(如Jaeger, OpenTelemetry Collector)
# 在生产环境中,推荐使用OTLP Collector作为中间层
exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
# BatchSpanProcessor会批量异步发送Span,性能更好
span_processor = BatchSpanProcessor(exporter)
provider.add_span_processor(span_processor)
print(f"Tracer configured for service '{service_name}' to endpoint '{otlp_endpoint}'")
return trace.get_tracer(service_name)
2.2. FastAPI应用埋点
对FastAPI的埋点相对简单,opentelemetry-instrumentation-fastapi
提供了中间件。
# file: my_project/main.py
from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from .tracing import configure_tracer
from .tasks import run_data_analysis
# 在应用启动时配置Tracer
tracer = configure_tracer(service_name="api-service")
app = FastAPI()
@app.post("/analyze")
async def start_analysis(data: dict):
# FastAPIInstrumentor会自动创建一个Root Span
# 这里的业务逻辑是触发一个Celery任务
task = run_data_analysis.delay(data)
return {"task_id": task.id}
# 在应用启动后应用埋点中间件
FastAPIInstrumentor.instrument_app(app)
2.3. Celery上下文传播 - 核心难点
这是最容易出坑的地方。opentelemetry-instrumentation-celery
提供了基础的埋点能力,但要确保上下文正确传递,需要仔细配置。它通过Celery的信号机制(signals)在任务发布前注入traceparent
到消息头,在任务执行前再从中提取出来。
首先配置Celery实例,并应用埋点。
# file: my_project/celery_app.py
from celery import Celery
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from .tracing import configure_tracer
# 在Celery worker启动时也需要配置Tracer
# 注意:service_name不同,以便在追踪系统中区分
configure_tracer(service_name="celery-worker-service")
celery_app = Celery(
"tasks",
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
)
# 应用Celery埋点,这是实现自动上下文传播的关键
CeleryInstrumentor().instrument(celery_app=celery_app)
# 导入任务模块
celery_app.autodiscover_tasks(['my_project.tasks'])
接下来是任务本身的定义。为了让数据库操作也被追踪,我们需要在Celery worker进程中对psycopg2
进行埋点。
# file: my_project/tasks.py
import time
import psycopg2
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from .celery_app import celery_app
from opentelemetry import trace
# 在定义任务的模块中,确保数据库驱动已被埋点
# 这样,当Celery worker执行到数据库代码时,会自动创建DB Span
Psycopg2Instrumentor().instrument()
# 获取当前模块的tracer
tracer = trace.get_tracer(__name__)
def get_db_connection():
# 模拟从连接池获取连接
# 真实项目中应使用类似psycopg2.pool的库
return psycopg2.connect(
dbname="testdb",
user="testuser",
password="testpassword",
host="localhost"
)
@celery_app.task
def run_data_analysis(data: dict):
# CeleryInstrumentor会自动恢复上下文,所以这里的Span会成为FastAPI Span的子Span
with tracer.start_as_current_span("run_data_analysis_task") as task_span:
task_span.set_attribute("analysis.data.items", len(data))
# 1. 模拟数据预处理
time.sleep(1)
# 2. 执行数据库查询
# Psycopg2Instrumentor会自动为下面的数据库操作创建Span
conn = get_db_connection()
cursor = conn.cursor()
# 这里的SQL查询会作为一个独立的Span出现在链路中
cursor.execute("SELECT pg_sleep(0.5);") # 模拟慢查询
cursor.execute("INSERT INTO analysis_results (data) VALUES (%s);", (str(data),))
conn.commit()
cursor.close()
conn.close()
task_span.add_event("Database operations complete")
return {"status": "complete"}
这里的关键在于:
-
CeleryInstrumentor
负责在任务入队时将当前Tracer的上下文序列化到消息头,并在Worker消费时反序列化,恢复上下文。 -
Psycopg2Instrumentor
必须在Worker进程中被调用instrument()
,这样它才能劫持psycopg2
库的内部方法,自动为每个SQL查询创建Span,并将其作为当前Celery任务Span的子Span。
第三步:前端埋点 - 链路的起点
前端是追踪的源头。我们需要在用户交互时创建第一个Span,并将 traceparent
Header注入到发往后端的API请求中。
3.1. 前端OpenTelemetry配置
首先安装依赖:
npm install @opentelemetry/sdk-trace-web @opentelemetry/api @opentelemetry/context-zone \
@opentelemetry/exporter-trace-otlp-http @opentelemetry/instrumentation-fetch \
@opentelemetry/propagator-w3c
创建一个tracing.js
文件来初始化Web Tracer。
// file: src/tracing.js
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';
import { W3CTraceContextPropagator } from '@opentelemetry/propagator-w3c';
const provider = new WebTracerProvider({
resource: {
'service.name': 'frontend-vue-app',
},
});
// 使用OTLP HTTP Exporter将数据发送到收集器
const exporter = new OTLPTraceExporter({
url: import.meta.env.VITE_OTEL_EXPORTER_OTLP_URL || 'http://localhost:4318/v1/traces',
});
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
// 设置全局的TracerProvider和ContextManager
provider.register({
contextManager: new ZoneContextManager(),
propagator: new W3CTraceContextPropagator(), // 关键:使用W3C传播器
});
// 自动埋点fetch请求
registerInstrumentations({
instrumentations: [
new FetchInstrumentation({
// 可以在这里配置哪些请求需要被追踪
// propagateTraceHeaderCorsUrls: [/.+/g] 允许向所有URL传播追踪头
propagateTraceHeaderCorsUrls: [new RegExp(import.meta.env.VITE_API_BASE_URL)],
}),
],
});
export const tracer = provider.getTracer('pinia-instrumentation');
在你的main.js
或main.ts
中引入并执行它:
// file: src/main.js
import './tracing'; // 确保在应用启动前初始化
// ... 其他vue app的创建逻辑
3.2. 结合Pinia和业务逻辑
现在,我们要在Pinia的action中触发API调用。由于FetchInstrumentation
已经自动工作,我们几乎不需要修改业务代码。但为了让链路更清晰,我们可以手动创建一个父Span来包裹整个用户操作。
// file: src/stores/analysis.js
import { defineStore } from 'pinia';
import { tracer } from '@/tracing';
import { trace, context } from '@opentelemetry/api';
export const useAnalysisStore = defineStore('analysis', {
state: () => ({
taskId: null,
status: 'idle',
error: null,
}),
actions: {
async startAnalysis(analysisData) {
// 手动创建一个Span来包裹整个用户操作
const span = tracer.startSpan('user-clicks-start-analysis-button');
// 在这个Span的上下文中执行后续操作
await context.with(trace.setSpan(context.active(), span), async () => {
try {
this.status = 'loading';
span.addEvent('Analysis process started from UI');
// FetchInstrumentation会自动从当前上下文中获取span信息
// 并将traceparent header注入到下面的fetch请求中
const response = await fetch(`${import.meta.env.VITE_API_BASE_URL}/analyze`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(analysisData),
});
if (!response.ok) {
throw new Error('API request failed');
}
const result = await response.json();
this.taskId = result.task_id;
this.status = 'submitted';
span.setAttribute('app.task.id', result.task_id);
} catch (e) {
this.error = e.message;
this.status = 'error';
span.recordException(e);
span.setStatus({ code: trace.SpanStatusCode.ERROR, message: e.message });
} finally {
span.end();
}
});
},
},
});
这个实现中,FetchInstrumentation
是魔法的关键。它会自动将user-clicks-start-analysis-button
这个Span的上下文信息(Trace ID, Span ID)打包成traceparent
Header,附加到发往/analyze
的请求上。后端FastAPIInstrumentor
收到后,会解析这个Header,创建一个子Span,从而将前后端链路连接起来。
第四步:用CircleCI验证链路完整性
代码写完,如何保证这套复杂的追踪系统在未来不会因为某次代码重构而失效?答案是自动化测试。我们要在CI/CD流程中加入一个集成测试,它能端到端地验证追踪链路的完整性。
我们将使用CircleCI和Docker Compose来搭建一个完整的测试环境,包括我们的应用、数据库、Celery,以及一个追踪数据收集器Jaeger。
4.1. Docker Compose配置测试环境
# file: docker-compose.ci.yml
version: '3.8'
services:
postgres:
image: postgres:14
environment:
- POSTGRES_USER=testuser
- POSTGRES_PASSWORD=testpassword
- POSTGRES_DB=testdb
ports:
- "5432:5432"
redis:
image: redis:7
ports:
- "6379:6379"
jaeger:
image: jaegertracing/all-in-one:1.35
ports:
- "4317:4317" # OTLP gRPC endpoint
- "16686:16686" # Jaeger UI
api:
build: .
command: uvicorn my_project.main:app --host 0.0.0.0 --port 8000
depends_on:
- redis
- postgres
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
ports:
- "8000:8000"
worker:
build: .
command: celery -A my_project.celery_app worker --loglevel=info
depends_on:
- redis
- postgres
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
4.2. CircleCI配置
现在,我们来编写.circleci/config.yml
。核心是一个integration-test
作业,它会启动上述Docker Compose环境,运行一个测试脚本,该脚本会调用API,然后去查询Jaeger的API,断言追踪数据是否符合预期。
# file: .circleci/config.yml
version: 2.1
orbs:
python: circleci/[email protected]
jobs:
build-and-test:
docker:
- image: cimg/python:3.10
steps:
- checkout
- python/install-packages:
pkg-manager: pip
- run:
name: Run Unit Tests
command: pytest tests/unit
integration-test:
machine:
image: ubuntu-2004:202111-01
steps:
- checkout
- run:
name: Install Docker Compose
command: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
- run:
name: Start Environment
command: docker-compose -f docker-compose.ci.yml up -d
- run:
name: Wait for services to be ready
command: |
echo "Waiting for Jaeger..."
sleep 10 # 在真实CI中应使用更健壮的等待脚本
echo "Waiting for API..."
docker-compose -f docker-compose.ci.yml run --rm api /bin/bash -c "pip install requests && python tests/integration/wait_for_api.py"
- run:
name: Run Tracing E2E Test
command: docker-compose -f docker-compose.ci.yml run --rm api /bin/bash -c "pip install pytest requests && pytest tests/integration/test_tracing.py"
workflows:
main:
jobs:
- build-and-test
- integration-test:
requires:
- build-and-test
4.3. 编写集成测试脚本
这是验证工作的最后一步,也是最重要的一步。这个pytest
测试会:
- 向我们的API发送一个请求来触发Celery任务。
- 等待一段时间,确保任务已完成。
- 通过HTTP请求查询Jaeger的API,获取刚才生成的Trace。
- 断言Trace的结构是否正确:是否包含来自
api-service
、celery-worker-service
的Spans,以及数据库查询的Span。
# file: tests/integration/test_tracing.py
import requests
import time
import os
import pytest
API_URL = "http://api:8000"
JAEGER_API_URL = "http://jaeger:16686/api/traces"
def test_full_trace_propagation():
# 1. 触发API,它会启动一个Celery任务
response = requests.post(f"{API_URL}/analyze", json={"user": "ci-test"})
assert response.status_code == 200
task_id = response.json()["task_id"]
# 2. 等待足够的时间让Celery任务完成并上报trace
# 在生产级CI中,这里应该轮询任务结果,而不是固定等待
time.sleep(5)
# 3. 从Jaeger查询这个服务的traces
params = {'service': 'api-service', 'limit': 20}
response = requests.get(JAEGER_API_URL, params=params)
assert response.status_code == 200
traces_data = response.json()['data']
assert len(traces_data) > 0, "No traces found in Jaeger for api-service"
# 寻找与我们任务相关的trace
target_trace = None
for trace in traces_data:
for span in trace['spans']:
for tag in span['tags']:
if tag['key'] == 'http.target' and tag['value'] == '/analyze':
target_trace = trace
break
if target_trace:
break
assert target_trace is not None, "Could not find trace for the /analyze request"
# 4. 断言trace的结构
processes = target_trace['processes']
spans = target_trace['spans']
# 检查是否包含来自API和Celery Worker的span
services_found = {p['serviceName'] for p in processes.values()}
assert "api-service" in services_found
assert "celery-worker-service" in services_found, f"Services found: {services_found}"
# 检查关键Span是否存在
span_names = {s['operationName'] for s in spans}
assert "POST /analyze" in span_names # FastAPI Span
assert "run_data_analysis" in span_names # Celery Task Span
assert "run_data_analysis_task" in span_names # Custom Span within task
# 检查是否有数据库Span,它的名字通常是SQL命令
db_span_found = any(s['operationName'].startswith("INSERT") or s['operationName'].startswith("SELECT") for s in spans)
assert db_span_found, f"No DB span found. Spans available: {span_names}"
print("Trace validation successful!")
现在,每次代码提交到CircleCI,我们都会自动验证这套复杂的分布式追踪系统是否依然健壮,极大地增强了我们对系统可观测性的信心。
局限与未来路径
这套方案虽然解决了核心的上下文传递问题,但在生产环境中还需考虑更多。首先是采样率,全量追踪在高流量下成本过高,需要配置动态采样策略,比如只对错误链路或慢链路进行全量采样。其次,当前的方案依赖于OpenTelemetry提供的自动埋点库,如果项目中使用了某些冷门的、没有现成埋点库的组件,就需要手动创建和传递上下文,工作量会增加。
未来的优化方向可以是在CircleCI的集成测试中引入对Span属性(tags/attributes)的更精细断言,比如检查数据库Span中是否包含了正确的db.statement
。此外,还可以将追踪数据与Metrics(如Prometheus)和Logs进行关联,当收到一个告警时,可以直接从Metrics仪表盘下钻到对应的Trace,再关联到具体的错误日志,形成一个完整的、立体的可观测性体系。