构建一个基于 Clojure、Node.js 与 OpenSearch 的异构事件溯源状态管理引擎


当一个业务流程的生命周期跨越数天,涉及多个外部系统交互,并且对审计和回溯的要求极为严苛时,传统的基于 CRUD 和可变状态的数据库模型就开始变得捉襟见肘。状态转换的逻辑散落在代码各处,每一次状态变更都直接覆盖旧数据,导致历史轨迹的丢失。排查问题时,我们只能看到“现在是什么”,却无从知晓“它是如何变成这样的”。这种信息熵的持续增加,是复杂系统可维护性下降的根源。

我们遇到的正是这样一个问题:一个复杂的金融衍生品交易清算流程。这个流程可能持续72小时,包含头寸计算、风险评估、资金划拨、监管上报等多个步骤,其中任何一步都可能因外部市场波动或对手方系统响应而暂停、重试或回滚。我们需要一个不仅能记录最终结果,更能精确记录每一个决策和变化的系统。

最初的构想是采用事件溯源(Event Sourcing)。这个模式的核心思想很简单:不保存对象当前的状态,而是保存导致该状态的所有事件序列。任何时候,我们都可以通过重放这些事件来重建当前状态。这天然地提供了一个完美的审计日志。但一个纯粹的单体事件溯源系统在我们的技术栈中显得格格不入,并且难以利用现有团队在不同技术领域的专长。

最终,我们敲定了一个异构(Polyglot)方案,目标是利用每种技术的长处,构建一个职责清晰、可扩展性强的状态管理引擎。

  • Clojure 负责核心领域逻辑:交易清算的核心逻辑复杂且对正确性要求极高。Clojure 的不可变数据结构和对纯函数的推崇,使其成为实现事件溯源中“聚合”(Aggregate)状态转换的理想选择。状态转换就是一个 (reduce apply-event initial-state events) 的过程,清晰、可测试且无副作用。
  • Node.js 负责 I/O 边界:作为系统的入口和出口,处理 API 请求、与消息队列交互等 I/O 密集型任务,是 Node.js 的强项。其庞大的生态系统和异步非阻塞模型能为系统提供高性能的接入层和事件投射层。
  • OpenSearch 作为事件存储与读取模型:我们需要的不仅仅是一个事件的存储仓库,更是一个可供分析和查询的事件日志。OpenSearch 不仅能作为 append-only 的事件存储,其强大的搜索和聚合能力,使得我们可以对事件本身进行复杂的查询,这对于调试和业务分析是巨大的优势。同时,它也能完美地承载 CQRS 模式中的读取模型(Projections)。

这个架构将命令处理、事件持久化和状态投射的职责彻底分离,形成了一个清晰的数据流。

graph TD
    subgraph "客户端"
        A[Client]
    end

    subgraph "Node.js 边界层 (I/O Intensive)"
        B(Fastify API Server)
        C{Command Bus}
        D{Event Bus}
        E(Event Projector)
    end

    subgraph "Clojure 核心逻辑层 (CPU Intensive)"
        F(Command Handler)
        G[Aggregate Logic]
    end

    subgraph "OpenSearch 持久化层"
        H[(Event Store Index)]
        I[(Projections Index)]
    end

    A -- HTTP POST /commands --> B
    B -- Publishes Command --> C
    C -- Delivers Command --> F
    F -- Loads Events --> H
    F -- Invokes --> G
    G -- Returns New Events --> F
    F -- Persists Events with Optimistic Locking --> H
    H -- Success --> F
    F -- Publishes Events --> D
    D -- Delivers Events --> E
    E -- Updates Read Model --> I

Clojure 核心:不可变的领域堡垒

系统的核心是处理命令并生成事件的 Clojure 服务。这里的关键是聚合(Aggregate),它封装了业务规则和状态。在我们的场景中,一个“清算流程”(ClearingProcess)就是一个聚合。

首先,我们定义聚合的状态。使用 defrecord 是个好选择,它提供了结构化的数据类型。

;; src/clearing/aggregate.clj

(ns clearing.aggregate)

;; 定义清算流程的聚合状态
(defrecord ClearingProcess [id version status trade-id calculated-position risk-assessed? funds-settled?])

;; 聚合的初始状态
(defn initial-state []
  (map->ClearingProcess {:version 0 :status :nascent}))

接下来是最核心的部分:apply-event 函数。我们使用多重方法(multimethod)来根据事件类型分派到不同的处理函数。每个函数都是一个纯函数,接收当前状态和事件,返回一个全新的状态。

