一个基于计算机视觉(CV)的图像审核服务最近遇到了瓶颈。任务提交后,业务方无法实时了解处理进度,只能被动等待结果。当任务处理延迟时,我们无法快速定位瓶颈是在消息队列积压,还是在CV模型推理缓慢。缺乏端到端的可观测性,让整个系统成了一个黑盒。直接引入复杂的APM系统如SkyWalking或Pinpoint对于这个特定、独立的系统来说显得过重,我们需要一个轻量级、自包含的解决方案。
初步的构想是构建一个实时仪表盘。这个仪表盘不仅要展示队列的基本状态(如消息积压数),更重要的是,要能追踪每个任务从进入队列到处理完成的全过程。这意味着我们需要一个机制,将前端提交的请求、消息队列中的消息、以及后端处理的CV任务通过一个唯一的标识符关联起来。
技术选型上,我们做出了如下决策:
- 消息队列 (ActiveMQ): 团队对JMS规范熟悉,且ActiveMQ在内部已经有应用。它的消息属性(Message Properties)是实现端到端追踪的关键,可以用来携带追踪ID和时间戳。此外,其Advisory Messages机制可以用来监控消费者连接状态,为我们提供了开箱即用的工作节点状态监控能力。
- CV工作节点 (Java + DJL): 主体处理逻辑使用Java实现,因为它与ActiveMQ的集成最成熟。CV能力通过Amazon的Deep Java Library (DJL) 实现,它可以方便地加载预训练的PyTorch或TensorFlow模型,比如YOLOv5用于对象检测。
- 容器化 (Jib): 对于Java应用,Jib提供了无需编写Dockerfile的容器化方案。它能直接集成到Maven构建流程中,生成优化的分层镜像,极大地简化了CI/CD流程,并且比
docker build更快。 - 前端仪表盘 (Chakra UI): 需要快速构建一个功能性的内部仪表盘。Chakra UI的组件化和高可组合性非常适合这种场景,使我们能专注于数据展示逻辑,而不是繁琐的CSS样式调整。
- 实时通信 (WebSockets): 为了将后端状态实时推送到前端,传统的HTTP轮询效率低下且延迟高。我们选择WebSocket,由一个专门的监控服务(Monitor Service)负责聚合后端状态,并通过WebSocket长连接将数据实时推送到Chakra UI仪表盘。
整个系统的架构如下所示:
graph TD
subgraph "用户浏览器"
A[Chakra UI 仪表盘]
end
subgraph "后端服务 (Jib 容器化)"
B(API Gateway) -- HTTP POST --> C{ActiveMQ Broker}
C -- cv-task-queue --> D1[CV Worker 1]
C -- cv-task-queue --> D2[CV Worker 2]
C -- cv-task-queue --> Dn[CV Worker N]
D1 -- cv-status-topic --> E[Monitor Service]
D2 -- cv-status-topic --> E
Dn -- cv-status-topic --> E
C -- Advisory Messages --> E
end
A <-. WebSocket .-> E
style A fill:#D6EAF8,stroke:#3498DB
style E fill:#D5F5E3,stroke:#2ECC71
style C fill:#FCF3CF,stroke:#F1C40F
style D1 fill:#EBDEF0,stroke:#8E44AD
style D2 fill:#EBDEF0,stroke:#8E44AD
style Dn fill:#EBDEF0,stroke:#8E44AD
第一步:构建核心CV工作节点并使用Jib容器化
CV工作节点是整个系统的核心处理单元。它从cv-task-queue队列消费任务,执行模型推理,然后将带有性能指标的结果发送到cv-status-topic主题。
首先是Maven依赖配置,包含了ActiveMQ、DJL(及其YOLOv5模型)、Spring Boot和Jib插件。
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot for simplified setup -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- DJL for Computer Vision -->
<dependency>
<groupId>ai.djl</groupId>
<artifactId>api</artifactId>
<version>0.26.0</version>
</dependency>
<dependency>
<groupId>ai.djl.pytorch</groupId>
<artifactId>pytorch-engine</artifactId>
<version>0.26.0</version>
</dependency>
<dependency>
<groupId>ai.djl.pytorch</groupId>
<artifactId>pytorch-model-zoo</artifactId>
<version>0.26.0</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>3.4.0</version>
<configuration>
<from>
<!-- Use an optimized base image for Java -->
<image>eclipse-temurin:17-jre-jammy</image>
</from>
<to>
<!-- Target image name -->
<image>my-registry/cv-worker:1.0.0</image>
</to>
<container>
<mainClass>com.example.cvworker.CvWorkerApplication</mainClass>
<ports>
<!-- Expose any necessary ports, though this worker might not need any -->
</ports>
<jvmFlags>
<jvmFlag>-Xms512m</jvmFlag>
<jvmFlag>-Xmx1024m</jvmFlag>
<jvmFlag>-Djava.awt.headless=true</jvmFlag>
</jvmFlags>
</container>
</configuration>
</plugin>
</plugins>
</build>
Jib的配置非常直观。我们指定了基础镜像、目标镜像名称、主类和JVM参数。在真实项目中,my-registry应替换为实际的Docker镜像仓库地址。执行mvn compile jib:build即可完成镜像构建和推送,无需本地安装Docker守护进程。
接下来是工作节点的业务逻辑。
// CvTaskListener.java
package com.example.cvworker;
import ai.djl.Application;
import ai.djl.inference.Predictor;
import ai.djl.modality.cv.Image;
import ai.djl.modality.cv.ImageFactory;
import ai.djl.modality.cv.output.DetectedObjects;
import ai.djl.repository.zoo.Criteria;
import ai.djl.repository.zoo.ZooModel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
public class CvTaskListener {
private static final Logger logger = LoggerFactory.getLogger(CvTaskListener.class);
private final JmsTemplate jmsTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Predictor<Image, DetectedObjects> predictor;
@Autowired
public CvTaskListener(JmsTemplate jmsTemplate) throws Exception {
this.jmsTemplate = jmsTemplate;
// Initialize the DJL model once on startup
// This is a time-consuming operation and should not be done per message.
Criteria<Image, DetectedObjects> criteria = Criteria.builder()
.optApplication(Application.CV.OBJECT_DETECTION)
.setTypes(Image.class, DetectedObjects.class)
.optFilter("backbone", "yolov5s")
.optEngine("PyTorch")
.build();
ZooModel<Image, DetectedObjects> model = criteria.loadModel();
this.predictor = model.newPredictor();
logger.info("YOLOv5 model loaded successfully.");
}
@JmsListener(destination = "cv-task-queue")
public void receiveMessage(Message message) {
String traceId = null;
long queueEntryTimestamp = 0L;
try {
// 1. Extract tracing metadata from message properties
traceId = message.getStringProperty("traceId");
queueEntryTimestamp = message.getLongProperty("startTimestamp");
if (traceId == null || queueEntryTimestamp == 0) {
logger.warn("Received a message without tracing properties. Ignoring.");
return;
}
// Notify monitor service that processing has started
publishStatus(traceId, queueEntryTimestamp, "PROCESSING", null, -1);
long processingStart = System.currentTimeMillis();
// 2. Process the image data
if (!(message instanceof BytesMessage)) {
throw new IllegalArgumentException("Message must be of type BytesMessage");
}
BytesMessage bytesMessage = (BytesMessage) message;
byte[] imageBytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(imageBytes);
Image img = ImageFactory.getInstance().fromInputStream(new ByteArrayInputStream(imageBytes));
DetectedObjects detections = predictor.predict(img);
long processingEnd = System.currentTimeMillis();
long processingTimeMs = processingEnd - processingStart;
logger.info("TraceID [{}]: CV processing completed in {} ms. Found {} objects.",
traceId, processingTimeMs, detections.getNumberOfObjects());
// 3. Publish completion status to the status topic
publishStatus(traceId, queueEntryTimestamp, "COMPLETED", detections.toString(), processingTimeMs);
} catch (Exception e) {
logger.error("TraceID [{}]: Failed to process CV task.", traceId, e);
if (traceId != null && queueEntryTimestamp != 0) {
try {
// Publish failure status
publishStatus(traceId, queueEntryTimestamp, "FAILED", e.getMessage(), -1);
} catch (Exception publishEx) {
logger.error("TraceID [{}]: Failed to publish failure status.", traceId, publishEx);
}
}
// In a real system, you would likely move the message to a Dead Letter Queue (DLQ).
// Spring JMS can be configured to do this automatically.
}
}
private void publishStatus(String traceId, long queueEntryTimestamp, String status, String result, long processingTimeMs) throws Exception {
Map<String, Object> statusUpdate = new HashMap<>();
statusUpdate.put("traceId", traceId);
statusUpdate.put("status", status);
statusUpdate.put("result", result);
statusUpdate.put("processingTimeMs", processingTimeMs);
statusUpdate.put("queueEntryTimestamp", queueEntryTimestamp);
statusUpdate.put("eventTimestamp", System.currentTimeMillis());
String jsonPayload = objectMapper.writeValueAsString(statusUpdate);
jmsTemplate.convertAndSend("cv-status-topic", jsonPayload);
}
}
这段代码的核心在于:
- 模型预加载: DJL模型在构造函数中加载,避免了每次处理消息时都重复加载模型的巨大开销。
- 元数据提取:
JmsListener方法首先从消息属性中提取traceId和startTimestamp,这是实现追踪的关键。 - 状态发布: 在处理开始、处理完成或处理失败时,都会调用
publishStatus方法,向cv-status-topic发送一个JSON消息。这个消息包含了追踪ID、状态、处理耗时等所有前端需要的信息。这是一个典型的事件驱动更新模式。 - 错误处理: 包含了基础的异常捕获,并在失败时发布
FAILED状态。
第二步:API网关与监控服务
我们需要两个简单的Spring Boot应用:一个是接收HTTP请求并向队列发送任务的API网关,另一个是消费状态更新并将其通过WebSocket推送的监控服务。
API Gateway (api-gateway service):
// TaskController.java
package com.example.apigateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import jakarta.jms.BytesMessage;
import java.util.UUID;
@RestController
@RequestMapping("/api/tasks")
public class TaskController {
@Autowired
private JmsTemplate jmsTemplate;
@PostMapping(consumes = "multipart/form-data")
public Map<String, String> submitTask(@RequestParam("image") MultipartFile imageFile) throws Exception {
if (imageFile.isEmpty()) {
throw new IllegalArgumentException("Image file is empty.");
}
final String traceId = UUID.randomUUID().toString();
final long startTimestamp = System.currentTimeMillis();
jmsTemplate.send("cv-task-queue", session -> {
BytesMessage message = session.createBytesMessage();
message.writeBytes(imageFile.getBytes());
// Set tracing properties
message.setStringProperty("traceId", traceId);
message.setLongProperty("startTimestamp", startTimestamp);
return message;
});
return Map.of("traceId", traceId, "status", "QUEUED");
}
}
该控制器接收一个图片文件,生成traceId和startTimestamp,将它们设置到JMS消息的属性中,然后将图片字节流作为消息体发送到cv-task-queue。
Monitor Service (monitor-service):
这个服务是连接后端和前端的桥梁。它需要spring-boot-starter-websocket和spring-boot-starter-activemq依赖。
// WebSocketConfig.java
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private StatusUpdateHandler statusUpdateHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(statusUpdateHandler, "/ws/status").setAllowedOrigins("*");
}
}
// StatusUpdateHandler.java
@Component
public class StatusUpdateHandler extends TextWebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(StatusUpdateHandler.class);
// Use ConcurrentHashMap for thread-safe session management
private final Set<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
logger.info("New WebSocket connection established: {}", session.getId());
}
public void broadcast(String message) {
for (WebSocketSession session : sessions) {
try {
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
} catch (IOException e) {
logger.error("Failed to send message to session {}", session.getId(), e);
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
logger.info("WebSocket connection closed: {}. Status: {}", session.getId(), status);
}
}
// StatusTopicListener.java
@Component
public class StatusTopicListener {
@Autowired
private StatusUpdateHandler statusUpdateHandler;
// Listens to the status topic from CV workers
@JmsListener(destination = "cv-status-topic")
public void receiveStatusUpdate(String jsonPayload) {
// Simply forward the raw JSON payload to all connected WebSocket clients
statusUpdateHandler.broadcast(jsonPayload);
}
// Listens to ActiveMQ Advisory Topics for consumer changes
@JmsListener(destination = "ActiveMQ.Advisory.Consumer.Queue.cv-task-queue")
public void handleConsumerAdvisory(ActiveMQMessage message) throws Exception {
DataStructure ds = message.getDataStructure();
if (ds instanceof ConsumerInfo) {
ConsumerInfo consumerInfo = (ConsumerInfo) ds;
int consumerCount = consumerInfo.getConsumerId().getConnectionId() != null ? 1 : 0; // Simplified logic
String eventType = message.getJMSXGroupSeq() > 0 ? "CONSUMER_STARTED" : "CONSUMER_STOPPED";
// In a real system, you'd need more robust logic to count active consumers.
// This is a simplified example to show the concept.
// We'll create a synthetic JSON message to broadcast
String advisoryPayload = String.format(
"{\"type\":\"ADVISORY\",\"event\":\"%s\",\"consumerCount\":%d}",
eventType,
consumerCount
);
statusUpdateHandler.broadcast(advisoryPayload);
}
}
}
这里的关键点是:
- WebSocket处理器:
StatusUpdateHandler管理所有连接的WebSocket会话,并提供一个broadcast方法向所有客户端发送消息。 - JMS监听器:
StatusTopicListener监听两个目的地:-
cv-status-topic: 接收来自CV工作节点的状态更新,并直接广播给前端。 -
ActiveMQ.Advisory.Consumer.Queue.cv-task-queue: 这是ActiveMQ的Advisory Topic。当有新的消费者连接或断开cv-task-queue时,Broker会向这个主题发送消息。我们监听此消息来实时更新“活跃工作节点”的数量。这是一种非常轻量级的服务发现机制。
-
第三步:Chakra UI 实时仪表盘
前端使用React和Chakra UI构建。核心是使用WebSocket API接收来自monitor-service的实时数据流,并更新React状态。
// Dashboard.js
import React, { useState, useEffect, useRef } from 'react';
import {
ChakraProvider, Box, Heading, Table, Thead, Tbody, Tr, Th, Td,
Tag, Stat, StatLabel, StatNumber, StatGroup, Code, useToast
} from '@chakra-ui/react';
import { theme } from '@chakra-ui/react';
const TaskStatusTag = ({ status }) => {
const colorScheme = {
QUEUED: 'gray',
PROCESSING: 'blue',
COMPLETED: 'green',
FAILED: 'red',
}[status];
return <Tag colorScheme={colorScheme}>{status}</Tag>;
};
function App() {
const [tasks, setTasks] = useState({});
const [activeWorkers, setActiveWorkers] = useState(0); // This part is a simplification
const ws = useRef(null);
const toast = useToast();
useEffect(() => {
// The WebSocket URL should point to your Monitor Service
const WEBSOCKET_URL = 'ws://localhost:8082/ws/status';
ws.current = new WebSocket(WEBSOCKET_URL);
ws.current.onopen = () => {
console.log('WebSocket Connected');
toast({ title: 'Connected to Monitor Service', status: 'success', duration: 2000, isClosable: true });
};
ws.current.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
// Handle advisory messages for worker count
if (message.type === 'ADVISORY') {
// This is a simplified logic. A real implementation would need a more robust way to count.
if (message.event === 'CONSUMER_STARTED') {
setActiveWorkers(prev => prev + 1);
} else if (message.event === 'CONSUMER_STOPPED') {
setActiveWorkers(prev => Math.max(0, prev - 1));
}
return;
}
// Handle regular task status updates
const { traceId, status, queueEntryTimestamp, processingTimeMs, eventTimestamp } = message;
setTasks(prevTasks => {
const existingTask = prevTasks[traceId] || { traceId, status: 'QUEUED', startTime: queueEntryTimestamp };
const endToEndLatency = status === 'COMPLETED' ? eventTimestamp - existingTask.startTime : null;
const updatedTask = {
...existingTask,
status,
processingTimeMs: processingTimeMs > 0 ? processingTimeMs : existingTask.processingTimeMs,
endToEndLatency,
lastUpdated: eventTimestamp,
};
return { ...prevTasks, [traceId]: updatedTask };
});
} catch (error) {
console.error('Failed to parse WebSocket message:', event.data, error);
}
};
ws.current.onclose = () => {
console.log('WebSocket Disconnected');
toast({ title: 'Disconnected from Monitor Service', description: 'Attempting to reconnect...', status: 'error', duration: 5000, isClosable: true });
// You might want to implement a reconnection logic here
};
return () => {
ws.current.close();
};
}, [toast]);
const sortedTasks = Object.values(tasks).sort((a, b) => b.lastUpdated - a.lastUpdated);
return (
<ChakraProvider theme={theme}>
<Box p={8}>
<Heading mb={6}>CV Processing Pipeline - Real-time Dashboard</Heading>
<StatGroup mb={8}>
<Stat>
<StatLabel>Total Tasks Tracked</StatLabel>
<StatNumber>{sortedTasks.length}</StatNumber>
</Stat>
<Stat>
<StatLabel>Active CV Workers</StatLabel>
<StatNumber>{activeWorkers}</StatNumber>
</Stat>
</StatGroup>
<Table variant="simple">
<Thead>
<Tr>
<Th>Trace ID</Th>
<Th>Status</Th>
<Th isNumeric>Model Inference Time (ms)</Th>
<Th isNumeric>End-to-End Latency (ms)</Th>
</Tr>
</Thead>
<Tbody>
{sortedTasks.slice(0, 20).map(task => ( // Display latest 20 tasks
<Tr key={task.traceId}>
<Td><Code>{task.traceId}</Code></Td>
<Td><TaskStatusTag status={task.status} /></Td>
<Td isNumeric>{task.processingTimeMs || 'N/A'}</Td>
<Td isNumeric>{task.endToEndLatency || 'In Progress'}</Td>
</Tr>
))}
</Tbody>
</Table>
</Box>
</ChakraProvider>
);
}
export default App;
前端代码的逻辑很清晰:
- 状态管理: 使用
useState维护一个以traceId为键的任务对象tasks。这使得更新特定任务的状态变得高效。 - WebSocket连接:
useEffect钩子负责建立和清理WebSocket连接。useRef用于在多次渲染之间保持对WebSocket实例的引用。 - 消息处理:
onmessage事件处理器解析收到的JSON。如果是任务状态更新,它会计算端到端延迟(eventTimestamp - queueEntryTimestamp),并更新tasks状态。如果是Advisory消息,则更新activeWorkers状态。 - 组件渲染: 使用Chakra UI的
Table,Tag,Stat等组件来清晰地展示数据。TaskStatusTag是一个简单的组件,根据任务状态显示不同颜色的标签。
当前方案的局限性与未来迭代
这套自建的轻量级监控系统有效地解决了最初的“黑盒”问题,但它并非一个完备的生产级可观测性平台。存在一些明显的局限:
- Monitor Service的单点问题:
monitor-service目前是单体的,并且其WebSocket会话状态是保存在内存中的。如果该服务重启,所有前端连接都会断开,并且不会有历史状态。在生产环境中,可以考虑将其部署为多个实例,并使用Redis Pub/Sub来广播消息,以解决单点故障和状态共享问题。 - 追踪范围有限: 当前的追踪始于消息进入ActiveMQ,止于CV Worker处理完毕。它没有覆盖从用户浏览器到API Gateway的HTTP请求阶段。要实现真正的全链路追踪,需要引入遵循OpenTelemetry标准的追踪库,在HTTP头和JMS消息属性中传播统一的Trace Context。
- 指标聚合与持久化不足: 系统只展示了瞬时状态。无法进行历史趋势分析、告警或SLO计算。一个自然的演进方向是将
monitor-service收集到的状态事件(如处理耗时、任务成功/失败率)转换为Prometheus指标,通过/metrics端点暴露出去,然后使用Grafana进行持久化存储和更丰富的数据可视化。 - Advisory Message的复杂性: 通过Advisory Message来统计消费者数量在集群和复杂网络环境下可能不完全准确。更可靠的方式是通过JMX接口定期查询Broker的队列信息,但这会从“事件驱动”变为“轮询”,需要权衡。
尽管存在这些局限,但该方案作为一个起点,成功地利用ActiveMQ的消息属性、Jib的便捷容器化以及Chakra UI的快速开发能力,以较低的成本实现了一个高度定制化的实时监控仪表盘,为后续构建更完善的可观测性体系奠定了基础。