在现代的Java应用程序中很少不用到集合类和数组。 可以对集合进行增,删,改,插, 统计(聚合aggregate)。 这些操作的概念在SQL操作上也会用到。 但是对集合的操作却没有像SQL那样方便简捷。 为什么我们不能实现一种类似SQL语句一样方便的编程方式呢, 去取代一遍又一遍loop遍历的方式处理集合和数组中的数据?
另外,对于大数据量的集合, 能不能充分利用多核的优势, 并行的处理?
Stream是就是这种处理数据的风格, 一种流式风格。 这种风格在其它语言中也有实现, 比如Javascript (Node.js stream)。
这种风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。
元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。
|
|
一个简单的例子:
|
|
应用Stream
首先,我们先了解一些Stream处理的概念。
什么是流 Stream
流是一个来自数据源的元素队列并支持聚合操作
- 元素队列 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
- 数据源 流的来源。 可以是集合,数组,I/O channel, 产生器generator 等。
- 聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。
注意 这里的流和Java I/O操作的流如InputStream/OutputStream不是一个概念。
和以前的Collection操作不同, Stream操作还有两个基础的特征:
- Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
- 内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。
除了操作不同, 从实现角度比较, Stream和Collection也有众多不同:
- 不存储数据。 流不是一个存储元素的数据结构。 它只是传递源(source)的数据。
- 功能性的(Functional in nature)。 在流上操作只是产生一个结果,不会修改源。 例如filter只是生成一个筛选后的stream,不会删除源里的元素。
- 延迟搜索。 许多流操作, 如filter, map等,都是延迟执行。 中间操作总是lazy的。
- Stream可能是无界的。 而集合总是有界的(元素数量是有限大小)。 短路操作如limit(n) , findFirst()可以在有限的时间内完成在无界的stream
- 可消费的(Consumable)。 不是太好翻译, 意思流的元素在流的声明周期内只能访问一次。 再次访问只能再重新从源中生成一个Stream
几种流生成的方式:
- 集合类的stream() 和 parallelStream()方法;
- 数组Arrays.stream(Object[]);
- Stream类的静态工厂方法: Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator);
- 文件行 BufferedReader.lines();
- Files类的获取文件路径列表: find(), lines(), list(), walk();
- Random.ints() 随机数流, 无界的;
- 其它一些产生流的方法:BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence),JarFile.stream().
- 通过StreamSupport辅助类从spliterator产生流
前面提到过, 流操作分为中间操作(Intermediate operation)和最终操作(Terminal operation)。 中间操作是lazy的, 不会立即执行,只不过是返回一个记录操作的新的流。 最终操作会最终使用流管道,使用后不能在被使用。 大部分情况下, 最终操作都是eager的。
中间操作又进一步分为无状态的操作和有状态的操作。 像filter,map都是无状态的操作, 处理一个新的元素时不需要获得先前遍历过的元素的状态。 而有状态的操作,像distinct, sorted, 需要得到先前访问的元素的状态。
有状态的操作在产生结果前需要获得完整的输入。 因此有状态的操作一个并行流时, 可能需要多次传入数据或者需要缓存数据。 而无状态的操作只需传入一次数据。
Collection.stream() 和 Collection.parallelStream() 分别产生序列化流(普通流)和并行流。 注意并行(parallel)和并发(concurrency)是有区别的。 并发是指多线程有竞争关系,在单核的情况下只有一个线程运行。 而并行是指在多核的情况下同时运行, 单核谈并行是无意义的。
记住,并行不一定快,尤其在数据量很小的情况下,可能比普通流更慢。 只有在大数据量和多核的情况下才考虑并行流。
除非操作明显是不确定的,比如findAny, 否则普通流和并行流应该返回一致的结果。
尽管可以从非线程安全的集合如ArrayList生成流,如果在流管道执行过程中修改了源,可能会抛出java.util.ConcurrentModificationException异常。 下面一个简单的例子演示这种情况。
|
|
concurrent下的并发集合不会抛出ConcurrentModificationException异常,在中间操作对源数据的修改会反射到最终结果:
|
|
如果传递给中间操作的lambda表达式是有状态, 并行流的最终结果可能会不一样:
|
|
"Side-effect"不鼓励使用。Side-effect是只一个方法不只是返回一个结果,还会改变对象的状态。 例如:
|
|
可以改为
|
|
如果源是有序的,则相应的流也是有序的。 这里有序是顺序的意思,不是排序。 比如Array和List都是有序的。 HashSet则不是。 有序流上的操作的记过基本上也是有序的, 比如[1,2,3]通过map(x -> x*2)的结果必然是[2,4,6], 如果是无序流, [6,2,4], [4,6,2]等都是合法的结果。
一些reduction方法(也就做fold方法)处理一系列的数据得到一个唯一的结果。 比如reduce, collect,sum,max,count等。
比如以前我们计算数组的sum值:
|
|
现在使用reduce:
|
|
揭秘Stream的实现
在我们进入Stream接口的代码实现之前,我们先看两个Iterator,Spliterator:
Iterator
Java 8中为Iterator新增加一个缺省方法forEachRemaining(Consumer<? super E> action)
。这个缺省方法的实现很简单, 对未处理的元素执行action
, 直到处理完或者action抛出异常。
|
|
Spliterator
正如其名,Spliterator可以看作一个“splittable Iterator”。 在单线程情况下使用没有问题,但是它提供了trySplit()
,为多线程提供处理数据片。
它是为了并行处理流而新增的一个迭代类。
它依然实现了顺序迭代方法default void forEachRemaining(Consumer<? super T> action)
。 内部用一个循环执行:
|
|
而tryAdvance
方法则对下一个为处理的操作执行action并返回true, 如果没有下一个元素,返回false。
看一个trySplit()
的例子, ArrayListSpliterator的trySplit采用二分法,将前一半数据返回, 如果数据太小不能分了,返回null。
|
|
而ConcurrentLinkedQueue和ConcurrentLinkedDeque的相应的Spliterator处理稍微复杂一点, 第一次取一个,第二个取两个,不超过MAX_BATCH.
Stream
事实上Stream只是一个接口,并没有操作的缺省实现。最主要的实现是ReferencePipeline
,而它的一些具体实现又是由AbstractPipeline
完成的。
下面我们看一下这两个类。
|
|
AbstractPipeline类实现了所有的 Stream的中间操作和最终操作。 我们重点的挑一些来分析。
首先这个类本身没有定义field, 所有我们只需关注它的每一个具体方法即可,简化了我们的分析。
看一个无状态的操作filter:
|
|
可以看到这个操作只是返回一个StatelessOp对象(此类依然继承于ReferencePipeline),它的一个回调函数opWrapSink会返回一个Sink
对象链表。Sink
代表管道操作的每一个阶段, 比如本例的filter阶段。 在调用accept之前,先调用begin通知数据来了,数据发送后调用end。
而map类似。
|
|
那么问题来了stream.filter(....).map(...)
怎么形成一个链的?
filter返回一个StatelessOp,我们记为StatelessOp1, 而map返回另外一个StatelessOp,我们记为StatelessOp2.
在调用StatelessOp1.map时, StatelessOp2是这样生成的:
|
|
管道中的每一个阶段的stream都保留前一个流(upstream)的引用。
有状态的操作比较复杂,有专门的类来处理:
|
|
最终操作基本由父类的final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp)
完成。
count由mapToLong实现。
前面讲到, 只有最终操作才对源数据进行操作,中间操作都是lazy的。 怎么实现的呢?
沿着这个调用terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()))
-> helper.wrapAndCopyInto(this, spliterator).get()
-> copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
找到wrapSink
。
|
|
首先找到最后一个操作,也就是最终操作, 执行它的opWrapSink,事实上得到一个链表,最终返回第一个Sink, 执行第一个Sink的accept
将触发链式操作, 将管道中的操作在一个迭代中执行一次。
代码稍显乱而复杂,简单化的将,事实上Java是将所有的操作形成一个类似链接的结构(通过Sink的downstream,upstream),在遇到最终操作时触发链式反应, 通过各种数据类型特定的spliterator的一次迭代最终得到结果。
并行操作是通过ForkJoinTask框架实现。
模拟流的处理链
Java Stream的实现比较复杂,因为它需要处理各种操作已经并行计算。 我们可以用一个简单的例子来模拟它的过程, 这个例子将Stream简化,但是处理方法是一致。
这个例子模拟使用Sink建立链表,在最后一个操作的时候回溯链表, 并调用Spliterator的forEachRemaining方法进行一次遍历, 每访问一个数组的元素就会从头开始调用链表的每个节点。
|
|
##参考
- http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
- http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html
- http://java.dzone.com/articles/understanding-java-8-streams-1
- http://stackoverflow.com/questions/224648/external-iterator-vs-internal-iterator
- http://codereview.stackexchange.com/questions/52050/spliterator-implementation
- https://www.inkling.com/read/schildt-java-complete-reference-9th/chapter-18/spliterators