基于 Actix-web 与 Consul 构建 Grafana 动态日志关联面板的实践


技术痛点:隔离的数据孤岛

在复杂的微服务环境中,Grafana、Loki 和 Consul 是我们的标准技术栈。Grafana 负责展示,Loki 存储海量日志,Consul 管理服务发现和配置。日常排障时,我们面临一个典型困境:在 Loki 面板中看到一条来自服务 A 的错误日志,但无法立刻确定这条日志产生时,该服务实例的具体配置是什么、它依赖的下游服务版本、或者当时是否有相关的部署事件。这些上下文信息通常分散在 Consul 的 KV 存储、服务元数据或者其他系统中。分析过程需要在多个 Grafana 面板甚至多个系统之间来回切换,效率低下且容易遗漏关键信息。标准的 Loki 查询面板只能展示日志本身,无法将这些动态的、来自异构系统的上下文信息进行实时关联与聚合。

我们需要的是一个“智能”的 Grafana 面板,它不仅能展示日志,还能在每一条日志旁边,自动附加上下文信息。例如,当一条日志显示时,面板能自动从 Consul 查询并展示该服务实例的元数据、当前的动态配置,甚至关联的部署标签。这就要求我们构建一个超越 Grafana 原生数据源能力的中间层,并以自定义面板的形式呈现出来。

架构构想与技术选型

为了解决这个问题,我们决定开发一个自定义的 Grafana Panel 插件。这个插件的架构分为两部分:

  1. 前端插件 (Panel Plugin): 运行在浏览器中,负责 UI 渲染和与 Grafana 后端交互。
  2. 后端关联服务 (Correlation Service): 一个独立的高性能微服务,作为插件的数据中转和处理中心。它负责接收前端的查询请求,然后分别从 Loki 获取原始日志,从 Consul 获取上下文元数据,将两者聚合后返回给前端。

这样的设计将复杂的业务逻辑后置,让前端插件只专注于数据展示,同时后端服务可以独立迭代和扩缩容。

技术选型决策如下:

  • 后端关联服务: 我们选择了 Actix-web (Rust)。对于一个核心的可观测性组件,性能、内存安全和高并发处理能力是首要考量。Rust 语言的特性恰好满足这些要求,而 Actix-web 是该生态中最成熟和高性能的 Web 框架之一。
  • 服务配置与发现: Consul。这已经是我们现有的基础设施。后端服务将通过 Consul API 动态获取 Loki 的地址,以及查询服务实例的元数据和 KV 配置。这避免了硬编码,使整个系统更加动态和健壮。
  • 日志存储: Loki。作为 Grafana 生态的一等公民,集成成本最低。
  • 前端插件构建: 官方推荐使用 Webpack,但我们选择了 Rollup。对于一个功能相对单一的 Panel 插件,Rollup 的配置更简洁,其基于 ES Module 的 tree-shaking 能力也可能带来更小的打包体积。在真实项目中,选择更熟悉的工具链能提升开发效率。

整体数据流如下:

sequenceDiagram
    participant User as 用户
    participant PanelPlugin as Grafana 面板 (Rollup构建)
    participant GrafanaBE as Grafana 后端代理
    participant CorrelationSvc as Actix-web 关联服务
    participant Loki
    participant Consul

    User->>PanelPlugin: 查看面板
    PanelPlugin->>GrafanaBE: 发起数据查询请求 (代理)
    GrafanaBE->>CorrelationSvc: POST /api/correlate
    CorrelationSvc->>Consul: 获取 Loki 地址和查询元数据
    Consul-->>CorrelationSvc: 返回配置信息
    CorrelationSvc->>Loki: 查询原始日志 (LogQL)
    Loki-->>CorrelationSvc: 返回日志流
    CorrelationSvc->>CorrelationSvc: 聚合日志与 Consul 元数据
    CorrelationSvc-->>GrafanaBE: 返回聚合后的 JSON 数据
    GrafanaBE-->>PanelPlugin: 返回数据
    PanelPlugin->>User: 渲染富文本日志视图

步骤化实现:从后端服务到前端插件

1. 构建 Actix-web 关联服务

这是整个系统的核心。它需要能够处理并发请求,并高效地与 Consul 和 Loki 通信。

首先,定义项目依赖 Cargo.toml:

[package]
name = "grafana-correlation-service"
version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
config = "0.13"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2"

服务入口与配置 main.rs

我们将服务配置(如 Consul 地址、Loki 地址等)通过一个 config.toml 文件管理,并在启动时加载。

// src/main.rs
use actix_web::{web, App, HttpServer, Responder, HttpResponse, middleware::Logger};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::info;

mod error;
mod consul;
mod loki;
mod config;