;; src/clearing/aggregate.clj

;; ... (defrecord and initial-state)

;; 定义一个多重方法,根据事件的 :event/type 分派
(defmulti apply-event (fn [_state event] (:event/type event)))

;; 默认实现,遇到未知事件时直接返回原状态
(defmethod apply-event :default [state _event]
  state)

(defmethod apply-event :clearing/process-initiated
  [state {:keys [aggregate-id trade-id]}]
  (-> state
      (assoc :id aggregate-id
             :trade-id trade-id
             :status :initiated)))

(defmethod apply-event :clearing/position-calculated
  [state {payload :event/payload}]
  (assoc state :status :position-calculated :calculated-position (:position payload)))

(defmethod apply-event :clearing/risk-assessed
  [state _event]
  (assoc state :risk-assessed? true))

(defmethod apply-event :clearing/funds-settled
  [state _event]
  (assoc state :status :completed :funds-settled? true))

;; 这是一个关键函数:通过折叠事件流来重建聚合的最新状态
(defn hydrate-from-events
  "Given a sequence of events, builds the aggregate state."
  [events]
  (reduce apply-event (initial-state) events))

这段代码的优美之处在于其确定性。给定相同的事件历史,hydrate-from-events 总会得到完全相同的状态。这使得单元测试变得极其简单,我们不需要模拟数据库或任何外部依赖。

命令处理函数则负责执行业务规则。它接收当前状态和命令,决定是否应该产出新的事件。

;; src/clearing/command_handler.clj

(ns clearing.command-handler
  (:require [clearing.aggregate :as agg]))

;; 定义业务错误
(defn- business-error [msg]
  {:error? true :message msg})

;; 命令处理也使用多重方法
(defmulti handle-command (fn [_state command] (:command/type command)))

(defmethod handle-command :clearing/initiate-process
  [{:keys [status]} {:keys [aggregate-id trade-id]}]
  (if (not= status :nascent)
    (business-error "Process already initiated.")
    [{:event/type :clearing/process-initiated
      :aggregate-id aggregate-id
      :trade-id trade-id
      :timestamp (java.time.Instant/now)}]))

(defmethod handle-command :clearing/calculate-position
  [{:keys [status]} {payload :command/payload}]
  (if (not= status :initiated)
    (business-error "Cannot calculate position if process is not initiated.")
    [{:event/type :clearing/position-calculated
      :event/payload {:position (:position payload)}}]))

;; ... 其他命令处理方法

;; 整合函数:加载聚合,处理命令,返回结果
(defn process-command
  "Main entry point. Hydrates aggregate, handles command, returns new events or error."
  [events-history command]
  (let [current-state (agg/hydrate-from-events events-history)]
    ;; 将版本号附加到状态中,以便命令处理器可以访问
    (handle-command (assoc current-state :version (count events-history)) command)))

OpenSearch:一个可查询的事件日志

将 OpenSearch 用作事件存储,我们需要精心设计索引映射。这不仅仅是存储数据,更是为了高效地检索和保证并发安全。

这是 event-store 索引的映射配置:

{
  "mappings": {
    "properties": {
      "aggregateId": { "type": "keyword" },
      "sequenceNumber": { "type": "long" },
      "eventType": { "type": "keyword" },
      "timestamp": { "type": "date" },
      "payload": { "type": "object", "enabled": false },
      "metadata": { "type": "object" }
    }
  },
  "settings": {
    "index.refresh_interval": "1s",
    "number_of_shards": 3,
    "number_of_replicas": 1
  }
}
  • aggregateId 是查询特定聚合历史的关键。
  • sequenceNumber 至关重要,它代表事件在聚合历史中的顺序。
  • payloadenabled 设置为 false,意味着我们不索引事件的具体内容,只将其作为原始 JSON 存储,这节省了索引空间并提高了写入性能。

从 OpenSearch 加载事件的 Clojure 函数如下:

;; src/clearing/event_store.clj
(ns clearing.event-store
  (:require [clj-http.client :as http]
            [cheshire.core :as json]))

(def opensearch-host "http://localhost:9200")
(def event-index "event-store")

(defn- format-es-query [aggregate-id]
  {:size 10000 ; A practical limit, for very long histories, pagination is needed.
   :sort [{:sequenceNumber {:order "asc"}}]
   :query {:term {:aggregateId aggregate-id}}})

