基于 APISIX 网关集成 Algolia 与 Trino 实现动态请求路由与数据增强


在构建一个流量巨大的个性化推荐系统中,我们面临一个棘手的架构挑战:需要根据用户的实时意图(通过搜索引擎推断)和其长期的行为模式(从数据湖分析得出)共同决定后端服务的路由策略,并对请求进行动态增强。具体来说,一个请求到达网关后,必须在几十毫秒内完成以下操作:

  1. 携带用户搜索词,请求 Algolia 获取最相关的商品类目 ID。
  2. 使用获取到的类目 ID 和用户 ID,查询 Trino (Presto) 分析集群,获取该用户在此类目下的历史偏好度(例如,点击率、转化率等)。
  3. 根据偏好度得分,将请求动态路由到不同的推荐服务集群(例如,“高意向用户集群”或“探索型用户集群”),并将偏好度得分作为 Header 注入到上游请求中。

这个问题的核心矛盾在于,需要在同步请求路径上,将一个低延迟的在线服务(Algolia)和一个高延迟的分析引擎(Trino)进行聚合。任何一个环节的抖动都可能导致整个用户请求超时,对用户体验造成毁灭性打击。

方案A: 独立的聚合服务层

一个直接的思路是在网关后面增加一个专门的聚合服务(Aggregation Service)。

graph TD
    A[Client] --> B{API Gateway};
    B --> C[Aggregation Service];
    C --> D[Algolia];
    C --> E[Trino Cluster];
    D --> C;
    E --> C;
    C -- Enriched Request --> F[Upstream Service A];
    C -- Enriched Request --> G[Upstream Service B];

优势分析:

  1. 关注点分离: 网关只负责认证、限流等通用职责。复杂的聚合逻辑被封装在独立的聚合服务中,团队可以独立开发、测试和部署,技术栈选择也更自由(例如,使用 Go 或 Rust 实现高性能)。
  2. 资源隔离: 聚合服务可以独立扩缩容,其资源消耗(CPU、内存)不会影响到网关本身的稳定性。

劣势分析:

  1. 引入额外网络跳数: Client -> Gateway -> Aggregation Service -> Upstream。这至少增加了一个内部网络往返时间(RTT),在对延迟极度敏感的场景下,这几毫秒甚至几十毫秒的延迟是无法接受的。
  2. 运维复杂性: 增加了一个新的服务节点,意味着额外的部署、监控、告警和故障排查成本。它自身也成为了一个新的潜在单点故障。
  3. 数据冗余传递: 用户身份、请求参数等信息需要在网关和聚合服务之间完整传递,增加了数据处理的复杂性。

在我们的场景中,目标是 P99 延迟控制在 50ms 以内。一次额外的内部网络调用,加上聚合服务自身的处理时间,很轻易就会消耗掉 10-20ms,这使得方案A的延迟预算非常紧张。

方案B: 在网关层实现聚合逻辑

另一种更为激进的方案是将聚合逻辑前置到网关层,通过网关的插件化能力直接实现。我们选择 Apache APISIX 作为基础网关,因为它基于 OpenResty (Nginx + LuaJIT),提供了极高的性能和灵活的 Lua 插件开发能力。

graph TD
    A[Client] --> B{API Gateway / APISIX};
    subgraph B
        direction LR
        C[Request] --> D{Custom Lua Plugin};
        D -- call --> E[Algolia];
        D -- call --> F[Trino Cluster];
        E --> D;
        F --> D;
        D -- Routing Decision & Enrichment --> G[Route to Upstream];
    end
    B --> H[Upstream Service A];
    B --> I[Upstream Service B];

优势分析:

  1. 极致的低延迟: 消除了网关到聚合服务的网络跳数。所有逻辑在同一个 Nginx Worker 进程的内存中完成,性能潜力是所有方案中最高的。
  2. 简化架构: 减少了一个独立的服务组件,降低了运维的复杂度和成本。
  3. 上下文共享: 插件可以直接访问请求的全部上下文信息(Headers, Body, Args),无需跨服务传递。

劣势分析:

  1. 网关稳定性风险: 复杂的插件代码,特别是涉及阻塞IO或CPU密集型操作,可能会严重影响 Nginx Worker 进程的事件循环,导致整个网关性能下降甚至瘫痪。这是一个致命的风险。
  2. 技术栈限制: 必须使用 Lua 进行开发,这对团队的技术储备提出了要求。同时,Lua 的生态和调试工具相比主流后端语言要薄弱一些。
  3. 逻辑耦合: 业务逻辑与基础设施(网关)耦合在一起,违反了单一职责原则。

