构建从 Pinia 到 PostgreSQL 的全链路追踪 Celery 异步任务上下文传递实战


一个线上问题的排查请求打破了周五下午的平静。用户反馈:“提交数据分析任务后,等了快一分钟才在页面上看到结果,系统是不是变慢了?” 我查了API网关日志,对应接口的响应时间稳定在50ms以内。再看应用服务器日志,请求处理也一切正常。问题显然出在了我们通过Celery处理的那个异步分析任务上,但麻烦的是,从API服务器的日志里,我们完全看不到是哪个Celery worker、在哪台机器上、具体执行了什么数据库查询导致了缓慢。API请求的日志和Celery worker的日志是完全割裂的,我们丢失了上下文。

这个场景在真实项目中太常见了。HTTP请求触发一个异步任务,然后快速返回。后台的异步任务可能经过多个队列、多个worker,最终操作数据库。一旦出现性能问题或错误,想要把最初的用户操作和最终的底层异常关联起来,无异于大海捞捞针。解决方案很明确:引入分布式链路追踪。目标是,从用户在前端UI(使用Vue和Pinia)的点击开始,生成一个唯一的Trace ID,并让这个ID贯穿整个调用链:Frontend -> API Backend -> Celery Broker -> Celery Worker -> PostgreSQL

第一步:技术选型与基础设定

我们的技术栈是固定的:Python FastAPI作为后端,Celery处理异步任务,PostgreSQL作为主数据库,Vue 3 + Pinia构建前端。为了实现追踪,我们选择OpenTelemetry,它是CNCF的孵化项目,提供了厂商中立的API和SDK,避免了技术锁定。

整个追踪系统的核心在于“上下文传播”(Context Propagation)。OpenTelemetry遵循W3C Trace Context规范,通过一个名为 traceparent 的HTTP Header在服务间传递追踪信息。我们的任务就是确保这个 traceparent Header能从Pinia发起的请求开始,一路传递到最终执行SQL查询的那个Celery Worker进程中。

这是我们将要构建的追踪流程图:

sequenceDiagram
    participant User as 用户 (浏览器)
    participant Pinia as Pinia状态管理
    participant FastAPI as FastAPI应用
    participant CeleryBroker as 消息队列 (Redis/RabbitMQ)
    participant CeleryWorker as Celery Worker
    participant PostgreSQL as 数据库

    User->>Pinia: 点击“开始分析”按钮
    Pinia->>FastAPI: 发起API请求 (携带 traceparent Header)
    FastAPI->>FastAPI: 创建Root Span
    FastAPI->>CeleryBroker: 推送任务 (在消息头中注入traceparent)
    FastAPI-->>Pinia: 立即返回任务ID
    CeleryBroker-->>CeleryWorker: 消费任务
    CeleryWorker->>CeleryWorker: 从消息头中提取traceparent, 创建Child Span
    CeleryWorker->>PostgreSQL: 执行耗时查询
    PostgreSQL-->>CeleryWorker: 返回查询结果
    CeleryWorker->>CeleryWorker: 更新任务状态

第二步:后端深度埋点 - 让Celery感知Trace

后端的埋点是整个链路中最复杂,也是最关键的一环。我们需要对FastAPI、Celery和PostgreSQL驱动进行埋点。

2.1. OpenTelemetry基础配置

首先,在我们的Python项目中安装必要的库:

pip install "opentelemetry-distro[otlp]" \
             opentelemetry-instrumentation-fastapi \
             opentelemetry-instrumentation-celery \
             opentelemetry-instrumentation-psycopg2

然后,创建一个tracing.py模块来集中管理OpenTelemetry的初始化。在真实项目中,配置项应该来自环境变量。

# file: my_project/tracing.py

import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

def configure_tracer(service_name: str):
    """
    为指定服务配置并初始化全局Tracer
    """
    # 从环境变量获取OTLP导出端点,方便在不同环境(本地/CI/生产)中配置
    # 比如在CircleCI中,我们可以启动一个Jaeger容器并设置此变量
    otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")

    # Resource用于标识当前服务的信息,会附加到所有Span上
    resource = Resource.create(attributes={
        "service.name": service_name,
        "service.instance.id": os.uname().nodename,
    })

    # 设置全局的TracerProvider
    provider = TracerProvider(resource=resource)
    trace.set_tracer_provider(provider)

    # 使用OTLP gRPC Exporter将追踪数据发送到收集器(如Jaeger, OpenTelemetry Collector)
    # 在生产环境中,推荐使用OTLP Collector作为中间层
    exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)

    # BatchSpanProcessor会批量异步发送Span,性能更好
    span_processor = BatchSpanProcessor(exporter)
    provider.add_span_processor(span_processor)

    print(f"Tracer configured for service '{service_name}' to endpoint '{otlp_endpoint}'")
    
    return trace.get_tracer(service_name)

