Java 8 Stream探秘

在现代的Java应用程序中很少不用到集合类和数组。 可以对集合进行增,删,改,插, 统计(聚合aggregate)。 这些操作的概念在SQL操作上也会用到。 但是对集合的操作却没有像SQL那样方便简捷。 为什么我们不能实现一种类似SQL语句一样方便的编程方式呢, 去取代一遍又一遍loop遍历的方式处理集合和数组中的数据?
另外,对于大数据量的集合, 能不能充分利用多核的优势, 并行的处理?
Stream是就是这种处理数据的风格, 一种流式风格。 这种风格在其它语言中也有实现, 比如Javascript (Node.js stream)。
这种风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。
元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。

1
2
3
+--------------------+ +------+ +------+ +---+ +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+ +------+ +------+ +---+ +-------+

一个简单的例子:

1
2
3
4
5
6
List<Integer> transactionsIds =
widgets.stream()
.filter(b -> b.getColor() == RED)
.sorted((x,y) -> x.getWeight() - y.getWeight())
.mapToInt(Widget::getWeight)
.sum();

应用Stream

首先,我们先了解一些Stream处理的概念。

什么是流 Stream
流是一个来自数据源的元素队列并支持聚合操作

  • 元素队列 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
  • 数据源 流的来源。 可以是集合,数组,I/O channel, 产生器generator 等。
  • 聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。

注意 这里的流和Java I/O操作的流如InputStream/OutputStream不是一个概念。

和以前的Collection操作不同, Stream操作还有两个基础的特征:

  1. Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
  2. 内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。

除了操作不同, 从实现角度比较, Stream和Collection也有众多不同:

  • 不存储数据。 流不是一个存储元素的数据结构。 它只是传递源(source)的数据。
  • 功能性的(Functional in nature)。 在流上操作只是产生一个结果,不会修改源。 例如filter只是生成一个筛选后的stream,不会删除源里的元素。
  • 延迟搜索。 许多流操作, 如filter, map等,都是延迟执行。 中间操作总是lazy的。
  • Stream可能是无界的。 而集合总是有界的(元素数量是有限大小)。 短路操作如limit(n) , findFirst()可以在有限的时间内完成在无界的stream
  • 可消费的(Consumable)。 不是太好翻译, 意思流的元素在流的声明周期内只能访问一次。 再次访问只能再重新从源中生成一个Stream

几种流生成的方式:

  • 集合类的stream() 和 parallelStream()方法;
  • 数组Arrays.stream(Object[]);
  • Stream类的静态工厂方法: Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator);
  • 文件行 BufferedReader.lines();
  • Files类的获取文件路径列表: find(), lines(), list(), walk();
  • Random.ints() 随机数流, 无界的;
  • 其它一些产生流的方法:BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence),JarFile.stream().
  • 通过StreamSupport辅助类从spliterator产生流

前面提到过, 流操作分为中间操作(Intermediate operation)和最终操作(Terminal operation)。 中间操作是lazy的, 不会立即执行,只不过是返回一个记录操作的新的流。 最终操作会最终使用流管道,使用后不能在被使用。 大部分情况下, 最终操作都是eager的。

中间操作又进一步分为无状态的操作和有状态的操作。 像filter,map都是无状态的操作, 处理一个新的元素时不需要获得先前遍历过的元素的状态。 而有状态的操作,像distinct, sorted, 需要得到先前访问的元素的状态。

有状态的操作在产生结果前需要获得完整的输入。 因此有状态的操作一个并行流时, 可能需要多次传入数据或者需要缓存数据。 而无状态的操作只需传入一次数据。

Collection.stream() 和 Collection.parallelStream() 分别产生序列化流(普通流)和并行流。 注意并行(parallel)和并发(concurrency)是有区别的。 并发是指多线程有竞争关系,在单核的情况下只有一个线程运行。 而并行是指在多核的情况下同时运行, 单核谈并行是无意义的。
记住,并行不一定快,尤其在数据量很小的情况下,可能比普通流更慢。 只有在大数据量和多核的情况下才考虑并行流。

除非操作明显是不确定的,比如findAny, 否则普通流和并行流应该返回一致的结果。

尽管可以从非线程安全的集合如ArrayList生成流,如果在流管道执行过程中修改了源,可能会抛出java.util.ConcurrentModificationException异常。 下面一个简单的例子演示这种情况。

1
2
3
4
List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5));
long count = list.stream().filter(x -> {list.remove(0);return true;})
.count();
System.out.println(count);

concurrent下的并发集合不会抛出ConcurrentModificationException异常,在中间操作对源数据的修改会反射到最终结果:

1
2
3
4
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));

如果传递给中间操作的lambda表达式是有状态, 并行流的最终结果可能会不一样:

1
2
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...

"Side-effect"不鼓励使用。Side-effect是只一个方法不只是返回一个结果,还会改变对象的状态。 例如:

1
2
3
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!

可以改为

1
2
3
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!

如果源是有序的,则相应的流也是有序的。 这里有序是顺序的意思,不是排序。 比如Array和List都是有序的。 HashSet则不是。 有序流上的操作的记过基本上也是有序的, 比如[1,2,3]通过map(x -> x*2)的结果必然是[2,4,6], 如果是无序流, [6,2,4], [4,6,2]等都是合法的结果。

一些reduction方法(也就做fold方法)处理一系列的数据得到一个唯一的结果。 比如reduce, collect,sum,max,count等。
比如以前我们计算数组的sum值:

1
2
3
4
5
int sum = 0;
for (int x : numbers) {
sum += x;
}

现在使用reduce:

1
2
3
4
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
int sum = numbers.stream().reduce(0, Integer::sum);
int sum = numbers.parallelStream().reduce(0, Integer::sum);

揭秘Stream的实现