最终选择与理由: 带风险控制的方案B

尽管方案B存在显著风险,但其带来的性能优势是方案A无法比拟的。在我们的核心业务场景下,延迟是第一优先级。因此,我们决定采用方案B,并通过一系列严格的工程实践来控制其风险。

我们的核心决策逻辑是:“如果能通过编码和架构设计将风险控制在可接受范围内,那么就应该选择性能最优的方案。”

我们将通过以下手段来 mitigation 风险:

  1. 非阻塞IO: 必须使用 ngx.location.capture_multicosocket API 进行所有外部HTTP调用,确保 Nginx 事件循环不会被阻塞。
  2. 严格的超时控制: 对 Algolia 和 Trino 的调用设置极短且独立的超时时间。
  3. 熔断与降级: 当 Trino 查询超时或失败时,插件必须有明确的降级逻辑,例如,跳过数据增强,直接按默认策略路由。
  4. 结果缓存: 对 Trino 的查询结果进行缓存。用户在短时间内的行为偏好是相对稳定的,使用 lua-resty-lrucache 在 Worker 进程级别缓存结果,可以大幅减少对 Trino 的直接请求。
  5. 并发执行: 对 Algolia 和 Trino 的请求必须并发执行,而不是串行,以缩短总体处理时间。

核心实现:APISIX 自定义插件

我们将插件命名为 dynamic-routing-enrichment

1. 插件配置文件 conf/config.yaml

插件的配置被注入到 APISIX 的路由规则中。

# In APISIX route configuration
routes:
  - uri: "/api/v1/recommend"
    plugins:
      dynamic-routing-enrichment:
        algolia_host: "https://XXXXXX-dsn.algolia.net"
        algolia_api_key: "env://ALGOLIA_API_KEY"
        algolia_app_id: "env://ALGOLIA_APP_ID"
        algolia_index_name: "product_categories"
        algolia_timeout: 50 # in milliseconds

        trino_host: "http://trino-coordinator.internal:8080"
        trino_user: "api_gateway"
        trino_catalog: "hive"
        trino_schema: "user_profiles"
        trino_timeout: 80 # in milliseconds
        trino_cache_ttl: 300 # cache TTL in seconds
        trino_cache_size: 5000 # max number of cached items

        default_upstream_id: "default-recommend-service"
        high_intent_upstream_id: "high-intent-recommend-service"
        preference_threshold: 0.75
  • env://…: 从环境变量加载敏感信息,这是生产环境的最佳实践。
  • 独立的超时设置: 为 Algolia 和 Trino 设置了不同的超时,Trino 的超时更长,但仍然非常严格。
  • 缓存配置: 为 Trino 查询结果配置了 TTL 和缓存大小。

2. 插件核心代码 apisix/plugins/dynamic-routing-enrichment.lua

这是插件的完整实现,包含了配置读取、并发请求、错误处理、缓存和动态路由逻辑。

-- apisix/plugins/dynamic-routing-enrichment.lua

local core = require("apisix.core")
local http = require("resty.http")
local lrucache = require("resty.lrucache")
local cjson = require("cjson.safe")

-- Plugin schema definition for validation
local schema = {
    type = "object",
    properties = {
        algolia_host = { type = "string" },
        algolia_api_key = { type = "string" },
        algolia_app_id = { type = "string" },
        algolia_index_name = { type = "string" },
        algolia_timeout = { type = "number", minimum = 10, default = 100 },

        trino_host = { type = "string" },
        trino_user = { type = "string" },
        trino_catalog = { type = "string" },
        trino_schema = { type = "string" },
        trino_timeout = { type = "number", minimum = 20, default = 200 },
        trino_cache_ttl = { type = "number", minimum = 1, default = 300 },
        trino_cache_size = { type = "number", minimum = 100, default = 1000 },

        default_upstream_id = { type = "string" },
        high_intent_upstream_id = { type = "string" },
        preference_threshold = { type = "number", minimum = 0, maximum = 1, default = 0.5 },
    },
    required = {"algolia_host", "algolia_api_key", "algolia_app_id", "trino_host", "default_upstream_id", "high_intent_upstream_id"}
}

-- Module-level cache instance
-- One cache per Nginx worker process
local trino_cache, cache_init_err = lrucache.new(2000) -- Default size, will be re-initialized
if not trino_cache then
    core.log.error("Failed to initialize LRU cache: ", cache_init_err)
end

local plugin_name = "dynamic-routing-enrichment"

