zl程序教程

您现在的位置是:首页 >  后端

当前栏目

[零]java8 函数式编程入门官方文档中文版 java.util.stream 中文版 流处理的相关概念详解编程语言

2023-06-13 09:20:45 时间
然后在流上执行一个filter-map-reduce 来获得红色widgets重量的总和。(总和是一个归约(reduce)操作的例子)
类 Stream、IntStream、LongStream和DoubleStream分别是在对象Object和基本类型int、long和double类型上的流。
不存储数据  流不是存储元素的数据结构;相反,它通过一个哥哥计算操作组合而成的管道,从一个数据源,如数据结构、数组、生成器函数或i/o通道  来传递元素  函数特性  一个流上的操作产生一个结果,但是不会修改它的源。例如,过滤集合 获得的流会产生一个没有被过滤元素的新流,而不是从源集合中删除元素 延迟搜索    许多流操作,如过滤、映射或重复删除,都可以延迟实现,从而提供出优化的机会。    例如,“找到带有三个连续元音的第一个字符串”不需要检查所有的输入字符串。   流操作分为中间(流生成)操作和终端(值或副作用生成)操作。许多的中间操作, 如filter,map等,都是延迟执行。   中间操作总是惰性的的。 Stream可能是无限的  虽然集合的大小是有限的,但流不需要。诸如limit(n)或findFirst()这样的短路操作可以允许在有限时间内完成无限流的计算。  消耗的  流的元素只在流的生命周期中访问一次。就像迭代器一样,必须生成一个新的流来重新访问源的相同元素 
Collection 提供的stream  parallelStream  从数组 Arrays.stream(Object[]) 静态方法  Stream类的静态工厂方法 比如  Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator)  Stream.generate  BufferedReader.lines(); 文件行 获取文件路径的流: Files类的find(), lines(), list(), walk(); Random.ints()  随机数流 JDK中的许多其他流载方法,包括BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), and JarFile.stream().
一条流管道由一个源(如一个集合、一个数组、一个生成器函数或一个i/o通道)组成; 然后是零个或更多的中间操作,例如stream.filter 或者stream.map; 还有一个终端操作,如stream.forEach或Stream.reduce
执行诸如filter()之类的中间操作实际上并不会立即执行任何过滤操作,而是创建了一个新流,当遍历时,它包含与给定谓词相匹配的初始流的元素。直到管道的终端操作被执行,管道源的遍历才会开始  
在几乎所有情况下,终端操作都很迫切,在返回之前完成了数据源的遍历和管道的处理。只有终端操作iterator() 和 spliterator() 不是;
这些都是作为一个“逃生舱口”提供的,以便在现有操作不足以完成任务的情况下,启用任意客户控制的管道遍历
在像上面的filer-map-sum例子这样的管道中,过滤、映射和求和可以被融合到数据的单个传递中,并且具有最小的中间状态。
惰性还允许在没有必要的情况下避免检查所有数据;对于诸如“查找第一个超过1000个字符的字符串”这样的操作,只需要检查足够的字符串,就可以找到具有所需特征的字符串,而不需要检查源的所有字符串。(当输入流是无限的而不仅仅是大的时候,这种行为就变得更加重要了。)
无状态操作,如filter和map,在处理新元素时不保留以前处理的元素的状态——每个元素都可以独立于其他元素的操作处理。
因此,在并行计算下,一些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。
包含完全无状态的中间操作的管道可以在单次传递过程中进行处理,无论是顺序的还是并行的,只有最少的数据缓冲
此外,一些操作被认为是短路操作。一个中间操作,如果在提供无限流输入时,它可能会产生一个有限的流,那么他就是短路的。
流通过将计算重新定义为聚合操作的管道,而不是在每个单独元素上立即执行操作,从而促进并行执行。
例如,Collection有方法Collection.stream()和Collection.parallelstream(),它们分别产生串行和并行流;
其他的流方法比如  IntStream.range(int, int) 产生串行的流,但是可以通过调用BaseStream.parallel()方法设置为 并行化
这个例子的串行和并行版本的唯一区别是初始时创建流,使用parallelStream()而不是stream()
除了被确定为显式非确定性的操作之外,如findAny(),无论是顺序执行还是并行执行,都不应该改变计算的结果。
为了保持正确的行为,这些行为参数必须是不干涉non-interfering的,并且在大多数情况下必须是无状态的。
Streams允许您在各种数据源上执行可能并行的聚合操作,甚至包括ArrayList之类的非线程安全集合。
除了逃脱舱口iterator()和spliterator()之外,都是在调用终端操作时开始执行,并在终端操作完成时结束。
这方面的一个显著的例外是源是并发集合的流,它们是专门设计用来处理并发修改的。并发流源是那些Spliterator 设置了并发特性(CONCURRENT characteristic)
一个行为参数将被称之为干扰的(interfere) 如果对于一个非并发数据源来说如果它修改或导致被修改数据源被修改.
除非流数据源是并发的,否则在执行流管道时修改stream的数据源可能会导致异常、错误的答案或不一致的行为。
对于表现良好的stream,数据源是可以修改的,只要是在终端操作开始之前,并且所有的修改都会包含在内
最后,流的元素被collect 以及joining在一起。由于该列表在终端收集操作开始之前被修改,结果将是一串“one two three”。
对于其他库生成的流,请参阅 Low-level stream construction,以满足构建行为良好的流的需求。
有状态的lambda(或实现适当的功函数接口的其他对象)是一个其结果依赖于任何可能在流水线执行过程中发生变化的状态。
在这里,如果映射操作是并行执行的,那么相同输入的结果可能因线程调度差异而变化,而对于无状态lambda表达式,结果总是相同的
最好的方法是在流操作中完全地避免有状态的行为参数; 通常总会有种方法可以重构流以避免状态性
一般来说,对流操作的行为参数的副作用是不鼓励的,因为它们通常会导致不知情的违反无状态要求的行为,以及其他线程安全隐患
如果行为参数确实有副作用,除非显式地声明,否则就无法保证这些副作用对其他线程的可见性,也不能保证在同一条管道内的“相同”元素上的不同操作在相同的线程中执行。此外,这些影响的排序可能出乎意料。即使管道被限制生成一个与stream源的处理顺序一致的结果(例如,IntStream.range(0,5).parallel().map(x - x*2).toArray() 必须生成0、2、4、6、8),对于将mapper函数应用于个别元素的顺序,或者对于给定元素执行任何行为参数的顺序,都没有保证
对许多可能会被尝试使用于副作用的计算中,可以替换为无副作用的,更安全更有效的表达,比如使用归约而不是可变的累积器。
然而,使用println()来进行调试的副作用通常是无害的。少部分的流操作,如forEach()和peek(),用的就是他们的副作用;这些应该小心使用。
这段代码不必要地使用了副作用。如果并行执行,ArrayList的非线程安全将导致不正确的结果,并且添加所需的同步将导致竞争,从而破坏并行性的好处。
此外,在这里使用副作用是完全没有必要的;forEach()可以简单地被替换为更安全、更高效、更适合并行化的reduce操作。
流可能有也可能没有定义好的顺序。流是否有顺序取决于源和中间操作。(所谓定义好的顺序,就是说原始数据源是否有序)
一些中间操作,比如sorted(),可以在无序的流中强加一个顺序,而其他的操作可能会使一个有序的流变成无序,例如BaseStream.unordered(). 
如果元素的排序不是很重要,那么可以更有效地实现某些聚合操作,如过滤重复元素(distinct()  )或分组归约(Collectors.groupingBy())。
类似地,与顺序相关的操作,如limit(),可能需要缓冲以确保正确的排序,从而破坏并行性的好处。
在流有顺序的情况下,但是用户并不特别关心这个顺序,显式地通过unordered()方法调用取消排序, 可能会改善一些有状态或终端操作的并行性能。
然而,大多数的流管道,例如上面的“blocks的重量总和”,即使在排序约束下仍然有效地并行化。
一个归约操作(也称为折叠)接受一系列的输入元素,并通过重复应用组合操作将它们组合成一个简单的结果,例如查找一组数字的总和或最大值,或者将元素累积到一个列表中。streams类有多种形式的通用归约reduce操作,称为reduce()和collect(),以及多个专门化的简化形式,如sum()、max()或count()
它不仅是一个“更抽象的”——它在流上把流作为一个整体运行而不是作用于单独的元素——但是一个适当构造的reduce操作本质上是可并行的,只要用于处理元素的函数(s)是结合的associative和无状态stateless的。举个例子,给定一个数字流,我们想要找到和,我们可以写:
之所以归约操作可以很好地并行,是因为实现可以并行地操作数据的子集,然后将中间结果组合在一起,得到最终的正确答案。(即使该语言有一个“ parallel for-each ”构造,迭代累计运算方法仍然需要开发人员提供对共享累积变量sum的线程安全更新以及所需的同步,这可能会消除并行性带来的任何性能收益。)
使用reduce()代替了归约操作的并行化的所有负担,并且库可以提供一个高效的并行实现,不需要额外的同步
在更通用的形式中  对类型为T的元素,并且返回结果类型为U的reduce操作  需要三个参数:
(在并行减少的情况下,组合是必要的,在这个过程中,输入被分区,每个分区都计算出部分的累积,然后将部分结果组合起来产生最终的结果。)
更准确地说,identity必须是组合函数的恒等式。这意味着对所有的u,combiner.apply(identity, u)等于u,
一个可变的归约操作在处理流中的元素时,将输入元素积累到一个可变的结果容器中,例如一个Collection或StringBuilder,
如果我们想要获取一串字符串的流并将它们连接成一个长字符串,我们可以通过普通的reduce来实现这个目标:
我们可以并行地累计运算部分结果,然后将它们组合起来,只要积累和组合功能满足适当的需求。
在这里,我们的supplier只是ArrayList的构造器,累加器将string  element元素添加到ArrayList中,组合器简单地使用addAll将字符串从一个容器复制到另一个容器中
类Collectors包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。
(对于第二个类型的参数  ?  ,仅仅表明我们不关心收集器所使用的中间类型。 )如果我们想要创建一个收集器来按部门计算工资的总和,我们可以使用groupingBy来重用summingSalaries 薪水:
对于任何部分累计运算的结果,将其与空结果容器相结合combiner  必须产生一个等效的结果
也就是说,对于任意一个部分累计运算的结果p,累计运算或者组合调用的结果,p必须等于  combiner.apply(p, supplier.get()).
而且,无论计算是否分割,它必须产生一个等价的结果。对于任何输入元素t1和t2,下面计算的结果r1和r2必须是等价的
并行执行操作可能实际上会产生反效果。这是因为组合步骤(通过键将一个Map合并到另一个Map)对于某些Map实现来说可能代价很大
然而,假设在这个reduce中使用的结果容器是一个可修改的集合——例如ConcurrentHashMap。在这种情况下,对迭代累计运算器的并行调用实际上可以将它们的结果并发地放到相同的共享结果容器中,从而将不再需要组合器合并不同的结果容器。这可能会促进并行执行性能的提升。我们称之为并行reduce
支持并发reduce的收集器以Collector.Characteristics.CONCURRENT characteristic特性为标志。并发特性。然而,并发集合也有缺点。
收集器有Collector.Characteristics.CONCURRENT 特性 要么是无序的流,要么收集器拥有Collector.Characteristics.UNORDERED 特性
(Collectors.groupingByConcurrent(java.util.function.Function ? super T, ? extends K ) 等同于 groupingBy). 
这样我们就可以把(a op b)  和 (c op d) 进行并行计算  最后在对他们进行  op  运算
到目前为止,所有的流示例都使用了Collection.stream()或Arrays.stream(Object)等方法来获得一个stream。这些处理流的方法是如何实现的?
类StreamSupport提供了许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator。
它描述了一个(可能是无限的)元素集合,支持顺序前进、批量遍历,并将一部分输入分割成另一个可并行处理的Spliterator。
在实现Spliterator时,有许多实现选择,几乎所有的实现都是在简单的实现和运行时性能之间进行权衡。
创建Spliterator的最简单、但最不高性能的方法是,使用 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)从一个iterator中创建spliterator  。
虽然这样的spliterator 可以工作,但它可能会提供糟糕的并行性能,因为我们已经丢失了容量信息(底层数据集有多大),以及被限制为一个简单的分割算法。
一个高质量的spliterator 将提供平衡的和已知大小的分割,精确的容量信息,以及一些可用于实现优化执行的spliterator 或数据的其他特征  (特征见spliterator characteristics)
可变数据源的Spliterators 有一个额外的挑战;绑定到数据的时间,因为数据可能在创建Spliterators 后和开始执行流管道的期间,发生变化。
理想情况下,一个流的spliterator将报告一个IMMUTABLE or CONCURRENT;如果不是,应该是后期绑定(late-binding)。
如果一个源不能直接提供一个推荐的spliterator,它可能会通过Supplier 间接地提供一个spliterator,并通过接收Supplier作为参数的stream()版本构造一个stream。只有在流管道的终端操作之后,才从Supplier处获得spliterator
基于具有所需特性的spliterators ,或者使用 Supplier-based 的工厂的形式的流,在终端操作开始之前对数据源的修改是不受影响的(如果流操作的行为参数满足不干涉和无状态的要求标准)。参见不干涉 Non-Interference的细节。