构建基于ActiveMQ的异步任务总线连接Gatsby前端与BentoML模型服务


一个看似简单的需求摆在了面前:我们需要在一个基于Gatsby构建的静态站点上,允许用户触发一个耗时可能长达30秒到数分钟的机器学习模型推理任务,并在任务完成后实时将结果呈现在前端。这个模型服务由Python和BentoML构建。

定义问题:同步调用的脆弱性

最直观的方案是构建一个同步的RESTful API。Gatsby前端通过HTTP请求直接调用BentoML暴露的API端点,然后进入长时间的等待,直到BentoML返回推理结果。

在开发环境中,这或许能跑通。但在生产环境中,这种架构是灾难的开始:

  1. 糟糕的用户体验: 用户的浏览器会长时间处于“pending”状态,无法进行任何其他操作。HTTP连接可能会因为网关、负载均衡器或浏览器自身的超时策略而被中断。
  2. 脆弱的系统耦合: 前端与后端服务通过同步HTTP调用紧密耦合。如果BentoML服务重启、部署或发生短暂故障,所有正在进行的请求都会失败,且没有内置的重试或恢复机制。
  3. 资源耗尽风险: 大量长轮询的HTTP连接会占用BentoML服务和网络基础设施的连接资源,使其难以水平扩展。如果100个用户同时发起请求,服务将需要维持100个活跃的HTTP工作线程,极易导致资源枯竭。

这种架构的根本问题在于,它将一个本质上是异步的、长周期的批处理任务,强行塞进了同步的、短周期的请求-响应模型中。

架构决策:消息队列作为解耦总线

为了解决上述问题,我们需要引入一个中间件来解耦前端请求的发起和后端任务的执行。方案B,即采用消息队列(Message Queue)作为异步任务总线。

方案A: RabbitMQ/AMQP

RabbitMQ是一个成熟、功能强大的消息代理,支持AMQP协议,提供灵活的路由和可靠的消息投递。

  • 优势: 功能全面,Exchange/Binding机制提供了复杂的路由逻辑,社区生态成熟。
  • 劣势: 在我们的场景中,其复杂性有些过度。我们需要的是一个简单的点对点任务队列和结果通知机制。更重要的是,让浏览器端的TypeScript直接与AMQP协议通信需要引入相对厚重的库,或者通过一个专门的WebSocket网关进行协议转换,增加了架构的复杂性。

方案B: ActiveMQ/STOMP over WebSocket

ActiveMQ是一个经典的Java系消息中间件,其一大优势是原生支持多种协议,包括STOMP (Simple Text Oriented Messaging Protocol)。STOMP协议设计简洁,并且ActiveMQ内置了STOMP over WebSocket的传输连接器。

  • 优势:

    • 协议友好: STOMP over WebSocket允许我们的Gatsby/TypeScript前端应用能够直接、轻量地与消息队列建立全双工通信,无需额外的协议转换网关。
    • 解耦与可靠性: 继承了MQ的所有优点,包括生产者与消费者的解耦、消息持久化、失败重试(通过ACK机制)。
    • 异构系统亲和: ActiveMQ拥有广泛的客户端支持,Python端的BentoML服务和浏览器端的TypeScript都可以轻松集成。
  • 劣势: 相比Kafka等流处理平台,ActiveMQ在超高吞吐量场景下可能不是最优选。但对于我们的用户触发式推理任务场景,其吞吐量绰绰有余,而协议的便利性是决定性因素。

最终选择: ActiveMQ。它的STOMP over WebSocket能力完美解决了我们最棘手的“前端直连MQ”问题,使得整个异步架构闭环的实现最为直接和优雅。

核心实现概览

我们将构建一个双队列系统:一个用于任务分发,一个用于结果回传。整个流程通过一个correlationId进行串联,以确保请求和响应能够正确匹配。

