CompletableFuture(异步任务编排)
FutureTask的缺点
当我们需要异步调用任务并且需要异步任务的返回值时我们就需要使用FutureTask的get方法来获取返回值.
public static ExecutorService executorService = Executors.newFixedThreadPool(20);public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> integerFutureTask = new FutureTask<>(() -> {//业务逻辑return 1;});executorService.submit(integerFutureTask);Integer res = integerFutureTask.get();System.out.println(res);System.out.println(123);}
程序输出
123
1
加入我们的异步任务非常的耗时由于get方法会阻塞主线程就会导致程序的吞吐量下降
public static ExecutorService executorService = Executors.newFixedThreadPool(20);public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> integerFutureTask = new FutureTask<>(() -> {//业务逻辑Thread.sleep(2000);return 1;});executorService.submit(integerFutureTask);Integer res = integerFutureTask.get();System.out.println(res);System.out.println(123);}
程序输出
1
123
必须要等异步任务完成才能继续执行主线程,不仅如此,当我们在碰到一下业务场景的时候,单纯使用Future接口或者FutureTask类并不能很好地完成以下我们所需的业务.
CompletableFuture
CompletableFuture它可以完成复杂的异步任务编程功能,异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。通过这种方式我们的主线程就不需要阻塞等待返回结果。或者以下的业务逻辑也是需要CompletableFuture来完成的。
- 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
- 等待Future集合种的所有任务都完成。
- 仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。
- 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
- 应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果)
创建异步任务
CompletableFuture有两种方法创建异步任务,分别是
- supplyAsync执行CompletableFuture任务,支持返回值, 同时支持使用自己的线程池。
- runAsync执行CompletableFuture任务,没有返回值,同时支持使用自己的线程池。
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(ASYNC_POOL, runnable);}public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(ASYNC_POOL, supplier);}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}
异步任务回调
任务串行化
比如我们一个任务需要另一个任务的返回值,或者必须要等上一个任务执行完成我们才能执行当前任务,我们就需要对任务进行串行化。有三种方法。
- thenRun 或 thenRunAsync 不能接受上一个任务的返回值,自己本身也没有返回值。
- thenAccept 或 thenAcceptAsync 能接受上一个任务的返回值,但自己本身没有返回值。
- thenApply 或 thenApplyAsync 能接受上一个任务的返回值,自己本身也有返回值。
这些方法后面的Async代表当前的方法也要异步执行,也就是新开一个线程执行,并且也可以指定线程池,没有Async就使用当前线程继续执行。
public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}public CompletableFuture<Void> thenRunAsync(Runnable action) {return uniRunStage(defaultExecutor(), action);}public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {return uniRunStage(screenExecutor(executor), action);}public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {return uniAcceptStage(defaultExecutor(), action);}public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {return uniAcceptStage(screenExecutor(executor), action);}public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(defaultExecutor(), fn);}public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);}
当然这些方法并不能解决上面所说的FutureTask的问题,但是CompletableFuture为我们提供了whenComplete方法
任务监听(whenComplete和handle)
- whenComplete
whenComplete接收两个参数,第一个是上一个任务的返回值,第二个参数是上一个任务出现的异常。但是并没有返回值。
加Async的和上面的同理
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);}public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(defaultExecutor(), action);}public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {return uniWhenCompleteStage(screenExecutor(executor), action);}
一个例子
public static ExecutorService executorService = Executors.newFixedThreadPool(20);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("主线程");CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "123";}, executorService).whenComplete((res, e) -> {System.out.println("异步任务完成了" + res);});// System.out.println(stringCompletableFuture.get());System.out.println("主线程end");}
程序结果
主线程
主线程end
异步任务完成了123
可以看出我们使用了whenComplete即使我们的异步任务非常的慢,但并不影响我们主线程的运行,提高了程序的吞吐量
- handle
handle 接收两个参数,第一个是上一个任务的返回值,第二个参数是上一个任务出现的异常。并且可以有返回值。
加Async的和上面的同理
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(null, fn);}public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(defaultExecutor(), fn);}public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {return uniHandleStage(screenExecutor(executor), fn);}
多任务组合
- 两个任务组合
- thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
- thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
- runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
加Async的和上面的同理
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) {return biRunStage(null, other, action);}public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {return biRunStage(defaultExecutor(), other, action);}public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {return biRunStage(screenExecutor(executor), other, action);}public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);}public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(defaultExecutor(), other, action);}public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) {return biAcceptStage(screenExecutor(executor), other, action);}public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);}public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(defaultExecutor(), other, fn);}public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) {return biApplyStage(screenExecutor(executor), other, fn);}
- 多个任务组合
多任务组合是CompletableFuture下的静态方法allOf是所有都完成anyOf是只要有一个完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {}public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {}
使用CompletableFuture注意事项
- 默认线程池的注意点
CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。