接触到ws、mqtt等异步的通信方式转同步,学习CountDownLatch和CompleteFuture在java中的使用和原理
CountDownLatch 是 Java 中的一个用于管理并发控制的同步辅助类,作用是允许一个或多个线程等待其他线程完成操作。顾名思义,它的工作机制类似于“倒计时闩锁”,线程会阻塞等待,直到闩锁的计数器减少到 0,然后才能继续执行。
CountDownLatch是java.util.concurrent 包的一部分,用于同步一个或多个线程以等待特定条件的满足。它在创建时初始化一个给定的计数,表示必须发生的事件数量,才能使线程继续执行。这个计数通过调用 countDown() 方法来递减,等待该条件的线程调用 await() 方法来阻塞,直到计数达到零。
CountDownLatch 本质上是一种简化的信号量(Semaphore)。它的核心思想是设定一个计数器,当计数器值为 0 时,其他被阻塞的线程才会开始运行,线程的释放建立在调用 countDown 方法去减少计数器次数的基础上。
使用await()和countdown()函数就能实现简单的异步转同步。核心设计思路:
展开代码// 关键逻辑片段 public String sendSyncWithLatch(String message) throws Exception { String requestId = UUID.randomUUID().toString(); String sendMsg = wrapMessage(requestId, message); CountDownLatch latch = new CountDownLatch(1); responseCache.put(requestId, null); latchCache.put(requestId, latch); try { webSocket.send(sendMsg); // 阻塞主线程,直到 countDown() 或超时 if (!latch.await(5, TimeUnit.SECONDS)) { throw new TimeoutException("超时"); } return responseCache.remove(requestId); } finally { // 清理缓存 responseCache.remove(requestId); latchCache.remove(requestId); } } @Override public void onMessage(WebSocket webSocket, String text) { String requestId = parseRequestId(text); if (requestId != null && latchCache.containsKey(requestId)) { responseCache.put(requestId, text); latchCache.remove(requestId).countDown(); // 唤醒主线程 } }
其中latchCache存储CountDownLatch对象,responseCache存储返回值,两者都要用线程安全的本地存储实现(ConcurentHashMap)。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
CyclicBarrier它有两个构造函数:
展开代码//parties 是参与线程的个数,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。 //barrierCommand 线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
dowait()方法如下:
展开代码private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //检查当前栅栏是否被打翻 if (g.broken) { throw new BrokenBarrierException(); } //检查当前线程是否被中断 if (Thread.interrupted()) { //如果当前线程被中断会做以下三件事 //1.打翻当前栅栏 //2.唤醒拦截的所有线程 //3.抛出中断异常 breakBarrier(); throw new InterruptedException(); } //每次都将计数器的值减1 int index = --count; //计数器的值减为0则需唤醒所有线程并转换到下一代 if (index == 0) { boolean ranAction = false; try { //唤醒所有线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; //唤醒所有线程并转到下一代 nextGeneration(); return 0; } finally { //确保在任务未成功执行时能将所有线程唤醒 if (!ranAction) { breakBarrier(); } } } //如果计数器不为0则执行此循环 for (;;) { try { //根据传入的参数来决定是定时等待还是非定时等待 if (!timed) { trip.await(); }else if (nanos > 0L) { nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作 Thread.currentThread().interrupt(); } } //如果线程因为打翻栅栏操作而被唤醒则抛出异常 if (g.broken) { throw new BrokenBarrierException(); } //如果线程因为换代操作而被唤醒则返回计数器的值 if (g != generation) { return index; } //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
我们异步执行一个任务时,一般是用线程池Executor去创建。如果不需要有返回值,任务实现Runnable接口;如果需要有返回值,任务实现Callable接口,调用Executor的submit方法,再使用Future获取即可。如果多个线程存在依赖组合的话,我们怎么处理呢?可使用同步组件CountDownLatch、CyclicBarrier等,但是比较麻烦。其实有简单的方法,就是用CompeletableFuture。
常用实现异步调用的方式(带返回值)
展开代码public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() { @Override public UserInfo call() throws Exception { return userInfoService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); Thread.sleep(300); //模拟主线程其它操作耗时 FutureTask<MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalInfo>() { @Override public MedalInfo call() throws Exception { return medalService.getMedalInfo(userId); } }); executorService.submit(medalInfoFutureTask); UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果 MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }
Future对于结果的获取并不友好:
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
使用示例:
展开代码public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId)); Thread.sleep(300); //模拟主线程其它操作耗时 CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId)); UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果 MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }
代码相比Future简洁很多,而且CompletableFuture的supplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool。
supplyAsync方法
展开代码//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //自定义线程,根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法
展开代码//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
使用示例
展开代码public static void main(String[] args) { //可以自定义线程池 ExecutorService executor = Executors.newCachedThreadPool(); //runAsync的使用 CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run"), executor); //supplyAsync的使用 CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.print("supply"); return "supply"; }, executor); //runAsync的future没有返回值,输出null System.out.println(runFuture.join()); //supplyAsync的future,有返回值 System.out.println(supplyFuture.join()); executor.shutdown(); // 线程池需要关闭 }
两个方法对比如下
CompletableFuture的thenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。
thenRun/thenRunAsync源码如下;
展开代码public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); }
两者的区别是:如果你执行第一个任务的时候,传入了一个自定义线程池,调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。
CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
CompletableFuture的exceptionally方法表示,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。
使用示例如下:
展开代码public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); throw new RuntimeException(); } ); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); return "你的程序异常啦"; }); System.out.println(exceptionFuture.get()); }
CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。这种主要适用于组合任务,将上一个线程的入参当作结果带到下一个线程。
示例如下
展开代码public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "A"; } ); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("上个任务执行完啦,还把" + a + "传过来"); if ("A".equals(a)) { System.out.println("666"); } System.out.println("233333"); }); System.out.println(rstFuture.get()); }
CompletableFuture的handle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。这种主要适用于组合任务,将上一个线程的结果当作结果带到下一个线程。
示例如下
展开代码public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "A"; } ); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("上个任务执行完啦,还把" + a + "传过来"); if ("A".equals(a)) { System.out.println("666"); return "B"; } System.out.println("233333"); return null; }); System.out.println(rstFuture.get()); }

thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。区别在于:
展开代码public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务"); ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future = CompletableFuture //第二个异步任务 .supplyAsync(() -> "第二个异步任务", executor) // (w, s) -> System.out.println(s) 是第三个任务 .thenCombineAsync(first, (s, w) -> { System.out.println(w); System.out.println(s); return "两个异步任务的组合"; }, executor); System.out.println(future.join()); executor.shutdown(); }

applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。区别在于;
展开代码public static void main(String[] args) { //第一个异步任务,休眠2秒,保证它执行晚点 CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{ try{ Thread.sleep(2000L); System.out.println("执行完第一个异步任务");} catch (Exception e){ return "第一个任务异常"; } return "第一个异步任务"; }); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture //第二个异步任务 .supplyAsync(() -> { System.out.println("执行完第二个任务"); return "第二个任务";} , executor) //第三个任务 .acceptEitherAsync(first, System.out::println, executor); executor.shutdown(); }
所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常
展开代码public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ System.out.println("我执行完了"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("我也执行完了"); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); }); }
任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常
展开代码public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我执行完了"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("我也执行完了"); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); // return "捡田螺的小男孩"; }); anyOfFuture.join(); }
thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例
展开代码public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务"); //第二个异步任务 ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "第二个任务", executor) .thenComposeAsync(data -> { System.out.println(data); return f; //使用第一个任务作为返回 }, executor); System.out.println(future.join()); executor.shutdown(); }
展开代码1、定义好缓存,用于存储对象,实现另一侧能获取对象 // 请求缓存:<请求ID, 响应结果> private final ConcurrentMap<String, CompletableFuture<String>> pendingRequests = new ConcurrentHashMap<>(); 2、发送消息时定义好id以及设置好超时时间 public String sendServiceSync(String topic, MqttSendParam param) { JSONObject jsonObj = new JSONObject(param); String playLoad = jsonObj.toString(); CompletableFuture<String> responseFuture = new CompletableFuture<>(); try { pendingRequests.put(param.getBid(), responseFuture); log.info("【sendServiceSync】"+param.getMethod()+"||||||"+param.getBid()); mqttProducer.sendMessage(topic, playLoad); return responseFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { throw new BizException("请求MQTT失败"); }finally { pendingRequests.remove(param.getBid()); } } 3、接收回调消息时完成即可 public void dealDrcReplyMsg(String deviceSn,String message) { JSONObject jsonObj = new JSONObject(message); String method = jsonObj.getStr("method"); if(ObjectUtil.isNotNullObj(method)){ String bid = method+"_"+deviceSn; CompletableFuture<String> responseFuture = pendingRequests.get(bid); if (ObjectUtil.isNotNullObj(responseFuture)) { responseFuture.complete(message); }else { // log.warn("未找到请求方法为{}ID为{}的请求",method, bid); } } }
可以看出相比CountDownLatch优雅很多。
对比一下CountDownLatch、CyclicBarrier以及CompletableFuture。
| 对比维度 | CountDownLatch | CyclicBarrier | CompletableFuture |
|---|---|---|---|
| 核心功能 | 让1个或多个线程等待其他N个线程完成任务后再继续执行(“等待-通知”模型) | 让N个线程相互等待,直到所有线程都到达指定屏障点后,再一起继续执行(“同步屏障”) | 异步编程工具,支持非阻塞式任务执行、结果链式处理、多任务组合(串行/并行/聚合) |
| 核心设计思想 | 基于“计数器递减”,计数器为0时触发等待线程唤醒 | 基于“线程到达屏障”,所有线程到达后触发屏障放行(可重复使用) | 基于“未来结果”,任务异步执行,结果通过回调链处理,无需显式等待 |
| 可重用性 | 不可重用:计数器减到0后,无法重置,只能使用一次 | 可重用:所有线程通过屏障后,计数器自动重置,支持多次循环使用 | 不可重用:单个CompletableFuture对应单个异步任务的生命周期,完成后状态固定 |
| 触发条件 | 计数器(count)减至0(由其他线程调用countDown()触发) | 所有注册的线程都调用await()到达屏障点 | 异步任务执行完成(正常完成/异常终止) |
| 线程角色 | 分为“等待线程”(调用await())和“计数线程”(调用countDown()),角色固定 | 所有线程角色一致,均为“参与线程”(既等待其他线程,也被其他线程等待) | 无固定角色,任务执行线程与回调处理线程分离(默认ForkJoinPool,支持指定线程池) |
| 结果传递能力 | 无返回值:仅同步线程执行时机,不传递任务结果 | 无返回值:仅同步线程执行时机,不传递任务结果 | 支持结果传递:异步任务可返回结果,后续通过thenApply/thenAccept等链式处理 |
| 异常处理机制 | 若等待线程被中断,会抛出InterruptedException;计数线程异常不影响计数器(需手动处理) | 若任一线程在await()时异常/中断,会导致其他等待线程抛出BrokenBarrierException,屏障被破坏 | 任务异常会自动封装到Future中,支持exceptionally/whenComplete/handle等显式捕获处理 |
| 多任务组合能力 | 仅支持“等待N个任务完成”的简单同步,无复杂组合能力 | 仅支持“N个线程同步屏障”,无任务组合能力 | 强大的组合能力:串行(thenCompose)、并行聚合(allOf/anyOf)、结果合并(thenCombine) |
| 阻塞特性 | 等待线程会主动阻塞(调用await()),属于“阻塞式同步” | 参与线程会主动阻塞(调用await()),属于“阻塞式同步” | 默认非阻塞:通过回调链处理结果,无需阻塞等待;也可通过get()/join()强制阻塞获取结果 |
| 适用场景 | 1. 主线程等待多个子线程完成初始化后再启动;2. 多个子线程等待主线程下达指令后再执行;3. 批量任务完成后统一汇总(无结果传递) | 1. 多线程分阶段执行,需所有线程完成当前阶段后再进入下一阶段(如多线程数据加载后统一处理);2. 线程间需同步执行节奏的场景 | 1. 异步调用(如HTTP接口、数据库查询)+ 结果链式处理;2. 多异步任务并行执行并聚合结果;3. 非阻塞式业务流程(如异步通知、异步计算);4. 替代传统Future,支持更灵活的回调 |
| 核心方法 | countDown()(计数器减1)、await()(阻塞等待计数器为0)、await(long timeout, TimeUnit unit)(超时等待) | await()(阻塞等待所有线程到达屏障)、await(long timeout, TimeUnit unit)(超时等待)、reset()(重置屏障,支持重用) | 任务创建:supplyAsync()(有返回值)、runAsync()(无返回值);结果处理:thenApply()/thenAccept()/thenRun();异常处理:exceptionally()/handle();组合方法:allOf()/anyOf()/thenCombine() |
关键区别提炼:
参考:
https://juejin.cn/post/7436399136763707418
本文作者:刘涛
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!