【JAVA】CountDownLatch和CompleteFuture的学习
2025-12-01
JAVA
00

目录

CountDownLatch
定义
关键组件
工作原理
应用场景
CyclicBarrier
定义
工作原理
CompletableFuture
工作原理
1、异步执行
2、任务异步回调
1、thenRun/thenRunAsync
2、thenAccept/thenAcceptAsync
3、thenApply/thenApplyAsync
4、exceptionally
5、whenComplete方法
6、 handle方法
3、多任务组合管理
1、AND
2、OR
3、AllOf
4、AnyOf
5、thenCompose
4、适用注意点
使用示例:CompleteFuture实现异步转同步
简单对比

接触到ws、mqtt等异步的通信方式转同步,学习CountDownLatch和CompleteFuture在java中的使用和原理

CountDownLatch

定义

CountDownLatch 是 Java 中的一个用于管理并发控制的同步辅助类,作用是允许一个或多个线程等待其他线程完成操作。顾名思义,它的工作机制类似于“倒计时闩锁”,线程会阻塞等待,直到闩锁的计数器减少到 0,然后才能继续执行。

CountDownLatch是java.util.concurrent 包的一部分,用于同步一个或多个线程以等待特定条件的满足。它在创建时初始化一个给定的计数,表示必须发生的事件数量,才能使线程继续执行。这个计数通过调用 countDown() 方法来递减,等待该条件的线程调用 await() 方法来阻塞,直到计数达到零。

关键组件

  • 计数:CountDownLatch 的核心概念是计数。它从创建锁存器时指定的初始值开始,只能递减,不能重置。
  • await():线程使用此方法等待计数达到零。如果当前计数大于零,这些线程将被置于等待状态。可以设置超时时间,避免阻塞。
  • countDown() :调用此方法以递减计数。当计数达到零时,所有等待的线程将被释放。
  • 线程安全:CountDownLatch 是线程安全的,它使用内部的 AQS(AbstractQueuedSynchronizer)来管理状态,确保计数的可见性和原子性。

工作原理

CountDownLatch 本质上是一种简化的信号量(Semaphore)。它的核心思想是设定一个计数器,当计数器值为 0 时,其他被阻塞的线程才会开始运行,线程的释放建立在调用 countDown 方法去减少计数器次数的基础上。

  • 使多个线程等待一系列事件发生。
  • 让一个线程等待完成多个步协作操作的线程。
  • 在某个条件达到之前阻塞线程。

使用await()和countdown()函数就能实现简单的异步转同步。核心设计思路:

  1. 主线程发送 WebSocket 消息后,调用 latch.await() 阻塞,等待响应;
  2. WebSocket 回调线程收到响应后,调用 latch.countDown() 唤醒主线程;
  3. 主线程被唤醒后,从线程安全容器(如 ConcurrentHashMap)中获取响应结果。
展开代码
// 关键逻辑片段 public String sendSyncWithLatch(String message) throws Exception { String requestId = UUID.randomUUID().toString(); String sendMsg = wrapMessage(requestId, message); CountDownLatch latch = new CountDownLatch(1); responseCache.put(requestId, null); latchCache.put(requestId, latch); try { webSocket.send(sendMsg); // 阻塞主线程,直到 countDown() 或超时 if (!latch.await(5, TimeUnit.SECONDS)) { throw new TimeoutException("超时"); } return responseCache.remove(requestId); } finally { // 清理缓存 responseCache.remove(requestId); latchCache.remove(requestId); } } @Override public void onMessage(WebSocket webSocket, String text) { String requestId = parseRequestId(text); if (requestId != null && latchCache.containsKey(requestId)) { responseCache.put(requestId, text); latchCache.remove(requestId).countDown(); // 唤醒主线程 } }

其中latchCache存储CountDownLatch对象,responseCache存储返回值,两者都要用线程安全的本地存储实现(ConcurentHashMap)。

应用场景

  1. 批量任务协调
  2. 并行计算:计算任务很耗时,但是可以分成多个部分并行处理,然后将结果进行合并。
  3. 服务启动检查:可以为应用服务做“健康检查”。例如,系统在完全启动之前,需要依赖多个外部服务,那么我们可以通过异步方式检测各个服务的健康状态,只有当所有服务都正常启动时,才允许继续执行下一步。

CyclicBarrier

定义

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

工作原理

