构建基于 Kong 自定义插件与 Fluentd 的高可靠 API 审计追踪系统


最初的需求很简单:审计所有对内部核心账务服务发起的POSTPUTDELETE操作。Kong 作为 API 网关,自然是数据捕获的第一站。团队的第一反应是利用 Kong 自带的 tcp-loghttp-log 插件,将请求日志转发到一个集中的日志服务。这个方案在 PoC 阶段就暴露了致命缺陷:

  1. 信息缺失: 默认日志格式无法捕获我们最需要审计的字段,例如 JWT 载荷中的用户主体 (sub claim) 或特定业务请求头。更重要的是,我们不仅需要请求信息,还需要响应状态码和响应体(脱敏后)来判断操作是否成功。
  2. 数据噪音: 插件会记录所有请求,包括 GETOPTIONS,导致审计日志中充满了大量无关信息,增加了下游处理和存储的成本。
  3. 可靠性不足: tcp-loghttp-log 插件本质上是“尽力而为”的交付。如果下游日志接收端暂时不可用,日志就会被直接丢弃。对于审计日志这种高价值数据,这是不可接受的。

初步构想是依赖下游的日志处理系统进行过滤和丰富,但这很快被证明是一个糟糕的实践。当日志离开源头时,很多上下文信息已经丢失,后续的弥补成本极高。在真实项目中,正确的做法是在数据产生的源头,即 Kong 内部,就将日志结构化、精确化。

为此,我们决定放弃通用插件,转而构建一个专为审计目的设计的技术栈。整个架构的核心思路是:

  • 在 Kong 端:通过自定义 Lua 插件,在请求生命周期的 log 阶段,精确捕获所需的所有上下文信息,并组装成结构化的 JSON 日志。
  • 在传输与聚合端:使用 Fluentd 作为日志聚合层。它不仅仅是一个转发器,更是可靠性的保障。利用其强大的缓冲机制(Buffer),确保在下游服务(如 Elasticsearch)抖动或离线时,日志数据不会丢失,而是在本地暂存,待服务恢复后重发。
  • 在配置管理端:所有 Fluentd 的配置都通过一个独立的 GitHub 仓库进行管理,并使用 GitHub Actions 自动化部署。这实现了对日志处理逻辑的 GitOps 管理,任何变更都有迹可循,可审计,可回滚。

整体数据流如下:

graph TD
    A[Client] -->|API Request| B(Kong Gateway);
    B -->|Custom Lua Plugin| C{log phase};
    C -->|Structured JSON via TCP| D[Fluentd Agent];
    D -->|Buffer & Retry Logic| E(Output: Elasticsearch);
    D -->|Buffer & Retry Logic| F(Output: Cold Storage/S3);

    G[GitHub Repo] -->|Push to main| H(GitHub Actions);
    H -->|Deploy Config| D;

    subgraph Kong Node
        B
        C
    end

    subgraph Observability Infra
        D
        E
        F
    end

    subgraph Management Plane
        G
        H
    end

第一步:编写精准捕获的 Kong 自定义插件

要实现精准捕获,我们需要编写自己的 Lua 插件。一个 Kong 插件至少包含两个核心文件:schema.lua 用于定义插件配置,handler.lua 用于实现核心逻辑。

我们的插件命名为 audit-log-forwarder

audit-log-forwarder/schema.lua

这里的配置项非常简单,只定义了 Fluentd 的地址和端口,以及一个开关来控制是否记录请求体。在生产环境中,记录完整的请求体需要非常谨慎,通常只对非敏感、小体积的 body 开启。

-- schema.lua
-- 定义插件的配置项
local typedefs = require "kong.db.schema.typedefs"

return {
  name = "audit-log-forwarder",
  fields = {
    -- 插件可以应用在 service, route 或 consumer 上
    { consumer = typedefs.no_consumer },
    { config = {
        type = "record",
        fields = {
          { host = { type = "string", required = true, default = "127.0.0.1" } },
          { port = { type = "integer", required = true, default = 24224 } },
          { tag = { type = "string", required = true, default = "kong.audit" } },
          { include_request_body = { type = "boolean", default = false } },
        },
      },
    },
  },
}

audit-log-forwarder/handler.lua

这是插件的灵魂。我们选择在 log 阶段执行代码,因为此时 Kong 已经完成了对上游服务的代理,并拿到了响应信息,包括状态码和响应头。

