CompletableFuture异步回调CompletableFuture简介
CompletableFuture被用于异步编程,异步通常意味着非阻塞,可以使得任务单独允许在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。
CompletableFuture实现了Future,CompletionStage接口,实现了Future接口可以兼容线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。
Futrue和CompletableFuture
Future在Java里面,通过用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我么会得到一个Future,在Future里面有isDone方法来判断任务是否处理结束,该有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Futrue缺点
1.不支持手动完成。2.不支持进一步的非阻塞调用。3.不支持链式调用。4.不支持多个Future合并。5.不支持异步处理。
CompletableFuture类的使用案例CompletableFuture01
package com.shaonian.juc.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;/** * 演示CompletableFuture * @author 长名06 * @version 1.0 */public class CompletableFuture01 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture future = new CompletableFuture(); new Thread(() -> { System.out.println("子线程开始干活"); try { //子线程沉睡3s Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //完成future任务 future.complete("success"); },"A").start(); System.out.println("主线程调用get方法获取结果为:" + future.get()); System.out.println("主线程完成,阻塞结束"); }}
CompletableFuture02
package com.shaonian.juc.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;/** * @author 长名06 * @version 1.0 */public class CompletableFuture02 { public static void main(String[] args) throws ExecutionException, InterruptedException { //异步调用,无返回值 CompletableFuture completableFuture1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "执行runSync()"); }); completableFuture1.get(); //异步调用,有返回值 CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "执行supplyAsync()");// int i = 1/0; return 1024; }); completableFuture2.whenComplete((t, u) -> { System.out.println("----t=" + t);//t参数,是执行的返回值 System.out.println("----u=" + u);//异常信息 }).get();// System.out.println(Runtime.getRuntime().availableProcessors()); }}
CompletableFuture03
package com.shaonian.juc.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;/** * 演示线程依赖,执行api thenApply() * 一个任务,依赖于另一个任务可以使用thenApply()将两个任务(线程)串行化 * 对一个数先加10 再平方 * @author 长名06 * @version 1.0 */public class CompletableFuture03 { public static Integer num = 10; public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(Thread.currentThread().getName() + "主线程开始"); CompletableFuture future = CompletableFuture.supplyAsync(() -> { System.out.println("加10任务开启"); num += 10; return num; }).thenApply(i -> num * num); Integer integer = future.get(); System.out.println("主线程结束,子线程的结果为" + integer); }}
CompletableFuture04
package com.shaonian.juc.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.function.Consumer;import java.util.function.Function;/** * 消费处理结果 * thenAccept()方法,接收任务的处理结果,并消费结果,不返回结果了 * @author 长名06 * @version 1.0 */public class CompletableFuture04 { public static Integer num = 10; public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(Thread.currentThread().getName() + "主线程开始"); CompletableFuture future = CompletableFuture.supplyAsync(() -> { System.out.println("加10任务开启"); num += 10; return num; }).thenApply(new Function() { @Override public Integer apply(Integer integer) { return num * num; } }).thenAccept(new Consumer() { @Override public void accept(Integer i) { System.out.println("子线程全部处理完成,最后调用了accept方法,消费了结果" + i); } }); }}
CompletableFuture05
package com.shaonian.juc.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.function.Consumer;import java.util.function.Function;/** * 异常处理 * @author 长名06 * @version 1.0 */public class CompletableFuture05 { public static Integer num = 10; public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(Thread.currentThread().getName() + "主线程开始"); CompletableFuture future = CompletableFuture.supplyAsync(() -> { int i = 1/0;//模拟异常 System.out.println("加10任务开启"); num += 10; return num; }).exceptionally(new Function() { @Override public Integer apply(Throwable ex) { System.out.println(ex.getMessage()); return -1; } }); }}
CompletableFuture06
package com.shaonian.juc.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.function.BiFunction;import java.util.function.Function;/** * 消费结果,同时处理异常 * handle类似与thenAccept/thenRun方法,是最后一步结果的调用,但是同时可以处理异常 * @author 长名06 * @version 1.0 */public class CompletableFuture06 { public static Integer num = 10; public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(Thread.currentThread().getName() + "主线程开始"); CompletableFuture future = CompletableFuture.supplyAsync(() -> { int i = 1/0; System.out.println("加10任务开启"); num += 10; return num; }).handle(new BiFunction() { @Override public Integer apply(Integer i, Throwable ex) { System.out.println("进入了handle方法"); if(ex != null){ System.out.println("发生了异常,内容为" + ex.getMessage()); return -1; }else{ System.out.println("正常执行,结果为" + i); return i; } } }); }}
CompletableFuture07
package com.shaonian.juc.completable;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.CompletionStage;import java.util.concurrent.ExecutionException;import java.util.function.BiFunction;import java.util.function.Function;/** * 两个CompletableFuture结果的合并 * @author 长名06 * @version 1.0 */public class CompletableFuture07 { public static Integer num = 10; public static void main(String[] args) throws ExecutionException, InterruptedException { //有依赖关系的合并 CompletableFuture future = CompletableFuture.supplyAsync(() -> { System.out.println("加10任务开启"); num += 10; return num; }); //合并 CompletableFuture future2 = future.thenCompose(new Function<Integer, CompletionStage>() { @Override public CompletionStage apply(Integer i) { return CompletableFuture.supplyAsync(() -> { return i + 1; }); } }); System.out.println(future.get()); System.out.println(future2.get()); //无依赖的任务合并 CompletableFuture job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加10任务开启"); num += 10; return num; }); CompletableFuture job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘10任务开启"); num *= 10; return num; }); //合并两个结果 CompletableFuture
CompletableFuture08
package com.shaonian.juc.completable;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.stream.Collectors;/** * 多个独立任务的合并 allOf * @author 长名06 * @version 1.0 */public class CompletableFuture08 { public static Integer num = 10; public static void main(String[] args) throws ExecutionException, InterruptedException { List<CompletableFuture> list = new ArrayList(); CompletableFuture job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加10任务开启"); num += 10; return num; }); list.add(job1); CompletableFuture job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘10任务开启"); num *= 10; return num; }); list.add(job2); CompletableFuture job3 = CompletableFuture.supplyAsync(() -> { System.out.println("减10任务开启"); num -= 10; return num; }); list.add(job3); CompletableFuture job4 = CompletableFuture.supplyAsync(() -> { System.out.println("除10任务开启"); num /= 10; return num; }); list.add(job4); //使用allOf需注意,输入也会执行任务,但是无法获取到结果 //allOf需要等所有的任务执行完毕 /** * 返回值是CompletableFuture类型 * public static CompletableFuture allOf(CompletableFuture... cfs) { * return andTree(cfs, 0, cfs.length - 1); * } */// CompletableFuture allJob = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));// System.out.println(allJob.get()); //也可以使用 join的形式,执行,可以获取结果 List allResult = list.stream().map(CompletableFuture::join) .collect(Collectors.toList()); System.out.println(allResult); }}
CompletableFuture09
package com.shaonian.juc.completable;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;/** * anyOf * @author 长名06 * @version 1.0 */public class CompletableFuture09 { public static Integer num = 10; public static void main(String[] args) throws ExecutionException, InterruptedException { List<CompletableFuture> list = new ArrayList(); CompletableFuture job1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("加10任务开启"); num += 10; return num; }); list.add(job1); CompletableFuture job2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("乘10任务开启"); num *= 10; return num; }); list.add(job2); CompletableFuture job3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("减10任务开启"); num -= 10; return num; }); list.add(job3); CompletableFuture job4 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("除10任务开启"); num /= 10; return num; }); list.add(job4); //anyOf,这里只要有一个job执行完毕,就结束所有的任务执行,不需要等待所有的job执行完毕 //但是这个很鸡肋,因为如果不要执行所有的任务,就没必要开启一个CompletableFuture了 //也可以适用于竞争的场景,先执行成功的获取结果,其他的不再竞争了 CompletableFuture
只是为了记录自己的学习历程,且本人水平有限,不对之处,请指正。