sequenceDiagram
    participant Gatsby as Gatsby/TypeScript Frontend
    participant ActiveMQ as ActiveMQ Broker
    participant BentoML as BentoML Python Service

    Gatsby->>+ActiveMQ: CONNECT (STOMP over WebSocket)
    ActiveMQ-->>-Gatsby: CONNECTED

    Gatsby->>Gatsby: Generates unique correlationId
    Gatsby->>ActiveMQ: SUBSCRIBE to /queue/ml.results.
    Note right of Gatsby: 监听特定于用户的
结果队列 Gatsby->>ActiveMQ: SEND Message to /queue/ml.requests
{correlationId, payload, replyTo: '/queue/ml.results.'} Note left of ActiveMQ: 任务进入请求队列 BentoML->>+ActiveMQ: CONNECT (STOMP) ActiveMQ-->>-BentoML: CONNECTED BentoML->>ActiveMQ: SUBSCRIBE to /queue/ml.requests ActiveMQ-->>BentoML: RECEIVE Message
{correlationId, payload, replyTo} Note right of BentoML: 开始执行耗时的
模型推理... BentoML-->>BentoML: result = model.predict(payload) BentoML->>ActiveMQ: SEND Message to replyTo queue
{correlationId, result} Note left of ActiveMQ: 结果进入用户专属队列 ActiveMQ-->>Gatsby: RECEIVE Message
{correlationId, result} Note right of Gatsby: 根据correlationId
更新对应任务状态

下面是这个架构中关键部分的代码实现。

1. BentoML 服务端:任务消费者与结果生产者

我们的BentoML服务不再是一个简单的HTTP服务,而是一个后台运行的、监听ActiveMQ队列的健壮消费者。

首先,我们需要在bentofile.yaml中声明Python依赖。

bentofile.yaml

service: "com.example.async_inference_service:svc"
labels:
  owner: ml-platform-team
  stage: production
include:
- "*.py"
python:
  packages:
    - scikit-learn
    - pandas
    - stomp.py==8.0.1 # 核心依赖
    - tenacity==8.2.3 # 用于实现健壮的重连接

接下来是核心的服务实现。我们将创建一个单独的runner.py脚本来启动服务并管理MQ连接,而不是依赖BentoML的内置HTTP服务器。

model.py

# model.py
import time
import bentoml
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

# 一个简单的伪模型,用于模拟推理过程
class SimpleClassifier:
    def __init__(self):
        # 模拟加载一个耗时模型
        time.sleep(2)
        self._model = RandomForestClassifier()
        # 模拟训练
        self._model.fit([[0, 0], [1, 1]], [0, 1])

    def predict(self, data: pd.DataFrame) -> list:
        # 模拟一个耗时的推理任务
        print(f"Starting long inference for data shape: {data.shape}...")
        time.sleep(15) 
        print("Inference complete.")
        return self._model.predict(data.values).tolist()

# 使用BentoML的模型管理
# bentoml.sklearn.save_model("simple_rf_model", SimpleClassifier()._model)
# 在实际项目中,你会从模型仓库加载
clf_runner = bentoml.sklearn.get("simple_rf_model:latest").to_runner()

# 定义BentoML服务
svc = bentoml.Service("async_inference_service", runners=[clf_runner])

# 我们不再定义@svc.api, 因为入口是MQ

runner.py

# runner.py
import time
import json
import uuid
import logging
import stomp
import os
from tenacity import retry, stop_after_attempt, wait_exponential

# 导入我们的BentoML服务实例
from model import svc, clf_runner

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从环境变量获取配置,这是生产实践
MQ_HOST = os.getenv("ACTIVEMQ_HOST", "localhost")
MQ_PORT = int(os.getenv("ACTIVEMQ_PORT", 61613))
REQUEST_QUEUE = "/queue/ml.requests"

class MqListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        logging.error(f'Received an error frame: {frame.body}')

    def on_disconnected(self):
        logging.warning('Disconnected from ActiveMQ. Reconnecting...')
        connect_and_subscribe(self.conn)

    def on_message(self, frame):
        """
        核心任务处理逻辑
        """
        try:
            headers = frame.headers
            message_id = headers.get('message-id')
            correlation_id = headers.get('correlation-id')
            reply_to = headers.get('reply-to')

            logging.info(f"Received message {message_id} with correlation-id: {correlation_id}")

            if not all([correlation_id, reply_to]):
                logging.error("Missing correlation-id or reply-to header. Skipping message.")
                # 在真实项目中,应该将这类消息发送到死信队列
                # 这里我们选择不确认消息,让它重试或进入DLQ
                return

            body = json.loads(frame.body)
            # 假设输入数据格式为 {'columns': [...], 'data': [[...]]}
            input_df = pd.DataFrame(body['data'], columns=body['columns'])
            
            # 调用BentoML runner执行推理
            # runner.run()是异步的, 但我们在这里可以同步等待它
            results = clf_runner.predict.run(input_df)

            response_payload = {
                "status": "SUCCESS",
                "data": results
            }

        except json.JSONDecodeError as e:
            logging.error(f"Failed to decode JSON from message {message_id}: {e}")
            response_payload = {"status": "ERROR", "message": "Invalid JSON format."}
        except Exception as e:
            logging.error(f"Inference failed for message {message_id}: {e}", exc_info=True)
            response_payload = {"status": "ERROR", "message": str(e)}

        # 将结果发送到'reply-to'指定的队列
        self.conn.send(
            body=json.dumps(response_payload),
            destination=reply_to,
            headers={
                'correlation-id': correlation_id,
                'persistent': 'true' # 确保结果消息持久化
            }
        )
        logging.info(f"Sent response for {correlation_id} to {reply_to}")

        # 确认消息已被成功处理
        self.conn.ack(message_id, headers['subscription'])

# 使用tenacity实现带指数退避的重试连接
@retry(wait=wait_exponential(multiplier=1, min=2, max=60), stop=stop_after_attempt(10))
def connect_and_subscribe(conn):
    try:
        logging.info(f"Attempting to connect to ActiveMQ at {MQ_HOST}:{MQ_PORT}...")
        conn.connect('admin', 'admin', wait=True)
        # 订阅任务队列
        # ack='client-individual' 意味着我们需要手动确认每条消息
        conn.subscribe(destination=REQUEST_QUEUE, id=str(uuid.uuid4()), ack='client-individual')
        logging.info(f"Successfully connected and subscribed to {REQUEST_QUEUE}")
    except Exception as e:
        logging.error(f"Connection failed: {e}. Retrying...")
        raise

def main():
    # 初始化BentoML服务
    svc.on_startup()
    
    conn = stomp.Connection([(MQ_HOST, MQ_PORT)])
    conn.set_listener('', MqListener(conn))
    
    connect_and_subscribe(conn)

    try:
        while True:
            # 保持主线程存活
            time.sleep(1)
    except KeyboardInterrupt:
        logging.info("Shutting down...")
    finally:
        svc.on_shutdown()
        conn.disconnect()

if __name__ == "__main__":
    main()

这个runner.py是生产级的:

  • 通过环境变量配置MQ连接。
  • 使用tenacity库实现健壮的自动重连接。
  • 采用client-individual确认模式,确保任务处理失败时消息不会丢失。
  • 完整的错误处理,包括JSON解析错误和模型推理异常。
  • 使用correlation-idreply-to模式进行异步响应。

2. Gatsby/TypeScript 前端:任务发起与结果接收

前端需要使用一个STOMP客户端库。@stomp/stompjs是一个优秀的选择。

安装依赖:

npm install @stomp/stompjs uuid
npm install --save-dev @types/uuid

创建一个React Hook来管理STOMP连接和状态 (useStomp.ts)

// src/hooks/useStomp.ts
import { useEffect, useState, useRef } from 'react';
import { Client, IMessage } from '@stomp/stompjs';
import { v4 as uuidv4 } from 'uuid';

