构建基于Playwright微服务和事件驱动架构的弹性数据采集管道


传统的单体式爬虫脚本在面对动态、高反扒的现代Web应用时,其脆弱性暴露无遗。一个简单的cron任务,执行一个庞大的Python脚本,往往因为一个JavaScript渲染超时、一个选择器变更或是一个临时的网络波动而全盘崩溃。更糟糕的是,这类脚本难以水平扩展,当采集任务从几十个增长到成千上万个时,维护和调度就成了一场噩梦。失败的任务、丢失的数据、难以调试的执行环境,这些都是真实项目中无法回避的痛点。

问题的核心在于职责耦合与缺乏隔离。采集任务的派发、浏览器实例的生命周期管理、数据的解析与清洗、最终的入库存储,所有环节都纠缠在同一个进程中。我们需要一种更具弹性的架构,将这些职责拆分、解耦,并允许它们独立伸缩和容错。

初步的构想是借鉴微服务的思想,将整个数据采集流程拆分成一组相互协作的、轻量级的服务。每个服务只负责一项单一的职责。它们之间不直接调用,而是通过一个异步的消息总线进行通信,这便是事件驱动架构(EDA)的精髓。这种模式天然地提供了削峰填谷、异步处理和故障隔离的能力。

技术选型决策如下:

  1. 浏览器自动化: Playwright。相较于Selenium,它提供了更现代、更稳定的API,尤其是在处理复杂的单页应用(SPA)时,其自动等待机制和网络拦截能力非常出色。
  2. 服务容器化: Docker。这是解决Playwright环境依赖复杂性(需要特定版本的浏览器和系统库)的最佳方案。Docker确保了每个服务实例的运行环境都是一致、可复现的。
  3. 事件总线: Redis Streams。对于中等规模的系统,它比Kafka更轻量,部署简单,同时提供了持久化、消费组等企业级消息队列所需的关键特性。
  4. 数据着陆区 (Data Lake Landing Zone): MinIO。作为一个S3兼容的对象存储服务,它非常适合作为原始数据的“着陆区”。数据可以先以原始格式(HTML、JSON)快速存入,再进行后续的ETL处理,实现了读写的解耦。
  5. 服务编排: Docker Compose。在开发和中小型部署场景下,它足以管理我们这套微服务的生命周期。

整体架构图如下所示:

graph TD
    subgraph "Docker Network"
        A[API服务 / task-api] -- 1. 发布采集任务 --> R[Redis Streams: tasks_stream];
        R -- 2. 消费任务 --> B((Playwright 工作节点 / playwright-worker));
        B -- 3. 抓取Web页面 --> W[外部网站];
        B -- 4. 发布原始数据 --> R2[Redis Streams: raw_data_stream];
        R2 -- 5. 消费原始数据 --> C[入湖服务 / ingestion-service];
        C -- 6. 清洗/转换 --> C;
        C -- 7. 写入Parquet文件 --> D[MinIO 数据湖];
    end

    U[用户/调度器] --> A;

第一步: 定义服务间通信的事件契约

在事件驱动架构中,明确事件的结构至关重要。我们定义两种核心事件:

  • TaskEvent: 任务派发事件,包含目标URL和一些元数据。
  • RawDataEvent: 原始数据事件,包含任务ID、抓取状态和页面内容。
# common/events.py
import json
from dataclasses import dataclass, asdict, field
from typing import Optional, Dict
import uuid

@dataclass
class TaskEvent:
    task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    url: str
    metadata: Optional[Dict] = None

    def to_json(self) -> str:
        return json.dumps(asdict(self))

@dataclass
class RawDataEvent:
    task_id: str
    url: str
    status: str  # 'SUCCESS' or 'FAILED'
    content: Optional[str] = None
    error_message: Optional[str] = None
    metadata: Optional[Dict] = None

    def to_json(self) -> str:
        return json.dumps(asdict(self))

第二步: 构建核心的Playwright工作节点

这是系统中最核心、也是环境最复杂的服务。它负责消费任务、启动浏览器、执行抓取,并发布结果。

playwright-worker/main.py:

import os
import logging
import asyncio
from typing import Optional, List, Dict