2.2. FastAPI应用埋点

对FastAPI的埋点相对简单,opentelemetry-instrumentation-fastapi 提供了中间件。

# file: my_project/main.py

from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from .tracing import configure_tracer
from .tasks import run_data_analysis

# 在应用启动时配置Tracer
tracer = configure_tracer(service_name="api-service")
app = FastAPI()

@app.post("/analyze")
async def start_analysis(data: dict):
    # FastAPIInstrumentor会自动创建一个Root Span
    # 这里的业务逻辑是触发一个Celery任务
    task = run_data_analysis.delay(data)
    return {"task_id": task.id}

# 在应用启动后应用埋点中间件
FastAPIInstrumentor.instrument_app(app)

2.3. Celery上下文传播 - 核心难点

这是最容易出坑的地方。opentelemetry-instrumentation-celery 提供了基础的埋点能力,但要确保上下文正确传递,需要仔细配置。它通过Celery的信号机制(signals)在任务发布前注入traceparent到消息头,在任务执行前再从中提取出来。

首先配置Celery实例,并应用埋点。

# file: my_project/celery_app.py

from celery import Celery
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from .tracing import configure_tracer

# 在Celery worker启动时也需要配置Tracer
# 注意:service_name不同,以便在追踪系统中区分
configure_tracer(service_name="celery-worker-service")

celery_app = Celery(
    "tasks",
    broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
    backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
)

# 应用Celery埋点,这是实现自动上下文传播的关键
CeleryInstrumentor().instrument(celery_app=celery_app)

# 导入任务模块
celery_app.autodiscover_tasks(['my_project.tasks'])

接下来是任务本身的定义。为了让数据库操作也被追踪,我们需要在Celery worker进程中对psycopg2进行埋点。

# file: my_project/tasks.py

import time
import psycopg2
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from .celery_app import celery_app
from opentelemetry import trace

# 在定义任务的模块中,确保数据库驱动已被埋点
# 这样,当Celery worker执行到数据库代码时,会自动创建DB Span
Psycopg2Instrumentor().instrument()

# 获取当前模块的tracer
tracer = trace.get_tracer(__name__)

def get_db_connection():
    # 模拟从连接池获取连接
    # 真实项目中应使用类似psycopg2.pool的库
    return psycopg2.connect(
        dbname="testdb",
        user="testuser",
        password="testpassword",
        host="localhost"
    )

@celery_app.task
def run_data_analysis(data: dict):
    # CeleryInstrumentor会自动恢复上下文,所以这里的Span会成为FastAPI Span的子Span
    with tracer.start_as_current_span("run_data_analysis_task") as task_span:
        task_span.set_attribute("analysis.data.items", len(data))
        
        # 1. 模拟数据预处理
        time.sleep(1) 
        
        # 2. 执行数据库查询
        # Psycopg2Instrumentor会自动为下面的数据库操作创建Span
        conn = get_db_connection()
        cursor = conn.cursor()
        
        # 这里的SQL查询会作为一个独立的Span出现在链路中
        cursor.execute("SELECT pg_sleep(0.5);") # 模拟慢查询
        cursor.execute("INSERT INTO analysis_results (data) VALUES (%s);", (str(data),))
        conn.commit()
        
        cursor.close()
        conn.close()
        
        task_span.add_event("Database operations complete")
        return {"status": "complete"}

这里的关键在于:

  1. CeleryInstrumentor 负责在任务入队时将当前Tracer的上下文序列化到消息头,并在Worker消费时反序列化,恢复上下文。
  2. Psycopg2Instrumentor 必须在Worker进程中被调用 instrument(),这样它才能劫持psycopg2库的内部方法,自动为每个SQL查询创建Span,并将其作为当前Celery任务Span的子Span。

第三步:前端埋点 - 链路的起点

前端是追踪的源头。我们需要在用户交互时创建第一个Span,并将 traceparent Header注入到发往后端的API请求中。

3.1. 前端OpenTelemetry配置

首先安装依赖:

npm install @opentelemetry/sdk-trace-web @opentelemetry/api @opentelemetry/context-zone \
            @opentelemetry/exporter-trace-otlp-http @opentelemetry/instrumentation-fetch \
            @opentelemetry/propagator-w3c

创建一个tracing.js文件来初始化Web Tracer。

// file: src/tracing.js

import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';
import { W3CTraceContextPropagator } from '@opentelemetry/propagator-w3c';

const provider = new WebTracerProvider({
  resource: {
    'service.name': 'frontend-vue-app',
  },
});

// 使用OTLP HTTP Exporter将数据发送到收集器
const exporter = new OTLPTraceExporter({
  url: import.meta.env.VITE_OTEL_EXPORTER_OTLP_URL || 'http://localhost:4318/v1/traces',
});

provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

// 设置全局的TracerProvider和ContextManager
provider.register({
  contextManager: new ZoneContextManager(),
  propagator: new W3CTraceContextPropagator(), // 关键:使用W3C传播器
});

// 自动埋点fetch请求
registerInstrumentations({
  instrumentations: [
    new FetchInstrumentation({
      // 可以在这里配置哪些请求需要被追踪
      // propagateTraceHeaderCorsUrls: [/.+/g] 允许向所有URL传播追踪头
      propagateTraceHeaderCorsUrls: [new RegExp(import.meta.env.VITE_API_BASE_URL)],
    }),
  ],
});

export const tracer = provider.getTracer('pinia-instrumentation');

在你的main.jsmain.ts中引入并执行它:

// file: src/main.js
import './tracing'; // 确保在应用启动前初始化
// ... 其他vue app的创建逻辑

3.2. 结合Pinia和业务逻辑

现在,我们要在Pinia的action中触发API调用。由于FetchInstrumentation已经自动工作,我们几乎不需要修改业务代码。但为了让链路更清晰,我们可以手动创建一个父Span来包裹整个用户操作。

// file: src/stores/analysis.js

import { defineStore } from 'pinia';
import { tracer } from '@/tracing';
import { trace, context } from '@opentelemetry/api';

export const useAnalysisStore = defineStore('analysis', {
  state: () => ({
    taskId: null,
    status: 'idle',
    error: null,
  }),
  actions: {
    async startAnalysis(analysisData) {
      // 手动创建一个Span来包裹整个用户操作
      const span = tracer.startSpan('user-clicks-start-analysis-button');
      
      // 在这个Span的上下文中执行后续操作
      await context.with(trace.setSpan(context.active(), span), async () => {
        try {
          this.status = 'loading';
          span.addEvent('Analysis process started from UI');

          // FetchInstrumentation会自动从当前上下文中获取span信息
          // 并将traceparent header注入到下面的fetch请求中
          const response = await fetch(`${import.meta.env.VITE_API_BASE_URL}/analyze`, {
            method: 'POST',
            headers: {
              'Content-Type': 'application/json',
            },
            body: JSON.stringify(analysisData),
          });

          if (!response.ok) {
            throw new Error('API request failed');
          }

          const result = await response.json();
          this.taskId = result.task_id;
          this.status = 'submitted';
          span.setAttribute('app.task.id', result.task_id);

        } catch (e) {
          this.error = e.message;
          this.status = 'error';
          span.recordException(e);
          span.setStatus({ code: trace.SpanStatusCode.ERROR, message: e.message });
        } finally {
          span.end();
        }
      });
    },
  },
});

这个实现中,FetchInstrumentation是魔法的关键。它会自动将user-clicks-start-analysis-button这个Span的上下文信息(Trace ID, Span ID)打包成traceparent Header,附加到发往/analyze的请求上。后端FastAPIInstrumentor收到后,会解析这个Header,创建一个子Span,从而将前后端链路连接起来。

第四步:用CircleCI验证链路完整性

代码写完,如何保证这套复杂的追踪系统在未来不会因为某次代码重构而失效?答案是自动化测试。我们要在CI/CD流程中加入一个集成测试,它能端到端地验证追踪链路的完整性。

我们将使用CircleCI和Docker Compose来搭建一个完整的测试环境,包括我们的应用、数据库、Celery,以及一个追踪数据收集器Jaeger。

4.1. Docker Compose配置测试环境

# file: docker-compose.ci.yml
version: '3.8'
services:
  postgres:
    image: postgres:14
    environment:
      - POSTGRES_USER=testuser
      - POSTGRES_PASSWORD=testpassword
      - POSTGRES_DB=testdb
    ports:
      - "5432:5432"

  redis:
    image: redis:7
    ports:
      - "6379:6379"

  jaeger:
    image: jaegertracing/all-in-one:1.35
    ports:
      - "4317:4317"  # OTLP gRPC endpoint
      - "16686:16686" # Jaeger UI

  api:
    build: .
    command: uvicorn my_project.main:app --host 0.0.0.0 --port 8000
    depends_on:
      - redis
      - postgres
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
    ports:
      - "8000:8000"

  worker:
    build: .
    command: celery -A my_project.celery_app worker --loglevel=info
    depends_on:
      - redis
      - postgres
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317

4.2. CircleCI配置

现在,我们来编写.circleci/config.yml。核心是一个integration-test作业,它会启动上述Docker Compose环境,运行一个测试脚本,该脚本会调用API,然后去查询Jaeger的API,断言追踪数据是否符合预期。

# file: .circleci/config.yml
version: 2.1
orbs:
  python: circleci/[email protected]
