基于HBase与SciPy构建混合特征平台的不可变基础设施与状态管理实践


我们团队接到的初始Epic,听起来像是每个数据驱动型公司都会遇到的典型问题:统一在线和离线特征工程流程。现实情况是,我们的在线服务(Java栈)和离线分析(Python栈)各自维护着一套特征计算逻辑,代码重复、逻辑不一致、特征穿越问题层出不穷。一个模型在线上和线下的表现差异,追根溯源往往就是因为某个特征的计算窗口差了几个小时。这种混乱状态,在连续几个Scrum Sprint的复盘会上被反复提及,最终促成了这个平台的立项。

我们的目标很明确:构建一个单一数据源、统一计算逻辑的混合特征平台。它需要能为在线推理系统提供毫秒级的单点特征查询,同时也要支持数据科学家们用熟悉的Python工具进行大规模的离线特征回溯和分析。

Sprint 1-2: 技术选型与数据模型地基

在最初的几个技术评审会中,争论的焦点集中在存储层。方案A是Redis + S3,Redis负责在线热数据,S3存储全量历史数据。这个方案很常见,但问题在于数据同步的复杂性和一致性保障。我们预见到大量的胶水代码和潜在的数据延迟,这正是我们想解决的问题。

方案B,也是我们最终的选择:HBase。它的优势在于:

  1. 原生支持稀疏宽表: 完美契合特征存储场景。一个实体(如用户ID)作为RowKey,其下可以有成百上千个特征,每个特征是一个Qualifier。
  2. 低延迟随机读: 基于LSM树的架构和BlockCache机制,通过精心设计的RowKey,HBase可以轻松实现个位数毫秒级的点查,满足在线服务要求。
  3. MapReduce/Spark集成: 对离线批处理非常友好,可以直接作为数据源进行大规模扫描。

确定了HBase,我们立即开始设计核心的数据模型。这是整个系统的基石,一旦出错,后期修改成本极高。我们设计的RowKey至关重要,它需要同时服务于点查和范围扫描。

[entity_id]#[reversed_timestamp]

  • entity_id: 用户ID、商品ID等实体的唯一标识。
  • #: 分隔符。
  • reversed_timestamp: Long.MAX_VALUE - timestamp。这样做是为了让最新的数据在HBase内部排在最前面,方便获取最新的特征版本或进行时间窗口扫描。

一个典型的特征行看起来会是这样:

  • RowKey: user_12345#9223372036854775807
  • Column Family: f (features)
  • Qualifiers: f:last_7d_purchase_amount, f:realtime_clicks_per_hour, f:user_profile_vector_v1

我们用HBase Shell定义了这张核心的特征表:

# feature_store.hbase_schema.sh
# ------------------------------------------------
# 核心特征表定义
# 启用布隆过滤器以加速行查询
# 使用SNAPPY压缩减少存储成本
# 预分区以避免热点问题,这里以16个region为例
# ------------------------------------------------

create 'feature_platform:features', {
    NAME => 'f',
    VERSIONS => 3,                 # 保留最近3个版本的特征值,用于回溯或模型调试
    BLOCKCACHE => 'true',
    BLOOMFILTER => 'ROW',
    COMPRESSION => 'SNAPPY',
    TTL => '2592000'               # 特征默认保留30天
},
{
    SPLITS => (1..15).map { |i| "user_%x" % (i * 10) }
}

# 增加一个列族用于存放元数据或调试信息
alter 'feature_platform:features', {
    NAME => 'meta',
    VERSIONS => 1,
    BLOCKCACHE => 'false'
}

# 授权给计算服务和API服务对应的角色
grant 'feature_compute_role', 'RWXCA', 'feature_platform:features'
grant 'feature_api_role', 'R', 'feature_platform:features'

这个简单的Schema定义,在Sprint评审会上通过了。一个常见的错误是在这里过度设计,创建多个列族。在真实项目中,除非访问模式有巨大差异,否则单个列族通常性能更好,因为它能保证同一行的数据局部性。

Sprint 3-4: 不可变基础设施的自动化构建

解决了存储,下一个挑战是计算环境。我们的数据科学家习惯于Python和SciPy/NumPy生态,而生产环境要求稳定、可复现。过去的痛点是每个人的开发环境和生产环境都存在细微差异,导致“在我机器上能跑”的经典问题。