// 定义应用状态,包含 Consul 和 Loki 客户端
pub struct AppState {
    consul_client: consul::ConsulClient,
    loki_client: loki::LokiClient,
}

// 定义前端请求的 Body 结构
#[derive(Deserialize, Debug)]
struct CorrelateRequest {
    log_query: String, // LogQL 查询语句
    service_tags: Vec<String>, // 用于从 Consul 查找元数据的服务标签
}

// 定义返回给前端的聚合日志结构
#[derive(Serialize, Debug)]
struct EnrichedLogEntry {
    timestamp: String,
    line: String,
    labels: serde_json::Value,
    consul_metadata: Option<consul::ServiceMetadata>,
}

// 核心处理逻辑
async fn correlate_logs(
    state: web::Data<Arc<AppState>>,
    req: web::Json<CorrelateRequest>,
) -> impl Responder {
    info!("Received correlation request: {:?}", req);

    // 1. 从 Loki 查询日志
    let loki_result = state.loki_client.query(&req.log_query).await;
    let log_entries = match loki_result {
        Ok(entries) => entries,
        Err(e) => {
            return HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }));
        }
    };

    // 2. 在真实项目中,这里会基于日志标签(如 service name, instance id)
    //    去 Consul 查询对应的服务元数据。为简化示例,我们用请求中的 tags。
    let consul_metadata = state.consul_client.get_service_metadata(&req.service_tags).await.ok();

    // 3. 聚合数据
    let enriched_logs: Vec<EnrichedLogEntry> = log_entries.into_iter().map(|log| {
        EnrichedLogEntry {
            timestamp: log.timestamp,
            line: log.line,
            labels: log.labels,
            // 克隆元数据附加到每条日志上
            // 生产环境中应有更精细的匹配逻辑
            consul_metadata: consul_metadata.clone(),
        }
    }).collect();

    HttpResponse::Ok().json(enriched_logs)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // 初始化日志
    tracing_subscriber::fmt()
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .init();

    // 加载配置
    let settings = config::Settings::new().expect("Failed to load configuration");
    info!("Configuration loaded: {:?}", settings);

    // 创建客户端实例
    let consul_client = consul::ConsulClient::new(settings.consul.address);
    // 从 Consul KV 获取 Loki 地址,这是一个常见的实践
    let loki_address = consul_client.get_kv("config/loki/address")
        .await
        .unwrap_or(settings.loki.address);
    info!("Using Loki address: {}", loki_address);
    let loki_client = loki::LokiClient::new(loki_address);
    
    // 创建共享的应用状态
    let app_state = Arc::new(AppState {
        consul_client,
        loki_client,
    });

    info!("Starting server at http://127.0.0.1:8080");

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(app_state.clone()))
            .wrap(Logger::default())
            .service(web::resource("/api/correlate").route(web::post().to(correlate_logs)))
    })
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

Consul 客户端 consul.rs

这个模块负责与 Consul API 交互,获取服务元数据和 KV 配置。这里的坑在于,Consul API 的返回结构可能很复杂,需要精确定义 serde 结构体。

// src/consul.rs
use crate::error::AppError;
use reqwest::Client;
use serde::{Deserialize, Serialize};

#[derive(Clone)]
pub struct ConsulClient {
    client: Client,
    address: String,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServiceMetadata {
    pub node: String,
    pub address: String,
    pub service_name: String,
    pub service_port: u16,
    pub service_meta: serde_json::Value,
}

impl ConsulClient {
    pub fn new(address: String) -> Self {
        Self {
            client: Client::new(),
            address,
        }
    }

    // 从 Consul KV 获取值
    pub async fn get_kv(&self, key: &str) -> Result<String, AppError> {
        let url = format!("{}/v1/kv/{}", self.address, key);
        let res = self.client.get(&url).send().await?.json::<Vec<serde_json::Value>>().await?;
        
        // Consul KV API 返回一个数组,即使只有一个 key
        if let Some(item) = res.get(0) {
            if let Some(value_b64) = item.get("Value").and_then(|v| v.as_str()) {
                let bytes = base64::decode(value_b64)?;
                return Ok(String::from_utf8(bytes)?);
            }
        }
        Err(AppError::ConsulError("KV key not found".to_string()))
    }

