1. 为什么需要 CompletableFuture?
- Future 的局限性:传统 Future 获取结果会阻塞线程(get()),无法优雅处理任务依赖关系。
- 异步编程需求:现代应用需高效利用 CPU/IO 资源,避免线程空等。
- 解决方案:CompletableFuture(Java 8+)支持非阻塞调用、链式处理和组合异步任务。
2. 核心原理
- CompletionStage 接口:定义异步操作步骤(如 thenApply, thenCombine)。
- 非阻塞回调:通过回调函数处理结果,避免线程阻塞。
- 依赖关系管理:内置任务依赖图,自动触发后续操作。
3. 创建 CompletableFuture
// 1. 使用默认线程池(ForkJoinPool)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result");
// 2. 指定自定义线程池
ExecutorService customPool = Executors.newFixedThreadPool(4);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result", customPool);
// 3. 手动完成
CompletableFuture<String> future3 = new CompletableFuture<>();
future3.complete("Manual Result");
4. 关键操作详解
4.1 转换结果(thenApply)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> "42")
.thenApply(s -> Integer.parseInt(s)); // 转换 String → Int
4.2 组合任务(thenCompose)
CompletableFuture<String> userFuture = getUserById(1);
CompletableFuture<Integer> ageFuture = userFuture.thenCompose(user -> getAge(user));
4.3 合并结果(thenCombine)
CompletableFuture<Integer> futureA = getPriceAsync("A");
CompletableFuture<Integer> futureB = getPriceAsync("B");
futureA.thenCombine(futureB, (a, b) -> a + b) // 合并两个结果
.thenAccept(total -> System.out.println("Total: " + total));
4.4 异常处理
CompletableFuture.supplyAsync(() -> {
if (error) throw new RuntimeException("Oops!");
return "OK";
}).exceptionally(ex -> { // 捕获异常并恢复
System.err.println("Error: " + ex.getMessage());
return "Fallback";
});
5. 多任务协作
5.1 所有任务完成(allOf)
CompletableFuture<Void> all = CompletableFuture.allOf(futureA, futureB, futureC);
all.thenRun(() -> {
Integer a = futureA.join(); // 不会阻塞
Integer b = futureB.join();
// 处理全部结果
});
5.2 任一任务完成(anyOf)
CompletableFuture<Object> any = CompletableFuture.anyOf(futureA, futureB);
any.thenAccept(result -> System.out.println("First result: " + result));
6. 生产实践技巧
6.1 超时控制
CompletableFuture<String> future = fetchDataAsync()
.orTimeout(2, TimeUnit.SECONDS) // Java 9+
.exceptionally(ex -> "Fallback due to timeout");
6.2 线程池选择
- CPU 密集型:固定大小线程池(Executors.newFixedThreadPool)
- IO 密集型:缓存线程池(Executors.newCachedThreadPool)
- 避免使用默认线程池:防止任务相互影响。
6.3 避免阻塞
// 错误:在异步线程中调用 get()
future.thenApply(result -> {
String data = anotherFuture.get(); // 阻塞!
return process(result, data);
});
// 正确:使用 thenCompose
future.thenCompose(result ->
anotherFuture.thenApply(data -> process(result, data))
);
7. 完整示例:订单服务
// 获取用户信息 → 查询商品 → 计算总价 → 发送通知
CompletableFuture<Order> orderFuture = getUserAsync(userId)
.thenCompose(user -> getProductAsync(productId)
.thenCombine(getDiscountAsync(), (product, discount) ->
new Order(user, product, discount.calculate(product.price()))
)
.thenCompose(order ->
sendEmailAsync(user.email, "Order Confirmed: " + order.totalPrice)
.thenApply(emailStatus -> order)
);
// 超时与回退
orderFuture
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("Order failed", ex);
return createFallbackOrder();
});
8. 常见陷阱
- 异常吞噬:链中某步异常未处理,导致后续不执行(使用 handle() 或 exceptionally())。
- 线程泄露:未关闭自定义线程池(用 shutdown())。
- 回调地狱:过度嵌套链式调用(拆分步骤或使用 CompletableFuture 组合器)。
9. 性能优化
- 监控:记录任务耗时(thenApply 中添加日志)。
- 批处理:使用 allOf 并行独立任务。
- 资源隔离:关键任务使用独立线程池。
总结
- 适用场景:IO 密集型操作(网络请求、数据库访问)、并行计算。
- 核心优势:非阻塞、链式调用、组合异步任务。
- 最佳实践:始终处理异常、控制超时、合理选择线程池。
最后:通过 CompletableFuture,Java 开发者能以声明式风格编写高效异步代码,显著提升系统吞吐量。