构建基于 Pulsar 事件流的实时特征增强型 RAG 微服务架构


要构建一个能应对生产环境复杂性的检索增强生成(RAG)系统,单纯依赖静态文档向量化是远远不够的。真正的挑战在于如何将非结构化的知识(文档)与实时变化的结构化数据(用户画像、商品状态等)高效融合,并以低延迟、高可用的方式提供服务。

一个常见的错误是直接在一个单体应用中堆叠 LlamaIndex 向量检索、特征获取和 LLM 调用。这种设计在原型阶段或许可行,但在真实项目中,数据源是多样的、更新是异步的,服务之间需要解耦以保证各自的伸缩性和稳定性。

本文将探讨一种架构决策过程,旨在解决上述挑战。我们将对比两种截然不同的架构方案,最终选择并实现一个基于 Apache Pulsar 事件流的、微服务化的实时特征增强型 RAG 架构。其核心目标是实现数据更新的实时性、服务间的高度解耦以及整个系统的横向扩展能力。

定义复杂技术问题

我们的目标是构建一个 RAG 系统,它不仅能根据用户问题从知识库中检索相关文档,还能动态地将与问题相关的实时特征(如用户信息、产品实时库存、当前市场价格等)注入到提供给大语言模型(LLM)的上下文中。

具体的技术要求如下:

  1. 数据实时性: 文档知识库和特征数据必须能够近乎实时地更新。一份新文档或一个特征的变更,应在秒级内反映到系统中。
  2. 架构解耦: 数据摄入、向量化、特征处理和查询服务必须是相互独立的微服务,任何一个服务的故障或扩容不应直接影响其他服务。
  3. 查询低延迟: 终端用户查询的端到端延迟(从接收请求到返回 LLM 生成结果)必须在可接受范围内,通常要求 p99 延迟在2秒以内。
  4. 高可用与可扩展性: 整个系统必须是可水平扩展的,并且对消息队列、数据存储等关键组件的故障具有容错能力。

方案A:同步耦合的 REST API 架构

这是一种直觉上最简单的实现方式。我们将系统拆分为几个通过同步 REST API 通信的微服务。

  • Ingestion Service: 提供 API 端点,用于接收新文档和特征更新。它会同步调用 LlamaIndex Service 和 Feature Store Service 的接口来更新数据。
  • LlamaIndex Service: 封装 LlamaIndex 核心逻辑,提供文档索引和向量检索的 API。
  • Feature Store Service: 提供 API 用于读写结构化特征数据。
  • Query Orchestration Service: 作为入口,接收用户查询,依次同步调用 LlamaIndex Service 和 Feature Store Service,组合上下文,最后调用 LLM。

其架构图如下:

graph TD
    subgraph "同步调用链路"
        User -- HTTP Request --> QOS[Query Orchestration Service]
        QOS -- 1. REST Call --> LIS[LlamaIndex Service]
        LIS --> VS[(Vector Store)]
        LIS -- Vector Search Results --> QOS
        QOS -- 2. REST Call --> FSS[Feature Store Service]
        FSS --> FS[(Feature Store)]
        FSS -- Feature Data --> QOS
        QOS -- 3. Prompt --> LLM[Large Language Model]
        LLM -- Response --> QOS
        QOS -- HTTP Response --> User
    end

    subgraph "数据更新链路"
        DataSource -- HTTP POST --> IS[Ingestion Service]
        IS -- sync call --> LIS
        IS -- sync call --> FSS
    end

方案A的优势

  • 简单直观: 逻辑清晰,易于理解和初期实现。同步调用的心智负担较低。
  • 强一致性: 数据更新是同步的,调用方可以立即知道更新是否成功,数据状态相对确定。

方案A的劣势

  • 性能瓶颈与高延迟: 在查询路径上,串行调用 LlamaIndex 和 Feature Store 服务,延迟会累加。任何一个下游服务的抖动都会直接影响总查询时间。
  • 紧密耦合: Ingestion Service 与 LlamaIndex Service、Feature Store Service 强耦合。如果需要增加一个新的数据消费者(例如,一个审计服务),就需要修改 Ingestion Service 的代码。
  • 可用性差: Ingestion Service 严重依赖下游服务的可用性。如果 Feature Store Service 短暂不可用,整个文档摄入流程都会失败,尽管文档向量化本身可能与特征存储无关。这种“级联失败”在分布式系统中是致命的。
  • 削峰填谷能力弱: 面对突发的数据写入请求,同步处理模型会给下游服务带来巨大压力,容易导致系统过载。