    // 根据标签获取服务元数据
    pub async fn get_service_metadata(&self, tags: &[String]) -> Result<ServiceMetadata, AppError> {
        // 生产环境应该支持更复杂的查询和过滤逻辑
        if tags.is_empty() {
            return Err(AppError::ConsulError("No tags provided".to_string()));
        }
        let service_name = &tags[0]; // 简化:使用第一个 tag 作为服务名
        let url = format!("{}/v1/catalog/service/{}", self.address, service_name);

        let services = self.client.get(&url).send().await?
            .json::<Vec<serde_json::Value>>().await?;
        
        // 简化:返回找到的第一个服务实例的元数据
        if let Some(service) = services.get(0) {
            Ok(ServiceMetadata {
                node: service["Node"].as_str().unwrap_or_default().to_string(),
                address: service["Address"].as_str().unwrap_or_default().to_string(),
                service_name: service["ServiceName"].as_str().unwrap_or_default().to_string(),
                service_port: service["ServicePort"].as_u64().unwrap_or_default() as u16,
                service_meta: service["ServiceMeta"].clone(),
            })
        } else {
            Err(AppError::ConsulError(format!("Service '{}' not found", service_name)))
        }
    }
}

Loki 客户端 loki.rs

这个模块封装了对 Loki 的查询。Loki 的查询 API (LogQL) 功能强大,但返回的数据结构也需要小心处理。

// src/loki.rs
use crate::error::AppError;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use url::form_urlencoded;

#[derive(Clone)]
pub struct LokiClient {
    client: Client,
    address: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct LokiLogEntry {
    pub timestamp: String,
    pub line: String,
    pub labels: serde_json::Value,
}

#[derive(Deserialize, Debug)]
struct LokiResponse {
    status: String,
    data: LokiData,
}

#[derive(Deserialize, Debug)]
struct LokiData {
    #[serde(rename = "resultType")]
    result_type: String,
    result: Vec<LokiStream>,
}

#[derive(Deserialize, Debug)]
struct LokiStream {
    stream: serde_json::Value,
    values: Vec<(String, String)>,
}

impl LokiClient {
    pub fn new(address: String) -> Self {
        Self {
            client: Client::new(),
            address,
        }
    }

    pub async fn query(&self, query: &str) -> Result<Vec<LokiLogEntry>, AppError> {
        let encoded_query = form_urlencoded::byte_serialize(query.as_bytes()).collect::<String>();
        let url = format!("{}/loki/api/v1/query_range?query={}", self.address, encoded_query);

        // 在真实项目中,start, end, limit 等参数应该由前端传递
        let res = self.client.get(&url)
            .query(&[("limit", "100")])
            .send().await?
            .json::<LokiResponse>().await?;

        if res.status != "success" {
            return Err(AppError::LokiError("Query failed".to_string()));
        }

        let mut entries = Vec::new();
        for stream in res.data.result {
            for (ts, line) in stream.values {
                entries.push(LokiLogEntry {
                    timestamp: ts,
                    line,
                    labels: stream.stream.clone(),
                });
            }
        }
        Ok(entries)
    }
}

其余的 config.rserror.rs 模块用于配置加载和统一错误处理,这里不再赘述。这个服务现在已经具备了核心的关联查询能力。

2. 开发 Grafana 面板插件 (Rollup 构建)

Grafana 插件本质上是一个 React 应用。我们将重点放在构建配置和与后端服务的通信上。

项目结构如下:

grafana-correlation-panel/
├── src/
│   ├── components/
│   │   └── CorrelationPanel.tsx
│   ├── module.ts
│   └── plugin.json
├── package.json
└── rollup.config.js

plugin.json - 插件元数据

{
  "type": "panel",
  "name": "Correlation Panel",
  "id": "my-correlation-panel",
  "info": {
    "description": "A panel to show logs enriched with Consul metadata",
    "author": {
      "name": "Your Name"
    },
    "version": "1.0.0"
  },
  "dependencies": {
    "grafanaVersion": "8.x.x",
    "plugins": []
  }
}

rollup.config.js - 使用 Rollup 进行构建

这里的关键是正确配置插件,以处理 TypeScript、React JSX、CSS 以及 Grafana 依赖的外部化。一个常见的错误是把 Grafana 的核心库打包进去,导致插件体积巨大且可能与宿主环境冲突。

// rollup.config.js
import resolve from '@rollup/plugin-node-resolve';
import commonjs from '@rollup/plugin-commonjs';
import typescript from 'rollup-plugin-typescript2';
import postcss from 'rollup-plugin-postcss';
import replace from '@rollup/plugin-replace';
import { terser } from 'rollup-plugin-terser';

const pkg = require('./package.json');

const grafanaExternals = [
  'react',
  'react-dom',
  'react-router-dom',
  '@grafana/data',
  '@grafana/ui',
  '@grafana/runtime',
];

export default {
  input: 'src/module.ts',
  output: {
    file: 'dist/module.js',
    format: 'amd', // Grafana 插件必须是 AMD 格式
    sourcemap: true,
  },
  plugins: [
    resolve({
      browser: true,
    }),
    commonjs(),
    typescript({
      tsconfig: './tsconfig.json',
      // 避免 rollup 和 tsc 的缓存冲突
      clean: true, 
    }),
    postcss({
      modules: true,
    }),
    replace({
      'process.env.NODE_ENV': JSON.stringify('production'),
      preventAssignment: true,
    }),
    terser()
  ],
  // 关键:将 Grafana 提供的库排除在打包之外
  external: (id) => grafanaExternals.some(prefix => id.startsWith(prefix)),
};

CorrelationPanel.tsx - React 组件

这个组件负责发起请求并渲染数据。它使用 @grafana/runtime 中的 getBackendSrv() 来发送请求,这会自动利用 Grafana 的后端代理,解决跨域问题。

// src/components/CorrelationPanel.tsx
import React, { useState, useEffect } from 'react';
import { PanelProps } from '@grafana/data';
import { getBackendSrv } from '@grafana/runtime';

// 假设 options 中配置了查询参数
interface PanelOptions {
  logQuery: string;
  serviceTags: string[];
}

interface EnrichedLog {
  timestamp: string;
  line: string;
  labels: Record<string, string>;
  consul_metadata?: any;
}

export const CorrelationPanel: React.FC<PanelProps<PanelOptions>> = ({ options, width, height }) => {
  const [data, setData] = useState<EnrichedLog[]>([]);
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    const fetchData = async () => {
      setLoading(true);
      setError(null);

      try {
        // Grafana Panel 插件通过 datasourceRequest 调用自定义后端服务
        // 这里需要配置一个 JSON API 数据源指向我们的 Actix-web 服务
        // 或者更简单的,通过插件路由代理
        // 为了演示,我们假设插件有一个后端路由 `my-correlation-panel/correlate`
        const response = await getBackendSrv().post('/api/plugins/my-correlation-panel/routes/correlate', {
            log_query: options.logQuery,
            service_tags: options.serviceTags,
        });
        setData(response);
      } catch (err: any) {
        setError(err.message || 'Failed to fetch data');
      } finally {
        setLoading(false);
      }
    };

    fetchData();
  }, [options.logQuery, options.serviceTags]);

