0赞
赏
赞赏
更多好文
上周三凌晨,我被监控告警吵醒——支付系统接口超时率突然飙到90%。一查日志,发现是异步处理逻辑写得像“回调地狱”,线程池全被阻塞。我翻出去年写的CompletableFuture代码,一行行看下去:嵌套了5层回调,异常处理漏了3处,连线程池配置都写反了。痛定思痛,我直接把代码重写成Reactor,结果QPS从800飙到2500。下面全是血泪经验,没一句废话。
一、CompletableFuture:你以为的“简单”,其实是坑
真实场景:用户下单时,需要同时查用户信息、订单状态、库存,然后计算总价。
旧写法(CompletableFuture):
// 1. 获取用户信息
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getUser(userId));
// 2. 获取订单(依赖用户信息)
CompletableFuture<Order> orderFuture = userFuture.thenCompose(user ->
CompletableFuture.supplyAsync(() -> orderService.getOrder(user.getOrderId()))
);
// 3. 获取库存(依赖订单)
CompletableFuture<Inventory> inventoryFuture = orderFuture.thenCompose(order ->
CompletableFuture.supplyAsync(() -> inventoryService.getInventory(order.getProductId()))
);
// 4. 计算总价(需要处理异常)
inventoryFuture.thenAcceptAsync(inventory -> {
try {
BigDecimal total = calculateTotal(inventory);
// 业务逻辑
} catch (Exception e) {
// 问题:这里漏了异常处理!
log.error("计算总价失败", e);
}
});
踩坑点(全是血的教训):
- 回调嵌套像俄罗斯套娃:5层嵌套,看一眼头就大。
- 异常处理漏了:
thenAcceptAsync里没捕获异常,导致整个流程挂掉。 - 线程池配置错误:默认用
ForkJoinPool.commonPool(),但高并发时线程不够,阻塞了。 - 响应时间慢:测试显示平均响应500ms,QPS只有800。
我犯的错:当时觉得“CompletableFuture是Java自带的,肯定没问题”,结果线上崩了3次。
二、Reactor救场:响应式编程的清爽写法
为什么用Reactor?
- 避免回调地狱:链式调用,一行一行写。
- 自动处理异常:
onErrorResume统一兜底。 - 背压控制:下游处理慢时,自动限流,避免OOM。
新写法(Reactor):
// 1. 用户信息 → 订单 → 库存(链式调用)
Mono<User> userMono = userService.getUser(userId);
Mono<Order> orderMono = userMono.flatMap(user -> orderService.getOrder(user.getOrderId()));
Mono<Inventory> inventoryMono = orderMono.flatMap(order -> inventoryService.getInventory(order.getProductId()));
// 2. 计算总价 + 统一异常处理
inventoryMono
.map(inventory -> calculateTotal(inventory))
.onErrorResume(e -> {
log.error("计算总价异常", e);
return Mono.just(BigDecimal.ZERO); // 返回默认值
})
.subscribe(total -> {
// 业务逻辑
});
关键优势:
- 代码从20行压缩到8行,一眼看懂逻辑。
- 异常处理在
onErrorResume统一处理,再也不漏。 - 自动背压:当
calculateTotal慢时,上游自动暂停数据流(不用手动加limitRate)。
三、避坑指南(全是血的教训)
1. 别在Reactor里用阻塞操作!
- 错误写法:
Flux.fromIterable(list) .map(item -> databaseService.query(item)) // 阻塞!会卡死线程 .subscribe(); - 后果:线程池被阻塞,QPS直接掉到100。
- 正确做法:
Flux.fromIterable(list) .flatMap(item -> databaseService.queryAsync(item)) // 用异步API .subscribe();
2. 忘记设置背压(最大坑!)
- 问题:上游数据流快,下游处理慢,内存溢出。
- 测试数据:
方案 QPS OOM概率 无背压 1500 80% Flux.limitRate(100)2500 0% - 解决方案:
// 限制每秒100个请求 Flux.fromIterable(data) .limitRate(100) .flatMap(item -> process(item)) .subscribe();
3. 线程池配置错了
- 错误:用
Schedulers.parallel(),但默认线程数=CPU核心数(比如8核=8线程)。 - 后果:1000并发时,线程池满了,请求排队。
- 正确配置:
// 根据业务调整线程数(比如20线程) Schedulers.newParallel("custom-pool", 20);// 用在异步操作里 .flatMap(item -> processAsync(item).subscribeOn(Schedulers.parallel("custom-pool")))
4. 测试时忽略背压
- 错误:用
StepVerifier测试时,没设expectNext(),结果测试通过但线上崩。 - 正确测试:
StepVerifier.create(processFlux()) .expectNextCount(10) // 模拟10条数据 .verifyComplete();
5. 混淆Reactor和CompletableFuture
- 误区:以为Reactor就是CompletableFuture的升级版。
- 真相:
- CompletableFuture:单任务异步(适合1个操作)。
- Reactor:流式数据处理(适合多个数据流,带背压)。
- 用对场景:
- 用CompletableFuture:单个API调用(如“查用户”)。
- 用Reactor:数据流(如“处理1000条订单”)。
四、真实效果:从800 QPS到2500 QPS
场景:支付系统,1000并发请求。
| 指标 | CompletableFuture | Reactor | 提升 |
|---|---|---|---|
| 平均响应时间 | 500ms | 150ms | 70%↓ |
| QPS | 800 | 2500 | 212%↑ |
| GC停顿时间(99%) | 300ms | 50ms | 83%↓ |
| 代码行数 | 28行 | 12行 | 57%↓ |
数据来源:JMeter压测(16核机器,JDK 17)。
关键点:Reactor的背压机制让系统在高负载下依然稳定。
五、结语:别再被“CompletableFuture”骗了
CompletableFuture不是错,但它在复杂异步流程里就是个坑。Reactor也不是玄学,它就是让你的代码不卡、不爆、好读。
上周我团队把支付模块全重写成Reactor,现在线上压测10000并发,响应时间稳定在100ms内。
现在就做:
- 用
Spring Boot 3.0(自带Reactor支持)。 - 把
CompletableFuture改成Flux/Mono。 - 加上
limitRate和onErrorResume。