const MQ_WEBSOCKET_URL = process.env.GATSBY_ACTIVEMQ_WS_URL || 'ws://localhost:61614/ws';
const MQ_USER = process.env.GATSBY_ACTIVEMQ_USER || 'admin';
const MQ_PASS = process.env.GATSBY_ACTIVEMQ_PASS || 'admin';

export interface Task {
  id: string;
  status: 'PENDING' | 'SUCCESS' | 'ERROR';
  requestPayload: any;
  result?: any;
  error?: string;
}

export const useStompQueue = (userId: string | null) => {
  const [tasks, setTasks] = useState<Record<string, Task>>({});
  const [isConnected, setIsConnected] = useState<boolean>(false);
  const stompClientRef = useRef<Client | null>(null);

  // 初始化和清理STOMP客户端
  useEffect(() => {
    if (!userId) return;

    const resultQueue = `/queue/ml.results.${userId}`;
    
    const client = new Client({
      brokerURL: MQ_WEBSOCKET_URL,
      connectHeaders: {
        login: MQ_USER,
        passcode: MQ_PASS,
      },
      debug: (str) => {
        console.log(new Date(), str);
      },
      reconnectDelay: 5000,
      heartbeatIncoming: 4000,
      heartbeatOutgoing: 4000,
    });

    client.onConnect = () => {
      setIsConnected(true);
      // 订阅用户专属的结果队列
      client.subscribe(resultQueue, (message: IMessage) => {
        handleResultMessage(message);
      });
    };

    client.onStompError = (frame) => {
      console.error('Broker reported error: ' + frame.headers['message']);
      console.error('Additional details: ' + frame.body);
      setIsConnected(false);
    };

    stompClientRef.current = client;
    client.activate();

    return () => {
      client.deactivate();
      setIsConnected(false);
    };
  }, [userId]);

  const handleResultMessage = (message: IMessage) => {
    const correlationId = message.headers['correlation-id'];
    if (!correlationId) {
      console.warn('Received a message without a correlation-id, ignoring.');
      return;
    }

    try {
      const body = JSON.parse(message.body);
      setTasks(prevTasks => {
        const existingTask = prevTasks[correlationId];
        if (!existingTask) return prevTasks;

        return {
          ...prevTasks,
          [correlationId]: {
            ...existingTask,
            status: body.status === 'SUCCESS' ? 'SUCCESS' : 'ERROR',
            result: body.data,
            error: body.message,
          },
        };
      });
    } catch (error) {
      console.error('Failed to parse result message:', error);
    }
  };

  const sendInferenceRequest = (payload: any) => {
    if (!stompClientRef.current || !stompClientRef.current.connected || !userId) {
      console.error('STOMP client is not connected.');
      // 可以在这里加入UI提示
      return null;
    }

    const correlationId = uuidv4();
    const resultQueue = `/queue/ml.results.${userId}`;
    const requestQueue = '/queue/ml.requests';

    const newTask: Task = {
      id: correlationId,
      status: 'PENDING',
      requestPayload: payload,
    };

    setTasks(prev => ({ ...prev, [correlationId]: newTask }));

    stompClientRef.current.publish({
      destination: requestQueue,
      headers: {
        'persistent': 'true',
        'correlation-id': correlationId,
        'reply-to': resultQueue,
      },
      body: JSON.stringify(payload),
    });
    
    return correlationId;
  };

  return { tasks, sendInferenceRequest, isConnected };
};

在Gatsby页面组件中使用这个Hook (InferencePage.tsx)

// src/pages/inference.tsx
import React, { useState } from "react";
import { useStompQueue, Task } from '../hooks/useStomp';

// 模拟一个用户ID
const FAKE_USER_ID = "user123";