CyclicBarrier它有两个构造函数:

展开代码
//parties 是参与线程的个数,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。 //barrierCommand 线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }

dowait()方法如下:

展开代码
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //检查当前栅栏是否被打翻 if (g.broken) { throw new BrokenBarrierException(); } //检查当前线程是否被中断 if (Thread.interrupted()) { //如果当前线程被中断会做以下三件事 //1.打翻当前栅栏 //2.唤醒拦截的所有线程 //3.抛出中断异常 breakBarrier(); throw new InterruptedException(); } //每次都将计数器的值减1 int index = --count; //计数器的值减为0则需唤醒所有线程并转换到下一代 if (index == 0) { boolean ranAction = false; try { //唤醒所有线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; //唤醒所有线程并转到下一代 nextGeneration(); return 0; } finally { //确保在任务未成功执行时能将所有线程唤醒 if (!ranAction) { breakBarrier(); } } } //如果计数器不为0则执行此循环 for (;;) { try { //根据传入的参数来决定是定时等待还是非定时等待 if (!timed) { trip.await(); }else if (nanos > 0L) { nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作 Thread.currentThread().interrupt(); } } //如果线程因为打翻栅栏操作而被唤醒则抛出异常 if (g.broken) { throw new BrokenBarrierException(); } //如果线程因为换代操作而被唤醒则返回计数器的值 if (g != generation) { return index; } //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }

CompletableFuture

我们异步执行一个任务时,一般是用线程池Executor去创建。如果不需要有返回值,任务实现Runnable接口;如果需要有返回值,任务实现Callable接口,调用Executor的submit方法,再使用Future获取即可。如果多个线程存在依赖组合的话,我们怎么处理呢?可使用同步组件CountDownLatch、CyclicBarrier等,但是比较麻烦。其实有简单的方法,就是用CompeletableFuture。

常用实现异步调用的方式(带返回值)

展开代码
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() { @Override public UserInfo call() throws Exception { return userInfoService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); Thread.sleep(300); //模拟主线程其它操作耗时 FutureTask<MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalInfo>() { @Override public MedalInfo call() throws Exception { return medalService.getMedalInfo(userId); } }); executorService.submit(medalInfoFutureTask); UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果 MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }

Future对于结果的获取并不友好:

  • Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞。
  • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

使用示例:

展开代码
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId)); Thread.sleep(300); //模拟主线程其它操作耗时 CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId)); UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果 MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }

代码相比Future简洁很多,而且CompletableFuture的supplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool。

工作原理

1、异步执行

supplyAsync方法

展开代码
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //自定义线程,根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync方法

展开代码
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

使用示例

展开代码
public static void main(String[] args) { //可以自定义线程池 ExecutorService executor = Executors.newCachedThreadPool(); //runAsync的使用 CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run"), executor); //supplyAsync的使用 CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.print("supply"); return "supply"; }, executor); //runAsync的future没有返回值,输出null System.out.println(runFuture.join()); //supplyAsync的future,有返回值 System.out.println(supplyFuture.join()); executor.shutdown(); // 线程池需要关闭 }

两个方法对比如下

  1. 任务接口与返回值(最本质区别)。supplyAsync接收 Supplier T接口,要求任务必须返回一个结果(类型为 T)。因此它创建的 CompletableFuture T 会携带这个返回值,供后续链式调用消费。runAsync接收 Runnable 接口,任务仅执行逻辑(无返回值)。因此它创建的 CompletableFuture Void 没有有效返回值,后续链式调用无法获取任务执行结果,仅能感知任务完成状态。
  2. 链式调用的差异。由于 supplyAsync 有返回值,后续可使用依赖返回值的链式方法(如 thenApply 转换结果、thenCompose 组合异步任务、thenCombine 合并多个结果);而 runAsync 无返回值,仅能使用不依赖结果的方法(如 thenRun 执行后续逻辑、whenComplete 感知完成状态)。
  3. 异常处理的一致性。两者的异常处理逻辑完全一致:任务中抛出的 checked/unchecked 异常都会被封装到 CompletableFuture 中,不会直接抛出,需通过 exceptionally(捕获异常并返回默认值)、whenComplete(感知异常)或 handle(处理结果 / 异常)捕获。

2、任务异步回调

1、thenRun/thenRunAsync

CompletableFuture的thenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。

thenRun/thenRunAsync源码如下;

展开代码
public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); }