我们决定采用不可变基础设施(Immutable Infrastructure)模式。所有计算节点的环境都是从一个预先构建好的机器镜像(AMI)启动的,运行期间不做任何变更。这杜绝了配置漂移。Packer是实现这一点的完美工具。

我们定义了一个Packer模板,用于构建包含Python环境、SciPy、HBase客户端以及我们内部库的“特征计算基础镜像”。

// packer/feature-compute.pkr.hcl
packer {
  required_plugins {
    amazon = {
      version = ">= 1.0.0"
      source  = "github.com/hashicorp/amazon"
    }
  }
}

variable "aws_region" {
  type    = string
  default = "us-east-1"
}

variable "source_ami" {
  type    = string
  description = "Base AMI with corporate security policies"
}

variable "app_version" {
  type    = string
  description = "Version of the feature engineering application"
}

source "amazon-ebs" "feature-compute" {
  region        = var.aws_region
  source_ami    = var.source_ami
  instance_type = "m5.large"
  ssh_username  = "ec2-user"
  ami_name      = "feature-compute-node-${var.app_version}-${formatdate("YYYYMMDDHHmmss", timestamp())}"
  
  tags = {
    Name        = "FeatureComputeNode"
    Version     = var.app_version
    CreatedBy   = "Packer"
  }
}

build {
  sources = ["source.amazon-ebs.feature-compute"]

  provisioner "shell" {
    inline = [
      "sudo yum update -y",
      "sudo yum install -y python3-devel gcc krb5-devel",
      "sudo pip3 install --upgrade pip"
    ]
  }

  provisioner "shell" {
    script = "./scripts/setup_python_env.sh"
  }
  
  provisioner "file" {
    source      = "../dist/feature-engine.tar.gz"
    destination = "/tmp/feature-engine.tar.gz"
  }

  provisioner "shell" {
    inline = [
      "echo 'Installing application code...'",
      "sudo mkdir -p /opt/app",
      "sudo tar -xzf /tmp/feature-engine.tar.gz -C /opt/app",
      "rm /tmp/feature-engine.tar.gz",
      "echo 'Installation complete.'"
    ]
  }
}

setup_python_env.sh脚本负责安装核心依赖。这里的坑在于,必须精确锁定版本,避免未来构建时因依赖库的次要版本更新导致行为不一致。

#!/bin/bash
# scripts/setup_python_env.sh
set -e # Exit immediately if a command exits with a non-zero status.

echo "Setting up Python environment..."

# We use specific versions to ensure reproducibility
sudo pip3 install \
    numpy==1.24.3 \
    scipy==1.10.1 \
    pandas==2.0.1 \
    happybase==1.2.0 \
    requests==2.31.0

# Install HBase client native libs if needed
# ...

echo "Python environment setup complete."

这个流程被集成到CI/CD中。每次代码合并到主分支,都会触发Packer构建一个新的AMI。部署时,我们只需要用新的AMI替换旧的Auto Scaling Group启动模板,然后进行滚动更新即可。整个过程零停机、完全自动化,极大地提升了部署的可靠性。

Sprint 5-7: SciPy特征计算引擎的实现

有了坚实的基础设施,我们开始编写核心的特征计算逻辑。我们选择了一个典型的业务场景:计算用户在金融产品上的行为序列特征。这涉及到信号处理和平滑,SciPy是理想的工具。

下面是一个简化的特征计算任务,它读取原始事件,使用scipy.signal进行滤波,然后将结果写入HBase。

# feature_engine/processors/user_behavior.py
import logging
import time
from typing import List, Dict

import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import happybase

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# HBase connection pool should be managed globally in a real application
# For simplicity, we establish a connection here.
HBASE_HOST = 'hbase-thrift.internal.service'
HBASE_TABLE = 'feature_platform:features'
COLUMN_FAMILY = 'f'