在真实项目中,这种架构很快会暴露出其脆弱性。一个常见的场景是,批量导入文档时,同步调用会导致 Ingestion Service 长时间阻塞,甚至超时,而下游的向量化服务和特征服务也因瞬时高并发而性能下降或崩溃。

方案B:基于 Pulsar 的事件驱动解耦架构

为了解决方案A的根本性问题,我们引入一个消息中间件作为系统的“神经中枢”,将同步调用改为异步消息驱动。Apache Pulsar 是一个优秀的选择,其分层存储、多租户、统一消息模型(流和队列)等特性非常适合构建复杂的事件驱动系统。

架构的核心思想是“发布-订阅”模式:

  • Producers: 数据源(或一个轻量级的 Ingestion API Gateway)将原始数据(新文档、特征变更)作为事件发布到 Pulsar 的特定主题(Topic)中。
  • Topics: Pulsar 中定义了不同的主题,如 unstructured-docs-topicstructured-features-topic,用于隔离不同类型的数据流。
  • Consumers: 各个处理单元作为消费者,独立订阅它们关心的主题,并进行异步处理。
    • Indexing Consumer: 订阅 unstructured-docs-topic,负责文档的解析、向量化,并更新到向量数据库。
    • Feature Consumer: 订阅 structured-features-topic,负责更新到 Feature Store。
  • Query Orchestration Service: 查询路径与方案A类似,但数据更新是完全异步解耦的。

其架构图如下:

graph TD
    subgraph "异步数据流 (Event-Driven)"
        DataSource1[Unstructured Data Source] -- Event --> APIG[Ingestion API Gateway]
        DataSource2[Structured Data Source] -- Event --> APIG
        APIG -- Publishes --> Pulsar((Apache Pulsar))
        Pulsar -- unstructured-docs-topic --> IC[Indexing Consumer]
        IC -- Upsert --> VS[(Vector Store)]
        Pulsar -- structured-features-topic --> FC[Feature Consumer]
        FC -- Upsert --> FS[(Feature Store)]
    end
    
    subgraph "查询链路 (Low-Latency Query Path)"
        User -- HTTP Request --> QOS[Query Orchestration Service]
        QOS -- Vector Search --> VS
        QOS -- Feature Lookup --> FS
        QOS -- Enriched Prompt --> LLM[Large Language Model]
        LLM -- Response --> QOS
        QOS -- HTTP Response --> User
    end

方案B的优势

  • 高解耦与可扩展性: 服务之间通过 Pulsar 通信,互不直接依赖。增加新的数据处理服务(如日志审计、实时监控)只需添加一个新的消费者订阅相应主题即可,无需改动现有服务。
  • 韧性与容错: 如果 Feature Consumer 宕机,Indexing Consumer 依然可以正常处理文档,不会造成整个数据管道中断。Pulsar 的消息持久化确保了在消费者恢复后可以继续处理积压的消息。
  • 异步处理与削峰填谷: 数据摄入方只需将消息快速发布到 Pulsar 即可返回,响应时间极短。下游消费者可以按照自己的节奏消费数据,有效应对流量洪峰。
  • 实时性保障: Pulsar 设计上就是为低延迟场景服务的,可以满足秒级的数据更新需求。

方案B的挑战

  • 架构复杂性增加: 引入消息队列带来了额外的运维成本和技术复杂性,例如需要管理 Pulsar 集群、处理消息序列、幂等性、死信队列等问题。
  • 最终一致性: 数据在多个系统(Vector Store, Feature Store)之间的同步是异步的,存在短暂的数据不一致窗口。需要设计好业务逻辑来容忍或处理这种最终一致性。

最终选择与理由

对于一个严肃的、面向生产环境的 RAG 系统,方案B(事件驱动架构)是明显更优的选择。尽管其初始实现更复杂,但它提供的解耦、韧性和可扩展性是系统长期健康发展的基石。在真实项目中,数据源的种类和数量总是会不断增加,业务逻辑也会越来越复杂,一个灵活、可演进的架构至关重要。

