构建从Jetpack Compose到OpenFaaS经由WebSocket的全链路追踪系统


在一个由原生客户端、实时通信层和无服务器后端构成的系统中,定位性能瓶颈或错误根源是一项艰巨的任务。当用户在 Jetpack Compose 构建的 Android 应用上报告一个操作出现延迟时,问题可能出在客户端渲染、网络传输、WebSocket 网关处理,或是后端的 OpenFaaS 函数执行。各个组件的日志是孤立的,无法形成一个完整的调用链。我们需要一种方法来端到端地追踪单个请求的生命周期,从用户点击屏幕的那一刻,直到 serverless 函数处理完毕。

我们的目标是利用 OpenTelemetry 实现一个完整的调用链,将 Jetpack Compose 客户端、一个中间 WebSocket 网关以及最终的 OpenFaaS 函数串联起来,并将追踪数据汇集到 Jaeger 中进行可视化。核心挑战在于,如何在协议本身不支持元数据传递的 WebSockets 上,可靠地传递追踪上下文(Trace Context)。

架构概览与技术选型决策

整个系统的流程被设计为:

  1. 客户端 (Jetpack Compose): 用户在 Android 应用上触发一个动作。应用内的 OpenTelemetry SDK 创建一个根 Span,并将 Trace Context 注入到一个即将通过 WebSocket 发送的 JSON 载荷中。
  2. WebSocket 网关 (Go): 一个 Go 语言实现的中间件,负责维护与客户端的 WebSocket 长连接。它从收到的 JSON 载荷中提取 Trace Context,创建一个子 Span,然后将该 Context 注入到调用 OpenFaaS 函数的 HTTP 请求头中(遵循 W3C Trace Context 标准)。
  3. 后端 (OpenFaaS Function): Python 编写的函数。OpenFaaS 网关和 OpenTelemetry Python SDK 会自动从 HTTP 请求头中解析 Trace Context,无缝地继续这个调用链,并创建自己的业务逻辑 Span。

这个架构中的技术选型并非偶然。Jetpack Compose 提供了现代化的 UI 构建方式,非常适合数据驱动的实时视图。WebSockets 是实现客户端与后端低延迟、双向通信的不二之选。OpenFaaS 则提供了事件驱动、自动伸缩的计算能力,避免了为处理不频繁的请求而维护常驻服务的成本。Jaeger 作为 CNCF 的毕业项目,是分布式追踪领域成熟的解决方案。

sequenceDiagram
    participant Client as Jetpack Compose App
    participant Gateway as WebSocket Gateway (Go)
    participant OpenFaaS as OpenFaaS Gateway
    participant Function as Python Function
    participant Jaeger as Jaeger Collector

    Client->>+Gateway: WebSocket Connect
    Note over Client: User Action (e.g., Button Click)
    Client->>Client: OTel SDK: Start Root Span (Span A)
    Client->>Client: Inject TraceContext into JSON payload
    Client->>Gateway: Send WebSocket Message with TraceContext
    Gateway->>Gateway: Extract TraceContext from payload
    Gateway->>Gateway: OTel SDK: Start Child Span (Span B), parent=A
    Gateway->>Gateway: Inject TraceContext into HTTP Headers
    Gateway->>+OpenFaaS: Invoke Function (HTTP POST)
    OpenFaaS->>+Function: Forward Request with Headers
    Function->>Function: OTel SDK: Auto-extract Context, Start Child Span (Span C), parent=B
    Note over Function: Business Logic Execution
    Function->>Jaeger: Export Span C
    Function-->>-OpenFaaS: Return Result
    OpenFaaS-->>-Gateway: Return Result
    Gateway->>Jaeger: Export Span B
    Gateway->>Client: Push Result via WebSocket
    Client->>Jaeger: Export Span A

第一步:在 Jetpack Compose 客户端埋点

在 Android 客户端上集成 OpenTelemetry 是整个链路的起点。我们需要捕获用户交互,并确保 Trace Context 能被正确生成和传递。

1. 依赖配置

首先,在 build.gradle.kts 中添加 OpenTelemetry 相关的依赖。我们需要核心 API、SDK 以及用于将数据导出到 Jaeger 的 OTLP (OpenTelemetry Protocol) exporter。

// app/build.gradle.kts

dependencies {
    // OpenTelemetry Core
    implementation("io.opentelemetry:opentelemetry-api:1.28.0")
    implementation("io.opentelemetry:opentelemetry-sdk:1.28.0")

    // OpenTelemetry Exporter for OTLP (to Jaeger)
    implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.28.0")

    // OkHttp instrumentation for potential REST calls
    implementation("io.opentelemetry.instrumentation:opentelemetry-okhttp-3.0:1.28.0-alpha")
    
    // For WebSocket client
    implementation("com.squareup.okhttp3:okhttp:4.11.0")
}

