CompletableFuture 入门与实战
170
类别: 
开发交流

CompletableFuture 入门与实战

前言

在现代Java应用开发中,异步编程已经成为提升应用性能和用户体验的重要手段。Java 8引入的CompletableFuture为我们提供了强大的异步编程能力,它不仅实现了Future接口,还提供了丰富的链式调用方法,使得编写复杂的异步流程变得更加简单优雅。

本文将带你深入了解CompletableFuture的基本概念、核心方法以及实际应用场景,帮助你掌握这一重要的并发工具。

什么是 CompletableFuture

CompletableFuture是Java 8引入的一个实现Future接口的类,它可以显式地设置计算结果和异常,支持链式调用,并且提供了丰富的API来进行异步编程。

相比传统的Future,CompletableFuture的主要优势包括:

  1. 非阻塞操作:不需要手动调用get()方法阻塞线程等待结果
  2. 链式调用:支持流式API,可以方便地组合多个异步操作
  3. 异常处理:提供完整的异常处理机制
  4. 回调机制:可以在计算完成时自动触发回调函数

基本用法

1. 创建CompletableFuture

使用runAsync()

执行无返回值的异步任务:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("执行无返回值的任务");
});

使用supplyAsync()

执行有返回值的异步任务:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello CompletableFuture";
});

2. 获取结果

使用get()

阻塞等待结果(不推荐在主线程中使用):

String result = future.get(); // 可能抛出异常

使用join()

阻塞等待结果(不抛出受检异常):

String result = future.join();

使用whenComplete()

非阻塞方式处理结果:

future.whenComplete((result, throwable) -> {
    if (throwable != null) {
        System.out.println("发生异常: " + throwable.getMessage());
    } else {
        System.out.println("执行结果: " + result);
    }
});

核心方法详解

1. 转换操作

thenApply()

对结果进行转换:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")
    .thenApply(String::toUpperCase);

System.out.println(future.join()); // 输出: HELLO WORLD

2. 消费操作

thenAccept()

消费结果而不返回新值:

CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(System.out::println)
    .join(); // 输出: Hello

3. 组合操作

thenCompose()

组合两个异步操作,第二个操作依赖第一个的结果:

public CompletableFuture<String> getUserById(Long id) {
    return CompletableFuture.supplyAsync(() -> {
        // 模拟数据库查询
        return "User" + id;
    });
}

public CompletableFuture<String> getPhoneNumber(String userName) {
    return CompletableFuture.supplyAsync(() -> {
        // 模拟获取手机号
        return "13800138000";
    });
}

CompletableFuture<String> result = getUserById(1L)
    .thenCompose(this::getPhoneNumber);

thenCombine()

组合两个独立的异步操作:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combinedFuture = future1
    .thenCombine(future2, (s1, s2) -> s1 + " " + s2);

System.out.println(combinedFuture.join()); // 输出: Hello World

4. 并行执行

allOf()

等待所有CompletableFuture完成:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Result 3");

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

// 等待所有任务完成
allFutures.join();

// 获取各个任务的结果
String result1 = future1.join();
String result2 = future2.join();
String result3 = future3.join();

anyOf()

等待任意一个CompletableFuture完成:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(2000); // 模拟耗时操作
    return "Result 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(1000); // 模拟耗时操作
    return "Result 2";
});

CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

System.out.println(anyFuture.join()); // 输出: Result 2 (较快完成的那个)

异常处理

CompletableFuture提供了完善的异常处理机制:

1. exceptionally()

处理异常并提供默认值:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("发生异常");
    }
    return "正常结果";
}).exceptionally(throwable -> {
    System.out.println("捕获异常: " + throwable.getMessage());
    return "默认值";
});

System.out.println(future.join()); // 输出: 默认值

2. handle()

无论是否发生异常都执行处理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("发生异常");
    }
    return "正常结果";
}).handle((result, throwable) -> {
    if (throwable != null) {
        System.out.println("处理异常: " + throwable.getMessage());
        return "错误结果";
    }
    return result;
});

System.out.println(future.join()); // 输出: 错误结果

实战案例

案例1:电商商品详情页数据聚合

在电商平台中,商品详情页往往需要从多个服务获取数据,如商品基本信息、价格、库存、评论等。使用CompletableFuture可以并行获取这些数据,提高响应速度。