local _M = {
    version = 0.1,
    priority = 1000, -- Execute early in the request lifecycle
    name = plugin_name,
    schema = schema,
}

-- check_schema: Validates the plugin configuration
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end

-- init: Plugin initialization per worker
function _M.init(conf)
    -- Re-initialize cache with user-defined size
    trino_cache, cache_init_err = lrucache.new(conf.trino_cache_size)
    if not trino_cache then
        core.log.error("Failed to re-initialize LRU cache with size ",
                       conf.trino_cache_size, ": ", cache_init_err)
        return false, "failed to init lrucache"
    end
    core.log.info("LRU cache for trino results initialized with size: ", conf.trino_cache_size)
    return true
end

-- access: Main logic executed in the access phase
function _M.access(conf, ctx)
    local args = core.request.get_uri_args(ctx)
    local query = args.q
    local user_id = core.request.get_header(ctx, "x-user-id")

    if not query or not user_id then
        core.log.warn("Missing 'q' query parameter or 'x-user-id' header")
        -- Fallback to default, do nothing
        return
    end

    -- 1. Concurrently fetch data from Algolia and Trino
    local results, err = core.utils.async_call(function()
        local algolia_req = {
            path = "/1/indexes/" .. conf.algolia_index_name .. "/query",
            method = "POST",
            headers = {
                ["X-Algolia-API-Key"] = conf.algolia_api_key,
                ["X-Algolia-Application-Id"] = conf.algolia_app_id,
                ["Content-Type"] = "application/json",
            },
            body = cjson.encode({ params = "query=" .. query .. "&hitsPerPage=1" })
        }

        -- For Trino, we first check the cache
        local cache_key = user_id .. ":" .. query
        local cached_preference = trino_cache:get(cache_key)
        if cached_preference then
            -- Cache hit! We create a fake response to unify the processing flow.
            core.log.info("Trino cache hit for key: ", cache_key)
            return {
                { status = 200, body = cjson.encode({ data = {{ cached_preference }} }) }, -- Algolia is placeholder
                { status = 200, body = cjson.encode({ data = {{ cached_preference }} }) }  -- Use cached data
            }
        end
        core.log.info("Trino cache miss for key: ", cache_key)

        local trino_sql = string.format(
            "SELECT preference_score FROM %s.%s.user_category_preference WHERE user_id = '%s' AND category_id = (SELECT category_id FROM %s.%s.product_catalog WHERE product_name = '%s' LIMIT 1) LIMIT 1",
            conf.trino_catalog, conf.trino_schema, user_id,
            conf.trino_catalog, conf.trino_schema, query -- Simplified query for example
        )
        
        local trino_req = {
            path = "/v1/statement",
            method = "POST",
            headers = {
                ["X-Trino-User"] = conf.trino_user,
                ["Content-Type"] = "text/plain",
            },
            body = trino_sql,
        }

        -- ngx.location.capture_multi performs parallel subrequests
        return ngx.location.capture_multi({
            { conf.algolia_host, { args = algolia_req } },
            { conf.trino_host,   { args = trino_req } }
        })
    end)
    
    if err then
        core.log.error("async_call failed: ", err)
        -- Fallback to default route on any error
        core.log.warn("Fallback to default upstream: ", conf.default_upstream_id)
        core.service.set_upstream(ctx, conf.default_upstream_id)
        return
    end

    local algolia_res, trino_res = results[1], results[2]
    local preference_score = 0.0

    -- 2. Process Trino response and handle caching
    -- A real Trino client would handle pagination via `nextUri`, this is a simplified example for a single result set.
    if trino_res.status == 200 then
        local trino_body, json_err = cjson.decode(trino_res.body)
        if not json_err and trino_body.data and trino_body.data[1] and trino_body.data[1][1] then
            preference_score = tonumber(trino_body.data[1][1]) or 0.0
            local cache_key = user_id .. ":" .. query
            local ok, set_err = trino_cache:set(cache_key, preference_score, conf.trino_cache_ttl)
            if not ok then
                core.log.error("Failed to set trino cache: ", set_err)
            end
        else
            core.log.warn("Trino response is valid but data is empty or malformed: ", trino_res.body)
        end
    else
        core.log.error("Trino request failed with status: ", trino_res.status, ", body: ", trino_res.body)
        -- On failure, we proceed with a default score of 0. This is our degradation strategy.
    end

    -- 3. Process Algolia response and enrich request header
    if algolia_res.status == 200 then
        local algolia_body, json_err = cjson.decode(algolia_res.body)
        if not json_err and algolia_body.hits and #algolia_body.hits > 0 then
            local category_id = algolia_body.hits[1].category_id
            if category_id then
                core.request.set_header(ctx, "X-Detected-Category-Id", category_id)
            end
        else
            core.log.warn("Algolia response is valid but hits are empty or malformed")
        end
    else
        core.log.error("Algolia request failed with status: ", algolia_res.status)
        -- We can still proceed even if Algolia fails, enrichment is optional
    end

    -- 4. Set final enrichment header
    core.request.set_header(ctx, "X-User-Preference-Score", tostring(preference_score))
    
    -- 5. Make routing decision
    local target_upstream_id = conf.default_upstream_id
    if preference_score >= conf.preference_threshold then
        target_upstream_id = conf.high_intent_upstream_id
        core.log.info("Routing user ", user_id, " to high-intent upstream based on score ", preference_score)
    end
    
    -- The core action: dynamically set the upstream for this request
    local ok, set_err = core.service.set_upstream(ctx, target_upstream_id)
    if not ok then
        core.log.error("Failed to set upstream to ", target_upstream_id, ": ", set_err)
        -- Critical error, maybe terminate the request
        return 500, { message = "Failed to route request internally" }
    end