2. 初始化 OpenTelemetry SDK

在应用的 Application 类中,我们需要配置并初始化一个全局的 OpenTelemetry 实例。这里的关键是配置 SpanExporter,它决定了追踪数据被发送到哪里。在真实项目中,Jaeger Collector 的地址应当从配置中读取。

// TracingSdk.kt
package com.example.tracingapp.tracing

import android.content.Context
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import java.util.concurrent.TimeUnit

object TracingSdk {

    private const val JAEGER_HOST = "10.0.2.2" // Emulator's alias for host machine
    private const val JAEGER_PORT = 4317 // OTLP GRPC port
    private const val SERVICE_NAME = "jetpack-compose-client"

    private lateinit var openTelemetry: OpenTelemetry

    fun initialize(context: Context) {
        if (::openTelemetry.isInitialized) return

        val resource = Resource.create(
            Attributes.of(ResourceAttributes.SERVICE_NAME, SERVICE_NAME)
        )

        val spanExporter = OtlpGrpcSpanExporter.builder()
            .setEndpoint("http://$JAEGER_HOST:$JAEGER_PORT")
            .setTimeout(2, TimeUnit.SECONDS)
            .build()

        val spanProcessor = BatchSpanProcessor.builder(spanExporter)
            .setScheduleDelay(100, TimeUnit.MILLISECONDS)
            .build()

        val tracerProvider = SdkTracerProvider.builder()
            .addSpanProcessor(spanProcessor)
            .setResource(resource)
            .build()

        openTelemetry = OpenTelemetrySdk.builder()
            .setTracerProvider(tracerProvider)
            .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
            .buildAndRegisterGlobal()
    }

    fun getTracer(instrumentationName: String): io.opentelemetry.api.trace.Tracer {
        return openTelemetry.getTracer(instrumentationName)
    }

    fun getPropagator(): ContextPropagators {
        return openTelemetry.propagators
    }
}

3. 注入 Trace Context 到 WebSocket 消息

这是最关键的一步。W3C Trace Context 通常通过 HTTP Header(traceparent)传递,但 WebSockets 没有标准化的 Header 机制用于传递此类元数据。一个务实的做法是将其嵌入到我们应用层的消息载荷中。

我们定义一个 TextMapSetter,它知道如何将 traceparent 写入一个 MutableMap<String, String>,这个 map 稍后会被序列化为 JSON 的一部分。

// WebSocketTraceContext.kt
package com.example.tracingapp.tracing

import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapSetter

// Custom setter to inject context into a mutable map, which will become part of our JSON payload.
object WebSocketTraceContext {

    val setter = TextMapSetter<MutableMap<String, String>> { carrier, key, value ->
        carrier?.set(key, value)
    }

    fun inject(context: Context, carrier: MutableMap<String, String>) {
        TracingSdk.getPropagator().getTextMapPropagator().inject(context, carrier, setter)
    }
}

现在,在 ViewModel 或 Service 层,当我们要发送 WebSocket 消息时,我们先创建一个 Span,然后利用上面的工具注入上下文。

// MainViewModel.kt
class MainViewModel : ViewModel() {
    private val tracer = TracingSdk.getTracer("com.example.tracingapp.main")
    private val webSocketClient = WebSocketClient() // Assuming this class handles WebSocket connection

    fun sendAnalysisRequest(logLine: String) {
        val span = tracer.spanBuilder("client.sendAnalysisRequest").startSpan()

        // Make the new span the current one in the scope
        span.makeCurrent().use {
            try {
                // The carrier for our trace context
                val traceContextCarrier = mutableMapOf<String, String>()
                
                // Inject the current context (which includes info from 'span') into the carrier
                WebSocketTraceContext.inject(Context.current(), traceContextCarrier)

                val messagePayload = mapOf(
                    "action" to "analyze",
                    "data" to logLine,
                    "trace" to traceContextCarrier // Embed the context
                )
                
                // Serialize and send
                val jsonMessage = Gson().toJson(messagePayload)
                webSocketClient.send(jsonMessage)
                span.setAttribute("message.payload.size", jsonMessage.length.toLong())

            } catch (e: Exception) {
                span.setStatus(StatusCode.ERROR, "Failed to send WebSocket message")
                span.recordException(e)
                throw e
            } finally {
                span.end()
            }
        }
    }
}

第二步:WebSocket 网关的实现与上下文传递

网关是承上启下的枢纽。它必须能解析客户端发来的自定义载荷,提取 Trace Context,并将其转换为标准的 HTTP Header 格式,以便下游的 OpenFaaS 函数能够识别。我们使用 Go 语言和 gorilla/websocket 库来实现。

1. 依赖与 SDK 初始化

