【Java 8 新特性】Java CompletableFuture supplyAsync()

supplyAsync()是Java 8引入的CompletableFuture静态方法。

1.supplyAsync(Supplier supplier)

supplyAsync()默认完成在ForkJoinPool.commonPool()或指定Executor中异步执行的任务。

方法声明:supplyAsync(Supplier supplier)

需要将Supplier作为任务传递给supplyAsync()方法。

默认情况下,此任务将在ForkJoinPool.commonPool()中异步完成执行,最后supplyAsync()将返回新的CompletableFuture,其值是通过调用给定的Supplier所获得的值。

代码示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture");System.out.println(completableFuture.get());

2.supplyAsync(Supplier supplier, Executor executor)

方法声明:supplyAsync(Supplier supplier, Executor executor)

需要将Supplier作为任务传递给supplyAsync()方法,并且指定Executor,任务将在给定的Executor中异步完成。最后supplyAsyncl()将返回具有通过调用给定Supplier所获得的值的新CompletableFuture

示例代码:

ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> "Hello CompletableFuture!", executorService );System.out.println(cf.get()); 

3.使用thenApply()

thenApply()通过传递阶段结果来执行一个函数。当supplyAsyncthenApply()一起使用,thenApply()将从supplyAsync()获得的参数传递来执行给定的函数。

thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class SupplyAsyncExample {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> getDataById(1)).thenApply(data -> sendData(data));cf.get();}private static String getDataById(int id) {System.out.println("getDataById: "+ Thread.currentThread().getName());return "Data:"+ id;}private static String sendData(String data) {System.out.println("sendData: "+ Thread.currentThread().getName());System.out.println(data);return data;}} 

输出结果:

getDataById: ForkJoinPool.commonPool-worker-1sendData: mainData:1

主线程开始执行代码,当代码到达supplyAsync()时,supplyAsync()ForkJoinPool.commonPool()获取新线程以异步执行其功能。

thenApply()将由主线程或supplyAsync()使用的线程执行。

如果supplyAsync()的Supplier花费的时间更长,则thenApply()将由supplyAsync()所使用的线程执行,因此主线程将不会被阻塞。

代码示例:

import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class SupplyAsyncExample {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> getDataById(1)).thenApply(data -> sendData(data));cf.get();}private static String getDataById(int id) {System.out.println("getDataById: "+ Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "Data:"+ id;} private static String sendData(String data) {System.out.println("sendData: "+ Thread.currentThread().getName());System.out.println(data);return data;}} 

输出:

getDataById: ForkJoinPool.commonPool-worker-1sendData: ForkJoinPool.commonPool-worker-1Data:1

4.自定义Executor

Executor作为入参传递给supplyAsync()

传递给supplyAsync()Supplier将由上面传入的Executor执行,而不是由ForkJoinPool.commonPool()执行。

package com.example.supplyAsyncExample;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SupplyAsyncExample02 {public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> getById(1),executorService).thenApply(data -> sendData(data));completableFuture.get();executorService.shutdown();}private static String getById(int id){System.out.println("getById:" + Thread.currentThread().getName());return "Data:" + id;}private static String sendData(String data){System.out.println("sendData:" + Thread.currentThread().getName());System.out.println(data);return data;}}

输出:

getById:pool-1-thread-1sendData:pool-1-thread-1Data1也有可能是:getById:pool-1-thread-1sendData:mainData1

5.使用whenComplete()

whenComplete()方法创建supplyAsync()

完成指定操作之后,wenComplete()返回具有相同结构或异常的新CompletionStage

当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete这回调函数。

正常完成:whenComplete返回结果和上级任务一致,异常为null;

出现异常:whenComplete返回结果为null,异常为上级任务的异常;

即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。

示例代码:

package com.example.supplyAsyncExample;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SupplyAsyncExample03 {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> getById(1)).whenComplete((data, exception) -> {consumeData(data);if (exception != null){System.out.println(exception);}});completableFuture.get();}private static String getById(int id){System.out.println("getById:" + Thread.currentThread().getName());return "Data:" + id;}private static String consumeData(String data){System.out.println("sendData:" + Thread.currentThread().getName());System.out.println(data);return data;}}

6.stream流

package com.example.supplyAsyncExample;import java.util.Arrays;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class SupplyAsyncExample04 {public static void main(String[] args) throws InterruptedException, ExecutionException {List<Integer> list = Arrays.asList(1, 2, 3);long count = list.stream().map(n -> CompletableFuture.supplyAsync(() -> getById(n))).map(completableFuture -> completableFuture.thenApply(data -> sendData(data))).map(t -> t.join()).count();System.out.println("count: " + count);}private static String getById(int id) {System.out.println("getById:" + Thread.currentThread().getName());return "Data:" + id;}private static String sendData(String data) {System.out.println("sendData:" + Thread.currentThread().getName());System.out.println(data);return data;}}

输出结果:

getById:ForkJoinPool.commonPool-worker-1sendData:ForkJoinPool.commonPool-worker-1Data1getById:ForkJoinPool.commonPool-worker-1sendData:ForkJoinPool.commonPool-worker-1Data2getById:ForkJoinPool.commonPool-worker-1sendData:mainData3count: 3