在构建一个流量巨大的个性化推荐系统中,我们面临一个棘手的架构挑战:需要根据用户的实时意图(通过搜索引擎推断)和其长期的行为模式(从数据湖分析得出)共同决定后端服务的路由策略,并对请求进行动态增强。具体来说,一个请求到达网关后,必须在几十毫秒内完成以下操作:
- 携带用户搜索词,请求 Algolia 获取最相关的商品类目 ID。
- 使用获取到的类目 ID 和用户 ID,查询 Trino (Presto) 分析集群,获取该用户在此类目下的历史偏好度(例如,点击率、转化率等)。
- 根据偏好度得分,将请求动态路由到不同的推荐服务集群(例如,“高意向用户集群”或“探索型用户集群”),并将偏好度得分作为 Header 注入到上游请求中。
这个问题的核心矛盾在于,需要在同步请求路径上,将一个低延迟的在线服务(Algolia)和一个高延迟的分析引擎(Trino)进行聚合。任何一个环节的抖动都可能导致整个用户请求超时,对用户体验造成毁灭性打击。
方案A: 独立的聚合服务层
一个直接的思路是在网关后面增加一个专门的聚合服务(Aggregation Service)。
graph TD A[Client] --> B{API Gateway}; B --> C[Aggregation Service]; C --> D[Algolia]; C --> E[Trino Cluster]; D --> C; E --> C; C -- Enriched Request --> F[Upstream Service A]; C -- Enriched Request --> G[Upstream Service B];
优势分析:
- 关注点分离: 网关只负责认证、限流等通用职责。复杂的聚合逻辑被封装在独立的聚合服务中,团队可以独立开发、测试和部署,技术栈选择也更自由(例如,使用 Go 或 Rust 实现高性能)。
- 资源隔离: 聚合服务可以独立扩缩容,其资源消耗(CPU、内存)不会影响到网关本身的稳定性。
劣势分析:
- 引入额外网络跳数:
Client -> Gateway -> Aggregation Service -> Upstream
。这至少增加了一个内部网络往返时间(RTT),在对延迟极度敏感的场景下,这几毫秒甚至几十毫秒的延迟是无法接受的。 - 运维复杂性: 增加了一个新的服务节点,意味着额外的部署、监控、告警和故障排查成本。它自身也成为了一个新的潜在单点故障。
- 数据冗余传递: 用户身份、请求参数等信息需要在网关和聚合服务之间完整传递,增加了数据处理的复杂性。
在我们的场景中,目标是 P99 延迟控制在 50ms 以内。一次额外的内部网络调用,加上聚合服务自身的处理时间,很轻易就会消耗掉 10-20ms,这使得方案A的延迟预算非常紧张。
方案B: 在网关层实现聚合逻辑
另一种更为激进的方案是将聚合逻辑前置到网关层,通过网关的插件化能力直接实现。我们选择 Apache APISIX 作为基础网关,因为它基于 OpenResty (Nginx + LuaJIT),提供了极高的性能和灵活的 Lua 插件开发能力。
graph TD A[Client] --> B{API Gateway / APISIX}; subgraph B direction LR C[Request] --> D{Custom Lua Plugin}; D -- call --> E[Algolia]; D -- call --> F[Trino Cluster]; E --> D; F --> D; D -- Routing Decision & Enrichment --> G[Route to Upstream]; end B --> H[Upstream Service A]; B --> I[Upstream Service B];
优势分析:
- 极致的低延迟: 消除了网关到聚合服务的网络跳数。所有逻辑在同一个 Nginx Worker 进程的内存中完成,性能潜力是所有方案中最高的。
- 简化架构: 减少了一个独立的服务组件,降低了运维的复杂度和成本。
- 上下文共享: 插件可以直接访问请求的全部上下文信息(Headers, Body, Args),无需跨服务传递。
劣势分析:
- 网关稳定性风险: 复杂的插件代码,特别是涉及阻塞IO或CPU密集型操作,可能会严重影响 Nginx Worker 进程的事件循环,导致整个网关性能下降甚至瘫痪。这是一个致命的风险。
- 技术栈限制: 必须使用 Lua 进行开发,这对团队的技术储备提出了要求。同时,Lua 的生态和调试工具相比主流后端语言要薄弱一些。
- 逻辑耦合: 业务逻辑与基础设施(网关)耦合在一起,违反了单一职责原则。
最终选择与理由: 带风险控制的方案B
尽管方案B存在显著风险,但其带来的性能优势是方案A无法比拟的。在我们的核心业务场景下,延迟是第一优先级。因此,我们决定采用方案B,并通过一系列严格的工程实践来控制其风险。
我们的核心决策逻辑是:“如果能通过编码和架构设计将风险控制在可接受范围内,那么就应该选择性能最优的方案。”
我们将通过以下手段来 mitigation 风险:
- 非阻塞IO: 必须使用
ngx.location.capture_multi
或cosocket
API 进行所有外部HTTP调用,确保 Nginx 事件循环不会被阻塞。 - 严格的超时控制: 对 Algolia 和 Trino 的调用设置极短且独立的超时时间。
- 熔断与降级: 当 Trino 查询超时或失败时,插件必须有明确的降级逻辑,例如,跳过数据增强,直接按默认策略路由。
- 结果缓存: 对 Trino 的查询结果进行缓存。用户在短时间内的行为偏好是相对稳定的,使用
lua-resty-lrucache
在 Worker 进程级别缓存结果,可以大幅减少对 Trino 的直接请求。 - 并发执行: 对 Algolia 和 Trino 的请求必须并发执行,而不是串行,以缩短总体处理时间。
核心实现:APISIX 自定义插件
我们将插件命名为 dynamic-routing-enrichment
。
1. 插件配置文件 conf/config.yaml
插件的配置被注入到 APISIX 的路由规则中。
# In APISIX route configuration
routes:
- uri: "/api/v1/recommend"
plugins:
dynamic-routing-enrichment:
algolia_host: "https://XXXXXX-dsn.algolia.net"
algolia_api_key: "env://ALGOLIA_API_KEY"
algolia_app_id: "env://ALGOLIA_APP_ID"
algolia_index_name: "product_categories"
algolia_timeout: 50 # in milliseconds
trino_host: "http://trino-coordinator.internal:8080"
trino_user: "api_gateway"
trino_catalog: "hive"
trino_schema: "user_profiles"
trino_timeout: 80 # in milliseconds
trino_cache_ttl: 300 # cache TTL in seconds
trino_cache_size: 5000 # max number of cached items
default_upstream_id: "default-recommend-service"
high_intent_upstream_id: "high-intent-recommend-service"
preference_threshold: 0.75
- env://…: 从环境变量加载敏感信息,这是生产环境的最佳实践。
- 独立的超时设置: 为 Algolia 和 Trino 设置了不同的超时,Trino 的超时更长,但仍然非常严格。
- 缓存配置: 为 Trino 查询结果配置了 TTL 和缓存大小。
2. 插件核心代码 apisix/plugins/dynamic-routing-enrichment.lua
这是插件的完整实现,包含了配置读取、并发请求、错误处理、缓存和动态路由逻辑。
-- apisix/plugins/dynamic-routing-enrichment.lua
local core = require("apisix.core")
local http = require("resty.http")
local lrucache = require("resty.lrucache")
local cjson = require("cjson.safe")
-- Plugin schema definition for validation
local schema = {
type = "object",
properties = {
algolia_host = { type = "string" },
algolia_api_key = { type = "string" },
algolia_app_id = { type = "string" },
algolia_index_name = { type = "string" },
algolia_timeout = { type = "number", minimum = 10, default = 100 },
trino_host = { type = "string" },
trino_user = { type = "string" },
trino_catalog = { type = "string" },
trino_schema = { type = "string" },
trino_timeout = { type = "number", minimum = 20, default = 200 },
trino_cache_ttl = { type = "number", minimum = 1, default = 300 },
trino_cache_size = { type = "number", minimum = 100, default = 1000 },
default_upstream_id = { type = "string" },
high_intent_upstream_id = { type = "string" },
preference_threshold = { type = "number", minimum = 0, maximum = 1, default = 0.5 },
},
required = {"algolia_host", "algolia_api_key", "algolia_app_id", "trino_host", "default_upstream_id", "high_intent_upstream_id"}
}
-- Module-level cache instance
-- One cache per Nginx worker process
local trino_cache, cache_init_err = lrucache.new(2000) -- Default size, will be re-initialized
if not trino_cache then
core.log.error("Failed to initialize LRU cache: ", cache_init_err)
end
local plugin_name = "dynamic-routing-enrichment"
local _M = {
version = 0.1,
priority = 1000, -- Execute early in the request lifecycle
name = plugin_name,
schema = schema,
}
-- check_schema: Validates the plugin configuration
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
-- init: Plugin initialization per worker
function _M.init(conf)
-- Re-initialize cache with user-defined size
trino_cache, cache_init_err = lrucache.new(conf.trino_cache_size)
if not trino_cache then
core.log.error("Failed to re-initialize LRU cache with size ",
conf.trino_cache_size, ": ", cache_init_err)
return false, "failed to init lrucache"
end
core.log.info("LRU cache for trino results initialized with size: ", conf.trino_cache_size)
return true
end
-- access: Main logic executed in the access phase
function _M.access(conf, ctx)
local args = core.request.get_uri_args(ctx)
local query = args.q
local user_id = core.request.get_header(ctx, "x-user-id")
if not query or not user_id then
core.log.warn("Missing 'q' query parameter or 'x-user-id' header")
-- Fallback to default, do nothing
return
end
-- 1. Concurrently fetch data from Algolia and Trino
local results, err = core.utils.async_call(function()
local algolia_req = {
path = "/1/indexes/" .. conf.algolia_index_name .. "/query",
method = "POST",
headers = {
["X-Algolia-API-Key"] = conf.algolia_api_key,
["X-Algolia-Application-Id"] = conf.algolia_app_id,
["Content-Type"] = "application/json",
},
body = cjson.encode({ params = "query=" .. query .. "&hitsPerPage=1" })
}
-- For Trino, we first check the cache
local cache_key = user_id .. ":" .. query
local cached_preference = trino_cache:get(cache_key)
if cached_preference then
-- Cache hit! We create a fake response to unify the processing flow.
core.log.info("Trino cache hit for key: ", cache_key)
return {
{ status = 200, body = cjson.encode({ data = {{ cached_preference }} }) }, -- Algolia is placeholder
{ status = 200, body = cjson.encode({ data = {{ cached_preference }} }) } -- Use cached data
}
end
core.log.info("Trino cache miss for key: ", cache_key)
local trino_sql = string.format(
"SELECT preference_score FROM %s.%s.user_category_preference WHERE user_id = '%s' AND category_id = (SELECT category_id FROM %s.%s.product_catalog WHERE product_name = '%s' LIMIT 1) LIMIT 1",
conf.trino_catalog, conf.trino_schema, user_id,
conf.trino_catalog, conf.trino_schema, query -- Simplified query for example
)
local trino_req = {
path = "/v1/statement",
method = "POST",
headers = {
["X-Trino-User"] = conf.trino_user,
["Content-Type"] = "text/plain",
},
body = trino_sql,
}
-- ngx.location.capture_multi performs parallel subrequests
return ngx.location.capture_multi({
{ conf.algolia_host, { args = algolia_req } },
{ conf.trino_host, { args = trino_req } }
})
end)
if err then
core.log.error("async_call failed: ", err)
-- Fallback to default route on any error
core.log.warn("Fallback to default upstream: ", conf.default_upstream_id)
core.service.set_upstream(ctx, conf.default_upstream_id)
return
end
local algolia_res, trino_res = results[1], results[2]
local preference_score = 0.0
-- 2. Process Trino response and handle caching
-- A real Trino client would handle pagination via `nextUri`, this is a simplified example for a single result set.
if trino_res.status == 200 then
local trino_body, json_err = cjson.decode(trino_res.body)
if not json_err and trino_body.data and trino_body.data[1] and trino_body.data[1][1] then
preference_score = tonumber(trino_body.data[1][1]) or 0.0
local cache_key = user_id .. ":" .. query
local ok, set_err = trino_cache:set(cache_key, preference_score, conf.trino_cache_ttl)
if not ok then
core.log.error("Failed to set trino cache: ", set_err)
end
else
core.log.warn("Trino response is valid but data is empty or malformed: ", trino_res.body)
end
else
core.log.error("Trino request failed with status: ", trino_res.status, ", body: ", trino_res.body)
-- On failure, we proceed with a default score of 0. This is our degradation strategy.
end
-- 3. Process Algolia response and enrich request header
if algolia_res.status == 200 then
local algolia_body, json_err = cjson.decode(algolia_res.body)
if not json_err and algolia_body.hits and #algolia_body.hits > 0 then
local category_id = algolia_body.hits[1].category_id
if category_id then
core.request.set_header(ctx, "X-Detected-Category-Id", category_id)
end
else
core.log.warn("Algolia response is valid but hits are empty or malformed")
end
else
core.log.error("Algolia request failed with status: ", algolia_res.status)
-- We can still proceed even if Algolia fails, enrichment is optional
end
-- 4. Set final enrichment header
core.request.set_header(ctx, "X-User-Preference-Score", tostring(preference_score))
-- 5. Make routing decision
local target_upstream_id = conf.default_upstream_id
if preference_score >= conf.preference_threshold then
target_upstream_id = conf.high_intent_upstream_id
core.log.info("Routing user ", user_id, " to high-intent upstream based on score ", preference_score)
end
-- The core action: dynamically set the upstream for this request
local ok, set_err = core.service.set_upstream(ctx, target_upstream_id)
if not ok then
core.log.error("Failed to set upstream to ", target_upstream_id, ": ", set_err)
-- Critical error, maybe terminate the request
return 500, { message = "Failed to route request internally" }
end
end
return _M
代码深度解析
- 并发模型:
ngx.location.capture_multi
是这里的关键。它允许在一个非阻塞的协程中,并发地发起多个子请求。这是在 OpenResty 中实现 I/O 并发的标准做法,能将总等待时间降低为最长的那次调用,而不是所有调用的总和。 - 缓存策略:
resty.lrucache
提供了一个基于共享内存的、worker 进程安全的 LRU 缓存。当 Trino 查询成功后,结果会以user_id:query
为 key 存入缓存。后续请求如果命中缓存,则直接返回缓存值,完全避免了对 Trino 的查询,这是应对 Trino 高延迟的关键武器。 - 健壮的错误处理:
-
core.utils.async_call
封装了ngx.thread.spawn
,用于在一个受保护的、独立的协程中执行并发调用,防止某个协程的异常崩溃影响到主流程。 - 对 Algolia 和 Trino 的响应都检查了 HTTP 状态码。
- 即使 Trino 请求失败或返回空数据,插件并不会中断请求,而是赋予一个默认的
preference_score
(0.0),确保请求可以继续被处理。这是一种优雅降级。 - 对 JSON 解析也进行了错误捕获,防止恶意或错误的响应体导致插件崩溃。
-
- 动态路由实现:
core.service.set_upstream(ctx, target_upstream_id)
是 APISIX 提供的核心 API,它允许插件在运行时动态地改变当前请求的目标上游服务。这是实现我们路由策略的最终执行点。 - 请求增强:
core.request.set_header(ctx, "X-...", ...)
用于向上游服务注入我们聚合得到的数据(类目ID和偏好度得分),这样后端服务无需重复计算,直接消费这些信息即可。
架构的局限性与未来迭代
虽然此方案在性能上达到了我们的预期,但它的局限性也十分明显。将 Trino 这种 OLAP 引擎的查询直接暴露在同步请求路径上,本质上是一种反模式。即使有缓存和严格的超时,一旦缓存穿透且 Trino 集群负载过高,80ms 的超时设置仍然可能导致大量的请求失败或延迟增加。这里的坑在于,我们用应用层的技巧(缓存、超时)来弥补底层架构的不匹配。
一个更稳健的长期演进方向是:
- 数据路径异步化: 采用事件驱动架构。用户的行为数据通过消息队列(如 Kafka)进入一个流处理系统(如 Flink)。
- 预计算与物化: Flink 作业实时/准实时地消费数据,并从 Trino 或其他数据源拉取维表信息,预计算出用户的偏好度得分。
- 推送到低延迟存储: 将计算好的得分物化到一个低延迟的 KV 存储中(如 Redis 或 DynamoDB)。
- 网关插件简化: 网关插件的逻辑将大大简化,不再需要查询 Trino,而是直接从 Redis 中根据
user_id
读取预计算好的偏好度得分。这会将一次高延迟、高风险的分析查询,变成一次稳定的、亚毫秒级的 KV 查询。
这种架构将复杂的计算逻辑从同步路径中彻底剥离,网关插件只负责轻量的查询和路由,从而在保证高性能的同时,获得了更高的系统稳定性和解耦性。当前方案则可视为在该最终架构实现之前,一个兼顾性能和实现成本的过渡性权衡。