import redis
from playwright.async_api import async_playwright, Browser, Page, Playwright, TimeoutError as PlaywrightTimeoutError

# 模拟common.events模块
from events import TaskEvent, RawDataEvent

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从环境变量获取配置
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
TASK_STREAM_NAME = 'tasks_stream'
RAW_DATA_STREAM_NAME = 'raw_data_stream'
CONSUMER_GROUP_NAME = 'playwright_workers'
WORKER_ID = f"worker-{os.getpid()}"

# Redis连接
try:
    redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    redis_client.ping()
    logging.info("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    logging.error(f"Failed to connect to Redis: {e}")
    exit(1)

# 确保消费组存在
try:
    redis_client.xgroup_create(TASK_STREAM_NAME, CONSUMER_GROUP_NAME, id='0', mkstream=True)
    logging.info(f"Consumer group '{CONSUMER_GROUP_NAME}' created for stream '{TASK_STREAM_NAME}'.")
except redis.exceptions.ResponseError as e:
    if "BUSYGROUP" in str(e):
        logging.info(f"Consumer group '{CONSUMER_GROUP_NAME}' already exists.")
    else:
        raise e

async def scrape_page(browser: Browser, task: TaskEvent) -> RawDataEvent:
    """
    使用独立的BrowserContext执行抓取,提供更好的隔离性。
    """
    context = None
    try:
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        )
        page = await context.new_page()
        await page.goto(task.url, wait_until='domcontentloaded', timeout=30000)
        
        # 这里的逻辑可以非常复杂,例如等待特定元素、执行JS、处理登录等
        # 作为一个生产级示例,我们只获取页面内容
        content = await page.content()

        logging.info(f"Successfully scraped URL: {task.url}")
        return RawDataEvent(
            task_id=task.task_id,
            url=task.url,
            status='SUCCESS',
            content=content,
            metadata=task.metadata
        )
    except PlaywrightTimeoutError:
        logging.error(f"Timeout while scraping URL: {task.url}")
        return RawDataEvent(task_id=task.task_id, url=task.url, status='FAILED', error_message='TimeoutError')
    except Exception as e:
        logging.error(f"An unexpected error occurred for URL {task.url}: {e}", exc_info=True)
        return RawDataEvent(task_id=task.task_id, url=task.url, status='FAILED', error_message=str(e))
    finally:
        if context:
            await context.close()


async def main():
    """
    主工作循环,从Redis Stream消费任务并处理。
    """
    async with async_playwright() as p:
        # 在真实项目中,可能需要根据环境选择浏览器,例如chromium
        browser = await p.chromium.launch(headless=True)
        logging.info("Playwright browser launched.")
        
        try:
            while True:
                # XREADGROUP
                # BLOCK 0: 阻塞直到有新消息
                # COUNT 1: 一次只取一个任务
                response = redis_client.xreadgroup(
                    CONSUMER_GROUP_NAME,
                    WORKER_ID,
                    {TASK_STREAM_NAME: '>'},
                    count=1,
                    block=0
                )

                if not response:
                    continue

                stream, messages = response[0]
                message_id, task_data = messages[0]

                try:
                    task_dict = json.loads(task_data['data'])
                    task = TaskEvent(**task_dict)
                    logging.info(f"Received task {task.task_id} for URL: {task.url}")

                    # 执行抓取
                    result_event = await scrape_page(browser, task)
                    
                    # 将结果发布到下一个Stream
                    redis_client.xadd(RAW_DATA_STREAM_NAME, {'data': result_event.to_json()})
                    logging.info(f"Published raw data for task {task.task_id}")

                    # 确认消息已被处理
                    redis_client.xack(TASK_STREAM_NAME, CONSUMER_GROUP_NAME, message_id)

                except json.JSONDecodeError:
                    logging.error(f"Failed to decode task data: {task_data}")
                    # 对于无法解析的消息,也需要ack,避免无限重试
                    redis_client.xack(TASK_STREAM_NAME, CONSUMER_GROUP_NAME, message_id)
                except Exception as e:
                    logging.error(f"Critical error during task processing: {e}", exc_info=True)
                    # 这里的错误处理策略很关键,可以选择不ack,让其他消费者重试,
                    # 或者移入死信队列。为简单起见,我们选择ack。
                    redis_client.xack(TASK_STREAM_NAME, CONSUMER_GROUP_NAME, message_id)

        finally:
            await browser.close()
            logging.info("Playwright browser closed.")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Worker shutting down.")