jobs:
  build-and-test:
    docker:
      - image: cimg/python:3.10
    steps:
      - checkout
      - python/install-packages:
          pkg-manager: pip
      - run:
          name: Run Unit Tests
          command: pytest tests/unit

  integration-test:
    machine:
      image: ubuntu-2004:202111-01
    steps:
      - checkout
      - run:
          name: Install Docker Compose
          command: |
            sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
            sudo chmod +x /usr/local/bin/docker-compose
      - run:
          name: Start Environment
          command: docker-compose -f docker-compose.ci.yml up -d
      - run:
          name: Wait for services to be ready
          command: |
            echo "Waiting for Jaeger..."
            sleep 10 # 在真实CI中应使用更健壮的等待脚本
            echo "Waiting for API..."
            docker-compose -f docker-compose.ci.yml run --rm api /bin/bash -c "pip install requests && python tests/integration/wait_for_api.py"
      - run:
          name: Run Tracing E2E Test
          command: docker-compose -f docker-compose.ci.yml run --rm api /bin/bash -c "pip install pytest requests && pytest tests/integration/test_tracing.py"

workflows:
  main:
    jobs:
      - build-and-test
      - integration-test:
          requires:
            - build-and-test

4.3. 编写集成测试脚本

这是验证工作的最后一步,也是最重要的一步。这个pytest测试会:

  1. 向我们的API发送一个请求来触发Celery任务。
  2. 等待一段时间,确保任务已完成。
  3. 通过HTTP请求查询Jaeger的API,获取刚才生成的Trace。
  4. 断言Trace的结构是否正确:是否包含来自api-servicecelery-worker-service的Spans,以及数据库查询的Span。
# file: tests/integration/test_tracing.py
import requests
import time
import os
import pytest

API_URL = "http://api:8000"
JAEGER_API_URL = "http://jaeger:16686/api/traces"

def test_full_trace_propagation():
    # 1. 触发API,它会启动一个Celery任务
    response = requests.post(f"{API_URL}/analyze", json={"user": "ci-test"})
    assert response.status_code == 200
    task_id = response.json()["task_id"]

    # 2. 等待足够的时间让Celery任务完成并上报trace
    # 在生产级CI中,这里应该轮询任务结果,而不是固定等待
    time.sleep(5)

    # 3. 从Jaeger查询这个服务的traces
    params = {'service': 'api-service', 'limit': 20}
    response = requests.get(JAEGER_API_URL, params=params)
    assert response.status_code == 200
    traces_data = response.json()['data']
    
    assert len(traces_data) > 0, "No traces found in Jaeger for api-service"

    # 寻找与我们任务相关的trace
    target_trace = None
    for trace in traces_data:
        for span in trace['spans']:
            for tag in span['tags']:
                if tag['key'] == 'http.target' and tag['value'] == '/analyze':
                    target_trace = trace
                    break
        if target_trace:
            break

    assert target_trace is not None, "Could not find trace for the /analyze request"

    # 4. 断言trace的结构
    processes = target_trace['processes']
    spans = target_trace['spans']

    # 检查是否包含来自API和Celery Worker的span
    services_found = {p['serviceName'] for p in processes.values()}
    assert "api-service" in services_found
    assert "celery-worker-service" in services_found, f"Services found: {services_found}"
    
    # 检查关键Span是否存在
    span_names = {s['operationName'] for s in spans}
    assert "POST /analyze" in span_names # FastAPI Span
    assert "run_data_analysis" in span_names # Celery Task Span
    assert "run_data_analysis_task" in span_names # Custom Span within task
    
    # 检查是否有数据库Span,它的名字通常是SQL命令
    db_span_found = any(s['operationName'].startswith("INSERT") or s['operationName'].startswith("SELECT") for s in spans)
    assert db_span_found, f"No DB span found. Spans available: {span_names}"

    print("Trace validation successful!")

现在,每次代码提交到CircleCI,我们都会自动验证这套复杂的分布式追踪系统是否依然健壮,极大地增强了我们对系统可观测性的信心。

局限与未来路径

这套方案虽然解决了核心的上下文传递问题,但在生产环境中还需考虑更多。首先是采样率,全量追踪在高流量下成本过高,需要配置动态采样策略,比如只对错误链路或慢链路进行全量采样。其次,当前的方案依赖于OpenTelemetry提供的自动埋点库,如果项目中使用了某些冷门的、没有现成埋点库的组件,就需要手动创建和传递上下文,工作量会增加。

未来的优化方向可以是在CircleCI的集成测试中引入对Span属性(tags/attributes)的更精细断言,比如检查数据库Span中是否包含了正确的db.statement。此外,还可以将追踪数据与Metrics(如Prometheus)和Logs进行关联,当收到一个告警时,可以直接从Metrics仪表盘下钻到对应的Trace,再关联到具体的错误日志,形成一个完整的、立体的可观测性体系。


  目录