CompletableFuture 合并操作
背景
最近在使用CompletableFuture做异步代码的重构,其中遇到了一些问题,记录下来。
需求
批量请求下游RPC的List,然后合并为一个List
代码
1.模拟调用下游请求
private CompletableFuture<List<Integer>> getIntegerList(int startIndex, int size) { return CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return Stream.iterate(startIndex, item -> item + 1).limit(size).collect(Collectors.toList()); } ); }
2.使用CompletableFuture.thenCombine来聚合
@Test public void testCompletableFutureWithCombine() { List<CompletableFuture<List<Integer>>> completableFutureList = new ArrayList<>(4); Instant startTime = Instant.now(); for (int i = 1; i <= 30; i += 10) { CompletableFuture<List<Integer>> integerList = this.getIntegerList(i, 10); completableFutureList.add(integerList); } List<Integer> join = completableFutureList.stream() .reduce((fn1, fn2) -> fn1.thenCombine(fn2, (integers, integers2) -> Stream.of(integers, integers2).flatMap(Collection::stream).collect(Collectors.toList()))) .orElse(CompletableFuture.completedFuture(Collections.emptyList())) .join(); System.out.println(join); Instant endTime = Instant.now(); System.out.println(Duration.between(startTime, endTime).getSeconds()); }
3.使用CompletableFuture.allOf来聚合请求
@Test public void testCompletableFutureForAll() { List<CompletableFuture<List<Integer>>> completableFutureList = new ArrayList<>(4); Instant startTime = Instant.now(); for (int i = 1; i <= 30; i += 10) { CompletableFuture<List<Integer>> integerList = this.getIntegerList(i, 10); completableFutureList.add(integerList); } CompletableFuture<List<List<Integer>>> sequence = this.sequence(completableFutureList); CompletableFuture<List<Integer>> listCompletableFuture = sequence.thenApply(lists -> lists.stream().flatMap(Collection::stream).collect(Collectors.toList())); List<Integer> join = listCompletableFuture .join(); System.out.println(join); Instant endTime = Instant.now(); System.out.println(Duration.between(startTime, endTime).getSeconds()); } public <T> CompletableFuture<List<T>> sequence(Collection<CompletableFuture<T>> futureList) { return CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[0])) .thenApply(v -> futureList.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); }