CompletableFuture 入门与实战
前言
在现代Java应用开发中,异步编程已经成为提升应用性能和用户体验的重要手段。Java 8引入的CompletableFuture为我们提供了强大的异步编程能力,它不仅实现了Future接口,还提供了丰富的链式调用方法,使得编写复杂的异步流程变得更加简单优雅。
本文将带你深入了解CompletableFuture的基本概念、核心方法以及实际应用场景,帮助你掌握这一重要的并发工具。
什么是 CompletableFuture
CompletableFuture是Java 8引入的一个实现Future接口的类,它可以显式地设置计算结果和异常,支持链式调用,并且提供了丰富的API来进行异步编程。
相比传统的Future,CompletableFuture的主要优势包括:
- 非阻塞操作:不需要手动调用get()方法阻塞线程等待结果
- 链式调用:支持流式API,可以方便地组合多个异步操作
- 异常处理:提供完整的异常处理机制
- 回调机制:可以在计算完成时自动触发回调函数
基本用法
1. 创建CompletableFuture
使用runAsync()
执行无返回值的异步任务:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("执行无返回值的任务");
});
使用supplyAsync()
执行有返回值的异步任务:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello CompletableFuture";
});
2. 获取结果
使用get()
阻塞等待结果(不推荐在主线程中使用):
String result = future.get(); // 可能抛出异常
使用join()
阻塞等待结果(不抛出受检异常):
String result = future.join();
使用whenComplete()
非阻塞方式处理结果:
future.whenComplete((result, throwable) -> {
if (throwable != null) {
System.out.println("发生异常: " + throwable.getMessage());
} else {
System.out.println("执行结果: " + result);
}
});
核心方法详解
1. 转换操作
thenApply()
对结果进行转换:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
System.out.println(future.join()); // 输出: HELLO WORLD
2. 消费操作
thenAccept()
消费结果而不返回新值:
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(System.out::println)
.join(); // 输出: Hello
3. 组合操作
thenCompose()
组合两个异步操作,第二个操作依赖第一个的结果:
public CompletableFuture<String> getUserById(Long id) {
return CompletableFuture.supplyAsync(() -> {
// 模拟数据库查询
return "User" + id;
});
}
public CompletableFuture<String> getPhoneNumber(String userName) {
return CompletableFuture.supplyAsync(() -> {
// 模拟获取手机号
return "13800138000";
});
}
CompletableFuture<String> result = getUserById(1L)
.thenCompose(this::getPhoneNumber);
thenCombine()
组合两个独立的异步操作:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1
.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combinedFuture.join()); // 输出: Hello World
4. 并行执行
allOf()
等待所有CompletableFuture完成:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Result 3");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
// 等待所有任务完成
allFutures.join();
// 获取各个任务的结果
String result1 = future1.join();
String result2 = future2.join();
String result3 = future3.join();
anyOf()
等待任意一个CompletableFuture完成:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(2000); // 模拟耗时操作
return "Result 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1000); // 模拟耗时操作
return "Result 2";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
System.out.println(anyFuture.join()); // 输出: Result 2 (较快完成的那个)
异常处理
CompletableFuture提供了完善的异常处理机制:
1. exceptionally()
处理异常并提供默认值:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("发生异常");
}
return "正常结果";
}).exceptionally(throwable -> {
System.out.println("捕获异常: " + throwable.getMessage());
return "默认值";
});
System.out.println(future.join()); // 输出: 默认值
2. handle()
无论是否发生异常都执行处理:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("发生异常");
}
return "正常结果";
}).handle((result, throwable) -> {
if (throwable != null) {
System.out.println("处理异常: " + throwable.getMessage());
return "错误结果";
}
return result;
});
System.out.println(future.join()); // 输出: 错误结果
实战案例
案例1:电商商品详情页数据聚合
在电商平台中,商品详情页往往需要从多个服务获取数据,如商品基本信息、价格、库存、评论等。使用CompletableFuture可以并行获取这些数据,提高响应速度。
@Service
public class ProductDetailService {
@Autowired
private ProductService productService;
@Autowired
private PriceService priceService;
@Autowired
private InventoryService inventoryService;
@Autowired
private ReviewService reviewService;
public ProductDetail getProductDetail(Long productId) {
// 并行获取各项数据
CompletableFuture<Product> productFuture = CompletableFuture
.supplyAsync(() -> productService.getProductById(productId));
CompletableFuture<BigDecimal> priceFuture = CompletableFuture
.supplyAsync(() -> priceService.getPrice(productId));
CompletableFuture<Integer> inventoryFuture = CompletableFuture
.supplyAsync(() -> inventoryService.getInventory(productId));
CompletableFuture<List<Review>> reviewsFuture = CompletableFuture
.supplyAsync(() -> reviewService.getReviews(productId));
// 组合所有结果
CompletableFuture<ProductDetail> resultFuture = productFuture
.thenCombine(priceFuture, (product, price) ->
new ProductDetail(product, price))
.thenCombine(inventoryFuture, (detail, inventory) -> {
detail.setInventory(inventory);
return detail;
})
.thenCombine(reviewsFuture, (detail, reviews) -> {
detail.setReviews(reviews);
return detail;
});
return resultFuture.join();
}
}
案例2:多数据源查询
当需要从多个数据源查询数据并合并结果时:
public class MultiDataSourceQueryService {
public List<User> searchUsers(String keyword) {
// 从不同数据源并行查询
CompletableFuture<List<User>> dbFuture = CompletableFuture
.supplyAsync(() -> queryFromDatabase(keyword));
CompletableFuture<List<User>> cacheFuture = CompletableFuture
.supplyAsync(() -> queryFromCache(keyword));
CompletableFuture<List<User>> externalFuture = CompletableFuture
.supplyAsync(() -> queryFromExternalService(keyword));
// 合并所有结果并去重
return CompletableFuture.allOf(dbFuture, cacheFuture, externalFuture)
.thenApply(v -> {
List<User> result = new ArrayList<>();
result.addAll(dbFuture.join());
result.addAll(cacheFuture.join());
result.addAll(externalFuture.join());
return result.stream()
.distinct()
.collect(Collectors.toList());
})
.join();
}
private List<User> queryFromDatabase(String keyword) {
// 数据库查询实现
return Collections.emptyList();
}
private List<User> queryFromCache(String keyword) {
// 缓存查询实现
return Collections.emptyList();
}
private List<User> queryFromExternalService(String keyword) {
// 外部服务查询实现
return Collections.emptyList();
}
}
最佳实践
1. 合理使用线程池
默认情况下,CompletableFuture使用ForkJoinPool.commonPool()作为执行器。在生产环境中,建议自定义线程池:
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// 业务逻辑
return "Result";
}, executor);
2. 设置超时时间
避免任务无限期执行:
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// 可能长时间运行的任务
Thread.sleep(5000);
return "Result";
});
// 设置超时时间
String result = future.get(3, TimeUnit.SECONDS);
3. 注意异常传播
确保异常得到正确处理:
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// 可能抛出异常的操作
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "Success";
})
.exceptionally(throwable -> {
// 记录日志
log.error("任务执行失败", throwable);
// 返回默认值
return "Default Value";
});
总结
CompletableFuture是Java并发编程的强大工具,它极大地简化了异步编程的复杂性。通过合理使用CompletableFuture,我们可以:
- 提高应用程序的响应性和吞吐量
- 简化异步代码的编写和维护
- 更好地处理并发场景下的异常情况
- 实现复杂的异步流程编排
在实际开发中,我们应该根据具体场景选择合适的CompletableFuture方法,合理配置线程池,并做好异常处理,这样才能充分发挥其优势,构建高性能的Java应用。





