项目启动时,我们面临一个棘手的技术需求:构建一个用户画像系统,它不仅需要向前端实时推送用户的最新特征(如“最近活跃度”、“购买倾向指数”),还需要支持对用户之间深层、复杂的关系网络进行即时分析(如“探索与该用户有三度社交关联且具有相似购物偏好的群体”)。前者要求极低的读取延迟和实时更新能力,后者则需要强大的图计算和遍历能力。
单一数据库方案很快被否决。如果只用 Firestore 这样的文档数据库,实时推送和简单的键值查询性能优异,但执行多层关系查询几乎是不可能的,需要在应用层做大量的循环和拼接,性能会随着关系深度的增加呈指数级下降。反之,如果只用 Dgraph 这样的图数据库,虽然能完美处理复杂关系查询,但将其用于高并发、低延迟的用户单点特征读取场景,不仅是功能上的冗余,其事务模型和数据结构也不如文档数据库那样为前端应用优化得好。
这导向了一个清晰的结论:采用多模数据库(Polyglot Persistence)架构。让每种数据存储只做它最擅长的事情。我们的最终选型是 Dask 用于离线数据处理,Dgraph 存储关系图谱,Firestore 存储实时特征,而前端则使用 Valtio 来聚合这两个异构数据源的状态。
架构决策:为何是 Dask + Dgraph + Firestore?
这个决策的核心是数据流和读写模式的分离。
Dask:海量数据处理的引擎
我们的原始数据源是存储在对象存储中的数十 TB 的用户行为日志(Parquet 格式)。我们需要一个能够并行处理这些数据、计算用户特征并提取实体间关系的工具。Dask 是一个自然的选择,它能用熟悉的 Pandas 和 NumPy API 来操作无法装入单机内存的数据集,并能轻松扩展到多机集群。在真实项目中,这意味着我们可以用较低的成本完成大规模的 ETL 和特征工程。Dgraph:复杂关系网络的归宿
用户之间的社交关系、用户与产品的交互关系、产品之间的关联关系……这些构成了一个巨大的图。Dgraph 原生支持 GraphQL,并且其底层设计就是为图遍历优化的。将这些关系数据存入 Dgraph,意味着我们可以用一条简单的 GraphQL 查询替代传统数据库中需要多次 JOIN 甚至递归查询的复杂操作。Firestore:面向前端的实时特征缓存层
Dask 计算出的用户特征,例如last_active_timestamp,propensity_score,是扁平的键值对结构。这类数据需要被前端以极低的延迟获取,并且当特征更新时,UI 应当实时响应。Firestore 的实时监听能力和为客户端 SDK 优化的数据结构使其成为这个场景下的不二之选。我们把它看作一个持久化的、实时的、面向最终用户的“物化视图”。Valtio:弥合前端状态的裂缝
前端需要同时消费来自 Firestore 的实时数据和来自 Dgraph 的按需图查询数据。这两种数据的更新频率和获取方式完全不同。使用 Valtio 这种基于 Proxy 的状态管理器,我们可以将这两个数据源的逻辑封装在统一的状态对象中,而 UI 组件只需订阅这个状态,无需关心数据具体来自哪里,从而极大简化了前端的复杂度。
整体数据流如下所示:
graph TD
subgraph "离线处理层 (Batch Processing)"
A[S3/GCS: Raw Logs in Parquet] --> B(Dask Cluster);
end
subgraph "数据持久化层 (Persistence Layer)"
B --> C{Dgraph: Graph Data};
B --> D[Firestore: Real-time Features];
end
subgraph "服务与展现层 (Serving & Presentation)"
E[Backend API Gateway] --> C;
E --> D;
F[React Frontend] --> E;
F -- Real-time Updates --> D;
end
subgraph "前端状态管理"
G(Valtio Store)
end
F -- Manages State --> G;
核心实现:从数据处理到前端呈现
下面我们将通过代码来展示这个架构的关键环节。
1. Dask 并行处理与双写数据库
这是整个流程的起点。我们假设有一个 Dask 作业,定期运行,处理新的日志数据。
process_user_data.py
import dask.dataframe as dd
import pandas as pd
import logging
from typing import Dict, Any
from config import DaskConfig, FirestoreConfig, DgraphConfig
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 模拟数据库客户端(在生产中应使用真实的SDK)
class MockFirestoreClient:
def collection(self, name):
logging.info(f"[Firestore] Accessing collection: {name}")
return self
def document(self, doc_id):
logging.info(f"[Firestore] Accessing document: {doc_id}")
return self
def set(self, data, merge=False):
logging.info(f"[Firestore] Setting data (merge={merge}): {data}")
# 在真实实现中,这里会有网络调用
pass
class MockDgraphClient:
def txn(self):
return self
def mutate(self, set_nquads):
logging.info(f"[Dgraph] Mutating nquads:\n{set_nquads}")
# 在真实实现中,这里会有网络调用
pass
def commit(self):
logging.info("[Dgraph] Committing transaction.")
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.commit()
else:
logging.error(f"Transaction failed: {exc_val}")
# 实际项目中,这里会是真实的客户端初始化
# from google.cloud import firestore
# import pydgraph
# firestore_client = firestore.Client(project=FirestoreConfig.PROJECT_ID)
# dgraph_client_stub = pydgraph.DgraphClientStub(f"{DgraphConfig.HOST}:{DgraphConfig.PORT}")
# dgraph_client = pydgraph.DgraphClient(dgraph_client_stub)
firestore_client = MockFirestoreClient()
dgraph_client = MockDgraphClient()
def process_partition(df: pd.DataFrame) -> int:
"""
处理单个 Pandas DataFrame 分区。
这个函数会被 Dask 并行调用。
"""
processed_count = 0
try:
user_features = {}
mutations = []
for row in df.itertuples():
user_id = str(row.user_id)
# 1. 计算扁平化的用户特征 (用于 Firestore)
if user_id not in user_features:
user_features[user_id] = {'interaction_count': 0, 'last_seen': row.timestamp}
user_features[user_id]['interaction_count'] += 1
if row.timestamp > user_features[user_id]['last_seen']:
user_features[user_id]['last_seen'] = str(row.timestamp)
# 2. 提取图关系 (用于 Dgraph)
# 这里的 N-Quads 格式是 Dgraph 的标准输入格式
mutations.append(f'_:user_{user_id} <dgraph.type> "User" .')
mutations.append(f'_:user_{user_id} <user_id> "{user_id}" .')
if hasattr(row, 'friend_id') and pd.notna(row.friend_id):
friend_id = str(int(row.friend_id))
mutations.append(f'_:user_{user_id} <knows> _:user_{friend_id} .')
mutations.append(f'_:user_{friend_id} <dgraph.type> "User" .')
mutations.append(f'_:user_{friend_id} <user_id> "{friend_id}" .')
processed_count += 1
# 批量写入 Firestore
# 在真实项目中,这里应该使用 Firestore 的 BatchWriter 来提高性能
for user_id, features in user_features.items():
doc_ref = firestore_client.collection('user_features').document(user_id)
doc_ref.set(features, merge=True)
# 批量写入 Dgraph
nquads_payload = "\n".join(mutations)
if nquads_payload:
with dgraph_client.txn() as txn:
txn.mutate(set_nquads=nquads_payload)
logging.info(f"Processed partition with {len(df)} rows. Found {len(user_features)} users.")
return processed_count
except Exception as e:
logging.error(f"Error processing partition: {e}", exc_info=True)
# 这里的错误处理很关键,是保证作业稳定性的基础
# 可以加入重试机制或者将失败的分区信息记录到死信队列
return 0
def run_feature_engineering_job(source_path: str):
"""
主作业函数,使用 Dask 读取数据并分发处理任务。
"""
logging.info(f"Starting job. Reading data from {source_path}")
# 使用 Dask 读取大型 Parquet 文件集
# blocksize 控制了每个分区的大小,这是性能调优的关键参数
ddf = dd.read_parquet(
source_path,
engine='pyarrow',
blocksize=DaskConfig.BLOCK_SIZE
)
# 在 Dask 中,一个常见的错误是直接在 ddf 上进行迭代或操作。
# 正确的方式是使用 map_partitions,它将函数应用到每个底层的 Pandas DataFrame。
# meta 参数定义了输出的结构,有助于 Dask 优化执行计划。
result = ddf.map_partitions(process_partition, meta=(None, 'int64')).compute()
total_processed = result.sum()
logging.info(f"Job finished. Total rows processed: {total_processed}")
if __name__ == "__main__":
# 模拟一个 Parquet 数据源目录
# 在生产环境中,这会指向 S3 或 GCS 上的真实路径
# e.g., "s3://my-data-bucket/user-logs/date=2023-10-27/"
mock_source_path = "./mock_data.parquet"
# 创建模拟数据
mock_df = pd.DataFrame({
'user_id': [101, 102, 101, 103, 102],
'timestamp': pd.to_datetime(['2023-10-27 10:00', '2023-10-27 10:05', '2023-10-27 10:10', '2023-10-27 10:15', '2023-10-27 10:20']),
'action': ['login', 'view_item', 'add_to_cart', 'login', 'knows_friend'],
'friend_id': [None, None, None, None, 101]
})
mock_df.to_parquet(mock_source_path)
run_feature_engineering_job(mock_source_path)
这里的关键在于 process_partition 函数。它在一个原子操作单元(一个 Pandas DataFrame)内,同时准备了写入 Firestore 的数据和写入 Dgraph 的 N-Quads。这个双写操作的原子性是一个挑战。当前实现不是严格事务性的,Dask 作业的重试可能导致数据不一致。在生产环境中,一个更健壮的方案是 Dask 先将处理结果写入一个可靠的消息队列(如 Kafka),然后由两个独立的消费者分别写入 Dgraph 和 Firestore,通过消费者组的位移来保证至少一次或精确一次的处理。但对于可接受最终一致性的特征计算场景,当前方案因其简单性而具备优势。
2. 后端 API 网关:聚合异构数据源
后端需要提供一个接口,能够根据请求,智能地从 Dgraph 或组合两个数据源的数据返回给前端。我们使用 FastAPI 来演示。
main.py
from fastapi import FastAPI, HTTPException
import httpx
import logging
from typing import Dict, Any
# 配置
DGRAPH_GQL_ENDPOINT = "http://localhost:8080/graphql" # Dgraph GraphQL endpoint
app = FastAPI()
logger = logging.getLogger("api")
# Dgraph 查询模板
# 使用参数化查询防止注入
FIND_FRIENDS_OF_FRIENDS_QUERY = """
query GetRelatedUsers($userId: string, $depth: int) {
queryUser(filter: { user_id: { eq: $userId } }) {
user_id
knows @cascade(depth: $depth) {
user_id
}
}
}
"""
@app.get("/user/{user_id}/graph")
async def get_user_graph_data(user_id: str, depth: int = 2):
"""
从 Dgraph 获取用户的图关系数据。
这里的坑在于超时和错误处理。对图数据库的深度查询可能非常耗时。
"""
if depth > 3:
# 保护性编程:限制查询深度,防止恶意请求拖垮数据库
raise HTTPException(status_code=400, detail="Query depth cannot exceed 3.")
variables = {"userId": user_id, "depth": depth}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.post(
DGRAPH_GQL_ENDPOINT,
json={"query": FIND_FRIENDS_OF_FRIENDS_QUERY, "variables": variables},
)
response.raise_for_status() # 如果状态码是 4xx 或 5xx,则抛出异常
data = response.json()
if data.get("errors"):
logger.error(f"Dgraph query failed: {data['errors']}")
raise HTTPException(status_code=500, detail="Error querying graph database.")
return data.get("data", {})
except httpx.ReadTimeout:
logger.warning(f"Dgraph query timed out for user {user_id} with depth {depth}")
raise HTTPException(status_code=504, detail="Graph query timed out.")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error.")
这个 API 端点只查询 Dgraph。在更复杂的场景中,一个端点可能会先从 Firestore 获取用户的基本特征,再根据某些特征决定是否需要去 Dgraph 进行深度图查询,最后将结果合并返回。这种模式将复杂性隔离在了后端,为前端提供了清晰、统一的接口。
3. 前端状态管理:Valtio 的优雅之道
前端的挑战在于如何优雅地管理两个来源的数据:一个是来自 Firestore 的实时推送,另一个是用户手动触发的、对后端 API 的异步请求(查询 Dgraph)。
store.ts
import { proxy, subscribe } from 'valtio';
import { onSnapshot, doc, DocumentData } from 'firebase/firestore';
import { firestore } from './firebaseConfig'; // 假设这是你的 Firebase 初始化文件
interface UserGraph {
loading: boolean;
error: string | null;
data: any; // 实际项目中应定义更精确的类型
}
interface AppState {
currentUser: {
id: string | null;
features: DocumentData | null;
unsubscribe: (() => void) | null;
};
userGraph: UserGraph;
}
// Valtio 的核心:一个可变的代理对象
export const state = proxy<AppState>({
currentUser: {
id: null,
features: null,
unsubscribe: null,
},
userGraph: {
loading: false,
error: null,
data: null,
},
});
// --- Actions ---
// 监听指定用户的实时特征
export function watchUserFeatures(userId: string) {
if (state.currentUser.unsubscribe) {
state.currentUser.unsubscribe(); // 取消上一个监听
}
if (!userId) {
state.currentUser.id = null;
state.currentUser.features = null;
state.currentUser.unsubscribe = null;
return;
}
state.currentUser.id = userId;
const docRef = doc(firestore, 'user_features', userId);
// onSnapshot 会在数据首次加载和后续每次变更时触发
const unsubscribe = onSnapshot(docRef, (docSnap) => {
if (docSnap.exists()) {
// 直接修改 proxy 对象,Valtio 会自动通知相关组件
state.currentUser.features = docSnap.data();
} else {
console.warn(`User features for ${userId} not found.`);
state.currentUser.features = null;
}
}, (error) => {
console.error("Firestore subscription error:", error);
state.currentUser.features = null; // 清理状态
});
state.currentUser.unsubscribe = unsubscribe;
}
// 从我们的后端 API 获取图数据
export async function fetchUserGraph(userId: string, depth: number) {
if (!userId) return;
state.userGraph.loading = true;
state.userGraph.error = null;
try {
const response = await fetch(`/api/user/${userId}/graph?depth=${depth}`);
if (!response.ok) {
throw new Error(`API request failed with status ${response.status}`);
}
const data = await response.json();
state.userGraph.data = data;
} catch (error: any) {
state.userGraph.error = error.message;
state.userGraph.data = null; // 出错时清空数据
} finally {
state.userGraph.loading = false;
}
}
// 这是一个单元测试的思路
// 我们可以模拟 firestore 和 fetch,然后调用 action,检查 state 的变化
// e.g. using vitest
// test('watchUserFeatures should update state', () => {
// // mock onSnapshot
// watchUserFeatures('user123');
// // assert that state.currentUser.features is updated
// });
UserProfile.tsx (React Component)
import React, { useEffect } from 'react';
import { useSnapshot } from 'valtio';
import { state, watchUserFeatures, fetchUserGraph } from './store';
const UserProfile: React.FC<{ userId: string }> = ({ userId }) => {
// useSnapshot 创建了对 state 的一个不可变快照
// 当 state 变化时,组件会自动重渲染
const snap = useSnapshot(state);
useEffect(() => {
watchUserFeatures(userId);
// 组件卸载时,清理 Firestore 监听器
return () => {
if (snap.currentUser.unsubscribe) {
snap.currentUser.unsubscribe();
}
};
}, [userId]); // 当 userId 变化时,重新设置监听
const handleFetchGraph = () => {
fetchUserGraph(userId, 2);
};
return (
<div>
<h1>User Profile: {userId}</h1>
<div style={{ border: '1px solid #ccc', padding: '10px', marginBottom: '20px' }}>
<h2>Real-time Features (from Firestore)</h2>
{snap.currentUser.features ? (
<ul>
{Object.entries(snap.currentUser.features).map(([key, value]) => (
<li key={key}><strong>{key}:</strong> {JSON.stringify(value)}</li>
))}
</ul>
) : (
<p>Loading real-time features...</p>
)}
</div>
<div style={{ border: '1px solid #ccc', padding: '10px' }}>
<h2>Graph Relationships (from Dgraph)</h2>
<button onClick={handleFetchGraph} disabled={snap.userGraph.loading}>
{snap.userGraph.loading ? 'Loading...' : 'Fetch 2-degree Connections'}
</button>
{snap.userGraph.error && <p style={{ color: 'red' }}>Error: {snap.userGraph.error}</p>}
{snap.userGraph.data && (
<pre>{JSON.stringify(snap.userGraph.data, null, 2)}</pre>
)}
</div>
</div>
);
};
export default UserProfile;
Valtio 的美妙之处在于其极简的 API。我们只需要在一个 proxy 对象上执行异步操作来修改数据,任何使用了 useSnapshot 的组件都会自动、高效地更新。这里,UserProfile 组件完全不知道 currentUser.features 来自 Firestore 的实时流,而 userGraph.data 来自一次性的 fetch 请求。这种关注点分离使得组件逻辑保持纯粹,而数据获取的复杂性则被封装在 store.ts 的 action 中。
架构的局限性与未来迭代
此架构并非没有代价。最主要的挑战是维护两个数据库之间的数据一致性。Dask 作业的双写操作在失败和重试时可能导致 Firestore 和 Dgraph 之间的数据出现偏差。一个长期的优化路径是引入变更数据捕获(CDC)机制。例如,Dask 可以只写入一个主数据库(如 PostgreSQL 或 TiDB),然后使用 Debezium 等工具捕获变更,再通过 Kafka 将这些变更流式传输到 Dgraph 和 Firestore 的物化视图构建器中。这将以增加系统复杂性为代价,换取更强的数据一致性保证。
另一个考量是运维成本。管理一个 Dask 集群、一个 Dgraph 集群和一个 Firestore 实例,比管理单一数据库要复杂得多,需要团队在监控、备份、容灾方面投入更多精力。
最后,前端状态聚合也存在优化空间。当图数据变得非常庞大时,一次性将其全部加载到前端状态中是不明智的。可以考虑在前端实现虚拟化渲染,或者后端 API 支持对图数据进行分页,让 Valtio store 只持有当前视图所需的数据。