class FeatureProcessor:
    def __init__(self, hbase_host: str, table_name: str):
        self.hbase_host = hbase_host
        self.table_name = table_name
        self.connection = None
        self._connect()

    def _connect(self):
        """Establishes a connection to HBase. Includes basic retry logic."""
        retries = 3
        for i in range(retries):
            try:
                self.connection = happybase.Connection(self.hbase_host, autoconnect=False)
                self.connection.open()
                self.table = self.connection.table(self.table_name)
                logging.info(f"Successfully connected to HBase at {self.hbase_host}")
                return
            except Exception as e:
                logging.error(f"HBase connection failed (attempt {i+1}/{retries}): {e}")
                time.sleep(2 ** i)
        raise ConnectionError("Could not connect to HBase after several retries.")

    def process_user_transactions(self, user_id: str, events: List[Dict]):
        """
        Calculates advanced behavioral features from a user's transaction events.
        """
        if not events:
            logging.warning(f"No events found for user {user_id}. Skipping.")
            return

        try:
            df = pd.DataFrame(events).sort_values('timestamp')
            
            # Feature 1: Smoothed transaction amount over time
            # Using Savitzky-Golay filter to smooth out noise while preserving signal shape.
            # This is more robust than a simple moving average for certain patterns.
            if len(df['amount']) > 5: # Filter requires window_length > polyorder
                smoothed_amounts = savgol_filter(df['amount'], window_length=5, polyorder=2)
                latest_smoothed_amount = smoothed_amounts[-1]
            else:
                latest_smoothed_amount = df['amount'].mean()

            # Feature 2: Inter-transaction time statistics
            time_deltas = df['timestamp'].diff().dropna().dt.total_seconds()
            mean_time_delta = time_deltas.mean()
            std_time_delta = time_deltas.std()

            # Prepare data for HBase
            # The timestamp for the features is the timestamp of the latest event
            latest_timestamp = df['timestamp'].iloc[-1].timestamp()
            reversed_ts = 2**63 - 1 - int(latest_timestamp * 1000)
            row_key = f"{user_id}#{reversed_ts}".encode('utf-8')
            
            features_to_write = {
                f'{COLUMN_FAMILY}:txn_amount_smoothed_sgf': str(latest_smoothed_amount).encode('utf-8'),
                f'{COLUMN_FAMILY}:txn_time_delta_mean_sec': str(mean_time_delta).encode('utf-8'),
                f'{COLUMN_FAMILY}:txn_time_delta_std_sec': str(std_time_delta).encode('utf-8'),
            }
            
            # Atomic batch write to HBase
            self.table.put(row_key, features_to_write)
            logging.info(f"Successfully processed and stored features for user {user_id}")

        except Exception as e:
            logging.error(f"Error processing features for user {user_id}: {e}", exc_info=True)
            # In a real system, failed events would be sent to a dead-letter queue.
            
    def close(self):
        if self.connection:
            self.connection.close()
            logging.info("HBase connection closed.")

# --- Unit Test Thinking ---
# To test `process_user_transactions`:
# 1. Mock the `happybase.Connection` and `table.put` methods.
# 2. Create sample event dataframes of varying lengths (empty, <5, >5).
# 3. Assert that `savgol_filter` is called with correct parameters.
# 4. Assert that the `table.put` method is called with a correctly formatted row key and feature values.
# 5. Test edge cases like DataFrames with NaNs or single events.

这个计算脚本被打包到我们用Packer构建的镜像中,并通过Airflow或一个简单的CRON作业调度执行。它的健壮性体现在日志记录、连接重试和对异常情况的 graceful handling。

Sprint 8-10: 实时监控仪表盘与Zustand的状态管理

平台上线后,新的问题出现了。运维和数据科学家们需要一个界面来监控特征的生成状态、数据新鲜度以及计算节点的健康状况。这是一个典型的“内部工具”需求,我们不想为此引入Redux那样复杂的解决方案。

Zustand以其极简的API和基于Hooks的模式吸引了我们。它让我们能快速构建一个响应式的仪表盘,而无需编写大量的样板代码。

我们的仪表盘核心需求是:

  1. 实时展示正在运行的特征计算任务。
  2. 显示每个特征的最新生成时间(新鲜度)。
  3. 可视化计算集群的资源利用率。

我们通过一个WebSocket服务将后端的监控数据推送到前端。Zustand store负责管理这些实时数据流。

// stores/monitoringStore.js
import { create } from 'zustand';
import { immer } from 'zustand/middleware/immer';