选择 Pulsar 而非其他消息队列(如 Kafka)的考量点在于:

  1. 云原生架构: Pulsar 的计算(Broker)与存储(BookKeeper)分离架构,使其在 Kubernetes 等云原生环境中更具弹性伸缩能力。
  2. 内置多租户: 对于大型企业,可以用单个 Pulsar 集群安全地服务于多个业务线或团队,简化了资源管理。
  3. 统一消息模型: Pulsar 天然支持流(Streaming)和队列(Queueing)两种消费模式,无需像 Kafka 那样依赖外部组件(如 Kafka Connect)来实现复杂的数据管道。
  4. 分层存储: Pulsar 可以将老旧数据自动卸载到更廉价的对象存储(如 S3)中,在实现无限事件流存储的同时控制成本。

核心实现概览

我们将使用 Python 生态来实现这个架构的核心组件。假设使用 FastAPI 构建微服务,pulsar-client 与 Pulsar 交互,LlamaIndex 处理向量化,Redis 作为简单的 Feature Store。

1. Pulsar 主题与 Schema 定义

在事件驱动架构中,一个常见的错误是直接在消息体中塞入无结构的 JSON。这会导致后期维护困难。使用 Schema Registry (Pulsar 内置) 和强类型 schema (如 Avro, Protobuf) 是最佳实践。

# common/schemas.py
# 使用 Pydantic 模型,可以方便地转换为 JSON Schema 或 Avro
import pydantic
from typing import Dict, Any, Optional

class DocumentEvent(pydantic.BaseModel):
    """定义非结构化文档事件的 Schema"""
    doc_id: str
    content: str
    metadata: Optional[Dict[str, Any]] = None
    event_timestamp: float

class FeatureEvent(pydantic.BaseModel):
    """定义结构化特征更新事件的 Schema"""
    entity_id: str
    entity_type: str # e.g., 'user', 'product'
    features: Dict[str, Any]
    event_timestamp: float

2. 数据摄入网关 (Ingestion API Gateway)

这是一个轻量级的 FastAPI 服务,负责接收外部数据,校验格式,然后发布到 Pulsar。

# ingestion_gateway/main.py
import pulsar
import json
import logging
from fastapi import FastAPI, HTTPException, status
from common.schemas import DocumentEvent, FeatureEvent
import os

# --- 配置 ---
PULSAR_SERVICE_URL = os.getenv("PULSAR_SERVICE_URL", "pulsar://localhost:6650")
DOC_TOPIC = "persistent://public/default/unstructured-docs"
FEATURE_TOPIC = "persistent://public/default/structured-features"

# --- 日志 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()
client = pulsar.Client(PULSAR_SERVICE_URL)
doc_producer = client.create_producer(DOC_TOPIC)
feature_producer = client.create_producer(FEATURE_TOPIC)

@app.post("/ingest/document", status_code=status.HTTP_202_ACCEPTED)
async def ingest_document(event: DocumentEvent):
    try:
        payload = event.model_dump_json().encode('utf-8')
        doc_producer.send_async(payload, callback=ack_callback)
        logger.info(f"Published document event for doc_id: {event.doc_id}")
        return {"status": "event published"}
    except Exception as e:
        logger.error(f"Failed to publish document event: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error")

@app.post("/ingest/feature", status_code=status.HTTP_202_ACCEPTED)
async def ingest_feature(event: FeatureEvent):
    try:
        payload = event.model_dump_json().encode('utf-8')
        feature_producer.send_async(payload, callback=ack_callback)
        logger.info(f"Published feature event for entity_id: {event.entity_id}")
        return {"status": "event published"}
    except Exception as e:
        logger.error(f"Failed to publish feature event: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error")

def ack_callback(res, msg_id):
    if res != pulsar.Result.Ok:
        logger.error(f"Message failed to publish with id {msg_id}: {res}")
    else:
        logger.info(f"Message published successfully with id {msg_id}")

@app.on_event("shutdown")
def shutdown_event():
    logger.info("Closing Pulsar client...")
    client.close()

这里的坑在于 send_async 的使用。对于高吞吐量的摄入服务,同步发送 producer.send() 会阻塞请求线程,严重影响性能。使用异步发送并配合回调函数来处理发送结果,是生产环境下的标准做法。

3. 异步处理消费者 (Indexing Consumer)

这个服务订阅文档主题,使用 LlamaIndex 进行处理。