首先获取 Go 相关的依赖。

go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/trace
go get go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc
go get github.com/gorilla/websocket

接着,初始化 Go 版本的 OpenTelemetry SDK。逻辑与 Android 端类似。

// tracing/tracing.go
package tracing

import (
	"context"
	"log"

	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func InitTracer(serviceName, jaegerEndpoint string) (*sdktrace.TracerProvider, error) {
	exporter, err := otlptracegrpc.New(context.Background(),
		otlptracegrpc.WithInsecure(),
		otlptracegrpc.WithEndpoint(jaegerEndpoint),
	)
	if err != nil {
		return nil, err
	}

	res, err := resource.Merge(
		resource.Default(),
		resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceNameKey.String(serviceName),
		),
	)
	if err != nil {
		return nil, err
	}

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(res),
	)

	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
		propagation.TraceContext{}, // W3C Trace Context
		propagation.Baggage{},
	))

	log.Println("Tracer initialized")
	return tp, nil
}

2. 提取、创建并注入 Span

在处理 WebSocket 消息的 handler 中,我们将执行上下文的提取与注入。

首先,定义一个 TextMapCarrier 来帮助 OpenTelemetry SDK 从 map[string]string 中读取 traceparent

// tracing/propagator.go
package tracing

// TextMapCarrier implements the TextMapCarrier interface for map[string]string.
type TextMapCarrier map[string]string

// Get returns the value associated with the given key.
func (c TextMapCarrier) Get(key string) string {
	return c[key]
}

// Set sets the value for the given key.
func (c TextMapCarrier) Set(key, value string) {
	c[key] = value
}

// Keys returns a slice of all keys in the carrier.
func (c TextMapCarrier) Keys() []string {
	keys := make([]string, 0, len(c))
	for k := range c {
		keys = append(keys, k)
	}
	return keys
}

然后,在主逻辑中,我们反序列化 JSON,使用 TextMapCarrier 提取父 Span 的上下文,创建新的子 Span,最后在调用 OpenFaaS 时再将上下文注入到 HTTP 请求头中。

// main.go
package main

import (
	"bytes"
	"context"
	"encoding/json"
	"log"
	"net/http"

	"github.com/gorilla/websocket"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/trace"
	
	"./tracing" // assuming local tracing package
)

// Represents the incoming WebSocket message structure
type WsMessage struct {
	Action string            `json:"action"`
	Data   string            `json:"data"`
	Trace  map[string]string `json:"trace"`
}

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool { return true },
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("Upgrade error:", err)
		return
	}
	defer conn.Close()

	tracer := otel.Tracer("ws-gateway")
	propagator := otel.GetTextMapPropagator()

	for {
		_, message, err := conn.ReadMessage()
		if err != nil {
			log.Println("Read error:", err)
			break
		}
		
		var msg WsMessage
		if err := json.Unmarshal(message, &msg); err != nil {
			log.Println("Unmarshal error:", err)
			continue
		}

		// 1. Extract context from the incoming message payload
		carrier := tracing.TextMapCarrier(msg.Trace)
		parentCtx := propagator.Extract(r.Context(), carrier)

		// 2. Start a new child span
		var span trace.Span
		parentCtx, span = tracer.Start(parentCtx, "gateway.processMessage")
		defer span.End()

		// 3. Invoke OpenFaaS function
		invokeOpenFaaS(parentCtx, msg.Data)
	}
}

func invokeOpenFaaS(ctx context.Context, data string) {
	// Function URL
	faasURL := "http://127.0.0.1:8080/function/log-analyzer"
	
	reqBody, _ := json.Marshal(map[string]string{"log": data})
	req, err := http.NewRequestWithContext(ctx, "POST", faasURL, bytes.NewBuffer(reqBody))
	if err != nil {
		log.Printf("Failed to create request: %v", err)
		return
	}
	req.Header.Set("Content-Type", "application/json")

	// 4. CRITICAL: Inject the context into the outgoing HTTP request headers
	propagator := otel.GetTextMapPropagator()
	propagator.Inject(ctx, propagation.HeaderCarrier(req.Header))

	// Get span from context to record http attributes
	span := trace.SpanFromContext(ctx)
	span.SetAttributes(
		attribute.String("http.method", "POST"),
		attribute.String("http.url", faasURL),
	)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		log.Printf("Failed to invoke function: %v", err)
		span.RecordError(err)
		return
	}
	defer resp.Body.Close()
	
	span.SetAttribute("http.status_code", resp.StatusCode)
	log.Printf("Function returned status: %d", resp.StatusCode)
}

func main() {
	// Initialize tracer
	tp, err := tracing.InitTracer("websocket-gateway", "localhost:4317")
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		if err := tp.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}()
	
	http.HandleFunc("/ws", handleWebSocket)
	log.Println("WebSocket server starting on :8090")
	log.Fatal(http.ListenAndServe(":8090", nil))
}

