我们团队接到的初始Epic,听起来像是每个数据驱动型公司都会遇到的典型问题:统一在线和离线特征工程流程。现实情况是,我们的在线服务(Java栈)和离线分析(Python栈)各自维护着一套特征计算逻辑,代码重复、逻辑不一致、特征穿越问题层出不穷。一个模型在线上和线下的表现差异,追根溯源往往就是因为某个特征的计算窗口差了几个小时。这种混乱状态,在连续几个Scrum Sprint的复盘会上被反复提及,最终促成了这个平台的立项。
我们的目标很明确:构建一个单一数据源、统一计算逻辑的混合特征平台。它需要能为在线推理系统提供毫秒级的单点特征查询,同时也要支持数据科学家们用熟悉的Python工具进行大规模的离线特征回溯和分析。
Sprint 1-2: 技术选型与数据模型地基
在最初的几个技术评审会中,争论的焦点集中在存储层。方案A是Redis + S3,Redis负责在线热数据,S3存储全量历史数据。这个方案很常见,但问题在于数据同步的复杂性和一致性保障。我们预见到大量的胶水代码和潜在的数据延迟,这正是我们想解决的问题。
方案B,也是我们最终的选择:HBase。它的优势在于:
- 原生支持稀疏宽表: 完美契合特征存储场景。一个实体(如用户ID)作为RowKey,其下可以有成百上千个特征,每个特征是一个Qualifier。
- 低延迟随机读: 基于LSM树的架构和BlockCache机制,通过精心设计的RowKey,HBase可以轻松实现个位数毫秒级的点查,满足在线服务要求。
- MapReduce/Spark集成: 对离线批处理非常友好,可以直接作为数据源进行大规模扫描。
确定了HBase,我们立即开始设计核心的数据模型。这是整个系统的基石,一旦出错,后期修改成本极高。我们设计的RowKey至关重要,它需要同时服务于点查和范围扫描。
[entity_id]#[reversed_timestamp]
-
entity_id
: 用户ID、商品ID等实体的唯一标识。 -
#
: 分隔符。 -
reversed_timestamp
:Long.MAX_VALUE - timestamp
。这样做是为了让最新的数据在HBase内部排在最前面,方便获取最新的特征版本或进行时间窗口扫描。
一个典型的特征行看起来会是这样:
- RowKey:
user_12345#9223372036854775807
- Column Family:
f
(features) - Qualifiers:
f:last_7d_purchase_amount
,f:realtime_clicks_per_hour
,f:user_profile_vector_v1
我们用HBase Shell定义了这张核心的特征表:
# feature_store.hbase_schema.sh
# ------------------------------------------------
# 核心特征表定义
# 启用布隆过滤器以加速行查询
# 使用SNAPPY压缩减少存储成本
# 预分区以避免热点问题,这里以16个region为例
# ------------------------------------------------
create 'feature_platform:features', {
NAME => 'f',
VERSIONS => 3, # 保留最近3个版本的特征值,用于回溯或模型调试
BLOCKCACHE => 'true',
BLOOMFILTER => 'ROW',
COMPRESSION => 'SNAPPY',
TTL => '2592000' # 特征默认保留30天
},
{
SPLITS => (1..15).map { |i| "user_%x" % (i * 10) }
}
# 增加一个列族用于存放元数据或调试信息
alter 'feature_platform:features', {
NAME => 'meta',
VERSIONS => 1,
BLOCKCACHE => 'false'
}
# 授权给计算服务和API服务对应的角色
grant 'feature_compute_role', 'RWXCA', 'feature_platform:features'
grant 'feature_api_role', 'R', 'feature_platform:features'
这个简单的Schema定义,在Sprint评审会上通过了。一个常见的错误是在这里过度设计,创建多个列族。在真实项目中,除非访问模式有巨大差异,否则单个列族通常性能更好,因为它能保证同一行的数据局部性。
Sprint 3-4: 不可变基础设施的自动化构建
解决了存储,下一个挑战是计算环境。我们的数据科学家习惯于Python和SciPy/NumPy生态,而生产环境要求稳定、可复现。过去的痛点是每个人的开发环境和生产环境都存在细微差异,导致“在我机器上能跑”的经典问题。
我们决定采用不可变基础设施(Immutable Infrastructure)模式。所有计算节点的环境都是从一个预先构建好的机器镜像(AMI)启动的,运行期间不做任何变更。这杜绝了配置漂移。Packer是实现这一点的完美工具。
我们定义了一个Packer模板,用于构建包含Python环境、SciPy、HBase客户端以及我们内部库的“特征计算基础镜像”。
// packer/feature-compute.pkr.hcl
packer {
required_plugins {
amazon = {
version = ">= 1.0.0"
source = "github.com/hashicorp/amazon"
}
}
}
variable "aws_region" {
type = string
default = "us-east-1"
}
variable "source_ami" {
type = string
description = "Base AMI with corporate security policies"
}
variable "app_version" {
type = string
description = "Version of the feature engineering application"
}
source "amazon-ebs" "feature-compute" {
region = var.aws_region
source_ami = var.source_ami
instance_type = "m5.large"
ssh_username = "ec2-user"
ami_name = "feature-compute-node-${var.app_version}-${formatdate("YYYYMMDDHHmmss", timestamp())}"
tags = {
Name = "FeatureComputeNode"
Version = var.app_version
CreatedBy = "Packer"
}
}
build {
sources = ["source.amazon-ebs.feature-compute"]
provisioner "shell" {
inline = [
"sudo yum update -y",
"sudo yum install -y python3-devel gcc krb5-devel",
"sudo pip3 install --upgrade pip"
]
}
provisioner "shell" {
script = "./scripts/setup_python_env.sh"
}
provisioner "file" {
source = "../dist/feature-engine.tar.gz"
destination = "/tmp/feature-engine.tar.gz"
}
provisioner "shell" {
inline = [
"echo 'Installing application code...'",
"sudo mkdir -p /opt/app",
"sudo tar -xzf /tmp/feature-engine.tar.gz -C /opt/app",
"rm /tmp/feature-engine.tar.gz",
"echo 'Installation complete.'"
]
}
}
setup_python_env.sh
脚本负责安装核心依赖。这里的坑在于,必须精确锁定版本,避免未来构建时因依赖库的次要版本更新导致行为不一致。
#!/bin/bash
# scripts/setup_python_env.sh
set -e # Exit immediately if a command exits with a non-zero status.
echo "Setting up Python environment..."
# We use specific versions to ensure reproducibility
sudo pip3 install \
numpy==1.24.3 \
scipy==1.10.1 \
pandas==2.0.1 \
happybase==1.2.0 \
requests==2.31.0
# Install HBase client native libs if needed
# ...
echo "Python environment setup complete."
这个流程被集成到CI/CD中。每次代码合并到主分支,都会触发Packer构建一个新的AMI。部署时,我们只需要用新的AMI替换旧的Auto Scaling Group启动模板,然后进行滚动更新即可。整个过程零停机、完全自动化,极大地提升了部署的可靠性。
Sprint 5-7: SciPy特征计算引擎的实现
有了坚实的基础设施,我们开始编写核心的特征计算逻辑。我们选择了一个典型的业务场景:计算用户在金融产品上的行为序列特征。这涉及到信号处理和平滑,SciPy是理想的工具。
下面是一个简化的特征计算任务,它读取原始事件,使用scipy.signal
进行滤波,然后将结果写入HBase。
# feature_engine/processors/user_behavior.py
import logging
import time
from typing import List, Dict
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import happybase
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# HBase connection pool should be managed globally in a real application
# For simplicity, we establish a connection here.
HBASE_HOST = 'hbase-thrift.internal.service'
HBASE_TABLE = 'feature_platform:features'
COLUMN_FAMILY = 'f'
class FeatureProcessor:
def __init__(self, hbase_host: str, table_name: str):
self.hbase_host = hbase_host
self.table_name = table_name
self.connection = None
self._connect()
def _connect(self):
"""Establishes a connection to HBase. Includes basic retry logic."""
retries = 3
for i in range(retries):
try:
self.connection = happybase.Connection(self.hbase_host, autoconnect=False)
self.connection.open()
self.table = self.connection.table(self.table_name)
logging.info(f"Successfully connected to HBase at {self.hbase_host}")
return
except Exception as e:
logging.error(f"HBase connection failed (attempt {i+1}/{retries}): {e}")
time.sleep(2 ** i)
raise ConnectionError("Could not connect to HBase after several retries.")
def process_user_transactions(self, user_id: str, events: List[Dict]):
"""
Calculates advanced behavioral features from a user's transaction events.
"""
if not events:
logging.warning(f"No events found for user {user_id}. Skipping.")
return
try:
df = pd.DataFrame(events).sort_values('timestamp')
# Feature 1: Smoothed transaction amount over time
# Using Savitzky-Golay filter to smooth out noise while preserving signal shape.
# This is more robust than a simple moving average for certain patterns.
if len(df['amount']) > 5: # Filter requires window_length > polyorder
smoothed_amounts = savgol_filter(df['amount'], window_length=5, polyorder=2)
latest_smoothed_amount = smoothed_amounts[-1]
else:
latest_smoothed_amount = df['amount'].mean()
# Feature 2: Inter-transaction time statistics
time_deltas = df['timestamp'].diff().dropna().dt.total_seconds()
mean_time_delta = time_deltas.mean()
std_time_delta = time_deltas.std()
# Prepare data for HBase
# The timestamp for the features is the timestamp of the latest event
latest_timestamp = df['timestamp'].iloc[-1].timestamp()
reversed_ts = 2**63 - 1 - int(latest_timestamp * 1000)
row_key = f"{user_id}#{reversed_ts}".encode('utf-8')
features_to_write = {
f'{COLUMN_FAMILY}:txn_amount_smoothed_sgf': str(latest_smoothed_amount).encode('utf-8'),
f'{COLUMN_FAMILY}:txn_time_delta_mean_sec': str(mean_time_delta).encode('utf-8'),
f'{COLUMN_FAMILY}:txn_time_delta_std_sec': str(std_time_delta).encode('utf-8'),
}
# Atomic batch write to HBase
self.table.put(row_key, features_to_write)
logging.info(f"Successfully processed and stored features for user {user_id}")
except Exception as e:
logging.error(f"Error processing features for user {user_id}: {e}", exc_info=True)
# In a real system, failed events would be sent to a dead-letter queue.
def close(self):
if self.connection:
self.connection.close()
logging.info("HBase connection closed.")
# --- Unit Test Thinking ---
# To test `process_user_transactions`:
# 1. Mock the `happybase.Connection` and `table.put` methods.
# 2. Create sample event dataframes of varying lengths (empty, <5, >5).
# 3. Assert that `savgol_filter` is called with correct parameters.
# 4. Assert that the `table.put` method is called with a correctly formatted row key and feature values.
# 5. Test edge cases like DataFrames with NaNs or single events.
这个计算脚本被打包到我们用Packer构建的镜像中,并通过Airflow或一个简单的CRON作业调度执行。它的健壮性体现在日志记录、连接重试和对异常情况的 graceful handling。
Sprint 8-10: 实时监控仪表盘与Zustand的状态管理
平台上线后,新的问题出现了。运维和数据科学家们需要一个界面来监控特征的生成状态、数据新鲜度以及计算节点的健康状况。这是一个典型的“内部工具”需求,我们不想为此引入Redux那样复杂的解决方案。
Zustand以其极简的API和基于Hooks的模式吸引了我们。它让我们能快速构建一个响应式的仪表盘,而无需编写大量的样板代码。
我们的仪表盘核心需求是:
- 实时展示正在运行的特征计算任务。
- 显示每个特征的最新生成时间(新鲜度)。
- 可视化计算集群的资源利用率。
我们通过一个WebSocket服务将后端的监控数据推送到前端。Zustand store负责管理这些实时数据流。
// stores/monitoringStore.js
import { create } from 'zustand';
import { immer } from 'zustand/middleware/immer';
// This store manages the real-time state of the feature engineering platform.
export const useMonitoringStore = create(
immer((set, get) => ({
// State for computation jobs
jobs: {
active: {}, // { jobId: { status, user_id, startTime } }
completedCount: 0,
failedCount: 0,
},
// State for feature freshness
features: {
// { featureName: { lastUpdated, lagSeconds } }
},
// State for cluster health
cluster: {
cpuUsage: 0,
memoryUsage: 0,
nodeCount: 0,
},
connectionStatus: 'disconnected',
webSocket: null,
// Actions
connect: () => {
if (get().webSocket) return; // Already connected or connecting
const socket = new WebSocket('wss://monitoring.internal.service/ws');
set((state) => {
state.connectionStatus = 'connecting';
state.webSocket = socket;
});
socket.onopen = () => {
set((state) => {
state.connectionStatus = 'connected';
});
};
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
// Using immer makes state updates here immutable and safe
set((state) => {
switch (data.type) {
case 'JOB_STARTED':
state.jobs.active[data.payload.jobId] = data.payload;
break;
case 'JOB_COMPLETED':
delete state.jobs.active[data.payload.jobId];
state.jobs.completedCount += 1;
break;
case 'JOB_FAILED':
delete state.jobs.active[data.payload.jobId];
state.jobs.failedCount += 1;
break;
case 'FEATURE_UPDATED':
state.features[data.payload.featureName] = {
lastUpdated: data.payload.timestamp,
lagSeconds: (Date.now() / 1000) - data.payload.timestamp,
};
break;
case 'CLUSTER_STATS':
state.cluster = data.payload;
break;
default:
break;
}
});
};
socket.onclose = () => {
set((state) => {
state.connectionStatus = 'disconnected';
state.webSocket = null;
});
// Implement reconnection logic here
};
socket.onerror = (error) => {
console.error('WebSocket Error:', error);
socket.close();
};
},
disconnect: () => {
get().webSocket?.close();
},
}))
);
React组件可以非常简洁地订阅这些状态变化:
// components/JobsDashboard.jsx
import React, { useEffect } from 'react';
import { useMonitoringStore } from '../stores/monitoringStore';
export const JobsDashboard = () => {
// Select specific state slices for performance optimization
const activeJobs = useMonitoringStore((state) => Object.values(state.jobs.active));
const { completedCount, failedCount } = useMonitoringStore((state) => state.jobs);
const { connect, disconnect, connectionStatus } = useMonitoringStore((state) => ({
connect: state.connect,
disconnect: state.disconnect,
connectionStatus: state.connectionStatus,
}));
useEffect(() => {
connect();
return () => disconnect(); // Clean up on unmount
}, [connect, disconnect]);
return (
<div>
<h2>Job Status ({connectionStatus})</h2>
<p>Active: {activeJobs.length} | Completed: {completedCount} | Failed: {failedCount}</p>
<table>
<thead>
<tr>
<th>Job ID</th>
<th>User ID</th>
<th>Status</th>
</tr>
</thead>
<tbody>
{activeJobs.map(job => (
<tr key={job.jobId}>
<td>{job.jobId}</td>
<td>{job.userId}</td>
<td>{job.status}</td>
</tr>
))}
</tbody>
</table>
</div>
);
};
我们使用了immer中间件,它让我们能够用“可变”的语法来更新状态,背后则自动处理了不可变性,代码更直观,也减少了出错的可能。这个监控仪表盘的快速交付,让我们在Scrum评审会上获得了业务方的高度评价。
平台的局限性与未来迭代
尽管目前的平台解决了在线/离线特征不一致的核心痛点,但它远非完美。我们当前的架构仍然存在一些明显的局限性:
- HBase的运维复杂性: 虽然HBase性能强大,但它是一个复杂的分布式系统。Region热点、Compaction风暴、Thrift Server的调优等问题,都需要专业的运维经验。这对于一个小团队来说,是一个持续的负担。
- 特征发现与治理缺失: 平台现在像一个黑盒,数据科学家很难发现已经存在哪些特征,特征的定义、血缘关系也没有被很好地管理。下一个大的Epic将是构建一个特征元数据中心。
- 计算模式单一: 目前主要支持基于Python的批处理计算。对于需要更低延迟的流式特征(如Flink),平台还需要进行架构扩展。
- Zustand的适用边界: 对于我们当前的监控仪表盘,Zustand非常合适。但如果未来这个内部工具变得极其复杂,涉及跨多个领域的大量状态和复杂的副作用管理,我们可能需要重新评估是否引入更结构化的方案,或者将store拆分成更多更小的逻辑单元。