(defn fetch-events [aggregate-id]
  (try
    (let [response (http/post (str opensearch-host "/" event-index "/_search")
                              {:content-type :json
                               :body (json/generate-string (format-es-query aggregate-id))})
          body (json/parse-string (:body response) true)
          hits (get-in body [:hits :hits])]
      (map :_source hits))
    (catch Exception e
      ;; 在真实项目中,这里需要详细的日志记录
      (println (str "Failed to fetch events for " aggregate-id ": " (.getMessage e)))
      [])))

写入事件是整个流程中最危险的一步,必须处理并发冲突。当两个命令几乎同时处理同一个聚合时,它们可能都基于版本号为 N 的状态,并试图写入版本号为 N+1 的事件。这会导致状态不一致。OpenSearch 的乐观并发控制机制 (if_seq_noif_primary_term) 在这里是救星。

我们不直接写入单个事件,而是使用 _bulk API 事务性地写入一个命令产生的所有事件。

;; src/clearing/event_store.clj
;; ...

(defn- build-bulk-payload [aggregate-id base-version events]
  (->> events
       (map-indexed
        (fn [idx event]
          (let [sequence-number (+ base-version idx 1)
                doc-id (str aggregate-id ":" sequence-number)]
            (str (json/generate-string
                  {:index {:_index event-index :_id doc-id}})
                 "\n"
                 (json/generate-string (assoc event :sequenceNumber sequence-number))
                 "\n"))))
       (apply str)))


(defn persist-events [aggregate-id expected-version events]
  (if (empty? events)
    {:success? true}
    (try
      (let [base-version expected-version
            ;; 这里的 optimistic_locking_parameter 需要从上次读取操作中获得。
            ;; 为简化示例,我们假设它已被获取。在实际应用中,fetch-events
            ;; 需要返回最后一个事件的 `_seq_no` 和 `_primary_term`。
            ;; 这里我们只使用 sequence number 来模拟。
            response (http/post (str opensearch-host "/_bulk?refresh=true&if_seq_no=" expected-version "&if_primary_term=1")
                                {:content-type "application/x-ndjson"
                                 :body (build-bulk-payload aggregate-id base-version events)})]
        (if (not (get-in (json/parse-string (:body response) true) [:errors]))
          {:success? true}
          {:success? false :reason :conflict}))
      (catch Exception e
        ;; 409 Conflict 是预期的并发错误,其他则是系统错误
        (if (= 409 (:status (ex-data e)))
          {:success? false :reason :conflict}
          {:success? false :reason :internal-error :details (.getMessage e)})))))

;; --- 在 command_handler.clj 中如何使用 ---
;; (let [current-version (count events-history)
;;       new-events (handle-command ...)]
;;   (if-not (:error? new-events)
;;     (event-store/persist-events aggregate-id current-version new-events)
;;     ...))

一个常见的错误是:在持久化事件时忽略并发控制。这在低负载测试中不会暴露,但在生产环境中会导致间歇性的、难以复现的数据损坏。使用 if_seq_no 就像给数据库的 UPDATE ... WHERE version = ? 操作一样,是保证一致性的基石。

Node.js 边界:高性能的 I/O 网关

Node.js 层的职责是“脏活累活”:对外提供 HTTP API,与消息队列通信,以及将事件转换成可供查询的读取模型。我们使用 Fastify 框架,因为它以性能著称。

命令 API 的实现非常简单,它只做三件事:接收请求、基本校验、把命令扔到消息总线(例如 NATS 或 Kafka)。

// src/api/server.js
const fastify = require('fastify')({ logger: true });
const { Nats, StringCodec } = require('nats'); // 假设使用 NATS 作为消息总线

// 在真实项目中,NATS 连接应该是单例的
let natsConnection;
const sc = StringCodec();

fastify.post('/commands/:aggregateId', async (request, reply) => {
  const { aggregateId } = request.params;
  const command = request.body;

  if (!command.type || !command.payload) {
    return reply.code(400).send({ error: 'Invalid command structure' });
  }

  try {
    const commandMessage = {
      aggregateId,
      command,
    };
    
    // 将命令发布到 `commands` 主题,由 Clojure 服务监听
    natsConnection.publish('commands', sc.encode(JSON.stringify(commandMessage)));
    
    fastify.log.info(`Published command ${command.type} for ${aggregateId}`);
    
    // 命令是异步处理的,API 立即返回 accepted
    return reply.code(202).send({ message: 'Command accepted for processing.' });
  } catch (err) {
    fastify.log.error(err, 'Failed to publish command');
    return reply.code(500).send({ error: 'Internal server error' });
  }
});