playwright-worker/Dockerfile:

FROM mcr.microsoft.com/playwright/python:v1.39.0-jammy

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 这里的安装命令至关重要,它会下载Playwright所需的浏览器二进制文件
# 在官方镜像中已包含,但如果使用普通Python镜像则必须执行
# RUN playwright install --with-deps chromium

COPY . .

# 设置环境变量,使其可在容器内配置
ENV REDIS_HOST=redis
ENV REDIS_PORT=6379

CMD ["python", "main.py"]

这里的Dockerfile是关键。直接使用Playwright官方镜像 (mcr.microsoft.com/playwright/python) 可以省去大量处理浏览器依赖的麻烦。在真实项目中,这是一个常见的最佳实践。

第三步: 实现数据入湖服务

这个服务消费原始数据,进行简单的格式转换(例如,存为Parquet),然后上传到MinIO。

ingestion-service/main.py:

import os
import logging
import json
from io import BytesIO

import redis
import pyarrow as pa
import pyarrow.parquet as pq
from minio import Minio
from minio.error import S3Error

# 模拟common.events模块
from events import RawDataEvent

# 配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
RAW_DATA_STREAM_NAME = 'raw_data_stream'
CONSUMER_GROUP_NAME = 'ingestion_workers'
WORKER_ID = f"ingestor-{os.getpid()}"

MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin')
MINIO_BUCKET = 'datalake'

# Redis连接
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

# MinIO客户端
minio_client = Minio(
    MINIO_ENDPOINT,
    access_key=MINIO_ACCESS_KEY,
    secret_key=MINIO_SECRET_KEY,
    secure=False # 在生产环境中应为True
)

# 确保消费组和Bucket存在
try:
    redis_client.xgroup_create(RAW_DATA_STREAM_NAME, CONSUMER_GROUP_NAME, id='0', mkstream=True)
    logging.info(f"Consumer group '{CONSUMER_GROUP_NAME}' created.")
except redis.exceptions.ResponseError as e:
    if "BUSYGROUP" in str(e):
        logging.info(f"Consumer group '{CONSUMER_GROUP_NAME}' already exists.")
    else:
        raise e

found = minio_client.bucket_exists(MINIO_BUCKET)
if not found:
    minio_client.make_bucket(MINIO_BUCKET)
    logging.info(f"Bucket '{MINIO_BUCKET}' created.")
else:
    logging.info(f"Bucket '{MINIO_BUCKET}' already exists.")


def process_and_store(event: RawDataEvent):
    """
    将成功的事件数据转换为Parquet格式并存入MinIO
    """
    if event.status != 'SUCCESS':
        logging.warning(f"Skipping failed task {event.task_id} for URL {event.url}")
        return

    try:
        # 在真实场景中,这里会有复杂的HTML解析逻辑
        # 这里我们简化为直接存储原始HTML和其他元数据
        data = {
            'task_id': [event.task_id],
            'url': [event.url],
            'content_length': [len(event.content) if event.content else 0],
            'scraped_at': [pa.timestamp('ns')],
            # 可以添加解析后的字段
            # 'title': [extract_title(event.content)]
        }
        table = pa.Table.from_pydict(data)

        # 写入内存中的Parquet文件
        buffer = BytesIO()
        pq.write_table(table, buffer)
        buffer.seek(0)

        # 文件路径可以按日期分区,例如:raw/YYYY/MM/DD/task_id.parquet
        object_name = f"raw/{event.task_id}.parquet"

        minio_client.put_object(
            MINIO_BUCKET,
            object_name,
            buffer,
            length=buffer.getbuffer().nbytes,
            content_type='application/octet-stream'
        )
        logging.info(f"Successfully stored task {event.task_id} to MinIO as {object_name}")

    except S3Error as e:
        logging.error(f"MinIO error while storing task {event.task_id}: {e}", exc_info=True)
        # 异常处理:可以重试,或将事件发往死信队列
        raise
    except Exception as e:
        logging.error(f"Error processing event {event.task_id}: {e}", exc_info=True)
        raise