这里的坑在于,如何高效且安全地将日志发送出去。直接使用阻塞的 IO 操作会严重影响 Kong 的性能。因此,必须使用 Kong 提供的 cosocket API 来实现非阻塞的 TCP 通信。

-- handler.lua
local cjson = require "cjson.safe"
local socket = require "socket"
local log_serializer = require "kong.plugins.log-serializers.basic"

local AuditLogHandler = {}

AuditLogHandler.PRIORITY = 10
AuditLogHandler.VERSION = "0.1.0"

-- 核心逻辑在 log 阶段执行
function AuditLogHandler:log(conf)
  -- 1. 获取基础日志信息
  -- 我们复用 Kong 的基础序列化器,避免重复造轮子
  local message = log_serializer.serialize(ngx)
  
  -- 2. 过滤我们不关心的请求
  local request_method = ngx.req.get_method()
  if request_method == "GET" or request_method == "OPTIONS" or request_method == "HEAD" then
    return
  end

  -- 3. 丰富审计所需的核心字段
  -- 这里的关键是获取认证插件(如 aai-jwt-ext)设置在 ngx.ctx 中的用户信息
  -- 这是一个常见的错误:不同插件间的数据传递应该通过 ngx.ctx,而不是全局变量
  local consumer = message.consumer
  local user_principal = "anonymous"
  if consumer and consumer.custom_id then
      user_principal = consumer.custom_id
  elseif ngx.ctx.authenticated_credential and ngx.ctx.authenticated_credential.key then
      -- 兼容 Key-Auth 等场景
      user_principal = ngx.ctx.authenticated_credential.key
  elseif ngx.ctx.aai_jwt_ext and ngx.ctx.aai_jwt_ext.sub then
      -- 假设我们有一个名为 aai-jwt-ext 的自定义JWT插件,它将 `sub` 放入 ngx.ctx
      user_principal = ngx.ctx.aai_jwt_ext.sub
  end
  
  message.audit_principal = user_principal

  -- 4. 按需捕获请求体
  if conf.include_request_body then
    -- 注意:这将读取整个 body 到内存,对大请求体有性能影响
    ngx.req.read_body()
    local body_data, err = ngx.req.get_body_data()
    if err then
      kong.log.err("failed to get request body: ", err)
    elseif body_data then
      message.request.body = body_data
    end
  end
  
  -- 5. 序列化为 Fluentd 的 MessagePack 格式
  -- [tag, timestamp, record]
  local payload = {
    conf.tag,
    ngx.time(),
    message
  }

  local bytes, err = cjson.encode(payload)
  if err then
    kong.log.err("could not JSON encode payload: ", err)
    return
  end
  
  -- 6. 使用非阻塞的 cosocket 发送数据
  -- 这是确保性能的关键,绝不能在这里使用阻塞IO
  local sock, err = ngx.socket.tcp()
  if not sock then
    kong.log.err("failed to create tcp socket: ", err)
    return
  end
  
  -- 设置超时,防止永久阻塞
  sock:settimeouts(1000, 1000, 1000) -- connect, send, read timeouts in ms

  local ok, err = sock:connect(conf.host, conf.port)
  if not ok then
    kong.log.err("failed to connect to fluentd host: ", conf.host, ":", conf.port, " err: ", err)
    return
  end

  local bytes_sent, err = sock:send(bytes .. "\n") -- Fluentd forward协议需要换行符
  if not bytes_sent then
    kong.log.err("failed to send audit log to fluentd: ", err)
  end

  -- 关闭 socket 并释放资源
  sock:close()
end

return AuditLogHandler

这个插件实现了我们的核心目标:

  • 选择性记录: 只记录 POST, PUT, DELETE
  • 上下文丰富: 从 ngx.ctx 中提取了关键的用户身份信息。
  • 结构化输出: 生成完整的 JSON 对象。
  • 高性能IO: 使用 cosocket 保证了异步非阻塞的网络通信,对 Kong 自身性能影响降到最低。

第二步:构建高可靠的 Fluentd 管道

Fluentd 的配置是保障日志不丢失的核心。我们不把它当作简单的转发器,而是一个带有持久化缓冲区的智能路由。

以下是一个生产级的 fluent.conf 核心配置片段,它负责接收来自 Kong 的日志,并将其转发到 Elasticsearch 和 S3。

# /etc/fluent/fluent.conf