在我们进入Stream接口的代码实现之前,我们先看两个Iterator,Spliterator:

Iterator

Java 8中为Iterator新增加一个缺省方法forEachRemaining(Consumer<? super E> action)。这个缺省方法的实现很简单, 对未处理的元素执行action, 直到处理完或者action抛出异常。

1
2
3
4
5
default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}

Spliterator

正如其名,Spliterator可以看作一个“splittable Iterator”。 在单线程情况下使用没有问题,但是它提供了trySplit(),为多线程提供处理数据片。
它是为了并行处理流而新增的一个迭代类。
它依然实现了顺序迭代方法default void forEachRemaining(Consumer<? super T> action)。 内部用一个循环执行:

1
2
3
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}

tryAdvance方法则对下一个为处理的操作执行action并返回true, 如果没有下一个元素,返回false。
看一个trySplit()的例子, ArrayListSpliterator的trySplit采用二分法,将前一半数据返回, 如果数据太小不能分了,返回null。

1
2
3
4
5
6
public ArrayListSpliterator<E> trySplit() {
int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
return (lo >= mid) ? null : // divide range in half unless too small
new ArrayListSpliterator<E>(list, lo, index = mid,
expectedModCount);
}

而ConcurrentLinkedQueue和ConcurrentLinkedDeque的相应的Spliterator处理稍微复杂一点, 第一次取一个,第二个取两个,不超过MAX_BATCH.

Stream

事实上Stream只是一个接口,并没有操作的缺省实现。最主要的实现是ReferencePipeline,而它的一些具体实现又是由AbstractPipeline完成的。
下面我们看一下这两个类。

1
2
3
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT>

AbstractPipeline类实现了所有的 Stream的中间操作和最终操作。 我们重点的挑一些来分析。
首先这个类本身没有定义field, 所有我们只需关注它的每一个具体方法即可,简化了我们的分析。
看一个无状态的操作filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}

可以看到这个操作只是返回一个StatelessOp对象(此类依然继承于ReferencePipeline),它的一个回调函数opWrapSink会返回一个Sink对象链表。
Sink代表管道操作的每一个阶段, 比如本例的filter阶段。 在调用accept之前,先调用begin通知数据来了,数据发送后调用end。

而map类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}

那么问题来了stream.filter(....).map(...)怎么形成一个链的?
filter返回一个StatelessOp,我们记为StatelessOp1, 而map返回另外一个StatelessOp,我们记为StatelessOp2.
在调用StatelessOp1.map时, StatelessOp2是这样生成的:

1
return new StatelessOp<P_OUT, R>(StatelessOp1,......);

管道中的每一个阶段的stream都保留前一个流(upstream)的引用。

有状态的操作比较复杂,有专门的类来处理:

1
2
3
4
5
6
7
8
@Override
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
@Override
public final Stream<P_OUT> sorted() {
return SortedOps.makeRef(this);
}

最终操作基本由父类的final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp)完成。
count由mapToLong实现。

前面讲到, 只有最终操作才对源数据进行操作,中间操作都是lazy的。 怎么实现的呢?
沿着这个调用
terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())) -> helper.wrapAndCopyInto(this, spliterator).get() -> copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);找到wrapSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}

首先找到最后一个操作,也就是最终操作, 执行它的opWrapSink,事实上得到一个链表,最终返回第一个Sink, 执行第一个Sink的accept将触发链式操作, 将管道中的操作在一个迭代中执行一次。
代码稍显乱而复杂,简单化的将,事实上Java是将所有的操作形成一个类似链接的结构(通过Sink的downstream,upstream),在遇到最终操作时触发链式反应, 通过各种数据类型特定的spliterator的一次迭代最终得到结果。

并行操作是通过ForkJoinTask框架实现。

模拟流的处理链

Java Stream的实现比较复杂,因为它需要处理各种操作已经并行计算。 我们可以用一个简单的例子来模拟它的过程, 这个例子将Stream简化,但是处理方法是一致。
这个例子模拟使用Sink建立链表,在最后一个操作的时候回溯链表, 并调用Spliterator的forEachRemaining方法进行一次遍历, 每访问一个数组的元素就会从头开始调用链表的每个节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.Arrays;
import java.util.Spliterator;
import java.util.function.Consumer;
public class SinkChain {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) {
int[] source = {1,2,3,4,5};
Spliterator spliterator = Arrays.stream(source).spliterator();
//setup upstream
Sink<Integer> sink0 = new Sink<Integer>("source sink", null);
Sink<Integer> sink4 = sink0.op("sink1").op("sink2").op("sink3").op("terminal sink");
//setup downstream chain
Sink wrappedSink = wrapSink(sink4);
assert(wrappedSink == sink0); //now get the first (source) stage
//in one loop, handle elements: 1,2,3,4,5
spliterator.forEachRemaining(wrappedSink);
}
public static Sink wrapSink(Sink sink) {
while(sink.upstream != null) {
sink.upstream.downstream = sink;
sink = sink.upstream;
}
return sink;
}
static class Sink<T> implements Consumer<T>{
private Sink upstream;
private Sink downstream;
private String name;
public Sink(String name, Sink upstream) {
this.name = name;
this.upstream = upstream;
}
public Sink op(String name) {
return new Sink(name, this);
}
@Override
public void accept(T t) {
System.out.println(name + " handles " + t);
if (downstream != null)
downstream.accept(t);
}
}
}

##参考

  1. http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
  2. http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html
  3. http://java.dzone.com/articles/understanding-java-8-streams-1
  4. http://stackoverflow.com/questions/224648/external-iterator-vs-internal-iterator
  5. http://codereview.stackexchange.com/questions/52050/spliterator-implementation
  6. https://www.inkling.com/read/schildt-java-complete-reference-9th/chapter-18/spliterators