def main_loop():
    logging.info("Ingestion service started. Waiting for raw data...")
    while True:
        response = redis_client.xreadgroup(
            CONSUMER_GROUP_NAME,
            WORKER_ID,
            {RAW_DATA_STREAM_NAME: '>'},
            count=1,
            block=0
        )

        if not response:
            continue
        
        stream, messages = response[0]
        message_id, data = messages[0]

        try:
            event_dict = json.loads(data['data'])
            event = RawDataEvent(**event_dict)
            process_and_store(event)
            redis_client.xack(RAW_DATA_STREAM_NAME, CONSUMER_GROUP_NAME, message_id)
        except Exception as e:
            # 如果处理失败,不ack消息,让其他消费者有机会重试。
            # 在生产环境中,需要有重试次数限制和死信队列机制。
            logging.error(f"Failed to process message {message_id}. It will be retried. Error: {e}")


if __name__ == "__main__":
    main_loop()

第四步: 任务API与整体编排

一个简单的API服务用于接收采集请求并将其推送到任务流中。

task-api/main.py (使用Flask):

import os
import json
from flask import Flask, request, jsonify
import redis

# 模拟common.events模块
from events import TaskEvent

app = Flask(__name__)
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
TASK_STREAM_NAME = 'tasks_stream'

redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

@app.route('/submit', methods=['POST'])
def submit_task():
    data = request.get_json()
    if not data or 'url' not in data:
        return jsonify({"error": "URL is required"}), 400

    task = TaskEvent(url=data['url'], metadata=data.get('metadata'))
    
    try:
        # XADD
        message_id = redis_client.xadd(TASK_STREAM_NAME, {'data': task.to_json()})
        return jsonify({"message": "Task submitted", "task_id": task.task_id, "message_id": message_id}), 202
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

最后,使用docker-compose.yml将所有服务粘合在一起。
docker-compose.yml:

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  minio:
    image: minio/minio:RELEASE.2023-10-25T08-35-54Z
    ports:
      - "9000:9000" # API port
      - "9001:9001" # Console port
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - minio_data:/data

  task-api:
    build: ./task-api
    ports:
      - "5000:5000"
    environment:
      - REDIS_HOST=redis
    depends_on:
      - redis

  playwright-worker:
    build: ./playwright-worker
    # 我们可以在这里轻松扩展工作节点的数量
    deploy:
      replicas: 2
    environment:
      - REDIS_HOST=redis
    depends_on:
      - redis
    # 增加shm_size对基于Chromium的浏览器至关重要,防止崩溃
    shm_size: '2gb'

  ingestion-service:
    build: ./ingestion-service
    environment:
      - REDIS_HOST=redis
      - MINIO_ENDPOINT=minio:9000
      - MINIO_ACCESS_KEY=minioadmin
      - MINIO_SECRET_KEY=minioadmin
    depends_on:
      - redis
      - minio

volumes:
  minio_data:

通过docker-compose up --build --scale playwright-worker=3命令,我们可以启动整个系统,并运行3个Playwright工作节点实例。当通过POST http://localhost:5000/submit提交一个URL时,整个事件流就会被触发,最终数据文件会出现在MinIO的datalake桶中。

这套架构解决了最初的痛点:

  • 弹性: playwright-worker可以根据负载独立扩展。
  • 容错: 单个worker的失败不会影响整个系统。Redis Streams的消息确认机制保证了任务至少被处理一次。
  • 可维护性: 每个服务职责单一,可以独立开发、测试和部署。

当前方案并非没有局限性。首先,错误处理机制相对简单,对于处理失败的消息,仅依赖于消费组的重试,缺乏一个明确的死信队列(Dead Letter Queue)来隔离和分析持续失败的任务。其次,系统的可观测性较弱,我们缺少全链路追踪来监控一个任务从提交到最终入湖的完整生命周期和耗时。未来的迭代方向可以包括引入OpenTelemetry进行分布式追踪,使用更强大的消息队列如Kafka来处理海量数据,并为失败的任务设计完善的重试与告警机制。同时,可以引入一个专门的调度服务,以支持定时、周期性的采集任务,而不仅仅是API触发。


  目录