# 1. Source: 接收来自 Kong 插件的数据
# 使用 forward 协议,这是 Fluentd 的标准输入方式
<source>
  @type forward
  @id kong_audit_input
  port 24224
  bind 0.0.0.0
</source>

# 2. Filter: 清洗和预处理
# 虽然插件已经做了很多,但在这里可以做一些通用处理
# 比如解析嵌套的JSON字符串
<filter kong.audit.**>
  @type parser
  key_name message # `message` 是我们自定义插件传过来的整个JSON对象
  reserve_data true # 保留原始字段
  <parse>
    @type json
  </parse>
</filter>

# 3. Match: 将日志路由到最终目的地
# 这是最关键的部分,定义了输出和缓冲策略

# 3.1 输出到 Elasticsearch 用于实时查询和分析
<match kong.audit.**>
  @type elasticsearch
  @id es_audit_output
  host "elasticsearch-master.default.svc.cluster.local"
  port 9200
  logstash_format true
  logstash_prefix "kong-audit"
  logstash_dateformat "%Y.%m.%d"
  type_name "_doc"
  
  # 关键的缓冲配置,这是高可靠性的核心
  <buffer>
    @type file # 使用文件作为缓冲区,重启后数据不丢失
    path /var/log/fluentd/buffer/es-audit # 必须是持久化路径
    
    # 块(chunk)是 Fluentd 处理数据的基本单位
    chunk_limit_size 16m # 每个块的最大大小
    queue_limit_length 128 # 内存中最多缓存的块数量
    
    # 刷新策略
    flush_interval 5s # 每5秒刷新一次
    flush_thread_count 4 # 使用4个线程并行发送数据到ES
    
    # 重试策略,应对下游服务不可用
    retry_type exponential_backoff # 指数退避
    retry_wait 1s # 初始等待1秒
    retry_max_interval 60s # 最大重试间隔60秒
    retry_max_times 15 # 最多重试15次,超过则认为失败
    
    # 对于无法发送的数据,可以转移到死信队列或直接丢弃
    # 在审计场景下,我们倾向于无限重试,或者转存到更可靠的地方
    retry_forever true 
  </buffer>
</match>

# 3.2 (可选) 备份到 S3 作为冷存储,满足合规要求
<match kong.audit.**>
  @type s3
  @id s3_archive_output
  aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
  aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
  s3_bucket "my-audit-log-archive-bucket"
  s3_region "ap-northeast-1"
  path "logs/kong-audit/%Y/%m/%d/"
  
  <buffer>
    @type file
    path /var/log/fluentd/buffer/s3-archive
    timekey 3600 # 每小时生成一个文件
    timekey_wait 10m # 等待10分钟,确保数据完整性
    chunk_limit_size 256m
    flush_at_shutdown true # 关闭时确保刷新所有缓冲区
    retry_forever true
  </buffer>
  
  <format>
    @type json
  </format>
</match>

这份配置的几个关键点:

  • @type file 缓冲: 这是最重要的配置。当 Elasticsearch 或网络出现故障时,Fluentd 会将日志块写入到本地文件系统。这能有效对抗长达数小时甚至数天的服务中断,只要 Fluentd 所在节点的磁盘空间足够。
  • retry_forever true: 对于审计日志,我们选择无限重试。这意味着除非手动干预,Fluentd 会一直尝试发送,直到成功。这是一种强保障策略。
  • 双写策略 (<match><match>): Fluentd 的一个强大之处在于可以将同一份数据流复制并发送到多个目的地。这里我们同时写入了用于实时分析的 Elasticsearch 和用于长期归档的 S3,满足了不同需求。

第三步:用 GitHub 实现 Fluentd 配置的 GitOps

