- 学习视频:https://www.bilibili.com/video/BV1ar4y1x727
- 参考书籍:《实战 JAVA 高并发程序设计》 葛一鸣 著
系列文章目录中的前七篇文章的相关学习视频都是:黑马程序员深入学习 Java 并发编程,JUC 并发编程全套教程。然而这个时长为 32h 的视频中并没有 CompletableFuture 的相关知识,这个知识点还是蛮重要的。故找了相关的视频来学习这个重要的知识点,写在了系列文章目录中的第八篇博客里。第八篇的相关学习视频是 尚硅谷 JUC 并发编程(对标阿里 P6-P7)
- 若文章内容或图片失效,请留言反馈。
- 部分素材来自网络,若不小心影响到您的利益,请联系博主删除。
- 写这篇博客旨在制作笔记,方便个人在线阅览,巩固知识。无他用。
Future 的核心思想是异步调用。
Future 接口(FutureTask实现类)会用异步编程的方式去执行一些方法。
如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否执行完毕等。
当我们需要调用一个函数的时候,如果这个函数执行的过程非常的漫长,那我们就需要等待。
但我们不一定急着要该函数的结果。所以我们可以让被调者立即返回,让它在后台慢慢地处理这个请求。
对于调用者来说,可以先处理其他的任务,在真正需要数据的场合,再去尝试获得需要的数据。
在整个的调用过程中,不存在无谓的等待,充分利用了所有的时间片段,从而提高了系统的响应速度。
一句话:Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时费力的复杂业务。
Future 是 Java 5 新加的一个接口,它提供了一种 异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过 future 把这个任务放到异步编程中执行。
主线程继续处理其他任务或者先行结束后,再通过 Future 来获取计算结果。
java/util/concurrent/FutureTask.java
public FutureTask(Callable callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable
}
下面是 Future 的一些简单功能
public boolean isCancelled() // 取消任务public boolean isDone() // 是否已经取消public boolean cancel(boolean mayInterruptIfRunning) // 是否已经完成public V get() throws InterruptedException, ExecutionException // 取得返回对象public V get(long timeout, TimeUnit unit) // 取得返回对象,可以设置超时时间
示例代码
@Slf4j(topic = "c.FutureDemo_1")
public class FutureDemo_1 {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask futureTask_1 = new FutureTask<>(new MyThread_1(), "Hello MyThread_1");FutureTask futureTask_2 = new FutureTask<>(new MyThread_2());Thread t1 = new Thread(futureTask_1);t1.setName("t1");Thread t2 = new Thread(futureTask_2, "t2");t1.start();t2.start();log.info(futureTask_1.get());log.info(futureTask_2.get());}
}@Slf4j(topic = "c.MyThread_1")
class MyThread_1 implements Runnable {@Overridepublic void run() {log.info("--- Come In MyThread_1.run() ---");}
}@Slf4j(topic = "c.MyThread_2")
class MyThread_2 implements Callable {@Overridepublic String call() throws Exception {log.info("--- Come In MyThread_2.call() ---");return "Hello MyThread_2";}
}
输出结果
22:03:06.526 [t1] INFO c.MyThread_1 - --- Come In MyThread_1.run() ---
22:03:06.526 [t2] INFO c.MyThread_2 - --- Come In MyThread_2.call() ---
22:03:06.538 [main] INFO c.FutureDemo_1 - Hello MyThread_1
22:03:06.538 [main] INFO c.FutureDemo_1 - Hello MyThread_2
示例代码
@Slf4j(topic = "FutureThreadPoolDemo")
public class FutureThreadPoolDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {long startTime = System.currentTimeMillis();ExecutorService threadPool = Executors.newFixedThreadPool(3);FutureTask futureTask_1 = new FutureTask<>(() -> {TimeUnit.MILLISECONDS.sleep(500);return "Task_1 over";});threadPool.submit(futureTask_1);FutureTask futureTask_2 = new FutureTask<>(() -> {TimeUnit.MILLISECONDS.sleep(500);return "Task_2 over";});threadPool.submit(futureTask_2);TimeUnit.MILLISECONDS.sleep(300);log.info("{}", futureTask_1.get());// get() 方法会阻塞线程log.info("{}", futureTask_2.get());long endTime = System.currentTimeMillis();log.info("costTime:{} 毫秒", (endTime - startTime));threadPool.shutdown();m1();}private static void m1() throws InterruptedException {long startTime = System.currentTimeMillis();TimeUnit.MILLISECONDS.sleep(500);TimeUnit.MILLISECONDS.sleep(500);TimeUnit.MILLISECONDS.sleep(300);long endTime = System.currentTimeMillis();log.info("m1_costTime:{} 毫秒", (endTime - startTime));}
}
输出
22:45:13.678 [main] INFO FutureThreadPoolDemo - Task_1 over
22:45:13.692 [main] INFO FutureThreadPoolDemo - Task_2 over
22:45:13.692 [main] INFO FutureThreadPoolDemo - costTime:567 毫秒
22:45:15.018 [main] INFO FutureThreadPoolDemo - m1_costTime:1326 毫秒
示例代码
@Slf4j(topic = "c.FutureApiDemo")
public class FutureApiDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask future = new FutureTask(() -> {log.info("{} Come in", Thread.currentThread().getName());TimeUnit.SECONDS.sleep(5);return "TASK OVER !!!";});Thread t1 = new Thread(future, "t1");t1.start();log.info("{} 忙其他任务了", Thread.currentThread().getName());log.info("future.get():{}", future.get());}
}
输出结果
23:15:20.551 [t1] INFO c.FutureApiDemo - t1 Come in
23:15:20.551 [main] INFO c.FutureApiDemo - main 忙其他任务了
23:15:25.565 [main] INFO c.FutureApiDemo - future.get():TASK OVER !!!
上面这种情况是没有什么问题的,但如果我们调换一下 future.get()
的位置呢?
// 将 future.get() 置于前方
log.info("future.get():{}", future.get());
log.info("{} 忙其他任务了", Thread.currentThread().getName());
输出结果(显然,主线程被 future.get()
阻塞住了,只有在 future 的任务解决了之后,主线程才继续运行)
23:16:20.399 [t1] INFO c.FutureApiDemo - t1 Come in
23:16:25.422 [main] INFO c.FutureApiDemo - future.get():TASK OVER !!!
23:16:25.422 [main] INFO c.FutureApiDemo - main 忙其他任务了
更改 future.get()
方法中的参数(不用 public V get()
了,改用 public V get(long timeout, TimeUnit unit)
了)
log.info("future.get():{}", future.get(3, TimeUnit.SECONDS));
log.info("{} 忙其他任务了", Thread.currentThread().getName());
输出结果(直接抛出异常了,不管 future 之外的线程了,主线程的那一行代码完全没有执行)
23:22:57.287 [t1] INFO c.FutureApiDemo - t1 Come in
Exception in thread "main" java.util.concurrent.TimeoutExceptionat java.util.concurrent.FutureTask.get(FutureTask.java:205)at org.example.replenish.future.FutureApiDemo.main(FutureApiDemo.java:24)
当然了,如果使用的是 try-catch
的话,主线程倒还是可以在等待时间结束后继续运行的
try {log.info("future.get():{}", future.get(3, TimeUnit.SECONDS));
} catch (TimeoutException e) {log.info("e.printStackTrace()");e.printStackTrace();
}log.info("{} 忙其他任务了", Thread.currentThread().getName());
输出结果
23:38:26.841 [t1] INFO c.FutureApiDemo - t1 Come in
23:38:29.843 [main] INFO c.FutureApiDemo - e.printStackTrace()
23:38:29.843 [main] INFO c.FutureApiDemo - main 忙其他任务了
java.util.concurrent.TimeoutExceptionat java.util.concurrent.FutureTask.get(FutureTask.java:205)at org.example.replenish.future.FutureApiDemo.main(FutureApiDemo.java:26)
鉴于上面的异常情况,还是不要调用 public V get(long timeout, TimeUnit unit)
为好,建议使用下面的姿势
while (true) {if (future.isDone()) {log.info("future.get():{}", future.get());break;} else {try {TimeUnit.MILLISECONDS.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}log.info("正在处理中...莫急");System.out.println("... ...");}
}
log.info("{} 忙其他任务了", Thread.currentThread().getName());
输出结果(控制台输出的信息,一切都是岁月静好)
23:46:11.225 [t1] INFO c.FutureApiDemo - t1 Come in
23:46:16.232 [main] INFO c.FutureApiDemo - 正在处理中...莫急
... ...
23:46:21.247 [main] INFO c.FutureApiDemo - 正在处理中...莫急
... ...
23:46:21.247 [main] INFO c.FutureApiDemo - future.get():TASK OVER !!!
23:46:21.247 [main] INFO c.FutureApiDemo - main 忙其他任务了
但是,CPU 资源还是浪费了
想完成一些复杂的任务
Future 提供的 API 的功能并不能完全满足我们的需求,处理起来也不够优雅
此时还是需要 CompletableFuture 来处理这些需求(以声明式的方式)
get() 方法在 Future 计算完成之前会一直处在阻塞状态下,isDone() 方法容易耗费 CPU 资源
对于真正的异步处理,我们希望是可以通过传入回调函数,在 Future 结束的时候自动调用该回调函数,这样我们就可以不用等待结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的 CPU 资源,故 JDK 8 中出现了 CompletableFuture 这样的设计。
CompletableFuture 提供了一种类似观察者模式类似的机制,可以让任务执行完成后通知监听的一方。(完成好了,就通知我)
CompletionStage 接口有 40 多种方法,是为了函数式编程中的流式调用准备的。
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
CompletableFuture 是 Java 8 新增的一个超大型工具类。
public class CompletableFuture implements Future, CompletionStage
优点
CompletableFuture 中有四个核心的工厂方法
runAsync() 方法用于没有返回值的场景。比如,仅仅是简单地执行某一个异步动作。
public static CompletableFuture runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture runAsync(Runnable runnable, Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
}
supplyAsync() 方法用于那些需要有返回值的场景。比如计算某个数据等。
public static CompletableFuture supplyAsync(Supplier supplier) {return asyncSupplyStage(asyncPool, supplier);
}
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}
参考书籍:《实战 JAVA 高并发程序设计》 葛一鸣 著
在上面的两对方法中,都有一个方法可以接收一个 Executor 参数。我们可以借此让Supplier
或者Runnable
在指定的线程池中工作;如果不指定 Executor,则在默认的系统公共的ForkJoinPool.common
线程池中执行。
补充知识:在 Java 8 中,新增了ForkJoinPool.commonPool()
方法。它可以获得一个公共的 ForkJoin 线程池。这个公共线程池中的所有线程都是 Daemon 线程。这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。
下面的示例代码是在验证上面的说法:在上面的四个静态方法中
ForkJoinPool.commonPool()
作为它的线程池执行异步代码;@Slf4j(topic = "c.CompletableFutureDemo_1")
public class CompletableFutureDemo_1 {public static void main(String[] args) throws ExecutionException, InterruptedException {method_1();}// ... ...
}
private static void method_1() throws InterruptedException, ExecutionException {CompletableFuture voidCF = CompletableFuture.runAsync(() -> {log.info("{}", Thread.currentThread().getName());// 线程暂停 1 秒try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }});log.info("{}", voidCF.get());
}
private static void method_2() throws InterruptedException, ExecutionException {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture voidCF = CompletableFuture.runAsync(() -> {log.info("{}", Thread.currentThread().getName());// 线程暂停 1 秒try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace();}}, threadPool);log.info("{}", voidCF.get());threadPool.shutdown();
}
private static void method_3() throws InterruptedException, ExecutionException {CompletableFuture rCF = CompletableFuture.supplyAsync(() -> {log.info("{}", Thread.currentThread().getName());// 线程暂停 1 秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "Hello SupplyAsync";});log.info("{}", rCF.get());
}
private static void method_4() throws InterruptedException, ExecutionException {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture rCF = CompletableFuture.supplyAsync(() -> {log.info("{}", Thread.currentThread().getName());// 线程暂停 1 秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "Hello SupplyAsync";}, threadPool);log.info("{}", rCF.get());threadPool.shutdown();
}
10:48:14.519 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureDemo_1 - ForkJoinPool.commonPool-worker-1
10:48:15.535 [main] INFO c.CompletableFutureDemo_1 - null
10:51:03.789 [pool-1-thread-1] INFO c.CompletableFutureDemo_1 - pool-1-thread-1
10:51:04.813 [main] INFO c.CompletableFutureDemo_1 - null
11:04:24.533 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureDemo_1 - ForkJoinPool.commonPool-worker-1
11:04:25.555 [main] INFO c.CompletableFutureDemo_1 - Hello SupplyAsync
11:05:23.226 [pool-1-thread-1] INFO c.CompletableFutureDemo_1 - pool-1-thread-1
11:05:24.244 [main] INFO c.CompletableFutureDemo_1 - Hello SupplyAsync
从 Java 8 开始引入了 CompletableFuture,它是 Future 的扩展功能增强版,可以减少 Future 中的阻塞和轮询问题。
它可以传入回调对象,当异步任务完成或发生异常的时候,自动调用回调对象的回调方法。
@Slf4j(topic = "c.CompletaleFutureDemo_2")
public class CompletableFutureDemo_2 {public static void main(String[] args) throws ExecutionException, InterruptedException {test_1();}// ... ...
}
熟悉的配方,熟悉的味道
private static void test_1() throws InterruptedException, ExecutionException {CompletableFuture rCF = CompletableFuture.supplyAsync(() -> {log.info("---【{} Come In】---", Thread.currentThread().getName());int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}log.info("---【1 秒后出结果:{}】---", result);return result;});log.info("---【{} 线程先去忙其他任务了】---", Thread.currentThread().getName());log.info("---【result:{}】---", rCF.get());
}
Future 可以做到的事情,CompletableFuture 都可以做到。
11:30:11.460 [main] INFO c.CompletaleFutureDemo_2 - ---【main 线程先去忙其他任务了】---
11:30:11.460 [ForkJoinPool.commonPool-worker-1] INFO c.CompletaleFutureDemo_2 - ---【ForkJoinPool.commonPool-worker-1 Come In】---
11:30:12.487 [ForkJoinPool.commonPool-worker-1] INFO c.CompletaleFutureDemo_2 - ---【1 秒后出结果:9】---
11:30:12.487 [main] INFO c.CompletaleFutureDemo_2 - ---【result:9】---
CompletableFuture 可以传入回调对象,当异步任务完成或发生异常的时候,自动调用回调对象的回调方法。
private static void test_2() throws InterruptedException, ExecutionException {ExecutorService threadPool = Executors.newFixedThreadPool(3);try {CompletableFuture.supplyAsync(() -> {log.info("【{} Come In】", Thread.currentThread().getName());int result = ThreadLocalRandom.current().nextInt(10);try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }log.info("【1 秒后出结果:{}】", result);if (result > 5) { int i = 10 / 0; }return result;}, threadPool).whenComplete((result, exception) -> {log.info("[进入 whenComplete 方法]");if (exception == null) { log.info("【计算完成,更新系统。Update result:{}】", result); }log.info("[退出 whenComplete 方法]");}).exceptionally(exception -> {// exception.printStackTrace();System.out.println("【异常情况 Cause:" + exception.getCause() + "】");log.info("【异常情况 Message:{}】", exception.getMessage());return null;});} catch (Exception e) {// e.printStackTrace();} finally {threadPool.shutdown();}// 【注意】// 在这个程序里,CompletableFuture 默认使用的 [ForkJoinPool] 就是守护线程,[main] 就是用户线程// 如果主线程立刻结束的话,CompletableFuture 默认使用的线程池会立刻关闭// (守护线程要守护的对象不存在了,程序就应该结束)// 我们一般推荐 [自定义线程池 + CompletableFuture] 配合使用log.info("【{} 线程先去忙其他任务了】", Thread.currentThread().getName());
}
控制台输出(没有出现异常的情况,走 whenComplete() 方法)
12:28:54.135 [main] INFO c.CompletaleFutureDemo_2 - 【main 线程先去忙其他任务了】
12:28:54.135 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【pool-1-thread-1 Come In】
12:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【1 秒后出结果:5】
12:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [进入 whenComplete 方法]
12:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【计算完成,更新系统。Update result:5】
12:28:55.151 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [退出 whenComplete 方法]
控制台输出(出现了异常的情况,依然会进入 whenComplete() 方法,之后会走 exceptionally() 方法)
注意:就算是出现了异常,也还是会进入 whenComplete() 方法,故我们还需要在 whenComplete() 加判断处理
12:29:58.062 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【pool-1-thread-1 Come In】
12:29:58.062 [main] INFO c.CompletaleFutureDemo_2 - 【main 线程先去忙其他任务了】
12:29:59.079 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【1 秒后出结果:8】
12:29:59.079 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [进入 whenComplete 方法]
12:29:59.079 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - [退出 whenComplete 方法]
【异常情况 Cause:java.lang.ArithmeticException: / by zero】
12:29:59.080 [pool-1-thread-1] INFO c.CompletaleFutureDemo_2 - 【异常情况 Message:java.lang.ArithmeticException: / by zero】
@FunctionalInterface
public interface Runnable {public abstract void run();
}
@FunctionalInterface
public interface Function {R apply(T t);
}
@FunctionalInterface
public interface Consumer {void accept(T t);
}
@FunctionalInterface
public interface BiConsumer {accept(T t, U u);
}
@FunctionalInterface
public interface Supplier {T get();
}
案例说明:电商比价需求
模拟如下情况:
《MySQL》 in jd price is 88.05
《MySQL》 pdd price is 86.11
《MySQL》 in taobao price is 90.43
step by step
,按部就班,查完京东查淘宝,查完淘宝查天猫 … …all in
,多线程异步任务同时查询NetMall.java
public class NetMall {private String netMallName;public String getNetMallName() {return netMallName;}public NetMall(String netMallName) {this.netMallName = netMallName;}public Double calcPrice(String productName) {try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}
CompletableFutureMallDemo.java
@Slf4j(topic = "c.CompletableFutureMallDemo")
public class CompletableFutureMallDemo {static List list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"));/*** Step By Step** @param list* @param productName* @return*/public static List getPrice_1(List list, String productName) {// 输出格式:MySQL in taobao price is 90.43List collectList = list.stream().map(netMall -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName))).collect(Collectors.toList());return collectList;}/*** All In** @param list* @param productName* @return*/public static List getPrice_2(List list, String productName) {// 其实可以一步链式到位的,但这里为了便于理解,还是中间截断了一下,设了俩变量List> futureList = list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName)))).collect(Collectors.toList());List collectList = futureList.stream().map(s -> s.join()).collect(Collectors.toList());return collectList;}public static void main(String[] args) {long startTime_1 = System.currentTimeMillis();List lists_1 = getPrice_1(list, "MySQL");for (String element : lists_1) {System.out.println(element);}long endTime_1 = System.currentTimeMillis();log.info("【costTime_1:{} 毫秒】", (endTime_1 - startTime_1));System.out.println("---------------------------------------------");long startTime_2 = System.currentTimeMillis();List lists_2 = getPrice_2(list, "MySQL");for (String element: lists_2){System.out.println(element);}long endTime_2 = System.currentTimeMillis();log.info("【costTime_2:{} 毫秒】", (endTime_2 - startTime_2));}
}
控制台输出
MySQL in jd price is 77.53
MySQL in dangdang price is 77.19
MySQL in taobao price is 77.53
17:27:21.710 [main] INFO c.CompletableFutureMallDemo - 【costTime_1:3068 毫秒】
---------------------------------------------
MySQL in jd price is 78.91
MySQL in dangdang price is 77.80
MySQL in taobao price is 77.56
17:27:22.735 [main] INFO c.CompletableFutureMallDemo - 【costTime_2:1014 毫秒】
获取结果
public T get()
:一旦调用 get() 方法求结果,在 futureTask 没有计算完成的情况下,容易导致线程阻塞public T get(long timeout, TimeUnit unit)
:超过了设置的时间,则不再等候,会发生阻塞public T join()
:和 get() 方法的区别不大,只是其在编译期也不会报错,而 get() 方法必须抛异常public T getNow(T valueIfAbsent)
:在没有计算完成的情况下,会返回一个替代的结果主动触发计算
public boolean complete(T value)
:判断是否是 get() 方法发生打断操作,并立即返回括号里面的值@Slf4j(topic = "c.CompletableFutureAPIDemo")
public class CompletableFutureAPIDemo {// public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {public static void main(String[] args) {CompletableFuture cf = CompletableFuture.supplyAsync(() -> {log.info("【进入 CompletableFuture.supplyAsync()】");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "ABC";});// log.info("【{}】", cf.get());// log.info("【{}】", cf.get(2L, TimeUnit.SECONDS));// log.info("【{}】", cf.join());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// log.info("【{}】", cf.getNow("xxx"));log.info("[{}]\t[{}]", cf.complete("completeValue"), cf.join());}
}
控制台输出结果
19:43:53.219 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo - 【进入 CompletableFuture.supplyAsync()】
19:43:56.223 [main] INFO c.CompletableFutureAPIDemo - [false] [ABC]
public CompletableFuture thenApply(Function super T,? extends U> fn)
public CompletableFuture handle(BiFunction super T, Throwable, ? extends U> fn)
exceptionallly() 类似于 try-catch
whenComplete()、handle() 类似于 try-finally
@Slf4j(topic = "c.CompletableFutureAPIDemo_2")
public class CompletableFutureAPIDemo_2 {public static void main(String[] args) throws InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(10);// test_1_1(threadPool);// test_2(threadPool);}
}
private static void test_1_1(ExecutorService threadPool) {CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}log.info("【111】");return 1;}, threadPool).thenApply(f -> {int i = 1 / 0; //故意设置异常log.info("【222】");return f + 2;}).thenApply(f -> {log.info("【333】");return f + 3;}).whenComplete((v, e) -> {log.info("---[进入 whenComplete() 方法]---");if (e == null) log.info("【计算结果:{}】", v);log.info("---[即将退出 whenComplete() 方法]---");}).exceptionally(e -> {// e.printStackTrace();log.info("【{}】", e.getMessage());return null;});log.info("-----【({})线程去忙其它事情了】-----", Thread.currentThread().getName());threadPool.shutdown();
}
private static void test_2(ExecutorService threadPool) {CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}log.info("【111】");return 1;}, threadPool).handle((f, e) -> {int i = 1 / 0;// 此处故意设置异常log.info("【222】");return f + 2;}).handle((f, e) -> {log.info("【333】");return f + 3;}).whenComplete((v, e) -> {log.info("---[进入 whenComplete() 方法]---");if (e == null) log.info("【计算结果:{}】", v);log.info("---[即将退出 whenComplete() 方法]---");}).exceptionally(e -> {// e.printStackTrace();log.info("【{}】", e.getMessage());return null;});log.info("-----【({})线程去忙其它事情了】-----", Thread.currentThread().getName());threadPool.shutdown();
}
控制台正常输出信息
20:31:23.578 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【111】
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【222】
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【333】
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【计算结果:6】
20:31:24.581 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---
控制台输出信息(打印 【222】 的操作部分有异常)使用 thenApply()
20:30:53.304 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
20:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【111】
20:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
20:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---
20:30:54.302 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【java.lang.ArithmeticException: / by zero】
控制台输出信息(打印 【222】 的操作部分有异常)使用 handle()
20:31:48.706 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【111】
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【333】
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---
20:31:49.711 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【java.lang.NullPointerException】
这里我们再看一下 thenApplyAsync() 方法
java/util/concurrent/CompletableFuture.java
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);
}
实际操作
private static void test_1_2(ExecutorService threadPool) {CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }log.info("【111】");return 1;}, threadPool).thenApplyAsync(f -> {log.info("【222】");return f + 2;}, threadPool).thenApplyAsync(f -> {log.info("【333】");return f + 3;}).whenComplete((v, e) -> {log.info("---[进入 whenComplete() 方法]---");if (e == null) log.info("【计算结果:{}】", v);log.info("---[即将退出 whenComplete() 方法]---");}).exceptionally(e -> {// e.printStackTrace();log.info("【{}】", e.getMessage());return null;});log.info("-----【({})线程去忙其它事情了】-----", Thread.currentThread().getName());try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); }threadPool.shutdown();
}
控制信息输出
21:35:21.902 [main] INFO c.CompletableFutureAPIDemo_2 - -----【(main)线程去忙其它事情了】-----
21:35:22.902 [pool-1-thread-1] INFO c.CompletableFutureAPIDemo_2 - 【111】
21:35:22.902 [pool-1-thread-2] INFO c.CompletableFutureAPIDemo_2 - 【222】
21:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 - 【333】
21:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 - ---[进入 whenComplete() 方法]---
21:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 - 【计算结果:6】
21:35:22.903 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_2 - ---[即将退出 whenComplete() 方法]---
如果你 threadPool.shutdown();
关闭的时间过早的话,也会抛出异常
java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniApply@514e386b rejected from java.util.concurrent.ThreadPoolExecutor@5f89dc32[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
... ...
Async 的意思是 “异步的”。
在 CompletableFuture 中,带了 Async 的方法都是在多个线程之间做串行化的处理。
thenApplyAsync:在前一个线程执行完成后,会开始执行,获取前一个线程的返回结果,同时也会返回结果信息
public CompletableFuture thenRun(Runnable action)
public CompletableFuture thenAccept(Consumer super T> action)
public CompletableFuture thenApply(Function super T,? extends U> fn)
public class CompletableFutureAPIDemo_3 {public static void main(String[] args) {CompletableFuture.supplyAsync(() -> {return "整一个值 A";}).thenApply(f -> { return f + "\t再整一个值 C"; }).thenAccept(f -> System.out.println("thenAccept:" + f)).thenRun(() -> {System.out.println("thenRun():无输入值,亦无返回值");});System.out.println("===========================================");System.out.println(CompletableFuture.supplyAsync(() -> "Result").thenRun(() -> {}).join());}
}
控制台输出信息
thenAccept:整一个值 A 再整一个值 C
thenRun():无输入值,亦无返回值
===========================================
null
以 thenRun 和 thenRunAsync 为例,分析他们的区别
(这里的代码比较简单,故省略。下面只贴上控制台的输出结果)
supplyAsync(supplier) + thenRun(runnable)
supplyAsync(suppler) + thenRunAsync(runnable)
10:00:07.860 [main] INFO c.TestA - 【main 线程正在摸鱼中...】
10:00:07.893 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 一号任务:ForkJoinPool.commonPool-worker-1
10:00:07.925 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 二号任务:ForkJoinPool.commonPool-worker-1
10:00:07.956 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 三号任务:ForkJoinPool.commonPool-worker-1
10:00:07.988 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 四号任务:ForkJoinPool.commonPool-worker-1
10:00:07.988 [main] INFO c.TestA - 【cf.get:null】
supplyAsync(suppler + threadPool) + thenRun(runnable)
10:02:00.847 [main] INFO c.TestA - 【main 线程正在摸鱼中...】
10:02:00.868 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:02:00.898 [pool-1-thread-1] INFO c.TestA - 二号任务:pool-1-thread-1
10:02:00.930 [pool-1-thread-1] INFO c.TestA - 三号任务:pool-1-thread-1
10:02:00.960 [pool-1-thread-1] INFO c.TestA - 四号任务:pool-1-thread-1
10:02:00.960 [main] INFO c.TestA - 【cf.get:null】
supplyAsync(suppler + threadPool) + thenRunAsync(runnable)
10:07:34.691 [main] INFO c.TestA - 【main 线程正在摸鱼中...】
10:07:34.713 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:07:34.744 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 二号任务:ForkJoinPool.commonPool-worker-1
10:07:34.776 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 三号任务:ForkJoinPool.commonPool-worker-1
10:07:34.808 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 四号任务:ForkJoinPool.commonPool-worker-1
10:07:34.808 [main] INFO c.TestA - 【cf.get:null】
supplyAsync(suppler + threadPool) + thenRunAsync(runnable + threadPool)
10:12:27.048 [main] INFO c.TestA - 【main 线程正在摸鱼中...】
10:12:27.068 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:12:27.100 [pool-1-thread-2] INFO c.TestA - 二号任务:pool-1-thread-2
10:12:27.130 [pool-1-thread-3] INFO c.TestA - 三号任务:pool-1-thread-3
10:12:27.161 [pool-1-thread-4] INFO c.TestA - 四号任务:pool-1-thread-4
10:12:27.161 [main] INFO c.TestA - 【cf.get:null】
supplyAsync(suppler + threadPool) + thenRunAsync(runnable) + thenRunAsync(runnable + threadPool) + thenRunAsync(runnable)
10:15:02.708 [main] INFO c.TestA - 【main 线程正在摸鱼中...】
10:15:02.741 [pool-1-thread-1] INFO c.TestA - 一号任务:pool-1-thread-1
10:15:02.772 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 二号任务:ForkJoinPool.commonPool-worker-1
10:15:02.804 [pool-1-thread-2] INFO c.TestA - 三号任务:pool-1-thread-2
10:15:02.835 [ForkJoinPool.commonPool-worker-1] INFO c.TestA - 四号任务:ForkJoinPool.commonPool-worker-1
10:15:02.835 [main] INFO c.TestA - 【cf.get:null】
分析
注意:有时候系统处理的速度太快,则直接使用 main 线程处理
supplyAsync(suppler + threadPool) + thenRun(runnable)
supplyAsync(suppler + threadPool) + thenRunAsync(runnable)
【一号任务:pool-1-thread-1】
【二号任务:main】
【三号任务:main】
【四号任务:main】
10:38:02.131 [main] INFO c.TestA - 【main 线程正在摸鱼中...】
10:38:02.143 [main] INFO c.TestA - 【cf.get:null】
这里再贴一下相关的源码
java/util/concurrent/CompletableFuture.java
public CompletableFuture thenRunAsync(Runnable action) {return uniRunStage(asyncPool, action);
}
public CompletableFuture thenRunAsync(Runnable action, Executor executor) {return uniRunStage(screenExecutor(executor), action);
}
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
static Executor screenExecutor(Executor e) {if (!useCommonPool && e == ForkJoinPool.commonPool())return asyncPool;if (e == null) throw new NullPointerException();return e;
}
简言之:在一般情况下,如果不给 thenRunAsync() 方法指定线程池的话,其就会使用默认的线程池
有一种特殊情况,那就是系统处理的速度太快,来不及使用线程池,就直接使用 main 线程处理完了
public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T, U> fn)
示例代码
@Slf4j(topic = "c.CompletableFutureAPIDemo_4")
public class CompletableFutureAPIDemo_4 {public static void main(String[] args) {CompletableFuture cfA = CompletableFuture.supplyAsync(() -> {log.info("【A come in】");try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }return "[This is cfA]";});CompletableFuture cfB = CompletableFuture.supplyAsync(() -> {log.info("【B come in】");try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return "[This is cfB]";});CompletableFuture result = cfA.applyToEither(cfB, f -> { return "【" + f + " is Winer" + "】"; });log.info(result.join());}
}
控制台输出信息
11:19:01.620 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_4 - 【A come in】
11:19:01.620 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_4 - 【B come in】
11:19:02.634 [main] INFO c.CompletableFutureAPIDemo_4 - 【[This is cfB] is Winer】
public CompletableFuture thenCombine(
CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
示例代码
@Slf4j(topic = "c.CompletableFutureAPIDemo_5")
public class CompletableFutureAPIDemo_5 {public static void main(String[] args) {CompletableFuture cf_1 = CompletableFuture.supplyAsync(() -> {log.info("【[cf_1] | 开始启动】");try { TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) { e.printStackTrace(); }return 10;});CompletableFuture cf_2 = CompletableFuture.supplyAsync(() -> {log.info("【[cf_2] | 开始启动】");try { TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) { e.printStackTrace(); }return 20;});CompletableFuture combineResult = cf_1.thenCombine(cf_2, (x, y) -> {log.info("【俩结果开始合并】");return x + y;});log.info("【CombineResult:{}】", combineResult.join());}
}
控制台输出信息
11:39:27.550 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_5 - 【[cf_2] | 开始启动】
11:39:27.550 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5 - 【[cf_1] | 开始启动】
11:39:29.571 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_5 - 【俩结果开始合并】
11:39:29.571 [main] INFO c.CompletableFutureAPIDemo_5 - 【CombineResult:30】
示例代码
@Slf4j(topic = "c.CompletableFutureAPIDemo_5Plus")
public class CompletableFutureAPIDemo_5Plus {public static void main(String[] args) {CompletableFuture combineResult = CompletableFuture.supplyAsync(() -> {log.info("【come in 1】");return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {log.info("【come in 2】");return 20;}), (x, y) -> {log.info("【come in 3】");return x + y;}).thenCombine(CompletableFuture.supplyAsync(() -> {log.info("【come in 4】");return 30;}), (a, b) -> {log.info("【come in 5】");return a + b;});log.info("【[{}]线程在摸鱼】", Thread.currentThread().getName());log.info("【CombineResult:{}】", combineResult.join());}
}
输出结果
11:50:02.395 [ForkJoinPool.commonPool-worker-3] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 4】
11:50:02.395 [ForkJoinPool.commonPool-worker-2] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 2】
11:50:02.396 [main] INFO c.CompletableFutureAPIDemo_5Plus - 【[main]线程在摸鱼】
11:50:02.395 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 1】
11:50:02.406 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 3】
11:50:02.406 [ForkJoinPool.commonPool-worker-1] INFO c.CompletableFutureAPIDemo_5Plus - 【come in 5】
11:50:02.406 [main] INFO c.CompletableFutureAPIDemo_5Plus - 【CombineResult:60】
有这样一种情况:并行运行多个互不相关的任务,最后统计整理它们处理的计算结果。
public static CompletableFuture allOf(CompletableFuture>... cfs)
public static CompletableFuture
示例代码
@Slf4j(topic = "c.Demo_6")
public class CompletableFutureAPIDemo_6 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {try { TimeUnit.MILLISECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); }log.info("执行一号任务");return "111";});CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }log.info("执行二号任务");return "222";});CompletableFuture cf3 = CompletableFuture.supplyAsync(() -> {try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }log.info("执行三号任务");return "333";});// allOfTest(cf1, cf2, cf3);// anyOfTest(cf1, cf2, cf3);log.info("溜啦溜啦");}private static void anyOfTest(CompletableFuture cf1, CompletableFuture cf2, CompletableFuture cf3) {CompletableFuture
输出结果-1(CompletableFuture.allOf()
)
13:16:01.084 [ForkJoinPool.commonPool-worker-3] INFO c.Demo_6 - 执行三号任务
13:16:01.097 [ForkJoinPool.commonPool-worker-2] INFO c.Demo_6 - 执行二号任务
13:16:01.112 [ForkJoinPool.commonPool-worker-1] INFO c.Demo_6 - 执行一号任务
13:16:01.112 [main] INFO c.Demo_6 - 【[allFuture.join]:null】
13:16:01.113 [main] INFO c.Demo_6 - 【所有任务都已经完成啦】
13:16:01.114 [main] INFO c.Demo_6 - 【[allFuture]:java.util.concurrent.CompletableFuture@66cd51c3[Completed normally]】
13:16:01.114 [main] INFO c.Demo_6 - 溜啦溜啦
输出结果-2(CompletableFuture.anyOf()
)
13:14:00.829 [ForkJoinPool.commonPool-worker-3] INFO c.Demo_6 - 执行三号任务
13:14:00.829 [ForkJoinPool.commonPool-worker-2] INFO c.Demo_6 - 执行二号任务
13:14:00.842 [ForkJoinPool.commonPool-worker-1] INFO c.Demo_6 - 执行一号任务
13:14:00.841 [main] INFO c.Demo_6 - 【[anyFuture.join]:333]】
13:14:00.843 [main] INFO c.Demo_6 - 【完成了一个任务了】
13:14:00.844 [main] INFO c.Demo_6 - 【[anyFuture]:java.util.concurrent.CompletableFuture@66cd51c3[Completed normally]】
13:14:00.844 [main] INFO c.Demo_6 - 溜啦溜啦
下一篇:模块、包和异常