一个看似简单的需求摆在了面前:我们需要在一个基于Gatsby构建的静态站点上,允许用户触发一个耗时可能长达30秒到数分钟的机器学习模型推理任务,并在任务完成后实时将结果呈现在前端。这个模型服务由Python和BentoML构建。
定义问题:同步调用的脆弱性
最直观的方案是构建一个同步的RESTful API。Gatsby前端通过HTTP请求直接调用BentoML暴露的API端点,然后进入长时间的等待,直到BentoML返回推理结果。
在开发环境中,这或许能跑通。但在生产环境中,这种架构是灾难的开始:
- 糟糕的用户体验: 用户的浏览器会长时间处于“pending”状态,无法进行任何其他操作。HTTP连接可能会因为网关、负载均衡器或浏览器自身的超时策略而被中断。
- 脆弱的系统耦合: 前端与后端服务通过同步HTTP调用紧密耦合。如果BentoML服务重启、部署或发生短暂故障,所有正在进行的请求都会失败,且没有内置的重试或恢复机制。
- 资源耗尽风险: 大量长轮询的HTTP连接会占用BentoML服务和网络基础设施的连接资源,使其难以水平扩展。如果100个用户同时发起请求,服务将需要维持100个活跃的HTTP工作线程,极易导致资源枯竭。
这种架构的根本问题在于,它将一个本质上是异步的、长周期的批处理任务,强行塞进了同步的、短周期的请求-响应模型中。
架构决策:消息队列作为解耦总线
为了解决上述问题,我们需要引入一个中间件来解耦前端请求的发起和后端任务的执行。方案B,即采用消息队列(Message Queue)作为异步任务总线。
方案A: RabbitMQ/AMQP
RabbitMQ是一个成熟、功能强大的消息代理,支持AMQP协议,提供灵活的路由和可靠的消息投递。
- 优势: 功能全面,Exchange/Binding机制提供了复杂的路由逻辑,社区生态成熟。
- 劣势: 在我们的场景中,其复杂性有些过度。我们需要的是一个简单的点对点任务队列和结果通知机制。更重要的是,让浏览器端的TypeScript直接与AMQP协议通信需要引入相对厚重的库,或者通过一个专门的WebSocket网关进行协议转换,增加了架构的复杂性。
方案B: ActiveMQ/STOMP over WebSocket
ActiveMQ是一个经典的Java系消息中间件,其一大优势是原生支持多种协议,包括STOMP (Simple Text Oriented Messaging Protocol)。STOMP协议设计简洁,并且ActiveMQ内置了STOMP over WebSocket的传输连接器。
优势:
- 协议友好: STOMP over WebSocket允许我们的Gatsby/TypeScript前端应用能够直接、轻量地与消息队列建立全双工通信,无需额外的协议转换网关。
- 解耦与可靠性: 继承了MQ的所有优点,包括生产者与消费者的解耦、消息持久化、失败重试(通过ACK机制)。
- 异构系统亲和: ActiveMQ拥有广泛的客户端支持,Python端的BentoML服务和浏览器端的TypeScript都可以轻松集成。
劣势: 相比Kafka等流处理平台,ActiveMQ在超高吞吐量场景下可能不是最优选。但对于我们的用户触发式推理任务场景,其吞吐量绰绰有余,而协议的便利性是决定性因素。
最终选择: ActiveMQ。它的STOMP over WebSocket能力完美解决了我们最棘手的“前端直连MQ”问题,使得整个异步架构闭环的实现最为直接和优雅。
核心实现概览
我们将构建一个双队列系统:一个用于任务分发,一个用于结果回传。整个流程通过一个correlationId
进行串联,以确保请求和响应能够正确匹配。
sequenceDiagram participant Gatsby as Gatsby/TypeScript Frontend participant ActiveMQ as ActiveMQ Broker participant BentoML as BentoML Python Service Gatsby->>+ActiveMQ: CONNECT (STOMP over WebSocket) ActiveMQ-->>-Gatsby: CONNECTED Gatsby->>Gatsby: Generates unique correlationId Gatsby->>ActiveMQ: SUBSCRIBE to /queue/ml.results.Note right of Gatsby: 监听特定于用户的
结果队列 Gatsby->>ActiveMQ: SEND Message to /queue/ml.requests
{correlationId, payload, replyTo: '/queue/ml.results.'} Note left of ActiveMQ: 任务进入请求队列 BentoML->>+ActiveMQ: CONNECT (STOMP) ActiveMQ-->>-BentoML: CONNECTED BentoML->>ActiveMQ: SUBSCRIBE to /queue/ml.requests ActiveMQ-->>BentoML: RECEIVE Message
{correlationId, payload, replyTo} Note right of BentoML: 开始执行耗时的
模型推理... BentoML-->>BentoML: result = model.predict(payload) BentoML->>ActiveMQ: SEND Message to replyTo queue
{correlationId, result} Note left of ActiveMQ: 结果进入用户专属队列 ActiveMQ-->>Gatsby: RECEIVE Message
{correlationId, result} Note right of Gatsby: 根据correlationId
更新对应任务状态
下面是这个架构中关键部分的代码实现。
1. BentoML 服务端:任务消费者与结果生产者
我们的BentoML服务不再是一个简单的HTTP服务,而是一个后台运行的、监听ActiveMQ队列的健壮消费者。
首先,我们需要在bentofile.yaml
中声明Python依赖。
bentofile.yaml
service: "com.example.async_inference_service:svc"
labels:
owner: ml-platform-team
stage: production
include:
- "*.py"
python:
packages:
- scikit-learn
- pandas
- stomp.py==8.0.1 # 核心依赖
- tenacity==8.2.3 # 用于实现健壮的重连接
接下来是核心的服务实现。我们将创建一个单独的runner.py
脚本来启动服务并管理MQ连接,而不是依赖BentoML的内置HTTP服务器。
model.py
# model.py
import time
import bentoml
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
# 一个简单的伪模型,用于模拟推理过程
class SimpleClassifier:
def __init__(self):
# 模拟加载一个耗时模型
time.sleep(2)
self._model = RandomForestClassifier()
# 模拟训练
self._model.fit([[0, 0], [1, 1]], [0, 1])
def predict(self, data: pd.DataFrame) -> list:
# 模拟一个耗时的推理任务
print(f"Starting long inference for data shape: {data.shape}...")
time.sleep(15)
print("Inference complete.")
return self._model.predict(data.values).tolist()
# 使用BentoML的模型管理
# bentoml.sklearn.save_model("simple_rf_model", SimpleClassifier()._model)
# 在实际项目中,你会从模型仓库加载
clf_runner = bentoml.sklearn.get("simple_rf_model:latest").to_runner()
# 定义BentoML服务
svc = bentoml.Service("async_inference_service", runners=[clf_runner])
# 我们不再定义@svc.api, 因为入口是MQ
runner.py
# runner.py
import time
import json
import uuid
import logging
import stomp
import os
from tenacity import retry, stop_after_attempt, wait_exponential
# 导入我们的BentoML服务实例
from model import svc, clf_runner
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 从环境变量获取配置,这是生产实践
MQ_HOST = os.getenv("ACTIVEMQ_HOST", "localhost")
MQ_PORT = int(os.getenv("ACTIVEMQ_PORT", 61613))
REQUEST_QUEUE = "/queue/ml.requests"
class MqListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
logging.error(f'Received an error frame: {frame.body}')
def on_disconnected(self):
logging.warning('Disconnected from ActiveMQ. Reconnecting...')
connect_and_subscribe(self.conn)
def on_message(self, frame):
"""
核心任务处理逻辑
"""
try:
headers = frame.headers
message_id = headers.get('message-id')
correlation_id = headers.get('correlation-id')
reply_to = headers.get('reply-to')
logging.info(f"Received message {message_id} with correlation-id: {correlation_id}")
if not all([correlation_id, reply_to]):
logging.error("Missing correlation-id or reply-to header. Skipping message.")
# 在真实项目中,应该将这类消息发送到死信队列
# 这里我们选择不确认消息,让它重试或进入DLQ
return
body = json.loads(frame.body)
# 假设输入数据格式为 {'columns': [...], 'data': [[...]]}
input_df = pd.DataFrame(body['data'], columns=body['columns'])
# 调用BentoML runner执行推理
# runner.run()是异步的, 但我们在这里可以同步等待它
results = clf_runner.predict.run(input_df)
response_payload = {
"status": "SUCCESS",
"data": results
}
except json.JSONDecodeError as e:
logging.error(f"Failed to decode JSON from message {message_id}: {e}")
response_payload = {"status": "ERROR", "message": "Invalid JSON format."}
except Exception as e:
logging.error(f"Inference failed for message {message_id}: {e}", exc_info=True)
response_payload = {"status": "ERROR", "message": str(e)}
# 将结果发送到'reply-to'指定的队列
self.conn.send(
body=json.dumps(response_payload),
destination=reply_to,
headers={
'correlation-id': correlation_id,
'persistent': 'true' # 确保结果消息持久化
}
)
logging.info(f"Sent response for {correlation_id} to {reply_to}")
# 确认消息已被成功处理
self.conn.ack(message_id, headers['subscription'])
# 使用tenacity实现带指数退避的重试连接
@retry(wait=wait_exponential(multiplier=1, min=2, max=60), stop=stop_after_attempt(10))
def connect_and_subscribe(conn):
try:
logging.info(f"Attempting to connect to ActiveMQ at {MQ_HOST}:{MQ_PORT}...")
conn.connect('admin', 'admin', wait=True)
# 订阅任务队列
# ack='client-individual' 意味着我们需要手动确认每条消息
conn.subscribe(destination=REQUEST_QUEUE, id=str(uuid.uuid4()), ack='client-individual')
logging.info(f"Successfully connected and subscribed to {REQUEST_QUEUE}")
except Exception as e:
logging.error(f"Connection failed: {e}. Retrying...")
raise
def main():
# 初始化BentoML服务
svc.on_startup()
conn = stomp.Connection([(MQ_HOST, MQ_PORT)])
conn.set_listener('', MqListener(conn))
connect_and_subscribe(conn)
try:
while True:
# 保持主线程存活
time.sleep(1)
except KeyboardInterrupt:
logging.info("Shutting down...")
finally:
svc.on_shutdown()
conn.disconnect()
if __name__ == "__main__":
main()
这个runner.py
是生产级的:
- 通过环境变量配置MQ连接。
- 使用
tenacity
库实现健壮的自动重连接。 - 采用
client-individual
确认模式,确保任务处理失败时消息不会丢失。 - 完整的错误处理,包括JSON解析错误和模型推理异常。
- 使用
correlation-id
和reply-to
模式进行异步响应。
2. Gatsby/TypeScript 前端:任务发起与结果接收
前端需要使用一个STOMP客户端库。@stomp/stompjs
是一个优秀的选择。
安装依赖:
npm install @stomp/stompjs uuid
npm install --save-dev @types/uuid
创建一个React Hook来管理STOMP连接和状态 (useStomp.ts
)
// src/hooks/useStomp.ts
import { useEffect, useState, useRef } from 'react';
import { Client, IMessage } from '@stomp/stompjs';
import { v4 as uuidv4 } from 'uuid';
const MQ_WEBSOCKET_URL = process.env.GATSBY_ACTIVEMQ_WS_URL || 'ws://localhost:61614/ws';
const MQ_USER = process.env.GATSBY_ACTIVEMQ_USER || 'admin';
const MQ_PASS = process.env.GATSBY_ACTIVEMQ_PASS || 'admin';
export interface Task {
id: string;
status: 'PENDING' | 'SUCCESS' | 'ERROR';
requestPayload: any;
result?: any;
error?: string;
}
export const useStompQueue = (userId: string | null) => {
const [tasks, setTasks] = useState<Record<string, Task>>({});
const [isConnected, setIsConnected] = useState<boolean>(false);
const stompClientRef = useRef<Client | null>(null);
// 初始化和清理STOMP客户端
useEffect(() => {
if (!userId) return;
const resultQueue = `/queue/ml.results.${userId}`;
const client = new Client({
brokerURL: MQ_WEBSOCKET_URL,
connectHeaders: {
login: MQ_USER,
passcode: MQ_PASS,
},
debug: (str) => {
console.log(new Date(), str);
},
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
client.onConnect = () => {
setIsConnected(true);
// 订阅用户专属的结果队列
client.subscribe(resultQueue, (message: IMessage) => {
handleResultMessage(message);
});
};
client.onStompError = (frame) => {
console.error('Broker reported error: ' + frame.headers['message']);
console.error('Additional details: ' + frame.body);
setIsConnected(false);
};
stompClientRef.current = client;
client.activate();
return () => {
client.deactivate();
setIsConnected(false);
};
}, [userId]);
const handleResultMessage = (message: IMessage) => {
const correlationId = message.headers['correlation-id'];
if (!correlationId) {
console.warn('Received a message without a correlation-id, ignoring.');
return;
}
try {
const body = JSON.parse(message.body);
setTasks(prevTasks => {
const existingTask = prevTasks[correlationId];
if (!existingTask) return prevTasks;
return {
...prevTasks,
[correlationId]: {
...existingTask,
status: body.status === 'SUCCESS' ? 'SUCCESS' : 'ERROR',
result: body.data,
error: body.message,
},
};
});
} catch (error) {
console.error('Failed to parse result message:', error);
}
};
const sendInferenceRequest = (payload: any) => {
if (!stompClientRef.current || !stompClientRef.current.connected || !userId) {
console.error('STOMP client is not connected.');
// 可以在这里加入UI提示
return null;
}
const correlationId = uuidv4();
const resultQueue = `/queue/ml.results.${userId}`;
const requestQueue = '/queue/ml.requests';
const newTask: Task = {
id: correlationId,
status: 'PENDING',
requestPayload: payload,
};
setTasks(prev => ({ ...prev, [correlationId]: newTask }));
stompClientRef.current.publish({
destination: requestQueue,
headers: {
'persistent': 'true',
'correlation-id': correlationId,
'reply-to': resultQueue,
},
body: JSON.stringify(payload),
});
return correlationId;
};
return { tasks, sendInferenceRequest, isConnected };
};
在Gatsby页面组件中使用这个Hook (InferencePage.tsx
)
// src/pages/inference.tsx
import React, { useState } from "react";
import { useStompQueue, Task } from '../hooks/useStomp';
// 模拟一个用户ID
const FAKE_USER_ID = "user123";
const InferencePage = () => {
const { tasks, sendInferenceRequest, isConnected } = useStompQueue(FAKE_USER_ID);
const handleButtonClick = () => {
// 构造一个符合模型输入的请求
const payload = {
columns: ["feature1", "feature2"],
data: [[Math.random(), Math.random()]],
};
sendInferenceRequest(payload);
};
return (
<main style={{ padding: 40, fontFamily: 'sans-serif' }}>
<h1>Async ML Inference Demo</h1>
<p>Connection Status:
<span style={{ color: isConnected ? 'green' : 'red', fontWeight: 'bold' }}>
{isConnected ? ' CONNECTED' : ' DISCONNECTED'}
</span>
</p>
<button onClick={handleButtonClick} disabled={!isConnected}>
Trigger 15-second Inference Task
</button>
<div style={{ marginTop: 30 }}>
<h2>Tasks</h2>
{Object.keys(tasks).length === 0 ? (
<p>No tasks initiated yet.</p>
) : (
<ul style={{ listStyle: 'none', padding: 0 }}>
{Object.values(tasks).reverse().map((task: Task) => (
<li key={task.id} style={{ border: '1px solid #ccc', padding: 10, marginBottom: 10 }}>
<strong>Task ID:</strong> {task.id} <br/>
<strong>Status:</strong> <span style={{ fontWeight: 'bold' }}>{task.status}</span> <br/>
{task.status === 'PENDING' && <p>Processing... please wait.</p>}
{task.status === 'SUCCESS' && <pre>Result: {JSON.stringify(task.result, null, 2)}</pre>}
{task.status === 'ERROR' && <pre style={{color: 'red'}}>Error: {task.error}</pre>}
</li>
))}
</ul>
)}
</div>
</main>
);
};
export default InferencePage;
这个前端实现同样是生产级的:
- Hook封装: 将所有STOMP逻辑封装在
useStompQueue
hook中,使组件代码保持干净。 - 连接管理: 自动处理连接、断线重连和组件卸载时的资源清理。
- 状态管理: 使用React state来跟踪每个任务的状态(PENDING, SUCCESS, ERROR),并响应式地更新UI。
- 唯一身份: 依赖一个
userId
来构建独一无二的结果队列,确保多用户场景下结果不会混淆。这是一个关键的安全和隔离设计。
3. ActiveMQ 配置
最后,确保ActiveMQ的配置启用了STOMP over WebSocket。在conf/activemq.xml
中,找到<transportConnectors>
部分,并添加或确保存在ws
连接器:
<transportConnectors>
<!-- ...其他连接器如 openwire, amqp ... -->
<transportConnector name="ws" uri="ws://0.0..0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
这会使ActiveMQ在61614
端口上监听WebSocket连接。
架构的扩展性与局限性
这个架构的优势在于其出色的解耦性和弹性。我们可以独立地扩展BentoML消费者实例的数量,只需让它们监听同一个ml.requests
队列,ActiveMQ会自动进行负载均衡。如果一个消费者崩溃,消息会由其他消费者处理。前端应用完全不知道后端有多少实例,也不关心它们是否在进行部署更新。
然而,这个架构并非没有代价。
复杂性: 引入了一个新的中间件——ActiveMQ,增加了系统的运维负担。需要监控其健康状况、磁盘使用(对于持久化消息)和性能。
结果传递: 虽然STOMP over WebSocket解决了前端接收结果的问题,但如果前端应用不在线(例如用户关闭了浏览器标签页),发送到用户专属队列的结果消息就会一直驻留,直到过期或队列达到存储上限。需要设计合理的TTL(Time-To-Live)策略和死信队列(DLQ)机制来处理这些“无人认领”的结果。
ActiveMQ的边界: 对于需要严格顺序保证或极高吞吐量(每秒数十万条消息)的事件流场景,基于日志的系统如Apache Kafka可能是更合适的选择。但在当前这种用户驱动、中等并发的异步任务场景下,ActiveMQ的易用性和对STOMP的良好支持使其成为一个非常务实且高效的选择。