# indexing_consumer/consumer.py
import pulsar
import json
import logging
import time
import os
from llama_index.core import Document, VectorStoreIndex, StorageContext
from llama_index.vector_stores.milvus import MilvusVectorStore
from common.schemas import DocumentEvent

# --- 配置 ---
PULSAR_SERVICE_URL = os.getenv("PULSAR_SERVICE_URL", "pulsar://localhost:6650")
DOC_TOPIC = "persistent://public/default/unstructured-docs"
SUBSCRIPTION_NAME = "indexing-subscription"
MILVUS_URI = os.getenv("MILVUS_URI", "http://localhost:19530")

# --- 日志 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IndexingService:
    def __init__(self):
        # 这里的向量存储应该是持久化的,例如 Milvus, Pinecone 等
        # 在真实项目中,这里的初始化会更复杂,包括连接池、重试逻辑等
        self.vector_store = MilvusVectorStore(uri=MILVUS_URI, dim=768, overwrite=False)
        self.storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
        self.index = VectorStoreIndex.from_documents(
            [], storage_context=self.storage_context
        )
        logger.info("IndexingService initialized.")

    def process_document(self, doc_event: DocumentEvent):
        """核心处理逻辑:消费消息并更新向量索引"""
        try:
            llama_document = Document(
                text=doc_event.content,
                doc_id=doc_event.doc_id,
                metadata=doc_event.metadata or {}
            )
            # LlamaIndex 的 insert 方法会自动处理 embedding 和 upsert
            self.index.insert(llama_document)
            logger.info(f"Successfully indexed document: {doc_event.doc_id}")
        except Exception as e:
            logger.error(f"Failed to index document {doc_event.doc_id}: {e}", exc_info=True)
            # 异常处理至关重要,这里可以选择抛出异常让 consumer nack 消息
            raise

def run_consumer():
    client = pulsar.Client(PULSAR_SERVICE_URL)
    consumer = client.subscribe(
        DOC_TOPIC,
        subscription_name=SUBSCRIPTION_NAME,
        consumer_type=pulsar.ConsumerType.Shared, # 允许多个实例负载均衡
        negative_acks_redelivery_delay_ms=10000 # 消息处理失败后10秒重投
    )
    
    indexing_service = IndexingService()

    logger.info("Consumer started, waiting for messages...")
    while True:
        try:
            msg = consumer.receive()
            try:
                event_data = json.loads(msg.data().decode('utf-8'))
                doc_event = DocumentEvent(**event_data)
                
                # 核心处理逻辑
                indexing_service.process_document(doc_event)
                
                consumer.acknowledge(msg)
            except Exception as e:
                # 业务逻辑处理失败,nack 消息,Pulsar 会在延迟后重投
                logger.error(f"Message processing failed. Nacking message id={msg.message_id()}. Error: {e}")
                consumer.negative_acknowledge(msg)
        except Exception as e:
            logger.error(f"Pulsar consumer loop error: {e}", exc_info=True)
            time.sleep(5) # 避免循环过快
            
    client.close()

if __name__ == "__main__":
    run_consumer()

这个消费者服务的关键点在于错误处理和消息确认机制。consumer.acknowledge(msg) 告诉 Pulsar 消息已成功处理。如果 process_document 失败,我们调用 consumer.negative_acknowledge(msg),Pulsar 会在配置的延迟后重新投递该消息,给了我们重试的机会。在生产环境中,还需要配置一个死信队列(Dead Letter Queue),用于存放多次重试仍然失败的消息,以便人工介入。

4. 统一查询服务 (Unified Query Service)

这是整个架构的“大脑”,负责整合来自不同数据源的信息。

# query_service/main.py
import redis
import logging
import os
from fastapi import FastAPI
from pydantic import BaseModel
from llama_index.core import VectorStoreIndex, get_response_synthesizer
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.llms.openai import OpenAI # 假设使用 OpenAI

# --- 配置 ---
MILVUS_URI = os.getenv("MILVUS_URI", "http://localhost:19530")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# --- 日志 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()

# --- 初始化连接 ---
# 在真实应用中,这些初始化应该在 app 启动事件中完成,并使用连接池
vector_store = MilvusVectorStore(uri=MILVUS_URI, dim=768, overwrite=False)
index = VectorStoreIndex.from_vector_store(vector_store)
llm = OpenAI(model="gpt-4-turbo", api_key=OPENAI_API_KEY)
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)