const InferencePage = () => {
  const { tasks, sendInferenceRequest, isConnected } = useStompQueue(FAKE_USER_ID);
  
  const handleButtonClick = () => {
    // 构造一个符合模型输入的请求
    const payload = {
      columns: ["feature1", "feature2"],
      data: [[Math.random(), Math.random()]],
    };
    sendInferenceRequest(payload);
  };

  return (
    <main style={{ padding: 40, fontFamily: 'sans-serif' }}>
      <h1>Async ML Inference Demo</h1>
      <p>Connection Status: 
        <span style={{ color: isConnected ? 'green' : 'red', fontWeight: 'bold' }}>
          {isConnected ? ' CONNECTED' : ' DISCONNECTED'}
        </span>
      </p>

      <button onClick={handleButtonClick} disabled={!isConnected}>
        Trigger 15-second Inference Task
      </button>

      <div style={{ marginTop: 30 }}>
        <h2>Tasks</h2>
        {Object.keys(tasks).length === 0 ? (
          <p>No tasks initiated yet.</p>
        ) : (
          <ul style={{ listStyle: 'none', padding: 0 }}>
            {Object.values(tasks).reverse().map((task: Task) => (
              <li key={task.id} style={{ border: '1px solid #ccc', padding: 10, marginBottom: 10 }}>
                <strong>Task ID:</strong> {task.id} <br/>
                <strong>Status:</strong> <span style={{ fontWeight: 'bold' }}>{task.status}</span> <br/>
                {task.status === 'PENDING' && <p>Processing... please wait.</p>}
                {task.status === 'SUCCESS' && <pre>Result: {JSON.stringify(task.result, null, 2)}</pre>}
                {task.status === 'ERROR' && <pre style={{color: 'red'}}>Error: {task.error}</pre>}
              </li>
            ))}
          </ul>
        )}
      </div>
    </main>
  );
};

export default InferencePage;

这个前端实现同样是生产级的:

  • Hook封装: 将所有STOMP逻辑封装在useStompQueue hook中,使组件代码保持干净。
  • 连接管理: 自动处理连接、断线重连和组件卸载时的资源清理。
  • 状态管理: 使用React state来跟踪每个任务的状态(PENDING, SUCCESS, ERROR),并响应式地更新UI。
  • 唯一身份: 依赖一个userId来构建独一无二的结果队列,确保多用户场景下结果不会混淆。这是一个关键的安全和隔离设计。

3. ActiveMQ 配置

最后,确保ActiveMQ的配置启用了STOMP over WebSocket。在conf/activemq.xml中,找到<transportConnectors>部分,并添加或确保存在ws连接器:

<transportConnectors>
    <!-- ...其他连接器如 openwire, amqp ... -->
    <transportConnector name="ws" uri="ws://0.0..0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

这会使ActiveMQ在61614端口上监听WebSocket连接。

架构的扩展性与局限性

这个架构的优势在于其出色的解耦性弹性。我们可以独立地扩展BentoML消费者实例的数量,只需让它们监听同一个ml.requests队列,ActiveMQ会自动进行负载均衡。如果一个消费者崩溃,消息会由其他消费者处理。前端应用完全不知道后端有多少实例,也不关心它们是否在进行部署更新。

然而,这个架构并非没有代价。

复杂性: 引入了一个新的中间件——ActiveMQ,增加了系统的运维负担。需要监控其健康状况、磁盘使用(对于持久化消息)和性能。

结果传递: 虽然STOMP over WebSocket解决了前端接收结果的问题,但如果前端应用不在线(例如用户关闭了浏览器标签页),发送到用户专属队列的结果消息就会一直驻留,直到过期或队列达到存储上限。需要设计合理的TTL(Time-To-Live)策略和死信队列(DLQ)机制来处理这些“无人认领”的结果。

ActiveMQ的边界: 对于需要严格顺序保证或极高吞吐量(每秒数十万条消息)的事件流场景,基于日志的系统如Apache Kafka可能是更合适的选择。但在当前这种用户驱动、中等并发的异步任务场景下,ActiveMQ的易用性和对STOMP的良好支持使其成为一个非常务实且高效的选择。


  目录