【Java 8 新特性】Java CompletableFuture supplyAsync()
supplyAsync()是Java 8
引入的CompletableFuture
静态方法。
1.supplyAsync(Supplier supplier)
supplyAsync()
默认完成在ForkJoinPool.commonPool()
或指定Executor
中异步执行的任务。
方法声明:supplyAsync(Supplier supplier)
需要将Supplier作为任务传递给supplyAsync()
方法。
默认情况下,此任务将在ForkJoinPool.commonPool()
中异步完成执行,最后supplyAsync()
将返回新的CompletableFuture
,其值是通过调用给定的Supplier所获得的值。
代码示例:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture");System.out.println(completableFuture.get());
2.supplyAsync(Supplier supplier, Executor executor)
方法声明:supplyAsync(Supplier supplier, Executor executor)
需要将Supplier作为任务传递给supplyAsync()
方法,并且指定Executor
,任务将在给定的Executor中异步完成。最后supplyAsyncl()
将返回具有通过调用给定Supplier
所获得的值的新CompletableFuture
。
示例代码:
ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> "Hello CompletableFuture!", executorService );System.out.println(cf.get());
3.使用thenApply()
thenApply()
通过传递阶段结果来执行一个函数。当supplyAsync
和thenApply()
一起使用,thenApply()
将从supplyAsync()
获得的参数传递来执行给定的函数。
thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class SupplyAsyncExample {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> getDataById(1)).thenApply(data -> sendData(data));cf.get();}private static String getDataById(int id) {System.out.println("getDataById: "+ Thread.currentThread().getName());return "Data:"+ id;}private static String sendData(String data) {System.out.println("sendData: "+ Thread.currentThread().getName());System.out.println(data);return data;}}
输出结果:
getDataById: ForkJoinPool.commonPool-worker-1sendData: mainData:1
主线程开始执行代码,当代码到达supplyAsync()
时,supplyAsync()
从ForkJoinPool.commonPool()
获取新线程以异步执行其功能。
thenApply()
将由主线程或supplyAsync()
使用的线程执行。
如果supplyAsync()
的Supplier花费的时间更长,则thenApply()
将由supplyAsync()
所使用的线程执行,因此主线程将不会被阻塞。
代码示例:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class SupplyAsyncExample {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> getDataById(1)).thenApply(data -> sendData(data));cf.get();}private static String getDataById(int id) {System.out.println("getDataById: "+ Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "Data:"+ id;} private static String sendData(String data) {System.out.println("sendData: "+ Thread.currentThread().getName());System.out.println(data);return data;}}
输出:
getDataById: ForkJoinPool.commonPool-worker-1sendData: ForkJoinPool.commonPool-worker-1Data:1
4.自定义Executor
将Executor
作为入参传递给supplyAsync()
。
传递给supplyAsync()
的Supplier
将由上面传入的Executor
执行,而不是由ForkJoinPool.commonPool()
执行。
package com.example.supplyAsyncExample;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SupplyAsyncExample02 {public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> getById(1),executorService).thenApply(data -> sendData(data));completableFuture.get();executorService.shutdown();}private static String getById(int id){System.out.println("getById:" + Thread.currentThread().getName());return "Data:" + id;}private static String sendData(String data){System.out.println("sendData:" + Thread.currentThread().getName());System.out.println(data);return data;}}
输出:
getById:pool-1-thread-1sendData:pool-1-thread-1Data:1也有可能是:getById:pool-1-thread-1sendData:mainData:1
5.使用whenComplete()
whenComplete()
方法创建supplyAsync()
。
完成指定操作之后,wenComplete()
返回具有相同结构或异常的新CompletionStage
当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete这回调函数。
正常完成:whenComplete返回结果和上级任务一致,异常为null;
出现异常:whenComplete返回结果为null,异常为上级任务的异常;
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。
示例代码:
package com.example.supplyAsyncExample;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SupplyAsyncExample03 {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> getById(1)).whenComplete((data, exception) -> {consumeData(data);if (exception != null){System.out.println(exception);}});completableFuture.get();}private static String getById(int id){System.out.println("getById:" + Thread.currentThread().getName());return "Data:" + id;}private static String consumeData(String data){System.out.println("sendData:" + Thread.currentThread().getName());System.out.println(data);return data;}}
6.stream流
package com.example.supplyAsyncExample;import java.util.Arrays;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class SupplyAsyncExample04 {public static void main(String[] args) throws InterruptedException, ExecutionException {List<Integer> list = Arrays.asList(1, 2, 3);long count = list.stream().map(n -> CompletableFuture.supplyAsync(() -> getById(n))).map(completableFuture -> completableFuture.thenApply(data -> sendData(data))).map(t -> t.join()).count();System.out.println("count: " + count);}private static String getById(int id) {System.out.println("getById:" + Thread.currentThread().getName());return "Data:" + id;}private static String sendData(String data) {System.out.println("sendData:" + Thread.currentThread().getName());System.out.println(data);return data;}}
输出结果:
getById:ForkJoinPool.commonPool-worker-1sendData:ForkJoinPool.commonPool-worker-1Data:1getById:ForkJoinPool.commonPool-worker-1sendData:ForkJoinPool.commonPool-worker-1Data:2getById:ForkJoinPool.commonPool-worker-1sendData:mainData:3count: 3