技术痛点:隔离的数据孤岛
在复杂的微服务环境中,Grafana、Loki 和 Consul 是我们的标准技术栈。Grafana 负责展示,Loki 存储海量日志,Consul 管理服务发现和配置。日常排障时,我们面临一个典型困境:在 Loki 面板中看到一条来自服务 A 的错误日志,但无法立刻确定这条日志产生时,该服务实例的具体配置是什么、它依赖的下游服务版本、或者当时是否有相关的部署事件。这些上下文信息通常分散在 Consul 的 KV 存储、服务元数据或者其他系统中。分析过程需要在多个 Grafana 面板甚至多个系统之间来回切换,效率低下且容易遗漏关键信息。标准的 Loki 查询面板只能展示日志本身,无法将这些动态的、来自异构系统的上下文信息进行实时关联与聚合。
我们需要的是一个“智能”的 Grafana 面板,它不仅能展示日志,还能在每一条日志旁边,自动附加上下文信息。例如,当一条日志显示时,面板能自动从 Consul 查询并展示该服务实例的元数据、当前的动态配置,甚至关联的部署标签。这就要求我们构建一个超越 Grafana 原生数据源能力的中间层,并以自定义面板的形式呈现出来。
架构构想与技术选型
为了解决这个问题,我们决定开发一个自定义的 Grafana Panel 插件。这个插件的架构分为两部分:
- 前端插件 (Panel Plugin): 运行在浏览器中,负责 UI 渲染和与 Grafana 后端交互。
- 后端关联服务 (Correlation Service): 一个独立的高性能微服务,作为插件的数据中转和处理中心。它负责接收前端的查询请求,然后分别从 Loki 获取原始日志,从 Consul 获取上下文元数据,将两者聚合后返回给前端。
这样的设计将复杂的业务逻辑后置,让前端插件只专注于数据展示,同时后端服务可以独立迭代和扩缩容。
技术选型决策如下:
- 后端关联服务: 我们选择了 Actix-web (Rust)。对于一个核心的可观测性组件,性能、内存安全和高并发处理能力是首要考量。Rust 语言的特性恰好满足这些要求,而 Actix-web 是该生态中最成熟和高性能的 Web 框架之一。
- 服务配置与发现: Consul。这已经是我们现有的基础设施。后端服务将通过 Consul API 动态获取 Loki 的地址,以及查询服务实例的元数据和 KV 配置。这避免了硬编码,使整个系统更加动态和健壮。
- 日志存储: Loki。作为 Grafana 生态的一等公民,集成成本最低。
- 前端插件构建: 官方推荐使用 Webpack,但我们选择了 Rollup。对于一个功能相对单一的 Panel 插件,Rollup 的配置更简洁,其基于 ES Module 的 tree-shaking 能力也可能带来更小的打包体积。在真实项目中,选择更熟悉的工具链能提升开发效率。
整体数据流如下:
sequenceDiagram
participant User as 用户
participant PanelPlugin as Grafana 面板 (Rollup构建)
participant GrafanaBE as Grafana 后端代理
participant CorrelationSvc as Actix-web 关联服务
participant Loki
participant Consul
User->>PanelPlugin: 查看面板
PanelPlugin->>GrafanaBE: 发起数据查询请求 (代理)
GrafanaBE->>CorrelationSvc: POST /api/correlate
CorrelationSvc->>Consul: 获取 Loki 地址和查询元数据
Consul-->>CorrelationSvc: 返回配置信息
CorrelationSvc->>Loki: 查询原始日志 (LogQL)
Loki-->>CorrelationSvc: 返回日志流
CorrelationSvc->>CorrelationSvc: 聚合日志与 Consul 元数据
CorrelationSvc-->>GrafanaBE: 返回聚合后的 JSON 数据
GrafanaBE-->>PanelPlugin: 返回数据
PanelPlugin->>User: 渲染富文本日志视图
步骤化实现:从后端服务到前端插件
1. 构建 Actix-web 关联服务
这是整个系统的核心。它需要能够处理并发请求,并高效地与 Consul 和 Loki 通信。
首先,定义项目依赖 Cargo.toml:
[package]
name = "grafana-correlation-service"
version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
config = "0.13"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2"
服务入口与配置 main.rs
我们将服务配置(如 Consul 地址、Loki 地址等)通过一个 config.toml 文件管理,并在启动时加载。
// src/main.rs
use actix_web::{web, App, HttpServer, Responder, HttpResponse, middleware::Logger};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::info;
mod error;
mod consul;
mod loki;
mod config;
// 定义应用状态,包含 Consul 和 Loki 客户端
pub struct AppState {
consul_client: consul::ConsulClient,
loki_client: loki::LokiClient,
}
// 定义前端请求的 Body 结构
#[derive(Deserialize, Debug)]
struct CorrelateRequest {
log_query: String, // LogQL 查询语句
service_tags: Vec<String>, // 用于从 Consul 查找元数据的服务标签
}
// 定义返回给前端的聚合日志结构
#[derive(Serialize, Debug)]
struct EnrichedLogEntry {
timestamp: String,
line: String,
labels: serde_json::Value,
consul_metadata: Option<consul::ServiceMetadata>,
}
// 核心处理逻辑
async fn correlate_logs(
state: web::Data<Arc<AppState>>,
req: web::Json<CorrelateRequest>,
) -> impl Responder {
info!("Received correlation request: {:?}", req);
// 1. 从 Loki 查询日志
let loki_result = state.loki_client.query(&req.log_query).await;
let log_entries = match loki_result {
Ok(entries) => entries,
Err(e) => {
return HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }));
}
};
// 2. 在真实项目中,这里会基于日志标签(如 service name, instance id)
// 去 Consul 查询对应的服务元数据。为简化示例,我们用请求中的 tags。
let consul_metadata = state.consul_client.get_service_metadata(&req.service_tags).await.ok();
// 3. 聚合数据
let enriched_logs: Vec<EnrichedLogEntry> = log_entries.into_iter().map(|log| {
EnrichedLogEntry {
timestamp: log.timestamp,
line: log.line,
labels: log.labels,
// 克隆元数据附加到每条日志上
// 生产环境中应有更精细的匹配逻辑
consul_metadata: consul_metadata.clone(),
}
}).collect();
HttpResponse::Ok().json(enriched_logs)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// 初始化日志
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
// 加载配置
let settings = config::Settings::new().expect("Failed to load configuration");
info!("Configuration loaded: {:?}", settings);
// 创建客户端实例
let consul_client = consul::ConsulClient::new(settings.consul.address);
// 从 Consul KV 获取 Loki 地址,这是一个常见的实践
let loki_address = consul_client.get_kv("config/loki/address")
.await
.unwrap_or(settings.loki.address);
info!("Using Loki address: {}", loki_address);
let loki_client = loki::LokiClient::new(loki_address);
// 创建共享的应用状态
let app_state = Arc::new(AppState {
consul_client,
loki_client,
});
info!("Starting server at http://127.0.0.1:8080");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(app_state.clone()))
.wrap(Logger::default())
.service(web::resource("/api/correlate").route(web::post().to(correlate_logs)))
})
.bind("0.0.0.0:8080")?
.run()
.await
}
Consul 客户端 consul.rs
这个模块负责与 Consul API 交互,获取服务元数据和 KV 配置。这里的坑在于,Consul API 的返回结构可能很复杂,需要精确定义 serde 结构体。
// src/consul.rs
use crate::error::AppError;
use reqwest::Client;
use serde::{Deserialize, Serialize};
#[derive(Clone)]
pub struct ConsulClient {
client: Client,
address: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServiceMetadata {
pub node: String,
pub address: String,
pub service_name: String,
pub service_port: u16,
pub service_meta: serde_json::Value,
}
impl ConsulClient {
pub fn new(address: String) -> Self {
Self {
client: Client::new(),
address,
}
}
// 从 Consul KV 获取值
pub async fn get_kv(&self, key: &str) -> Result<String, AppError> {
let url = format!("{}/v1/kv/{}", self.address, key);
let res = self.client.get(&url).send().await?.json::<Vec<serde_json::Value>>().await?;
// Consul KV API 返回一个数组,即使只有一个 key
if let Some(item) = res.get(0) {
if let Some(value_b64) = item.get("Value").and_then(|v| v.as_str()) {
let bytes = base64::decode(value_b64)?;
return Ok(String::from_utf8(bytes)?);
}
}
Err(AppError::ConsulError("KV key not found".to_string()))
}
// 根据标签获取服务元数据
pub async fn get_service_metadata(&self, tags: &[String]) -> Result<ServiceMetadata, AppError> {
// 生产环境应该支持更复杂的查询和过滤逻辑
if tags.is_empty() {
return Err(AppError::ConsulError("No tags provided".to_string()));
}
let service_name = &tags[0]; // 简化:使用第一个 tag 作为服务名
let url = format!("{}/v1/catalog/service/{}", self.address, service_name);
let services = self.client.get(&url).send().await?
.json::<Vec<serde_json::Value>>().await?;
// 简化:返回找到的第一个服务实例的元数据
if let Some(service) = services.get(0) {
Ok(ServiceMetadata {
node: service["Node"].as_str().unwrap_or_default().to_string(),
address: service["Address"].as_str().unwrap_or_default().to_string(),
service_name: service["ServiceName"].as_str().unwrap_or_default().to_string(),
service_port: service["ServicePort"].as_u64().unwrap_or_default() as u16,
service_meta: service["ServiceMeta"].clone(),
})
} else {
Err(AppError::ConsulError(format!("Service '{}' not found", service_name)))
}
}
}
Loki 客户端 loki.rs
这个模块封装了对 Loki 的查询。Loki 的查询 API (LogQL) 功能强大,但返回的数据结构也需要小心处理。
// src/loki.rs
use crate::error::AppError;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use url::form_urlencoded;
#[derive(Clone)]
pub struct LokiClient {
client: Client,
address: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LokiLogEntry {
pub timestamp: String,
pub line: String,
pub labels: serde_json::Value,
}
#[derive(Deserialize, Debug)]
struct LokiResponse {
status: String,
data: LokiData,
}
#[derive(Deserialize, Debug)]
struct LokiData {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<LokiStream>,
}
#[derive(Deserialize, Debug)]
struct LokiStream {
stream: serde_json::Value,
values: Vec<(String, String)>,
}
impl LokiClient {
pub fn new(address: String) -> Self {
Self {
client: Client::new(),
address,
}
}
pub async fn query(&self, query: &str) -> Result<Vec<LokiLogEntry>, AppError> {
let encoded_query = form_urlencoded::byte_serialize(query.as_bytes()).collect::<String>();
let url = format!("{}/loki/api/v1/query_range?query={}", self.address, encoded_query);
// 在真实项目中,start, end, limit 等参数应该由前端传递
let res = self.client.get(&url)
.query(&[("limit", "100")])
.send().await?
.json::<LokiResponse>().await?;
if res.status != "success" {
return Err(AppError::LokiError("Query failed".to_string()));
}
let mut entries = Vec::new();
for stream in res.data.result {
for (ts, line) in stream.values {
entries.push(LokiLogEntry {
timestamp: ts,
line,
labels: stream.stream.clone(),
});
}
}
Ok(entries)
}
}
其余的 config.rs 和 error.rs 模块用于配置加载和统一错误处理,这里不再赘述。这个服务现在已经具备了核心的关联查询能力。
2. 开发 Grafana 面板插件 (Rollup 构建)
Grafana 插件本质上是一个 React 应用。我们将重点放在构建配置和与后端服务的通信上。
项目结构如下:
grafana-correlation-panel/
├── src/
│ ├── components/
│ │ └── CorrelationPanel.tsx
│ ├── module.ts
│ └── plugin.json
├── package.json
└── rollup.config.js
plugin.json - 插件元数据
{
"type": "panel",
"name": "Correlation Panel",
"id": "my-correlation-panel",
"info": {
"description": "A panel to show logs enriched with Consul metadata",
"author": {
"name": "Your Name"
},
"version": "1.0.0"
},
"dependencies": {
"grafanaVersion": "8.x.x",
"plugins": []
}
}
rollup.config.js - 使用 Rollup 进行构建
这里的关键是正确配置插件,以处理 TypeScript、React JSX、CSS 以及 Grafana 依赖的外部化。一个常见的错误是把 Grafana 的核心库打包进去,导致插件体积巨大且可能与宿主环境冲突。
// rollup.config.js
import resolve from '@rollup/plugin-node-resolve';
import commonjs from '@rollup/plugin-commonjs';
import typescript from 'rollup-plugin-typescript2';
import postcss from 'rollup-plugin-postcss';
import replace from '@rollup/plugin-replace';
import { terser } from 'rollup-plugin-terser';
const pkg = require('./package.json');
const grafanaExternals = [
'react',
'react-dom',
'react-router-dom',
'@grafana/data',
'@grafana/ui',
'@grafana/runtime',
];
export default {
input: 'src/module.ts',
output: {
file: 'dist/module.js',
format: 'amd', // Grafana 插件必须是 AMD 格式
sourcemap: true,
},
plugins: [
resolve({
browser: true,
}),
commonjs(),
typescript({
tsconfig: './tsconfig.json',
// 避免 rollup 和 tsc 的缓存冲突
clean: true,
}),
postcss({
modules: true,
}),
replace({
'process.env.NODE_ENV': JSON.stringify('production'),
preventAssignment: true,
}),
terser()
],
// 关键:将 Grafana 提供的库排除在打包之外
external: (id) => grafanaExternals.some(prefix => id.startsWith(prefix)),
};
CorrelationPanel.tsx - React 组件
这个组件负责发起请求并渲染数据。它使用 @grafana/runtime 中的 getBackendSrv() 来发送请求,这会自动利用 Grafana 的后端代理,解决跨域问题。
// src/components/CorrelationPanel.tsx
import React, { useState, useEffect } from 'react';
import { PanelProps } from '@grafana/data';
import { getBackendSrv } from '@grafana/runtime';
// 假设 options 中配置了查询参数
interface PanelOptions {
logQuery: string;
serviceTags: string[];
}
interface EnrichedLog {
timestamp: string;
line: string;
labels: Record<string, string>;
consul_metadata?: any;
}
export const CorrelationPanel: React.FC<PanelProps<PanelOptions>> = ({ options, width, height }) => {
const [data, setData] = useState<EnrichedLog[]>([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
const fetchData = async () => {
setLoading(true);
setError(null);
try {
// Grafana Panel 插件通过 datasourceRequest 调用自定义后端服务
// 这里需要配置一个 JSON API 数据源指向我们的 Actix-web 服务
// 或者更简单的,通过插件路由代理
// 为了演示,我们假设插件有一个后端路由 `my-correlation-panel/correlate`
const response = await getBackendSrv().post('/api/plugins/my-correlation-panel/routes/correlate', {
log_query: options.logQuery,
service_tags: options.serviceTags,
});
setData(response);
} catch (err: any) {
setError(err.message || 'Failed to fetch data');
} finally {
setLoading(false);
}
};
fetchData();
}, [options.logQuery, options.serviceTags]);
if (loading) {
return <div>Loading...</div>;
}
if (error) {
return <div>Error: {error}</div>;
}
return (
<div style={{ width, height, overflow: 'auto' }}>
<table>
<thead>
<tr>
<th>Timestamp</th>
<th>Log Line</th>
<th>Consul Metadata</th>
</tr>
</thead>
<tbody>
{data.map((log, index) => (
<tr key={index}>
<td>{new Date(parseInt(log.timestamp.slice(0, 13))).toISOString()}</td>
<td><pre>{log.line}</pre></td>
<td>
{log.consul_metadata ? (
<pre>{JSON.stringify(log.consul_metadata, null, 2)}</pre>
) : (
'N/A'
)}
</td>
</tr>
))}
</tbody>
</table>
</div>
);
};
为了让 getBackendSrv().post('/api/plugins/my-correlation-panel/routes/correlate', ...) 工作,还需要在 plugin.json 中启用后端插件部分,并实现一个简单的 Go 代理。但一个更 Hacky 但快速的方法是,在 Grafana 配置中允许插件向任意 URL 发出请求,然后直接调用 http://actix-service:8080/api/correlate。在真实项目中,使用 Grafana 的后端代理是更安全、更规范的做法。
局限性与未来展望
这套方案有效地解决了最初的痛点,但它并非完美。当前的实现还存在一些可以迭代优化的方向:
- 查询性能: 当日志量巨大时,在后端服务中进行全量聚合可能会成为瓶颈。可以考虑引入流式处理或者分页机制,只对可见部分的数据进行关联查询。
- 关联逻辑的普适性: 目前的关联逻辑是基于服务标签进行的简单匹配。一个更强大的系统应该支持更复杂的关联规则,例如根据日志中的
traceID去 Jaeger 查询链路信息,或者根据userID去用户中心查询用户信息。这需要将关联服务设计得更具扩展性,比如采用插件化或规则引擎。 - 实时性: 当前是面板刷新时才拉取数据。对于需要实时监控的场景,可以改造为 WebSocket 推送模式,由 Actix-web 服务在检测到新的重要日志时,主动将富文本信息推送给前端面板。
- UI 体验: 目前的表格渲染非常简陋。生产级的面板需要更精细的 UI 设计,例如可折叠的元数据、标签高亮、以及点击元数据跳转到对应 Consul 页面的交互功能。
尽管存在这些待办事项,这个架构已经证明了其价值:通过将一个高性能的自定义后端服务与 Grafana 灵活的插件系统相结合,我们可以打破数据源的壁垒,构建出远超原生能力的、高度定制化的可观测性视图。