这里的关键是 propagator.Inject(ctx, propagation.HeaderCarrier(req.Header)),它将当前活动的 Span 上下文序列化为 traceparent Header,并添加到即将发往 OpenFaaS 网关的 HTTP 请求中。

第三步:在 OpenFaaS 函数中继续链路

最后一步是让 OpenFaaS 函数能够识别并继续这个调用链。好消息是,大部分工作 OpenTelemetry SDK 和常见的 Web 框架已经为我们完成了。

1. 函数与依赖

我们需要为 Python 函数创建一个 requirements.txt

# requirements.txt
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
opentelemetry-instrumentation-wsgi
flask

函数 handler.py 的实现非常直接。我们使用 OpenTelemetry 的 WSGI 中间件来自动处理请求的进入,它会检查 traceparent Header 并自动创建子 Span。

# log-analyzer/handler.py
import os
import logging
from flask import Flask, request
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware

# --- Boilerplate OTel Initialization ---
SERVICE_NAME = os.getenv("SERVICE_NAME", "log-analyzer-fn")
JAEGER_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://jaeger-collector.openfaas:4317")

# Configure the tracer provider
resource = Resource(attributes={"service.name": SERVICE_NAME})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer_provider = trace.get_tracer_provider()

# Configure the OTLP exporter
exporter = OTLPSpanExporter(endpoint=JAEGER_ENDPOINT, insecure=True)
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))

tracer = trace.get_tracer(__name__)

# --- Flask App ---
app = Flask(__name__)

# Wrap the Flask app with the OTel middleware
# This middleware automatically extracts context and creates a span for each request.
app.wsgi_app = OpenTelemetryMiddleware(app.wsgi_app)

@app.route('/', methods=['POST'])
def handle():
    # The middleware has already created a parent span for this request.
    # We can get the current span and add attributes or create child spans.
    current_span = trace.get_current_span()

    try:
        data = request.get_json()
        if not data or 'log' not in data:
            current_span.set_attribute("app.error", "InvalidPayload")
            return {"status": "error", "message": "Missing 'log' field"}, 400
        
        log_line = data['log']
        current_span.set_attribute("app.log.length", len(log_line))

        # Create a child span for the actual business logic
        with tracer.start_as_current_span("business_logic.analyze_log") as logic_span:
            logic_span.add_event("Analysis started")
            # In a real scenario, this would be complex analysis.
            # Here we just simulate some work.
            result = "benign"
            if "error" in log_line.lower():
                result = "critical"
                logic_span.set_attribute("app.analysis.result", "critical")
            
            logic_span.add_event("Analysis finished")
            
            return {"status": "success", "analysis_result": result}, 200

    except Exception as e:
        current_span.set_status(trace.Status(trace.StatusCode.ERROR))
        current_span.record_exception(e)
        return {"status": "error", "message": "Internal server error"}, 500

# This part is for local testing, not used by OpenFaaS
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

OpenTelemetryMiddleware 是这里的魔法。它拦截所有进入的 WSGI 请求,查找 traceparent 等 Header,如果存在,就自动创建一个子 Span,并将其设置为当前上下文。因此,我们在 handle 函数内部通过 trace.get_current_span() 获取到的就是这个已经关联好父级的 Span。

最终结果与局限性

通过上述三步,我们成功地将一条从移动端用户操作开始的调用链,穿透了 WebSocket 协议的限制,延伸到了后端的无服务器函数中。在 Jaeger UI 中,我们将看到一个完整的、层次分明的火焰图:

  • client.sendAnalysisRequest (root span from Jetpack Compose)
    • gateway.processMessage (child span from WebSocket Gateway)
      • POST / (child span from OpenFaaS function’s WSGI middleware)
        • business_logic.analyze_log (child span created manually inside the function)

这个方案的价值是巨大的。现在,任何一次操作的端到端延迟都可以被精确度量,任何一个环节的错误都可以被快速定位到具体的 Span 和相关日志。

然而,这个方案并非没有局限性。最主要的一点是我们依赖于在应用层消息载荷中传递追踪上下文。这意味着客户端和服务端必须对 trace 字段的结构达成一致,这是一种协议上的耦合。如果 WebSocket 客户端是第三方提供的,这种方式可能就无法实施。此外,我们没有实现复杂的采样策略。在生产环境中,对 100% 的请求进行追踪成本过高,必须引入基于请求头或尾部的采样机制,这会进一步增加网关和客户端的逻辑复杂性。未来的探索方向可能包括研究新兴的 RPC over WebSocket 协议,看它们是否为元数据传递提供了更标准化的支持。


  目录