第 4 章 引入流
流可以认为是遍历数据集的高级迭代器。
流还可以透明地并行处理,无须写任何多线程代码
代码是以声明性方式写
可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在filter后面接上sorted、map和collect操作
filter、sorted、map和collect等操作是与具体线程模型无关的高层次构件
内部实现可以是单线程的,也可能透明地充分利用你的多核架构
Java 8中的Stream API可以让你写出这样的代码:
- 声明性——更简洁,更易读;
- 可复合——更灵活;
- 可并行——性能更好
流到底是什么?
- 简短的定义就是“从支持数据处理操作的源生成的元素序列”。
- 元素序列——就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如
ArrayList
与LinkedList
)。但流的目的在于表达计算,比如前面的filter
、sorted
和map
。集合讲的是数据,流讲的是计算。 - 源——流会使用一个提供数据的源,比如集合、数组或I/O资源。请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
- 数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,比如
filter
、map
、reduce
、find
、match
、sort
等。流操作可以顺序执行,也可以并行执行。
流操作有两个重要的特点。
- 流水线——很多流操作本身会返回一个流,这样多个操作就可以链接起来,构成一个更大的流水线。这使得下一章中将要讨论的一些优化成为可能,比如处理延迟和短路。流水线的操作可以看作类似对数据源进行数据库查询。
- 内部迭代——与集合使用迭代器进行显式迭代不同,流的迭代操作是在后台进行的。
集合与流之间的差异就在于什么时候进行计算。
集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中(你可以往集合里加东西或者删东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分)。
相比之下,流则是在概念上固定的数据结构(你不能添加或删除元素),其元素是按需计算的。这对编程有很大的好处。
是一种生产者–消费者的关系。
从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值(用管理学的话说这就是需求驱动,甚至是实时制造)。
与此相反,集合则是急切创建的(供应商驱动:先把仓库装满,再开始卖,就像那些昙花一现的圣诞新玩意儿一样)。以质数为例,要是想创建一个包含所有质数的集合,那这个程序算起来就没完没了了,因为总有新的质数要算,然后把它加到集合里面。当然这个集合是永远也创建不完的,消费者这辈子都见不着了。
用DVD对比在线流媒体的例子展示了流和集合之间的差异
和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。你可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样(这里假设它是集合之类的可重复的源,如果是I/O通道就“没戏”了)。例如,以下代码会抛出一个异常,说流已被消费掉了:
List<String> title = Arrays.asList("Modern", "Java", "In", "Action");Stream<String> s = title.stream();s.forEach(System.out::println); ←---- 打印标题中的每个单词s.forEach(System.out::println); ←---- java.lang.IllegalStateException:流已被操作或关闭
流只能消费一次!
哲学上:
流看作在时间中分布的一组值。相反,集合则是空间(这里就是计算机内存)中分布的一组值,在一个时间点上全体存在——你可以使用迭代器来访问for-each循环中的内部成员
集合和流的另一个关键区别在于它们遍历数据的方式
使用Collection接口需要用户去做迭代(比如用for-each),这称为外部迭代。
Stream库使用内部迭代——它帮你把迭代做了,还把得到的流值存在了某个地方
for-each还隐藏了迭代中的一些复杂性。for-each结构是一个语法糖,它背后的东西用Iterator对象表达不够优雅
List<String> names = new ArrayList<>();Iterator<String> iterator = menu.iterator();while(iterator.hasNext()) { ←---- 显式迭代 Dish dish = iterator.next(); names.add(dish.getName());}
List<String> names = menu.stream().map(Dish::getName) ←---- 用getName方法参数化map,提取菜名.collect(toList()); ←---- 开始执行操作流水线;没有迭代!
Java 8引入流的理由
Streams库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。
与此相反,一旦选择了for-each这样的外部迭代,那你基本上就要自己管理所有的并行问题了(自己管理实际上意味着“某个良辰吉日我们会把它并行化”或“开始了关于任务和synchronized的漫长而艰苦的斗争”)。Java 8需要一个类似于Collection却没有迭代器的接口,于是就有了Stream
流(内部迭代)与集合(外部迭代)之间的差异
流操作
可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作
中间操作
filter或sorted等中间操作会返回另一个流。这让多个操作可以连接起来形成一个查询。重要的是,除非流水线上触发一个终端操作,否则中间操作不会执行任何处理中间操作
一般都可以合并起来,在终端操作时一次性全部处理
filter和map是两个独立的操作,但它们合并到同一次遍历中了(循环合并:loop fusion)
终端操作
终端操作会从流的流水线生成结果,其结果是任何不是流的值,比如List、Integer,甚至void。例如,在下面的流水线中,forEach是一个返回void的终端操作
流的使用一般包括三件事:
- 一个数据源(如集合)来执行一个查询;
- 一个中间操作链,形成一条流的流水线;
- 一个终端操作,执行流水线,并能生成结果。
5.2流的切片
Java 9引入了两个新方法,可以高效地选择流中的元素,这两个方法分别是:takeWhile和dropWhile
List<Dish> specialMenu = Arrays.asList(new Dish("seasonal fruit", true, 120, Dish.Type.OTHER),new Dish("prawns", false, 300, Dish.Type.FISH),new Dish("rice", true, 350, Dish.Type.OTHER),new Dish("chicken", false, 400, Dish.Type.MEAT),new Dish("french fries", true, 530, Dish.Type.OTHER));
List<Dish> filteredMenu= specialMenu.stream() .filter(dish -> dish.getCalories() < 320) .collect(toList()); ←---- 由季节性的水果、虾构成的列表
filter 缺点。遍历所有。对每个都执行谓词操作,但可以第一个大于热量或者等于就停止
takeWhile
操作 谓词分片 第一个不符合就停止。类似break。
List<Dish> slicedMenu1= specialMenu.stream() .takeWhile(dish -> dish.getCalories() < 320) .collect(toList());
dropWhile
List<Dish> slicedMenu2= specialMenu.stream() .dropWhile(dish -> dish.getCalories() < 320) .collect(toList());
dropWhile操作是对takeWhile操作的补充。它会从头开始,丢弃所有谓词结果为false的元素。一旦遭遇谓词计算的结果为true,它就停止处理,并返回所有剩余的元素,即便要处理的对象是一个由无限数量元素构成的流,它也能工作得很好。
区别:
takewhile 保留true之前的结果。drop保留true之后的结果
截短流
limit(n),返回不超过给定长度的流。
List<Dish> dishes = specialMenu.stream().filter(dish -> dish.getCalories() > 300).limit(3).collect(toList());
类似SQL中
SELECT * FROM table LIMIT n
limit也可以用在无序流上,比如源是一个Set。
这种情况下,limit的结果不会以任何顺序排列
跳过元素
流还支持skip(n)方法,返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一个空流。
注意
limit(n)和skip(n)是互补的!例如,下面的代码将跳过热量超过300卡路里的头两道菜,并返回剩下的
List<Dish> dishes = menu.stream().filter(d -> d.getCalories() > 300).skip(2).collect(toList());
映射
在SQL里,你可以从表中选择一列。Stream API也通过map和flatMap方法提供了类似的工具
SELECT Column,Column,... FROM table
映射一词,是因为它和转换类似,但其中的细微差别在于它是“创建一个新版本”而不是去“修改”
List<String> dishNames = menu.stream().map(Dish::getName).collect(toList());
返回结果为一个String 类型的集合
给定单词列表[“Hello”,“World”],你想要返回列表[“H”,“e”,“l”, “o”,“W”,“r”,“d”]
words.stream() .map(word -> word.split("")) .distinct() .collect(toList());
使用map和Arrays.stream()
String[] arrayOfWords = {"Goodbye", "World"};Stream<String> streamOfwords = Arrays.stream(arrayOfWords);words.stream() .map(word -> word.split("")) ←---- 将每个单词转换为由其字母构成的数组 .map(Arrays::stream) ←---- 让每个数组变成一个单独的流 .distinct() .collect(toList());
使用flatMap
List<String> uniqueCharacters =words.stream() .map(word -> word.split("")) ←---- 将每个单词转换为由其字母构成的数组 .flatMap(Arrays::stream) ←---- 将各个生成流扁平化为单个流 .distinct() .collect(toList());
flatMap总结
flatMap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流
类似双重for循环
测试
//给定[1, 2, 3, 4, 5],应该返回[1, 4, 9, 16, 25]List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);List<Integer> squares =numbers.stream() .map(n -> n * n) .collect(toList());
给定两个数字列表,如何返回所有的数对
//列表[1, 2, 3]和列表[3, 4],应该返回[(1, 3), (1, 4), (2, 3), (2, 4), (3, 3), (3, 4)]List<Integer> numbers1 = Arrays.asList(1, 2, 3);List<Integer> numbers2 = Arrays.asList(3, 4);List<int[]> pairs =numbers1.stream().flatMap(i -> numbers2.stream().map(j -> new int[]{i, j})).collect(toList());
返回总和能被3整除的数对
//filter可以配合谓词使用来筛选流中的元素。因为在flatMap操作后,//你有了一个代表数对的int[]流,所以只需要一个谓词来检查总和是否能被3整除List<Integer> numbers1 = Arrays.asList(1, 2, 3);List<Integer> numbers2 = Arrays.asList(3, 4);List<int[]> pairs =numbers1.stream().flatMap(i -> numbers2.stream() .filter(j -> (i + j) % 3 == 0) .map(j -> new int[]{i, j})).collect(toList());
查找和匹配
匹配
allMatch、anyMatch、noneMatch、findFirst和findAny
类似SQL
SELECT * FROM table LIKE '[]';SELECT * FROM table LIKE '%xx%';SELECT * FROM table LIKE '[^]';SELECT * FROM table LIKE 'x%';SELECT * FROM table LIKE '%x%';
anyMatch方法可以回答“流中是否有一个元素能匹配给定的谓词”。
比如,你可以用它来看看菜单里面是否有素食可选择:
//SELECT * FROM table LIKE '%x%';if(menu.stream().anyMatch(Dish::isVegetarian)){System.out.println("The menu is (somewhat) vegetarian friendly!!");}
allMatch方法的工作原理和anyMatch类似,但它会看看流中的元素是否都能匹配给定的谓词。比如,你可以用它来看看菜品是否有利健康(即所有菜的热量都低于1000卡路里)
//SELECT * FROM table LIKE 'x';boolean isHealthy = menu.stream().allMatch(dish -> dish.getCalories() < 1000);
noneMatch
和allMatch相对的是noneMatch。它可以确保流中没有任何元素与给定的谓词匹配。比如,你可以用noneMatch重写前面的例子
//SELECT * FROM table LIKE '[^x]';boolean isHealthy = menu.stream().noneMatch(dish -> dish.getCalories() >= 1000);
anyMatch、allMatch和noneMatch这三个操作都用到了
所谓的短路,
这就是大家熟悉的Java中&&和||运算符短路在流中的版本
短路求值
有些操作不需要处理整个流就能得到结果。
例如,假设你需要对一个用and连起来的大布尔表达式求值。
不管表达式有多长,你只需找到一个表达式为false,就可以推断整个表达式将返回false,所以用不着计算整个表达式。这就是短路。
对于流而言,某些操作(例如allMatch、anyMatch、noneMatch、findFirst和findAny)不用处理整个流就能得到结果。只要找到一个元素,就可以有结果了。同样,limit也是一个短路操作:它只需要创建一个给定大小的流,而用不着处理流中所有的元素。在碰到无限大小的流的时候,这种操作就有用了:它们可以把无限流变成有限流
查找元素
findAny方法将返回当前流中的任意元素。
它可以与其他流操作结合使用。
比如,你可能想找到一道素食菜肴。可以结合使用filter和findAny方法来实现这个查询
Optional<Dish> dish =menu.stream().filter(Dish::isVegetarian).findAny();
流水线将在后台进行优化使其只需走一遍,并在利用短路找到结果时立即结束。
Optional简介
Optional类(java.util.Optional)是一个容器类,代表一个值存在或不存在。在上面的代码中,findAny可能什么元素都没找到。Java 8的库设计人员引入了Optional,这样就不用返回众所周知容易出问题的null了
- isPresent()将在Optional包含值的时候返回true, 否则返回false。
- ifPresent(Consumer block)会在值存在的时候执行给定的代码块。Consumer函数式接口,它让你传递一个接受T类型参数,并返回void的Lambda表达式。
- T get()会在值存在时返回值,否则抛出一个NoSuchElement异常。
- T orElse(T other)会在值存在时返回值,否则返回一个默认值
menu.stream().filter(Dish::isVegetarian).findAny() ←---- 返回一个Optional<Dish>.ifPresent(dish -> System.out.println(dish.getName());
查找第一个元素
有些流由一个**出现顺序(encounter order)**来指定流中项目出现的逻辑顺序(比如由List或排序好的数据列生成的流)。对于这种流,你可能想要找到第一个元素。为此有一个findFirst方法,它的工作方式类似于findAny。
例如,给定一个数字列表,下面的代码能找出第一个平方能被3整除的数
List<Integer> someNumbers = Arrays.asList(1, 2, 3, 4, 5);Optional<Integer> firstSquareDivisibleByThree =someNumbers.stream() .map(n -> n * n) .filter(n -> n % 3 == 0) .findFirst(); // 9
何时使用findFirst和findAny?
你可能会想,为什么会同时有findFirst和findAny呢?答案是并行。
找到第一个元素在并行上限制更多。如果你不关心返回的元素是哪个,请使用findAny,因为它在使用并行流时限制较少。
规约
终端操作都是返回一个boolean(allMatch之类的)、void(forEach)或Optional对象(findAny等)、collect来将流中的所有元素组合成一个List
流中的元素组合起来,使用reduce操作来表达更复杂的查询
假设计算菜单中的总卡路里”或“菜单中卡路里最高的菜是哪一个”。
此类查询需要将流中所有元素反复结合起来,得到一个值,比如一个Integer。这样的查询可以被归类为归约操作(将流归约成一个值)。用函数式编程语言的术语来说,这称为折叠(fold),因为你可以将这个操作看成把一张长长的纸(你的流)反复折叠成一个小方块,而这就是折叠操作的结果
int sum = 0;for (int x : numbers) {sum += x;}
numbers中的每个元素都用加法运算符反复迭代来得到结果。通过反复使用加法,把一个数字列表归约成了一个数字。这段代码中有两个参数:
- 总和变量的初始值,在这里是0;
- 将列表中所有元素结合在一起的操作,在这里是+。
//等同于int sum = numbers.stream().reduce(0, (a, b) -> a + b);int product = numbers.stream().reduce(1, (a, b) -> a * b);
reduce接受两个参数:
- 一个初始值,这里是0;
- 一个BinaryOperator来将两个元素结合起来产生一个新值,这里用的是lambda (a, b) -> a + b。
深入研究一下reduce操作是如何对一个数字流求和的。
首先,0作为Lambda的第一个参数(a),从流中获得4作为第二个参数(b)。0 + 4得到4,它成了新的累积值。然后再用累积值和流中下一个元素5调用Lambda,产生新的累积值9。
接下来,再用累积值和下一个元素3调用Lambda,得到12。最后,用12和流中最后一个元素9调用Lambda,得到最终结果21。
Integer类现在有了一个静态的sum方法来对两个数求和,这恰好是我们想要的,用不着反复用Lambda写同一段代码了
int sum = numbers.stream().reduce(0, Integer::sum);
reduce还有一个重载的变体,它不接受初始值,但是会返回一个Optional对象:
Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));
为什么它返回一个Optional呢?
考虑流中没有任何元素的情况。reduce操作无法返回其和,因为它没有初始值。这就是为什么结果被包裹在一个Optional对象里,以表明和可能不存在
最大值和最小值
需要一个给定两个元素能够返回最大值的Lambda。reduce操作会考虑新值和流中下一个元素,并产生一个新的最大值,直到整个流消耗完!你可以像下面这样使用reduce来计算流中的最大值
Optional<Integer> max = numbers.stream().reduce(Integer::max);//最小值Optional<Integer> min = numbers.stream().reduce(Integer::min);//等同于Lambda (x, y) -> x < y " />: y而不是Integer::min
int count = menu.stream().map(d -> 1).reduce(0, (a, b) -> a + b);//等同于long count = menu.stream().count();
相比于前面写的逐步迭代求和,使用reduce的好处在于,这里的迭代被内部迭代抽象掉了,这让内部实现得以选择并行执行reduce操作。
而迭代式求和例子要更新共享变量sum,这不是那么容易并行化的。如果你加入了同步,很可能会发现线程竞争抵消了并行本应带来的性能提升!
这种计算的并行化需要另一种办法:将输入分块,分块求和,最后再合并起来。但这样的话代码看起来就完全不一样了
可变的累加器模式对于并行化来说是死路一条
int sum = numbers.parallelStream().reduce(0, Integer::sum);//传递给reduce的Lambda不能更改状态(如实例变量),而且操作必须满足结合律才可以按任意顺序执行。
流操作:无状态和有状态
诸如map或filter等操作会从输入流中获取每一个元素,并在输出流中得到0或1个结果。这些操作一般都是无状态的:它们没有内部状态(假设用户提供的Lambda或方法引用没有内部可变状态)。
但诸如reduce、sum、max等操作需要内部状态来累积结果。在上面的情况下,内部状态很小。在我们的例子里就是一个int或double。不管流中有多少元素要处理,内部状态都是有界的。
相反,诸如sort或distinct等操作一开始都与filter和map差不多——都是接受一个流,再生成一个流(中间操作),但有一个关键的区别。从流中排序和删除重复项时都需要知道先前的历史。
例如,排序要求所有元素都放入缓冲区后才能给输出流加入一个项目,这一操作的存储要求是无界的。要是流比较大或是无限的,就可能会有问题(把质数流倒序会做什么呢?它应当返回最大的质数,但数学告诉我们它不存在)。我们把这些操作叫作有状态操作
中间操作和终端操作
操作 | 类型 | 返回类型 | 使用的类型/函数式接口 | 函数描述符 |
---|---|---|---|---|
filter | 中间 | Stream | Predicate | T -> boolean |
distinct | 中间(有状态–无界) | Stream | ||
takeWhile | 中间 | Stream | Predicate | T -> boolean |
dropWhile | 中间 | Stream | Predicate | T -> boolean |
skip | 中间(有状态–无界) | Stream | long | |
limit | 中间(有状态–无界) | Stream | long | |
map | 中间 | Stream | Function | T -> R |
flatMap | 中间 | Stream | Function | T -> Stream |
sorted | 中间(有状态–无界) | Stream | Comparator | (T, T) -> int |
anyMatch | 终端 | boolean | Predicate | T -> boolean |
noneMatch | 终端 | boolean | Predicate | T -> boolean |
allMatch | 终端 | boolean | Predicate | T -> boolean |
findAny | 终端 | Optional | ||
findFirst | 终端 | Optional | ||
forEach | 终端 | void | Consumer | T -> void |
collect | 终端 | R | Collector | |
reduce | 终端(有状态–有界) | Optional | BinaryOperator | (T, T) -> T |
count | 终端 | long |
实践
执行交易的交易员
(1) 找出2011年发生的所有交易,并按交易额排序(从低到高)。(2) 交易员都在哪些不同的城市工作过?(3) 查找所有来自于剑桥的交易员,并按姓名排序。(4) 返回所有交易员的姓名字符串,按字母顺序排序。(5) 有没有交易员是在米兰工作的?(6) 打印生活在剑桥的交易员的所有交易额。(7) 所有交易中,最高的交易额是多少?(8) 找到交易额最小的交易。
@Datapublic class Trader{private final String name;private final String city;public Trader(String n, String c){this.name = n;this.city = c;}}// 另外一个类@Datapublic class Transaction{private final Trader trader;private final int year;private final int value;}
Trader raoul = new Trader("Raoul", "Cambridge");Trader mario = new Trader("Mario","Milan");Trader alan = new Trader("Alan","Cambridge");Trader brian = new Trader("Brian","Cambridge");List<Transaction> transactions = Arrays.asList(new Transaction(brian, 2011, 300),new Transaction(raoul, 2012, 1000),new Transaction(raoul, 2011, 400),new Transaction(mario, 2012, 710),new Transaction(mario, 2012, 700),new Transaction(alan, 2012, 950));
List<Transaction> tr2011 =transactions.stream().filter(transaction -> transaction.getYear() == 2011) ←---- 给filter传递一个谓词来选择2011年的交易.sorted(comparing(Transaction::getValue)) ←---- 按照交易额进行排序.collect(toList()); ←---- 将生成的Stream中的所有元素收集到一个List中
List<String> cities =transactions.stream().map(transaction -> transaction.getTrader().getCity())←---- 提取与交易相关的每位交易员的所在城市.distinct() ←---- 只选择互不相同的城市.collect(toList());//你可以去掉distinct(),改用toSet(),这样就会把流转换为集合Set<String> cities =transactions.stream().map(transaction -> transaction.getTrader().getCity()).collect(toSet());
List<Trader> traders =transactions.stream().map(Transaction::getTrader) ←---- 从交易中提取所有交易员.filter(trader -> trader.getCity().equals("Cambridge")) ←---- 仅选择位于剑桥的交易员.distinct() ←---- 确保没有任何重复.sorted(comparing(Trader::getName)) ←---- 对生成的交易员流按照姓名进行排序.collect(toList());
String traderStr =transactions.stream().map(transaction -> transaction.getTrader().getName()) ←---- 提取所有交易员姓名,生成一个Strings构成的Stream.distinct() ←---- 只选择不相同的姓名.sorted() ←---- 对姓名按字母顺序排序.reduce("", (n1, n2) -> n1 + n2); ←---- 逐个拼接每个名字,得到一个将所有名字连接起来的String//此解决方案效率不高(所有字符串都被反复连接,每次迭代的时候都要建立一个新的String对象)//高效的解决方案,它像下面这样使用joining(其内部会用到StringBuilder)String traderStr =transactions.stream().map(transaction -> transaction.getTrader().getName()).distinct().sorted().collect(joining());
boolean milanBased =transactions.stream().anyMatch(transaction -> transaction.getTrader().getCity().equals("Milan")); ←---- 把一个谓词传递给anyMatch,检查是否有交易员在米兰工作
transactions.stream().filter(t -> "Cambridge".equals(t.getTrader().getCity())) ←---- 选择住在剑桥的交易员所进行的交易.map(Transaction::getValue) ←---- 提取这些交易的交易额.forEach(System.out::println); ←---- 打印每个值
Optional<Integer> highestValue =transactions.stream().map(Transaction::getValue) ←---- 提取每项交易的交易额.reduce(Integer::max); ←---- 计算生成的流中的最大值
Optional<Transaction> smallestTransaction =transactions.stream().reduce((t1, t2) ->t1.getValue() < t2.getValue() " />: t2); ←---- 通过反复比较每个交易的交易额,找出最小的交易
//流支持min和max方法,//它们可以接受一个Comparator作为参数,指定计算最小或最大值时要比较哪个键值:Optional<Transaction> smallestTransaction =transactions.stream().min(comparing(Transaction::getValue));
数值流
int calories = menu.stream() .map(Dish::getCalories) .reduce(0, Integer::sum);// 这段代码的问题是,它有一个暗含的装箱成本。// 每个Integer都必须拆箱成一个原始类型,// 再进行求和。要是可以直接像下面这样调用sum方法,岂不是更好?int calories = menu.stream() .map(Dish::getCalories) .sum();// 但这是不可能的。问题在于map方法会生成一个Stream。// 虽然流中的元素是Integer类型,但Stream接口没有定义sum方法
Java 8引入了三个原始类型特化流接口来解决这个问题:
IntStream、DoubleStream和LongStream,
分别将流中的元素特化为int、long和double从而避免了暗含的装箱成本。每个接口都带来了进行常用数值归约的新方法,比如对数值流求和的sum,找到最大元素的max。此外还有在必要时再把它们转换回对象流的方法。要记住的是,这些特化的原因并不在于流的复杂性,而是装箱造成的复杂性——即类似int和Integer之间的效率差异。
int calories = menu.stream() ←---- 返回一个Stream<Dish> .mapToInt(Dish::getCalories) ←---- 返回一个IntStream .sum();
如果流是空的,sum则默认返回0。
IntStream还支持其他的方便方法,如max、min、average等
转换回对象流
IntStream intStream = menu.stream().mapToInt(Dish::getCalories);←---- 将Stream转换为数值流Stream<Integer> stream = intStream.boxed(); ←---- 将数值流转换为Stream
Optional可以用Integer、String等参考类型来参数化。对于三种原始流特化,也分别有一个Optional原始类型特化版本:OptionalInt、OptionalDouble和OptionalLong。
Optional 是为了清晰地表达返回值中没有结果的可能性
OptionalInt maxCalories = menu.stream().mapToInt(Dish::getCalories).max();int max = maxCalories.orElse(1); ←---- 如果没有最大值的话,显式提供一个默认最大值
IntStream evenNumbers = IntStream.rangeClosed(1, 100) ←---- 表示范围[1, 100].filter(n -> n % 2 == 0); ←---- 一个从1到100的偶数流System.out.println(evenNumbers.count()); ←---- 从1到100有50个偶数
数值流应用:勾股数(毕达哥拉斯三元数)
new int[]{3, 4, 5},来表示勾股数(3, 4, 5)//Java中可以这么表述:Math.sqrt(a*a + b*b) % 1 == 0//(对于浮点数x,它的分数部分在Java中可以使用x % 1.0表示,//譬如5.0这样的整数,它的分数部分是0)//filter过滤filter(b -> Math.sqrt(a*a + b*b) % 1 == 0)//生成三元组stream.filter(b -> Math.sqrt(a*a + b*b) % 1 == 0).map(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)});//Stream.rangeClosed让你可以在给定区间内生成一个数值流。//可以用它来给b提供数值,这里是1到100IntStream.rangeClosed(1, 100) .filter(b -> Math.sqrt(a*a + b*b) % 1 == 0) .boxed() .map(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)});
filter之后调用boxed,从rangeClosed返回的IntStream生成一个Stream。这是因为你的map会为流中的每个元素返回一个int数组。而IntStream中的map方法只能为流中的每个元素返回另一个int
//用IntStream的mapToObj方法改写它,这个方法会返回一个对象值流:IntStream.rangeClosed(1, 100) .filter(b -> Math.sqrt(a*a + b*b) % 1 == 0) .mapToObj(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)});
Stream<int[]> pythagoreanTriples =IntStream.rangeClosed(1, 100).boxed() .flatMap(a -> IntStream.rangeClosed(a, 100).filter(b -> Math.sqrt(a*a + b*b) % 1 == 0).mapToObj(b ->new int[]{a, b, (int)Math.sqrt(a * a + b * b)}) );
flatMap又是怎么回事呢?首先,创建一个从1到100的数值范围来生成a的值。对每个给定的a值,创建一个三元数流。要是把a的值映射到三元数流的话,就会得到一个由流构成的流。flatMap方法在做映射的同时,还会把所有生成的三元数流扁平化成一个流。这样你就得到了一个三元数流。还要注意,我们把b的范围改成了a到100。没有必要再从1开始了,否则就会造成重复的三元数,例如(3,4,5)和(4,3,5)。
pythagoreanTriples.limit(5).forEach(t -> System.out.println(t[0] + ", " + t[1] + ", " + t[2]));//结果3, 4, 55, 12, 136, 8, 107, 24, 258, 15, 17
//先 生成所有的三元数(a*a, b*b, a*a+b*b),然后再筛选符合条件Stream<double[]> pythagoreanTriples2 =IntStream.rangeClosed(1, 100).boxed() .flatMap(a -> IntStream.rangeClosed(a, 100).mapToObj(b -> new double[]{a, b, Math.sqrt(a*a + b*b)}) ←---- 产生三元数.filter(t -> t[2] % 1 == 0)); ←---- 元组中的第三个元素必须是整数
值创建流
使用静态方法Stream.of,通过显式值创建一个流。
它可以接受任意数量的参数。例如,以下代码直接使用Stream.of创建了一个字符串流。
Stream<String> stream = Stream.of("Modern ", "Java ", "In ", "Action");stream.map(String::toUpperCase).forEach(System.out::println);//使用empty得到一个空流,如下所示:Stream<String> emptyStream = Stream.empty();
Java 9提供了一个新方法可以由一个可空对象创建流。
使用流的过程中,即处理的对象有可能为空,而又需要把它们转换成流(或者由null构成的空的流)进行处理。
譬如,如果对象不存在指定键对应的属性,方法System.getProperty就会返回一个null。为了使用流处理它,需要显式地检查对象值是否为空.
String homeValue = System.getProperty("home");Stream<String> homeValueStream= homeValue == null " />Stream.empty() : Stream.of(value);//借助于Stream.ofNullable,这段代码可以改写得更加简洁:Stream<String> homeValueStream= Stream.ofNullable(System.getProperty("home"));//这种模式搭配flatMap处理由可空对象构成的流时尤其方便:Stream<String> values =Stream.of("config", "home", "user").flatMap(key -> Stream.ofNullable(System.getProperty(key)));
数组创建流
静态方法Arrays.stream从数组创建一个流。它接受一个数组作为参数。
将一个原始类型int的数组转换成一个IntStream,然后对IntStream求和以生成int
int[] numbers = {2, 3, 5, 7, 11, 13};int sum = Arrays.stream(numbers).sum(); ←---- 总和是41
文件生成流
Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。
java.nio.file.Files中的很多静态方法都会返回一个流。例如,一个很有用的方法是Files.lines,它会返回一个由指定文件中的各行构成的字符串流。stream流方式一个文件中有多少各不相同的词:
long uniqueWords = 0;try(Stream<String> lines =Files.lines(Paths.get("data.txt"), Charset.defaultCharset())){ ←---- 流会自动关闭,因此不需要执行额外的try-finally操作uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" "))) ←---- 生成单词流 .distinct() ←---- 删除重复项 .count(); ←---- 数一数有多少不重复的单词}catch(IOException e){ ←---- 如果打开文件时出现异常则加以处理}//data.txtThe quick brown fox jumped over the lazy dogThe lazy dog jumped over the quick brown fox
使用Files.lines得到一个流,其中的每个元素都是给定文件中的一行。因为流的源头是一个I/O资源,所以这个调用环绕在一个try/catch块中。事实上,调用Files.lines会打开一个I/O资源,这些I/O资源使用完毕后必须被关闭,否则会发生资源泄漏。
在过去,需要显式地声明一个finally块来完成这些回收工作。现在Stream接口通过实现AutoCloseable接口,资源的管理都由try代码块全权负责。
flatMap是如何生成一个扁平单词流的,而不是生成多个流,每一行一个单词流。最后通过串接distinct和count方法,统计了流中有多少不重复的单词。
由函数生成流:创建无限流
Stream API提供了两个静态方法来从函数生成流:
Stream.iterate和Stream.generate。
这两个操作可以创建所谓的无限流:不像从固定集合创建的流那样有固定大小的流。
由iterate和generate产生的流会用给定的函数按需创建值,因此可以无穷无尽地计算下去!一般来说,应该使用limit(n)来对这种流加以限制,以避免打印无穷多个值。
Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println);iterate方法接受一个初始值(在这里是0),还有一个依次应用在每个产生的新值上的Lambda(UnaryOperator<t>类型)。//0+2,2+2,4+2...这里,使用Lambda n -> n + 2,返回的是前一个元素加上2。因此,iterate方法生成了一个所有正偶数的流:流的第一个元素是初始值0。然后加上2来生成新的值2,再加上2来得到新的值4,以此类推。这种iterate操作基本上是顺序的,因为结果取决于前一次应用。请注意,此操作将生成一个无限流——这个流没有结尾,因为值是按需计算的,可以永远计算下去。我们说这个流是无界的。正如前面所讨论的,这是流和集合之间的一个关键区别。我们使用limit方法来显式限制流的大小。这里只选择了前10个偶数
一般来说,在需要依次生成一系列值的时候应该使用iterate,比如一系列日期:1月31日,2月1日,以此类推。
斐波那契数列的一部分:0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55…数列中开始的两个数字是0和1,后续的每个数字都是前两个数字之和
元组构成的序列:(0, 1), (1, 1), (1, 2), (2, 3), (3, 5), (5, 8), (8, 13), (13, 21) …
iterate方法要接受一个UnaryOperator<t>作为参数iterate会按顺序应用给定的LambdaStream.iterate(new int[]{0, 1}, t -> new int[]{t[1], t[0]+t[1]}).limit(20).forEach(t -> System.out.println("(" + t[0] + "," + t[1] +")"));iterate需要一个Lambda来确定后续的元素。对于元组(3, 5),其后续元素是(5, 3+5) = (5, 8)。下一个是(8, 5+8)。给定一个元组,其后续的元素是(t[1],t[0]+t[1])。这可以用这个Lambda来计算:t->new int[]{t[1], t[0]+t[1]}。运行这段代码,(0, 1), (1, 1), (1, 2), (2, 3), (3, 5), (5, 8), (8, 13), (13, 21)
只是打印
Stream.iterate(new int[]{0, 1}, t -> new int[]{t[1],t[0] + t[1]}).limit(10).map(t -> t[0]).forEach(System.out::println);这段代码将生成斐波那契数列:0, 1, 1, 2, 3, 5, 8, 13, 21, 34…
Java9增强
Java 9对iterate方法进行了增强,它现在可以支持谓词操作了。譬如,你可以由0开始生成一个数字序列,一旦数字大于100就停下来:iterate方法的第二个参数是一个谓词,它决定了迭代调用何时终止IntStream.iterate(0, n -> n < 100, n -> n + 4) .forEach(System.out::println);使用filter操作完全能实现同样的效果:IntStream.iterate(0, n -> n + 4) .filter(n -> n < 100) .forEach(System.out::println);实际上,这段代码根本停不下来!原因在于,filter根本无法了解数字是否需要持续递增,因此它只能不停地执行过滤操作!takeWhile解决这个问题,它能对流执行短路操作:IntStream.iterate(0, n -> n + 4) .takeWhile(n -> n < 100) .forEach(System.out::println);
生成
与iterate方法类似,generate方法也可让你按需生成一个无限流。
但generate不是依次对每个新生成的值应用函数的。
它接受一个Supplier类型的Lambda提供新的值。
Stream.generate(Math::random).limit(5).forEach(System.out::println);这段代码将生成一个流,其中有五个0到1之间的随机双精度数。例如,运行一次得到了下面的结果:0.94108102941061290.65862707556345920.95928591172668730.137433966594870060.3942776037651241Math.Random静态方法被用作新值生成器。同样,你可以用limit方法显式限制流的大小,否则流将会无限长。使用IntStream说明避免装箱操作的代码。IntStream的generate方法会接受一个IntSupplier,而不是Supplier<t>。例如,可以这样来生成一个全是1的无限流:IntStream ones = IntStream.generate(() -> 1);
错误示范
Lambda允许你创建函数式接口的实例,只要直接内联提供方法的实现就可以。你也可以像下面这样,通过实现IntSupplier接口中定义的getAsInt方法显式传递一个对象:IntStream twos = IntStream.generate(new IntSupplier(){public int getAsInt(){return 2;}});generate方法将使用给定的供应源,并反复调用getAsInt方法,而这个方法总是返回2。但这里使用的匿名类和Lambda的区别在于,匿名类可以通过字段定义状态,而状态又可以用getAsInt方法来修改。这是一个副作用的例子。大部分常用的lambda表达式没有副作用
回到斐波那契数列的任务上,现在需要做的是建立一个IntSupplier,它要把前一项的值保存在状态中,以便getAsInt用它来计算下一项。此外,在下一次调用它的时候,还要更新IntSupplier的状态。下面的代码就是如何创建一个在调用时返回下一个斐波那契项的IntSupplier:IntSupplier fib = new IntSupplier(){private int previous = 0;private int current = 1;public int getAsInt(){int oldPrevious = this.previous;int nextValue = this.previous + this.current;this.previous = this.current;this.current = nextValue;return oldPrevious;}};IntStream.generate(fib).limit(10).forEach(System.out::println);
处理的是一个无限流,所以必须使用limit操作来显式限制它的大小。否则,终端操作将永远计算下去。同样,不能对无限流做排序或归约,因为所有元素都需要处理,而这永远也完不成!
总结
Stream API可以表达复杂的数据处理查询。常用的流操作总结在表5-1中。
- filter、distinct、takeWhile (Java 9)、dropWhile (Java 9)、skip和limit对流做筛选和切片。
- 明确地知道数据源是排序的,那么用takeWhile和dropWhile方法通常比filter高效得多。
- map和flatMap提取或转换流中的元素。
- findFirst和findAny方法查找流中的元素。
- allMatch、noneMatch和anyMatch方法让流匹配给定的谓词。这些方法都利用了短路:找到结果就立即停止计算;没有必要处理整个流。
- 你可以利用reduce方法将流中所有的元素迭代合并成一个结果,例如求和或查找最大元素。
- filter和map等操作是无状态的,它们并不存储任何状态。reduce等操作要存储状态才能计算出一个值。sorted和distinct等操作也要存储状态,因为它们需要把流中的所有元素缓存起来才能返回一个新的流。这种操作称为有状态操作。
- 流有三种基本的原始类型特化:IntStream、DoubleStream和LongStream。它们的操作也有相应的特化。
- 流不仅可以从集合创建,也可从值、数组、文件以及iterate与generate等特定方法创建。
- 无限流所包含的元素数量是无限的(想象一下所有可能的字符串构成的流)。这种情况是有可能的,因为流中的元素大多数都是即时产生的。使用limit方法,你可以由一个无限流创建一个有限流。