// This store manages the real-time state of the feature engineering platform.
export const useMonitoringStore = create(
  immer((set, get) => ({
    // State for computation jobs
    jobs: {
      active: {}, // { jobId: { status, user_id, startTime } }
      completedCount: 0,
      failedCount: 0,
    },
    // State for feature freshness
    features: {
      // { featureName: { lastUpdated, lagSeconds } }
    },
    // State for cluster health
    cluster: {
      cpuUsage: 0,
      memoryUsage: 0,
      nodeCount: 0,
    },
    connectionStatus: 'disconnected',
    webSocket: null,

    // Actions
    connect: () => {
      if (get().webSocket) return; // Already connected or connecting

      const socket = new WebSocket('wss://monitoring.internal.service/ws');
      set((state) => {
        state.connectionStatus = 'connecting';
        state.webSocket = socket;
      });

      socket.onopen = () => {
        set((state) => {
          state.connectionStatus = 'connected';
        });
      };

      socket.onmessage = (event) => {
        const data = JSON.parse(event.data);
        // Using immer makes state updates here immutable and safe
        set((state) => {
          switch (data.type) {
            case 'JOB_STARTED':
              state.jobs.active[data.payload.jobId] = data.payload;
              break;
            case 'JOB_COMPLETED':
              delete state.jobs.active[data.payload.jobId];
              state.jobs.completedCount += 1;
              break;
            case 'JOB_FAILED':
              delete state.jobs.active[data.payload.jobId];
              state.jobs.failedCount += 1;
              break;
            case 'FEATURE_UPDATED':
              state.features[data.payload.featureName] = {
                lastUpdated: data.payload.timestamp,
                lagSeconds: (Date.now() / 1000) - data.payload.timestamp,
              };
              break;
            case 'CLUSTER_STATS':
              state.cluster = data.payload;
              break;
            default:
              break;
          }
        });
      };

      socket.onclose = () => {
        set((state) => {
          state.connectionStatus = 'disconnected';
          state.webSocket = null;
        });
        // Implement reconnection logic here
      };
      
      socket.onerror = (error) => {
        console.error('WebSocket Error:', error);
        socket.close();
      };
    },
    
    disconnect: () => {
      get().webSocket?.close();
    },
  }))
);

React组件可以非常简洁地订阅这些状态变化:

// components/JobsDashboard.jsx
import React, { useEffect } from 'react';
import { useMonitoringStore } from '../stores/monitoringStore';

export const JobsDashboard = () => {
  // Select specific state slices for performance optimization
  const activeJobs = useMonitoringStore((state) => Object.values(state.jobs.active));
  const { completedCount, failedCount } = useMonitoringStore((state) => state.jobs);
  const { connect, disconnect, connectionStatus } = useMonitoringStore((state) => ({
      connect: state.connect,
      disconnect: state.disconnect,
      connectionStatus: state.connectionStatus,
  }));

  useEffect(() => {
    connect();
    return () => disconnect(); // Clean up on unmount
  }, [connect, disconnect]);

  return (
    <div>
      <h2>Job Status ({connectionStatus})</h2>
      <p>Active: {activeJobs.length} | Completed: {completedCount} | Failed: {failedCount}</p>
      <table>
        <thead>
          <tr>
            <th>Job ID</th>
            <th>User ID</th>
            <th>Status</th>
          </tr>
        </thead>
        <tbody>
          {activeJobs.map(job => (
            <tr key={job.jobId}>
              <td>{job.jobId}</td>
              <td>{job.userId}</td>
              <td>{job.status}</td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
};

我们使用了immer中间件,它让我们能够用“可变”的语法来更新状态,背后则自动处理了不可变性,代码更直观,也减少了出错的可能。这个监控仪表盘的快速交付,让我们在Scrum评审会上获得了业务方的高度评价。

平台的局限性与未来迭代

尽管目前的平台解决了在线/离线特征不一致的核心痛点,但它远非完美。我们当前的架构仍然存在一些明显的局限性:

  1. HBase的运维复杂性: 虽然HBase性能强大,但它是一个复杂的分布式系统。Region热点、Compaction风暴、Thrift Server的调优等问题,都需要专业的运维经验。这对于一个小团队来说,是一个持续的负担。
  2. 特征发现与治理缺失: 平台现在像一个黑盒,数据科学家很难发现已经存在哪些特征,特征的定义、血缘关系也没有被很好地管理。下一个大的Epic将是构建一个特征元数据中心。
  3. 计算模式单一: 目前主要支持基于Python的批处理计算。对于需要更低延迟的流式特征(如Flink),平台还需要进行架构扩展。
  4. Zustand的适用边界: 对于我们当前的监控仪表盘,Zustand非常合适。但如果未来这个内部工具变得极其复杂,涉及跨多个领域的大量状态和复杂的副作用管理,我们可能需要重新评估是否引入更结构化的方案,或者将store拆分成更多更小的逻辑单元。

  目录