两者的区别是:如果你执行第一个任务的时候,传入了一个自定义线程池,调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

2、thenAccept/thenAcceptAsync

CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

3、thenApply/thenApplyAsync

CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

4、exceptionally

CompletableFuture的exceptionally方法表示,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。

使用示例如下:

展开代码
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); throw new RuntimeException(); } ); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); return "你的程序异常啦"; }); System.out.println(exceptionFuture.get()); }

5、whenComplete方法

CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。这种主要适用于组合任务,将上一个线程的入参当作结果带到下一个线程。

示例如下

展开代码
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "A"; } ); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("上个任务执行完啦,还把" + a + "传过来"); if ("A".equals(a)) { System.out.println("666"); } System.out.println("233333"); }); System.out.println(rstFuture.get()); }

6、 handle方法

CompletableFuture的handle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。这种主要适用于组合任务,将上一个线程的结果当作结果带到下一个线程。

示例如下

展开代码
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "A"; } ); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("上个任务执行完啦,还把" + a + "传过来"); if ("A".equals(a)) { System.out.println("666"); return "B"; } System.out.println("233333"); return null; }); System.out.println(rstFuture.get()); }

3、多任务组合管理

1、AND

image.png

thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。区别在于:

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
  • runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
展开代码
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务"); ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future = CompletableFuture //第二个异步任务 .supplyAsync(() -> "第二个异步任务", executor) // (w, s) -> System.out.println(s) 是第三个任务 .thenCombineAsync(first, (s, w) -> { System.out.println(w); System.out.println(s); return "两个异步任务的组合"; }, executor); System.out.println(future.join()); executor.shutdown(); }

2、OR

image.png

applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。区别在于;

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
  • runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
展开代码
public static void main(String[] args) { //第一个异步任务,休眠2秒,保证它执行晚点 CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{ try{ Thread.sleep(2000L); System.out.println("执行完第一个异步任务");} catch (Exception e){ return "第一个任务异常"; } return "第一个异步任务"; }); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture //第二个异步任务 .supplyAsync(() -> { System.out.println("执行完第二个任务"); return "第二个任务";} , executor) //第三个任务 .acceptEitherAsync(first, System.out::println, executor); executor.shutdown(); }

3、AllOf

所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常

展开代码
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ System.out.println("我执行完了"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("我也执行完了"); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); }); }

4、AnyOf

任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常

展开代码
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我执行完了"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("我也执行完了"); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); // return "捡田螺的小男孩"; }); anyOfFuture.join(); }

5、thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

展开代码
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务"); //第二个异步任务 ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "第二个任务", executor) .thenComposeAsync(data -> { System.out.println(data); return f; //使用第一个任务作为返回 }, executor); System.out.println(future.join()); executor.shutdown(); }

4、适用注意点

  1. Future需要获取返回值,才能获取异常信息。也就是调用get方法后才能抛出异常
  2. CompletableFuture的get()方法是阻塞的。
  3. CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
  4. 自定义线程池时,注意饱和策略,但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

使用示例:CompleteFuture实现异步转同步

展开代码
1、定义好缓存,用于存储对象,实现另一侧能获取对象 // 请求缓存:<请求ID, 响应结果> private final ConcurrentMap<String, CompletableFuture<String>> pendingRequests = new ConcurrentHashMap<>(); 2、发送消息时定义好id以及设置好超时时间 public String sendServiceSync(String topic, MqttSendParam param) { JSONObject jsonObj = new JSONObject(param); String playLoad = jsonObj.toString(); CompletableFuture<String> responseFuture = new CompletableFuture<>(); try { pendingRequests.put(param.getBid(), responseFuture); log.info("【sendServiceSync】"+param.getMethod()+"||||||"+param.getBid()); mqttProducer.sendMessage(topic, playLoad); return responseFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { throw new BizException("请求MQTT失败"); }finally { pendingRequests.remove(param.getBid()); } } 3、接收回调消息时完成即可 public void dealDrcReplyMsg(String deviceSn,String message) { JSONObject jsonObj = new JSONObject(message); String method = jsonObj.getStr("method"); if(ObjectUtil.isNotNullObj(method)){ String bid = method+"_"+deviceSn; CompletableFuture<String> responseFuture = pendingRequests.get(bid); if (ObjectUtil.isNotNullObj(responseFuture)) { responseFuture.complete(message); }else { // log.warn("未找到请求方法为{}ID为{}的请求",method, bid); } } }