class QueryRequest(BaseModel):
    query: str
    user_id: str # 假设需要根据用户ID获取特征

@app.post("/query")
async def handle_query(request: QueryRequest):
    try:
        # 1. 向量检索
        retriever = VectorIndexRetriever(index=index, similarity_top_k=3)
        retrieved_nodes = retriever.retrieve(request.query)
        retrieved_text = "\n\n".join([node.get_content() for node in retrieved_nodes])
        logger.info(f"Retrieved {len(retrieved_nodes)} nodes from vector store.")

        # 2. 实时特征获取
        # 这里的逻辑是业务相关的,可以是从查询中提取实体,也可以是固定的如 user_id
        user_features = {}
        try:
            # 一个常见的错误是不处理 Redis 查询失败的情况
            user_feature_json = redis_client.get(f"user:{request.user_id}")
            if user_feature_json:
                user_features = json.loads(user_feature_json)
                logger.info(f"Fetched features for user {request.user_id}")
        except redis.exceptions.RedisError as e:
            logger.warning(f"Could not fetch features for user {request.user_id} from Redis: {e}")
            # 即使特征获取失败,系统也应该能够继续提供基于文档的回答,这体现了系统的韧性

        # 3. 构建增强的上下文 (Context Enrichment)
        feature_context = "User Profile:\n" + "\n".join([f"- {k}: {v}" for k, v in user_features.items()])
        
        final_prompt_template = f"""
        Based on the following context and user profile, answer the query.
        
        Context from documents:
        ---
        {retrieved_text}
        ---
        
        {feature_context if user_features else ""}
        
        Query: {request.query}
        Answer:
        """
        
        # 4. 调用 LLM 生成回答
        response = llm.complete(final_prompt_template)
        
        return {"answer": response.text, "source_nodes": [node.metadata for node in retrieved_nodes]}

    except Exception as e:
        logger.error(f"Error processing query '{request.query}': {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to process query")

这个查询服务的设计体现了架构的最终目的。它并行(或快速串行)地从已经解耦和异步更新的数据源(向量存储和特征库)中拉取信息。即使特征库暂时不可用,核心的文档问答功能依然不受影响。这种设计的健壮性远高于方案A。

架构的扩展性与局限性

扩展性

  • 新增数据源: 如果需要接入新的数据源,例如一个实时的交易流,我们只需要创建一个新的 Producer 将交易事件发送到新的 Pulsar Topic,然后创建一个新的 Consumer 来处理这些事件(比如更新某个实时的商品热度特征),而无需改动任何现有服务。
  • 服务水平扩展: Indexing ConsumerFeature Consumer 成为瓶颈时,由于使用了 Pulsar 的共享订阅模式(Shared Subscription),我们只需简单地增加该服务的实例数量,Pulsar 会自动将消息分发给这些实例,实现负载均衡。
  • 查询能力扩展: 查询服务本身是无状态的,可以轻松地进行水平扩展。

局限性与未来迭代

  • 最终一致性问题: 最大的挑战在于处理数据延迟。可能存在一个短暂的窗口,文档的向量索引已经更新,但与之相关的特征还没有被 Feature Consumer 处理。这意味着查询服务可能会拿到“部分新鲜”的数据。对于一致性要求极高的场景,需要在应用层设计补偿逻辑,或者引入更复杂的分布式事务模式(如 Saga),但这会大大增加系统复杂度。
  • 监控与可观测性: 微服务和事件驱动架构使得端到端的链路追踪变得复杂。必须投入资源建设完善的可观测性体系,使用如 OpenTelemetry 等工具来追踪一个事件从发布到被所有消费者处理完毕的全过程,否则系统出现问题时将难以调试。
  • 死信与重试风暴: 需要精细化设计重试策略和死信队列(DLQ)机制。一个设计不当的重试逻辑,在下游服务持续故障时,可能会引发“重试风暴”,加剧系统雪崩。对进入 DLQ 的消息,需要有自动化的告警和手动的处理预案。

未来的迭代方向可能包括:引入流处理引擎(如 Flink)进行更复杂的事件流处理和特征工程;构建更智能的查询路由,根据查询意图动态决定需要检索哪些数据源;以及对整个数据管道进行更精细化的性能调优和成本控制。


  目录