单体应用里的速率限制实现起来直截了当,一个内存中的哈希表加几行逻辑就能搞定。但在水平扩展的微服务架构里,这种方式立刻失效。每个 Fastify 实例都只维护自身节点的请求计数,无法对来自同一客户端的跨节点请求进行全局限制。这就导致实际的请求上限是 limit * N(N为节点数),完全违背了初衷。
最初的构想是引入一个集中式存储,比如 Redis。使用 INCR 和 EXPIRE 组合可以快速实现一个固定窗口计数器,但这存在精度问题。滑动窗口日志或滑动窗口计数器是更精确的方案,但这通常需要 Lua 脚本来保证原子性。在我们的技术栈里,Consul 已经是服务发现和配置中心的核心组件,为速率限制再引入一个 Redis 实例,增加了运维的复杂性和成本。因此,问题变成了:能否仅用 Consul 实现一个生产级的、精确的分布式速率限制器?
Consul 提供了两个关键特性,让这个构想成为可能:
- 强一致性的 KV 存储: 保证所有节点读取到的是同一份数据。
- Check-And-Set (CAS) 操作: 一种乐观锁机制。只有当键的
ModifyIndex未发生变化时,写入操作才能成功。这正是实现无锁并发更新的关键。
我们将基于这两个特性,为 Fastify 构建一个插件,实现分布式的滑动窗口日志(Sliding Window Log)算法。
架构设计与算法选择
滑动窗口日志算法的核心是记录下每个请求的时间戳。当新请求到达时,我们移除窗口之外的旧时间戳,然后检查窗口内剩余的时间戳数量是否超过了限制。
在分布式环境中,这个简单的逻辑会遇到并发难题。当两个节点的进程同时读取某个用户的请求记录,都在本地计算新状态,然后都尝试写回 Consul,后一个写入操作会覆盖前一个,导致请求计数丢失。
使用 Consul 的事务或锁可以解决这个问题,但性能开销巨大。对每个请求都进行加锁、解锁操作,会使 Consul 集群成为整个系统的性能瓶LEI。
CAS 操作提供了一个更优雅的方案。流程如下:
- 读取 (GET): 从 Consul 读取用户的请求时间戳列表,同时获得该键的
ModifyIndex。 - 计算 (Compute): 在应用内存中,清理掉时间窗口之外的旧时间戳。
- 检查 (Check): 如果清理后的时间戳数量小于阈值,则将当前请求的时间戳加入列表。
- 写入 (CAS): 尝试将新的时间戳列表写回 Consul,同时带上第一步获取的
ModifyIndex作为cas参数。 - 处理结果:
- 如果写入成功,则请求被允许。
- 如果写入失败(意味着在第1步和第4步之间,有其他节点修改了数据),则从第1步开始重试。
这个流程避免了显式锁,只在发生并发冲突时才进行重试,极大地提升了吞吐量。
sequenceDiagram
participant App as Fastify 实例
participant Consul as Consul KV
App->>Consul: GET key=rate-limit/user123
Consul-->>App: (value=[t1,t2], modifyIndex=X)
App->>App: current_time = now()
App->>App: timestamps = filter_old(value, current_time)
alt len(timestamps) < limit
App->>App: new_timestamps = timestamps + [current_time]
App->>Consul: PUT key, value=new_timestamps, cas=X
alt CAS 写入成功
Consul-->>App: true
App->>App: 放行请求 (200 OK)
else CAS 写入失败 (冲突)
Consul-->>App: false
App->>App: 进入重试循环,返回步骤1
end
else
App->>App: 拒绝请求 (429 Too Many Requests)
end
代码实现:构建 Fastify 插件
我们将整个逻辑封装成一个 fastify-plugin,使其易于集成和复用。
1. 基础配置与 Consul 客户端
首先,我们需要一个健壮的 Consul 客户端。官方的 consul 包是首选。在真实项目中,配置应该外部化,并包含重试和超时逻辑。
consul-client.js
'use strict';
const Consul = require('consul');
const { promisify } = require('util');
// 在真实项目中,这些配置应该来自环境变量或配置文件
const CONSUL_HOST = process.env.CONSUL_HOST || '127.0.0.1';
const CONSUL_PORT = process.env.CONSUL_PORT || '8500';
const KV_ACQUIRE_TIMEOUT = '5s'; // 获取 KV 操作的超时
class ConsulClient {
constructor(logger) {
this.logger = logger;
this.consul = new Consul({
host: CONSUL_HOST,
port: CONSUL_PORT,
promisify: true, // 使用 Promise 风格的 API
});
this.logger.info(`Consul client initialized for ${CONSUL_HOST}:${CONSUL_PORT}`);
}
/**
* 获取 KV 值,返回包含 value 和 modifyIndex 的对象
* @param {string} key
* @returns {Promise<{value: any, modifyIndex: string} | null>}
*/
async getKv(key) {
try {
const result = await this.consul.kv.get({ key, timeout: KV_ACQUIRE_TIMEOUT });
if (!result) {
return null;
}
return {
value: JSON.parse(result.Value),
modifyIndex: result.ModifyIndex,
};
} catch (error) {
// JSON.parse 可能会失败
if (error instanceof SyntaxError) {
this.logger.error({ key, err: error }, 'Failed to parse KV value from Consul');
// 在这种情况下可以考虑删除损坏的键
await this.deleteKv(key);
return null;
}
this.logger.error({ key, err: error }, 'Failed to get KV from Consul');
throw error;
}
}
/**
* 使用 CAS 操作设置 KV 值
* @param {string} key
* @param {any} value
* @param {string} casIndex - The ModifyIndex for the CAS operation
* @returns {Promise<boolean>} - true if successful, false if CAS failed
*/
async setKvCas(key, value, casIndex) {
try {
const result = await this.consul.kv.set({
key,
value: JSON.stringify(value),
cas: casIndex,
});
return result;
} catch (error) {
this.logger.error({ key, cas: casIndex, err: error }, 'Failed to set KV with CAS');
// 这里的错误可能是网络问题,而不仅仅是 CAS 失败
throw error;
}
}
/**
* 无条件设置 KV 值,通常用于初始化
* @param {string} key
* @param {any} value
* @returns {Promise<boolean>}
*/
async setKv(key, value) {
try {
return await this.consul.kv.set(key, JSON.stringify(value));
} catch(error) {
this.logger.error({ key, err: error }, 'Failed to set KV unconditionally');
throw error;
}
}
/**
* 删除 KV
* @param {string} key
*/
async deleteKv(key) {
try {
await this.consul.kv.del(key);
this.logger.warn({ key }, 'Deleted potentially corrupted KV entry');
} catch (error) {
this.logger.error({ key, err: error }, 'Failed to delete KV entry');
}
}
}
module.exports = ConsulClient;
这个 ConsulClient 类封装了底层的 API 调用,并增加了日志和错误处理。注意,我们将 Consul 返回的 Value(一个 Buffer 或 String)解析为 JSON 对象,因为我们将以 JSON 数组的形式存储时间戳。
2. Fastify 插件核心逻辑
现在我们来编写插件本身。它将接收配置(如限制次数、窗口大小),并为每个请求执行限流逻辑。
rate-limiter-plugin.js
'use strict';
const fp = require('fastify-plugin');
function consulRateLimiter(fastify, options, done) {
const {
consulClient, // 传入实例化的 ConsulClient
limit = 100, // 默认每分钟100次
windowMs = 60 * 1000, // 默认窗口 60s
keyPrefix = 'rate-limit/', // Consul KV 中的前缀
keyGenerator = (req) => req.ip, // 默认使用 IP 作为客户端标识
maxRetries = 5, // CAS 重试次数上限
retryDelayMs = 50, // 重试间隔
} = options;
if (!consulClient) {
return done(new Error('consulClient is required for fastify-consul-rate-limiter'));
}
const limiter = async (req, reply) => {
const key = keyPrefix + keyGenerator(req);
const now = Date.now();
const windowStart = now - windowMs;
for (let i = 0; i < maxRetries; i++) {
// --- 步骤 1: 读取 ---
const data = await consulClient.getKv(key);
let timestamps = [];
let modifyIndex = '0'; // '0' 用于表示创建新键
if (data) {
// --- 步骤 2: 计算 ---
timestamps = data.value.filter(ts => ts > windowStart);
modifyIndex = data.modifyIndex;
}
// --- 步骤 3: 检查 ---
if (timestamps.length >= limit) {
fastify.log.warn({ key }, 'Rate limit exceeded');
reply.code(429).send({ error: 'Too Many Requests' });
return;
}
const newTimestamps = [...timestamps, now];
try {
// --- 步骤 4: 写入 (CAS) ---
const success = await consulClient.setKvCas(key, newTimestamps, modifyIndex);
if (success) {
// --- 步骤 5: 成功 ---
// 写入成功,放行
return;
}
// CAS 失败,意味着有并发写入
fastify.log.info({ key, attempt: i + 1 }, 'CAS conflict detected, retrying...');
// 等待一小段时间再重试,避免活锁
await new Promise(resolve => setTimeout(resolve, retryDelayMs * (i + 1)));
} catch (err) {
// 网络错误等,不应重试,直接失败并告警
fastify.log.error({ key, err }, 'Failed to update rate limit state in Consul. Failing open.');
// 这里的策略很关键:Failing open (放行) 还是 Failing close (拒绝)?
// 通常选择放行,避免因依赖组件故障导致核心业务不可用。
return;
}
}
// 达到最大重试次数
fastify.log.error({ key, retries: maxRetries }, 'Rate limiter failed after max retries. Failing open.');
// 同样,选择放行
};
// 将限流器挂载为 preHandler hook
fastify.addHook('preHandler', limiter);
done();
}
module.exports = fp(consulRateLimiter, {
fastify: '4.x',
name: 'fastify-consul-rate-limiter',
});
这个插件的实现有几个值得注意的工程细节:
- 依赖注入:
consulClient是从外部传入的,这有利于测试。我们可以轻易地在单元测试中 mock 掉 Consul 的交互。 - CAS 重试循环: 核心逻辑被包裹在一个
for循环中。如果setKvCas返回false,表示ModifyIndex不匹配,循环会继续,重新执行“读-算-写”流程。 - 重试退避: 在重试前加入一个小的、递增的延迟 (
retryDelayMs * (i + 1))。这是一种简单的退避策略,可以减少多个节点同时重试时再次发生冲突的概率。 - 失败策略 (Failing Open): 当与 Consul 的通信出现网络错误,或者 CAS 重试达到上限时,我们选择放行请求(”failing open”)。这是一个重要的容错设计。如果选择“failing close”(拒绝请求),那么当 Consul 集群出现抖动时,所有依赖此限流器的服务都会变得不可用。业务可用性的优先级通常高于严格的速率限制。
-
keyGenerator: 插件的使用者可以自定义如何从请求中生成唯一标识。默认是req.ip,但也可以是用户 ID、API Key 等。
3. 集成与运行
最后,我们创建一个 Fastify 服务器,注册并使用这个插件。
server.js
'use strict';
const Fastify = require('fastify');
const ConsulClient = require('./consul-client');
const rateLimiterPlugin = require('./rate-limiter-plugin');
const server = Fastify({
logger: {
level: 'info',
transport: {
target: 'pino-pretty'
}
},
});
async function main() {
const consulClient = new ConsulClient(server.log);
// 注册我们的速率限制插件
server.register(rateLimiterPlugin, {
consulClient,
limit: 5, // 为了方便测试,设置为每10秒5次
windowMs: 10 * 1000,
keyGenerator: (req) => req.headers['x-client-id'] || req.ip, // 优先使用 x-client-id
});
// 定义一个受保护的路由
server.get('/protected', async (request, reply) => {
return { message: 'You have accessed the protected resource.' };
});
// 定义一个不受保护的路由
server.get('/health', async (request, reply) => {
return { status: 'ok' };
});
try {
const port = process.env.PORT || 3000;
await server.listen({ port, host: '0.0.0.0' });
server.log.info(`Server listening on port ${port}, PID: ${process.pid}`);
} catch (err) {
server.log.error(err);
process.exit(1);
}
}
main();
要测试这个分布式系统,你需要在本地启动一个 Consul agent (consul agent -dev),然后启动多个 server.js 实例:
# Terminal 1
PORT=3000 node server.js
# Terminal 2
PORT=3001 node server.js
# Terminal 3
PORT=3002 node server.js
现在,使用 curl 或任何 HTTP 客户端,在10秒内向这三个端口的 /protected 路由发送超过5次请求。你会发现,无论请求落在哪一个实例上,从第6次请求开始,都会收到 429 Too Many Requests 的响应。
# 使用同一个 client-id
for i in {1..7}; do curl -H "X-Client-ID: client-alpha" http://localhost:3000/protected; done
# ...前5次会成功,后2次会返回 429
# 换一个 client-id,计数器是独立的
curl -H "X-Client-ID: client-beta" http://localhost:3001/protected
# 这次会成功
这个测试证明了我们的限流器状态是在所有 Fastify 实例间共享和同步的。
潜在陷阱与优化考量
尽管这个基于 Consul CAS 的方案在很多场景下是有效的,但在真实生产环境中,还需要考虑以下几点:
Consul KV 性能: Consul 的 KV 存储主要为配置、服务发现等低频读写场景设计。将高频的速率限制数据写入 Consul 会对其性能造成压力。对于请求量极大的核心 API,此方案可能成为瓶颈。它更适用于保护那些非核心但重要的接口,如登录、注册、密码重置等。
数据膨胀: 滑动窗口日志需要存储每个请求的时间戳。如果窗口很大或请求频率很高,存储的 JSON 数组会变得非常大,增加了网络传输和 JSON 解析的开销。一个优化是改用滑动窗口计数器算法,它只存储固定数量的时间槽(buckets),每个槽记录该时间段内的请求数,大大减少了存储空间,但会牺牲一些精度。
时钟同步: 分布式系统普遍依赖于大致同步的节点时钟。如果集群中节点的时钟偏差过大,
windowStart的计算就会不准确,影响限流的精度。在生产环境中,必须通过 NTP 等服务保证服务器时钟同步。“首次创建”的竞争: 当一个新用户首次发起请求时,其对应的 key 在 Consul 中不存在。此时多个节点可能会同时发现 key 不存在,并都尝试创建。虽然 CAS (
modifyIndex=0) 可以保证只有一个节点创建成功,但其他节点都会失败并重试。对于这种情况,可以引入 Consul 的 Session Lock 机制,专门用于 key 的首次创建,创建成功后立即释放锁,后续的更新则完全依赖 CAS。这能减少初始化时的竞争风暴。
方案的局限性与未来展望
此方案的本质是利用一个强一致性系统(Consul)来协调无状态应用(Fastify 实例)的共享状态。它最大的优点是重用了现有基础设施,避免了引入新的技术栈。然而,它的性能上限受制于 Consul KV 的吞吐能力和网络延迟。
对于需要处理每秒数万甚至更高 QPS 的限流场景,更合适的架构是将速率限制逻辑抽离成一个专门的高性能服务。这个服务可以基于内存数据库(如 Redis)或专门的流处理引擎构建,并采用更高效的算法(如 Token Bucket 的变种)。应用节点通过一次轻量级的 RPC 或 HTTP 调用来向这个中心化服务“请求令牌”。
此外,可以探索一种混合模式:每个节点在本地缓存一个令牌配额,并周期性地与 Consul 同步。例如,全局限制是1000 QPS,有10个节点,每个节点初始分配100 QPS的本地额度。当本地额度用尽后,它向 Consul 申请新的额度。这种方式极大地降低了对 Consul 的写压力,将大部分请求的限流决策都放在了本地内存中,代价是可能出现短时间内的速率超发。这是一种在一致性、可用性和性能之间的典型权衡。