end

return _M

代码深度解析

  1. 并发模型: ngx.location.capture_multi 是这里的关键。它允许在一个非阻塞的协程中,并发地发起多个子请求。这是在 OpenResty 中实现 I/O 并发的标准做法,能将总等待时间降低为最长的那次调用,而不是所有调用的总和。
  2. 缓存策略: resty.lrucache 提供了一个基于共享内存的、worker 进程安全的 LRU 缓存。当 Trino 查询成功后,结果会以 user_id:query 为 key 存入缓存。后续请求如果命中缓存,则直接返回缓存值,完全避免了对 Trino 的查询,这是应对 Trino 高延迟的关键武器。
  3. 健壮的错误处理:
    • core.utils.async_call 封装了 ngx.thread.spawn,用于在一个受保护的、独立的协程中执行并发调用,防止某个协程的异常崩溃影响到主流程。
    • 对 Algolia 和 Trino 的响应都检查了 HTTP 状态码。
    • 即使 Trino 请求失败或返回空数据,插件并不会中断请求,而是赋予一个默认的 preference_score (0.0),确保请求可以继续被处理。这是一种优雅降级。
    • 对 JSON 解析也进行了错误捕获,防止恶意或错误的响应体导致插件崩溃。
  4. 动态路由实现: core.service.set_upstream(ctx, target_upstream_id) 是 APISIX 提供的核心 API,它允许插件在运行时动态地改变当前请求的目标上游服务。这是实现我们路由策略的最终执行点。
  5. 请求增强: core.request.set_header(ctx, "X-...", ...) 用于向上游服务注入我们聚合得到的数据(类目ID和偏好度得分),这样后端服务无需重复计算,直接消费这些信息即可。

架构的局限性与未来迭代

虽然此方案在性能上达到了我们的预期,但它的局限性也十分明显。将 Trino 这种 OLAP 引擎的查询直接暴露在同步请求路径上,本质上是一种反模式。即使有缓存和严格的超时,一旦缓存穿透且 Trino 集群负载过高,80ms 的超时设置仍然可能导致大量的请求失败或延迟增加。这里的坑在于,我们用应用层的技巧(缓存、超时)来弥补底层架构的不匹配。

一个更稳健的长期演进方向是:

  1. 数据路径异步化: 采用事件驱动架构。用户的行为数据通过消息队列(如 Kafka)进入一个流处理系统(如 Flink)。
  2. 预计算与物化: Flink 作业实时/准实时地消费数据,并从 Trino 或其他数据源拉取维表信息,预计算出用户的偏好度得分。
  3. 推送到低延迟存储: 将计算好的得分物化到一个低延迟的 KV 存储中(如 Redis 或 DynamoDB)。
  4. 网关插件简化: 网关插件的逻辑将大大简化,不再需要查询 Trino,而是直接从 Redis 中根据 user_id 读取预计算好的偏好度得分。这会将一次高延迟、高风险的分析查询,变成一次稳定的、亚毫秒级的 KV 查询。

这种架构将复杂的计算逻辑从同步路径中彻底剥离,网关插件只负责轻量的查询和路由,从而在保证高性能的同时,获得了更高的系统稳定性和解耦性。当前方案则可视为在该最终架构实现之前,一个兼顾性能和实现成本的过渡性权衡。


  目录