可以看出相比CountDownLatch优雅很多。

简单对比

对比一下CountDownLatch、CyclicBarrier以及CompletableFuture。

对比维度CountDownLatchCyclicBarrierCompletableFuture
核心功能让1个或多个线程等待其他N个线程完成任务后再继续执行(“等待-通知”模型)让N个线程相互等待,直到所有线程都到达指定屏障点后,再一起继续执行(“同步屏障”)异步编程工具,支持非阻塞式任务执行、结果链式处理、多任务组合(串行/并行/聚合)
核心设计思想基于“计数器递减”,计数器为0时触发等待线程唤醒基于“线程到达屏障”,所有线程到达后触发屏障放行(可重复使用)基于“未来结果”,任务异步执行,结果通过回调链处理,无需显式等待
可重用性不可重用:计数器减到0后,无法重置,只能使用一次可重用:所有线程通过屏障后,计数器自动重置,支持多次循环使用不可重用:单个CompletableFuture对应单个异步任务的生命周期,完成后状态固定
触发条件计数器(count)减至0(由其他线程调用countDown()触发)所有注册的线程都调用await()到达屏障点异步任务执行完成(正常完成/异常终止)
线程角色分为“等待线程”(调用await())和“计数线程”(调用countDown()),角色固定所有线程角色一致,均为“参与线程”(既等待其他线程,也被其他线程等待)无固定角色,任务执行线程与回调处理线程分离(默认ForkJoinPool,支持指定线程池)
结果传递能力无返回值:仅同步线程执行时机,不传递任务结果无返回值:仅同步线程执行时机,不传递任务结果支持结果传递:异步任务可返回结果,后续通过thenApply/thenAccept等链式处理
异常处理机制若等待线程被中断,会抛出InterruptedException;计数线程异常不影响计数器(需手动处理)若任一线程在await()时异常/中断,会导致其他等待线程抛出BrokenBarrierException,屏障被破坏任务异常会自动封装到Future中,支持exceptionally/whenComplete/handle等显式捕获处理
多任务组合能力仅支持“等待N个任务完成”的简单同步,无复杂组合能力仅支持“N个线程同步屏障”,无任务组合能力强大的组合能力:串行(thenCompose)、并行聚合(allOf/anyOf)、结果合并(thenCombine
阻塞特性等待线程会主动阻塞(调用await()),属于“阻塞式同步”参与线程会主动阻塞(调用await()),属于“阻塞式同步”默认非阻塞:通过回调链处理结果,无需阻塞等待;也可通过get()/join()强制阻塞获取结果
适用场景1. 主线程等待多个子线程完成初始化后再启动;2. 多个子线程等待主线程下达指令后再执行;3. 批量任务完成后统一汇总(无结果传递)1. 多线程分阶段执行,需所有线程完成当前阶段后再进入下一阶段(如多线程数据加载后统一处理);2. 线程间需同步执行节奏的场景1. 异步调用(如HTTP接口、数据库查询)+ 结果链式处理;2. 多异步任务并行执行并聚合结果;3. 非阻塞式业务流程(如异步通知、异步计算);4. 替代传统Future,支持更灵活的回调
核心方法countDown()(计数器减1)、await()(阻塞等待计数器为0)、await(long timeout, TimeUnit unit)(超时等待)await()(阻塞等待所有线程到达屏障)、await(long timeout, TimeUnit unit)(超时等待)、reset()(重置屏障,支持重用)任务创建:supplyAsync()(有返回值)、runAsync()(无返回值);结果处理:thenApply()/thenAccept()/thenRun();异常处理:exceptionally()/handle();组合方法:allOf()/anyOf()/thenCombine()

关键区别提炼:

  • 同步vs异步:CountDownLatch/CyclicBarrier是阻塞式同步工具,核心解决“线程执行时机同步”;CompletableFuture是非阻塞式异步工具,核心解决“异步任务执行+结果处理”;
  • 重用性:仅CyclicBarrier支持重复使用,其余两者均为一次性;
  • 结果传递:仅CompletableFuture支持任务结果传递与链式处理,其余两者无此能力。

参考:

https://juejin.cn/post/7436399136763707418

https://juejin.cn/post/6977549754217529358

https://juejin.cn/post/6970558076642394142

本文作者:刘涛

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!