const start = async () => {
  try {
    natsConnection = await Nats.connect({ servers: 'nats://localhost:4222' });
    fastify.log.info('Connected to NATS server');
    await fastify.listen({ port: 3000 });
  } catch (err) {
    fastify.log.error(err);
    process.exit(1);
  }
};

start();

更重要的部分是事件投射器(Projector)。它订阅事件总线,接收 Clojure 服务产生的事件,并据此更新 OpenSearch 中的读取模型。读取模型是一个非规范化的、为特定查询场景优化的数据视图。

// src/projector/index.js
const { Nats, StringCodec } = require('nats');
const { Client } = require('@opensearch-project/opensearch');

const client = new Client({ node: 'http://localhost:9200' });
const PROJECTIONS_INDEX = 'clearing-projections';

async function handleEvent(event) {
  const { eventType, aggregateId, payload, timestamp } = event;
  console.log(`Projecting event ${eventType} for aggregate ${aggregateId}`);

  switch (eventType) {
    case 'clearing/process-initiated':
      // 创建一个新的文档
      await client.index({
        index: PROJECTIONS_INDEX,
        id: aggregateId,
        body: {
          processId: aggregateId,
          tradeId: payload.tradeId,
          status: 'INITIATED',
          createdAt: timestamp,
          updatedAt: timestamp,
          history: [ { type: eventType, at: timestamp } ]
        },
        refresh: true,
      });
      break;

    case 'clearing/position-calculated':
      // 使用 update API 更新现有文档
      await client.update({
        index: PROJECTIONS_INDEX,
        id: aggregateId,
        body: {
          doc: {
            status: 'POSITION_CALCULATED',
            calculatedPosition: payload.position,
            updatedAt: timestamp,
          },
          // 使用painless脚本原子性地向数组中添加元素
          script: {
            source: "ctx._source.history.add(params.event_log)",
            lang: "painless",
            params: {
              event_log: { type: eventType, at: timestamp }
            }
          }
        },
        refresh: true,
      });
      break;
    
    // ... 其他事件类型的处理逻辑

    case 'clearing/funds-settled':
      await client.update({
        index: PROJECTIONS_INDEX,
        id: aggregateId,
        body: {
          doc: {
            status: 'COMPLETED',
            fundsSettled: true,
            updatedAt: timestamp,
          },
          script: {
            source: "ctx._source.history.add(params.event_log)",
            lang: "painless",
            params: {
              event_log: { type: eventType, at: timestamp }
            }
          }
        },
      });
      break;
  }
}

async function run() {
  try {
    const nc = await Nats.connect({ servers: 'nats://localhost:4222' });
    const sc = StringCodec();
    console.log('Projector connected to NATS and listening for events...');

    const sub = nc.subscribe('events');
    for await (const m of sub) {
      try {
        const event = JSON.parse(sc.decode(m.data));
        await handleEvent(event);
      } catch (err) {
        console.error('Error processing event:', err);
        // 在真实项目中,这里需要有错误处理和重试逻辑,例如发送到死信队列
      }
    }
  } catch (err) {
    console.error('NATS connection error:', err);
    process.exit(1);
  }
}

run();

这里的核心是 CQRS 模式的体现。写操作(命令)由 Clojure 服务处理并写入事件存储,而读操作(查询)则完全由这个为读取优化的 clearing-projections 索引来服务。查询可以直接 hitting 这个索引,获取扁平化的、易于消费的 JSON,而无需在每次查询时都重放事件。

方案的局限性与未来迭代

这个架构虽然强大,但也引入了新的复杂性。首先是 最终一致性。从命令发出到读取模型更新,中间经过了消息总线和异步处理,存在秒级甚至更长的延迟。对于需要强一致性读的场景,这个方案需要调整,例如 API 在返回前可以轮询读取模型直到更新完成,但这会牺牲部分性能。

其次是 **事件模式演化 (Schema Evolution)**。随着业务发展,事件的结构几乎肯定会改变。如何处理旧版本的事件是一个重大挑战。我们需要引入事件版本号,并在聚合的水合逻辑中加入“上溯”(Upcasting)机制,即在读取时将旧版事件动态转换为新版结构。

最后,对于生命周期极长、事件数量可能达到数千甚至上万的聚合,每次都从头重放所有事件会成为性能瓶颈。下一步的优化方向是引入 快照(Snapshotting)。我们可以定期(例如每100个事件)将聚合的当前状态序列化并存起来。当需要水合聚合时,只需加载最新的快照,然后重放该快照之后发生的事件即可,大大减少了 I/O 和计算开销。


  目录