@Service
public class ProductDetailService {
    
    @Autowired
    private ProductService productService;
    
    @Autowired
    private PriceService priceService;
    
    @Autowired
    private InventoryService inventoryService;
    
    @Autowired
    private ReviewService reviewService;
    
    public ProductDetail getProductDetail(Long productId) {
        // 并行获取各项数据
        CompletableFuture<Product> productFuture = CompletableFuture
            .supplyAsync(() -> productService.getProductById(productId));
            
        CompletableFuture<BigDecimal> priceFuture = CompletableFuture
            .supplyAsync(() -> priceService.getPrice(productId));
            
        CompletableFuture<Integer> inventoryFuture = CompletableFuture
            .supplyAsync(() -> inventoryService.getInventory(productId));
            
        CompletableFuture<List<Review>> reviewsFuture = CompletableFuture
            .supplyAsync(() -> reviewService.getReviews(productId));
        
        // 组合所有结果
        CompletableFuture<ProductDetail> resultFuture = productFuture
            .thenCombine(priceFuture, (product, price) -> 
                new ProductDetail(product, price))
            .thenCombine(inventoryFuture, (detail, inventory) -> {
                detail.setInventory(inventory);
                return detail;
            })
            .thenCombine(reviewsFuture, (detail, reviews) -> {
                detail.setReviews(reviews);
                return detail;
            });
            
        return resultFuture.join();
    }
}

案例2:多数据源查询

当需要从多个数据源查询数据并合并结果时:

public class MultiDataSourceQueryService {
    
    public List<User> searchUsers(String keyword) {
        // 从不同数据源并行查询
        CompletableFuture<List<User>> dbFuture = CompletableFuture
            .supplyAsync(() -> queryFromDatabase(keyword));
            
        CompletableFuture<List<User>> cacheFuture = CompletableFuture
            .supplyAsync(() -> queryFromCache(keyword));
            
        CompletableFuture<List<User>> externalFuture = CompletableFuture
            .supplyAsync(() -> queryFromExternalService(keyword));
        
        // 合并所有结果并去重
        return CompletableFuture.allOf(dbFuture, cacheFuture, externalFuture)
            .thenApply(v -> {
                List<User> result = new ArrayList<>();
                result.addAll(dbFuture.join());
                result.addAll(cacheFuture.join());
                result.addAll(externalFuture.join());
                return result.stream()
                    .distinct()
                    .collect(Collectors.toList());
            })
            .join();
    }
    
    private List<User> queryFromDatabase(String keyword) {
        // 数据库查询实现
        return Collections.emptyList();
    }
    
    private List<User> queryFromCache(String keyword) {
        // 缓存查询实现
        return Collections.emptyList();
    }
    
    private List<User> queryFromExternalService(String keyword) {
        // 外部服务查询实现
        return Collections.emptyList();
    }
}

最佳实践

1. 合理使用线程池

默认情况下,CompletableFuture使用ForkJoinPool.commonPool()作为执行器。在生产环境中,建议自定义线程池:

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        // 业务逻辑
        return "Result";
    }, executor);

2. 设置超时时间

避免任务无限期执行:

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        // 可能长时间运行的任务
        Thread.sleep(5000);
        return "Result";
    });

// 设置超时时间
String result = future.get(3, TimeUnit.SECONDS);

3. 注意异常传播

确保异常得到正确处理:

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        // 可能抛出异常的操作
        if (Math.random() > 0.5) {
            throw new RuntimeException("随机异常");
        }
        return "Success";
    })
    .exceptionally(throwable -> {
        // 记录日志
        log.error("任务执行失败", throwable);
        // 返回默认值
        return "Default Value";
    });

总结

CompletableFuture是Java并发编程的强大工具,它极大地简化了异步编程的复杂性。通过合理使用CompletableFuture,我们可以:

  1. 提高应用程序的响应性和吞吐量
  2. 简化异步代码的编写和维护
  3. 更好地处理并发场景下的异常情况
  4. 实现复杂的异步流程编排

在实际开发中,我们应该根据具体场景选择合适的CompletableFuture方法,合理配置线程池,并做好异常处理,这样才能充分发挥其优势,构建高性能的Java应用。

标签:
评论 0
/ 1000
0
0
收藏