  if (loading) {
    return <div>Loading...</div>;
  }

  if (error) {
    return <div>Error: {error}</div>;
  }

  return (
    <div style={{ width, height, overflow: 'auto' }}>
      <table>
        <thead>
          <tr>
            <th>Timestamp</th>
            <th>Log Line</th>
            <th>Consul Metadata</th>
          </tr>
        </thead>
        <tbody>
          {data.map((log, index) => (
            <tr key={index}>
              <td>{new Date(parseInt(log.timestamp.slice(0, 13))).toISOString()}</td>
              <td><pre>{log.line}</pre></td>
              <td>
                {log.consul_metadata ? (
                  <pre>{JSON.stringify(log.consul_metadata, null, 2)}</pre>
                ) : (
                  'N/A'
                )}
              </td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
};

为了让 getBackendSrv().post('/api/plugins/my-correlation-panel/routes/correlate', ...) 工作,还需要在 plugin.json 中启用后端插件部分,并实现一个简单的 Go 代理。但一个更 Hacky 但快速的方法是,在 Grafana 配置中允许插件向任意 URL 发出请求,然后直接调用 http://actix-service:8080/api/correlate。在真实项目中,使用 Grafana 的后端代理是更安全、更规范的做法。

局限性与未来展望

这套方案有效地解决了最初的痛点,但它并非完美。当前的实现还存在一些可以迭代优化的方向:

  1. 查询性能: 当日志量巨大时,在后端服务中进行全量聚合可能会成为瓶颈。可以考虑引入流式处理或者分页机制,只对可见部分的数据进行关联查询。
  2. 关联逻辑的普适性: 目前的关联逻辑是基于服务标签进行的简单匹配。一个更强大的系统应该支持更复杂的关联规则,例如根据日志中的 traceID 去 Jaeger 查询链路信息,或者根据 userID 去用户中心查询用户信息。这需要将关联服务设计得更具扩展性,比如采用插件化或规则引擎。
  3. 实时性: 当前是面板刷新时才拉取数据。对于需要实时监控的场景,可以改造为 WebSocket 推送模式,由 Actix-web 服务在检测到新的重要日志时,主动将富文本信息推送给前端面板。
  4. UI 体验: 目前的表格渲染非常简陋。生产级的面板需要更精细的 UI 设计,例如可折叠的元数据、标签高亮、以及点击元数据跳转到对应 Consul 页面的交互功能。

尽管存在这些待办事项,这个架构已经证明了其价值:通过将一个高性能的自定义后端服务与 Grafana 灵活的插件系统相结合,我们可以打破数据源的壁垒,构建出远超原生能力的、高度定制化的可观测性视图。


  目录