随着业务发展,Fluentd 的过滤、解析和路由规则会变得越来越复杂。手动SSH到每台机器去修改配置文件是不可靠且危险的。我们使用 GitOps 流程来管理这一切。

  1. 创建配置仓库:
    建立一个专门的 Git 仓库,例如 fluentd-configs。目录结构如下:

    fluentd-configs/
    ├── conf.d/
    │   ├── 01-input-kong.conf
    │   ├── 10-filter-parser.conf
    │   └── 99-output-es-s3.conf
    ├── fluent.conf             # 主配置文件,include conf.d/*
    └── .github/
        └── workflows/
            └── deploy.yml      # GitHub Actions 部署脚本
  2. 编写部署工作流:
    我们使用 GitHub Actions,在 main 分支有新的提交时,自动将配置文件同步到所有 Fluentd 节点并触发服务重载。

    .github/workflows/deploy.yml

    name: Deploy Fluentd Config
    
    on:
      push:
        branches:
          - main
        paths:
          - 'conf.d/**'
          - 'fluent.conf'
    
    jobs:
      deploy:
        runs-on: ubuntu-latest
        strategy:
          matrix:
            # 在这里列出所有 Fluentd 节点的 IP 或主机名
            fluentd_host: [ '10.0.1.10', '10.0.1.11', '10.0.1.12' ]
        
        steps:
        - name: Checkout repository
          uses: actions/checkout@v3
    
        - name: Install SSH Key
          uses: shimataro/ssh-key-action@v2
          with:
            key: ${{ secrets.FLUENTD_SSH_PRIVATE_KEY }}
            name: id_rsa
            known_hosts: 'just-a-placeholder-so-we-can-disable-strict-host-key-checking'
    
        - name: Sync configuration files
          run: |
            # 使用 -o StrictHostKeyChecking=no 简化 CI 环境下的连接
            # 生产环境中建议使用 known_hosts
            scp -o StrictHostKeyChecking=no -r ./fluent.conf ./conf.d devops@${{ matrix.fluentd_host }}:/tmp/fluentd-new-config
            ssh -o StrictHostKeyChecking=no devops@${{ matrix.fluentd_host }} "sudo mv /tmp/fluentd-new-config/* /etc/fluent/ && sudo chown -R fluent:fluent /etc/fluent/"
    
        - name: Validate and Reload Fluentd
          run: |
            # 在重载前先验证配置文件的语法正确性,防止服务起不来
            ssh -o StrictHostKeyChecking=no devops@${{ matrix.fluentd_host }} "sudo /usr/sbin/fluentd --dry-run -c /etc/fluent/fluent.conf"
            
            # 使用 SIGHUP 信号平滑重载配置,不会中断正在处理的日志流
            ssh -o StrictHostKeyChecking=no devops@${{ matrix.fluentd_host }} "sudo systemctl kill -s HUP fluentd.service"
    

    这个工作流做了几件关键的事情:

    • 触发条件: 仅当配置文件发生变化时才触发,避免不必要的操作。
    • 安全连接: 使用部署密钥(secrets.FLUENTD_SSH_PRIVATE_KEY)安全地连接到目标服务器。
    • 原子化更新: 先将文件复制到临时目录,再移动到目标位置,减少配置不一致的窗口期。
    • 预检验 (--dry-run): 这是极其重要的安全措施。在应用新配置前,先检查其有效性。如果配置有语法错误,工作流会失败,从而阻止一个错误的配置被应用到生产环境。
    • 平滑重载 (SIGHUP): 向 Fluentd 进程发送 SIGHUP 信号,使其在不重启进程、不中断现有连接的情况下加载新配置。这保证了服务的连续性。

方案的局限性与未来迭代

这套系统虽然解决了我们最初面临的审计日志可靠性和信息完整性问题,但它并非完美。

首先,引入自定义 Kong 插件增加了维护成本。每当 Kong 大版本升级时,都需要回归测试甚至重写部分插件代码以保证兼容性。这需要团队具备一定的 Lua 开发能力。

其次,Fluentd 的文件缓冲机制会占用大量的磁盘空间,尤其是在下游服务长时间不可用时,可能会导致磁盘被写满。需要配合精细的磁盘监控和告警。同时,虽然 Fluentd 性能优异,但在极端高流量场景下,其基于 Ruby 的单核性能瓶颈可能会显现。

未来的优化路径可以考虑:

  1. 探索 Service Mesh: 对于已经采用服务网格(如 Istio)的架构,可以考虑利用 Envoy 的 access_log 服务和 Lua filter 来实现类似的功能,从而将这部分逻辑从 API 网关解耦。
  2. 替换日志聚合器: 评估更新的、用 Rust/Go 编写的日志工具,如 Vector。Vector 在性能和资源消耗上通常优于 Fluentd,并提供类似的可靠性保证。
  3. 增强死信处理: 当日志重试多次仍然失败后,目前的方案是无限重试。一个更完善的系统应该将这些“死信”日志转储到一个专门的队列(如 Kafka 或 AWS SQS)中,供后续手动分析和修复,而不是无限期地占用本地缓冲区。

  目录