Java Stream 详解

目录 [−]

  1. 介绍
    1. 并行 Parallelism
    2. Non-interference
    3. 无状态 Stateless behaviors
    4. 副作用 Side-effects
    5. 排序 Ordering
    6. 结合性 Associativity
  2. 创建Stream
  3. 中间操作 intermediate operations
    1. distinct
    2. filter
    3. map
    4. flatmap
    5. limit
    6. peek
    7. sorted
    8. skip
  4. 终点操作 terminal operations
    1. Match
    2. count
    3. collect
    4. find
    5. forEach、forEachOrdered
    6. max、min
    7. reduce
    8. toArray()
  5. 组合
  6. 转换
  7. 更进一步
  8. 参考文档

Stream是 Java 8新增加的类,用来补充集合类。

Stream代表数据流,流中的数据元素的数量可能是有限的,也可能是无限的。

Stream和其它集合类的区别在于:其它集合类主要关注与有限数量的数据的访问和有效管理(增删改),而Stream并没有提供访问和管理元素的方式,而是通过声明数据源的方式,利用可计算的操作在数据源上执行,当然BaseStream.iterator()BaseStream.spliterator()操作提供了遍历元素的方法。

Java Stream提供了提供了串行和并行两种类型的流,保持一致的接口,提供函数式编程方式,以管道方式提供中间操作和最终执行操作,为Java语言的集合提供了现代语言提供的类似的高阶函数操作,简化和提高了Java集合的功能。

本文首先介绍Java Stream的特点,然后按照功能分类逐个介绍流的中间操作和终点操作,最后会介绍第三方为Java Stream做的扩展。

前年年底的时候我写了一些关于Java 8 Lambda和Stream的文章,本文应该在那个时候完成。后来忙于项目和写《Scala集合技术手册》(Scala Collections Cookbook)这本书,一直没来得及写Java Stream的文章,现在这篇文章算是对 Java Stream的一个总结吧。

介绍

本节翻译整理自Javadoc,并对流的这些特性做了进一步的解释。

Stream接口还包含几个基本类型的子接口如IntStream, LongStream 和 DoubleStream。

关于流和其它集合具体的区别,可以参照下面的列表:

  1. 不存储数据。流是基于数据源的对象,它本身不存储数据元素,而是通过管道将数据源的元素传递给操作。
  2. 函数式编程。流的操作不会修改数据源,例如filter不会将数据源中的数据删除。
  3. 延迟操作。流的很多操作如filter,map等中间操作是延迟执行的,只有到终点操作才会将操作顺序执行。
  4. 可以解绑。对于无限数量的流,有些操作是可以在有限的时间完成的,比如limit(n)findFirst(),这些操作可是实现"短路"(Short-circuiting),访问到有限的元素后就可以返回。
  5. 纯消费。流的元素只能访问一次,类似Iterator,操作没有回头路,如果你想从头重新访问流的元素,对不起,你得重新生成一个新的流。

流的操作是以管道的方式串起来的。流管道包含一个数据源,接着包含零到N个中间操作,最后以一个终点操作结束。

并行 Parallelism

所有的流操作都可以串行执行或者并行执行。
除非显示地创建并行流,否则Java库中创建的都是串行流。 Collection.stream()为集合创建串行流而Collection.parallelStream()为集合创建并行流。IntStream.range(int, int)创建的是串行流。通过parallel()方法可以将串行流转换成并行流,sequential()方法将流转换成串行流。

除非方法的Javadoc中指明了方法在并行执行的时候结果是不确定(比如findAny、forEach),否则串行和并行执行的结果应该是一样的。

Non-interference

流可以从非线程安全的集合中创建,当流的管道执行的时候,非concurrent数据源不应该被改变。下面的代码会抛出java.util.ConcurrentModificationException异常:

1
2
3
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
sl.forEach(s -> l.add("three"));

在设置中间操作的时候,可以更改数据源,只有在执行终点操作的时候,才有可能出现并发问题(抛出异常,或者不期望的结果),比如下面的代码不会抛出异常:

1
2
3
4
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
sl.forEach(System.out::println);

对于concurrent数据源,不会有这样的问题,比如下面的代码很正常:

1
2
3
List<String> l = new CopyOnWriteArrayList<>(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
sl.forEach(s -> l.add("three"));

虽然我们上面例子是在终点操作中对非并发数据源进行修改,但是非并发数据源也可能在其它线程中修改,同样会有并发问题。

无状态 Stateless behaviors

大部分流的操作的参数都是函数式接口,可以使用Lambda表达式实现。它们用来描述用户的行为,称之为行为参数(behavioral parameters)。

如果这些行为参数有状态,则流的操作的结果可能是不确定的,比如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
List<String> l = new ArrayList(Arrays.asList("one", "two", ……));
class State {
boolean s;
}
final State state = new State();
Stream<String> sl = l.stream().map(e -> {
if (state.s)
return "OK";
else {
state.s = true;
return e;
}
});
sl.forEach(System.out::println);

上面的代码在并行执行时多次的执行结果可能是不同的。这是因为这个lambda表达式是有状态的。

副作用 Side-effects

有副作用的行为参数是被不鼓励使用的。

副作用指的是行为参数在执行的时候有输入输入,比如网络输入输出等。

这是因为Java不保证这些副作用对其它线程可见,也不保证相同流管道上的同样的元素的不同的操作运行在同一个线程中。

很多有副作用的行为参数可以被转换成无副作用的实现。一般来说println()这样的副作用代码不会有害。

1
2
3
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // 副作用代码

上面的代码可以改成无副作用的。

1
2
3
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!

排序 Ordering

某些流的返回的元素是有确定顺序的,我们称之为 encounter order。这个顺序是流提供它的元素的顺序,比如数组的encounter order是它的元素的排序顺序,List是它的迭代顺序(iteration order),对于HashSet,它本身就没有encounter order。

一个流是否是encounter order主要依赖数据源和它的中间操作,比如数据源List和Array上创建的流是有序的(ordered),但是在HashSet创建的流不是有序的。

sorted()方法可以将流转换成encounter order的,unordered可以将流转换成encounter order的。

注意,这个方法并不是对元素进行排序或者打散,而是返回一个是否encounter order的流。

https://stackoverflow.com/questions/21350195/stream-ordered-unordered-problems
If a stream is ordered, repeated execution of identical stream pipelines on an identical source will produce an identical result; if it is not ordered, repeated execution might produce different results.
谢谢 imekaku lee 指出

除此之外,一个操作可能会影响流的有序,比如map方法,它会用不同的值甚至类型替换流中的元素,所以输入元素的有序性已经变得没有意义了,但是对于filter方法来说,它只是丢弃掉一些值而已,输入元素的有序性还是保障的。

对于串行流,流有序与否不会影响其性能,只是会影响确定性(determinism),无序流在多次执行的时候结果可能是不一样的。

对于并行流,去掉有序这个约束可能会提供性能,比如distinctgroupingBy这些聚合操作。

结合性 Associativity

一个操作或者函数op满足结合性意味着它满足下面的条件:

1
(a op b) op c == a op (b op c)

对于并发流来说,如果操作满足结合性,我们就可以并行计算:

1
a op b op c op d == (a op b) op (c op d)

比如minmax以及字符串连接都是满足结合性的。

创建Stream

可以通过多种方式创建流:

1、通过集合的stream()方法或者parallelStream(),比如Arrays.asList(1,2,3).stream()
2、通过Arrays.stream(Object[])方法, 比如Arrays.stream(new int[]{1,2,3})
3、使用流的静态方法,比如Stream.of(Object[]), IntStream.range(int, int) 或者 Stream.iterate(Object, UnaryOperator),如Stream.iterate(0, n -> n * 2),或者generate(Supplier<T> s)Stream.generate(Math::random)
4、BufferedReader.lines()从文件中获得行的流。
5、Files类的操作路径的方法,如listfindwalk等。
6、随机数流Random.ints()
7、其它一些类提供了创建流的方法,如BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), 和 JarFile.stream()
8、更底层的使用StreamSupport,它提供了将Spliterator转换成流的方法。

中间操作 intermediate operations

中间操作会返回一个新的流,并且操作是延迟执行的(lazy),它不会修改原始的数据源,而且是由在终点操作开始的时候才真正开始执行。
这个Scala集合的转换操作不同,Scala集合转换操作会生成一个新的中间集合,显而易见Java的这种设计会减少中间对象的生成。

下面介绍流的这些中间操作:

distinct

distinct保证输出的流中包含唯一的元素,它是通过Object.equals(Object)来检查是否包含相同的元素。

1
2
3
4
List<String> l = Stream.of("a","b","c","b")
.distinct()
.collect(Collectors.toList());
System.out.println(l); //[a, b, c]

filter

filter返回的流中只包含满足断言(predicate)的数据。

下面的代码返回流中的偶数集合。

1
2
3
4
5
List<Integer> l = IntStream.range(1,10)
.filter( i -> i % 2 == 0)
.boxed()
.collect(Collectors.toList());
System.out.println(l); //[2, 4, 6, 8]

map

map方法将流中的元素映射成另外的值,新的值类型可以和原来的元素的类型不同。

下面的代码中将字符元素映射成它的哈希码(ASCII值)。

1
2
3
4
List<Integer> l = Stream.of('a','b','c')
.map( c -> c.hashCode())
.collect(Collectors.toList());
System.out.println(l); //[97, 98, 99]

flatmap

flatmap方法混合了map + flattern的功能,它将映射后的流的元素全部放入到一个新的流中。它的方法定义如下:

1
<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper)

可以看到mapper函数会将每一个元素转换成一个流对象,而flatMap方法返回的流包含的元素为mapper生成的所有流中的元素。

下面这个例子中将一首唐诗生成一个按行分割的流,然后在这个流上调用flatmap得到单词的小写形式的集合,去掉重复的单词然后打印出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
String poetry = "Where, before me, are the ages that have gone?\n" +
"And where, behind me, are the coming generations?\n" +
"I think of heaven and earth, without limit, without end,\n" +
"And I am all alone and my tears fall down.";
Stream<String> lines = Arrays.stream(poetry.split("\n"));
Stream<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")));
List<String> l = words.map( w -> {
if (w.endsWith(",") || w.endsWith(".") || w.endsWith("?"))
return w.substring(0,w.length() -1).trim().toLowerCase();
else
return w.trim().toLowerCase();
}).distinct().sorted().collect(Collectors.toList());
System.out.println(l); //[ages, all, alone, am, and, are, before, behind, coming, down, earth, end, fall, generations, gone, have, heaven, i, limit, me, my, of, tears, that, the, think, where, without]

flatMapToDoubleflatMapToIntflatMapToLong提供了转换成特定流的方法。

limit

limit方法指定数量的元素的流。对于串行流,这个方法是有效的,这是因为它只需返回前n个元素即可,但是对于有序的并行流,它可能花费相对较长的时间,如果你不在意有序,可以将有序并行流转换为无序的,可以提高性能。

1
2
3
4
List<Integer> l = IntStream.range(1,100).limit(5)
.boxed()
.collect(Collectors.toList());
System.out.println(l);//[1, 2, 3, 4, 5]

peek

peek方法方法会使用一个Consumer消费流中的元素,但是返回的流还是包含原来的流中的元素。

1
2
3
4
String[] arr = new String[]{"a","b","c","d"};
Arrays.stream(arr)
.peek(System.out::println) //a,b,c,d
.count();

sorted

sorted()将流中的元素按照自然排序方式进行排序,如果元素没有实现Comparable,则终点操作执行时会抛出java.lang.ClassCastException异常。
sorted(Comparator<? super T> comparator)可以指定排序的方式。

对于有序流,排序是稳定的。对于非有序流,不保证排序稳定。

1
2
3
4
5
6
7
8
9
10
String[] arr = new String[]{"b_123","c+342","b#632","d_123"};
List<String> l = Arrays.stream(arr)
.sorted((s1,s2) -> {
if (s1.charAt(0) == s2.charAt(0))
return s1.substring(2).compareTo(s2.substring(2));
else
return s1.charAt(0) - s2.charAt(0);
})
.collect(Collectors.toList());
System.out.println(l); //[b_123, b#632, c+342, d_123]

skip

skip返回丢弃了前n个元素的流,如果流中的元素小于或者等于n,则返回空的流。

终点操作 terminal operations

Match

1
2
3
public boolean allMatch(Predicate<? super T> predicate)
public boolean anyMatch(Predicate<? super T> predicate)
public boolean noneMatch(Predicate<? super T> predicate)

这一组方法用来检查流中的元素是否满足断言。
allMatch只有在所有的元素都满足断言时才返回true,否则flase,流为空时总是返回true

anyMatch只有在任意一个元素满足断言时就返回true,否则flase,

noneMatch只有在所有的元素都不满足断言时才返回true,否则flase,

1
2
3
4
5
6
7
System.out.println(Stream.of(1,2,3,4,5).allMatch( i -> i > 0)); //true
System.out.println(Stream.of(1,2,3,4,5).anyMatch( i -> i > 0)); //true
System.out.println(Stream.of(1,2,3,4,5).noneMatch( i -> i > 0)); //false
System.out.println(Stream.<Integer>empty().allMatch( i -> i > 0)); //true
System.out.println(Stream.<Integer>empty().anyMatch( i -> i > 0)); //false
System.out.println(Stream.<Integer>empty().noneMatch( i -> i > 0)); //true

count

count方法返回流中的元素的数量。它实现为:

1
mapToLong(e -> 1L).sum();

collect

1
2
<R,A> R collect(Collector<? super T,A,R> collector)
<R> R collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)

使用一个collector执行mutable reduction操作。辅助类Collectors提供了很多的collector,可以满足我们日常的需求,你也可以创建新的collector实现特定的需求。它是一个值得关注的类,你需要熟悉这些特定的收集器,如聚合类averagingInt、最大最小值maxBy minBy、计数counting、分组groupingBy、字符串连接joining、分区partitioningBy、汇总summarizingInt、化简reducing、转换toXXX等。

第二个提供了更底层的功能,它的逻辑类似下面的伪代码:

1
2
3
4
R result = supplier.get();
for (T element : this stream)
accumulator.accept(result, element);
return result;

例子:

1
2
3
4
5
List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,
ArrayList::addAll);
String concat = stringStream.collect(StringBuilder::new, StringBuilder::append,
StringBuilder::append)
.toString();

find

findAny()返回任意一个元素,如果流为空,返回空的Optional,对于并行流来说,它只需要返回任意一个元素即可,所以性能可能要好于findFirst(),但是有可能多次执行的时候返回的结果不一样。
findFirst()返回第一个元素,如果流为空,返回空的Optional。

forEach、forEachOrdered

forEach遍历流的每一个元素,执行指定的action。它是一个终点操作,和peek方法不同。这个方法不担保按照流的encounter order顺序执行,如果对于有序流按照它的encounter order顺序执行,你可以使用forEachOrdered方法。

1
Stream.of(1,2,3,4,5).forEach(System.out::println);

max、min

max返回流中的最大值,
min返回流中的最小值。

reduce

reduce是常用的一个方法,事实上很多操作都是基于它实现的。
它有几个重载方法:

1
2
3
pubic Optional<T> reduce(BinaryOperator<T> accumulator)
pubic T reduce(T identity, BinaryOperator<T> accumulator)
pubic <U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)

第一个方法使用流中的第一个值作为初始值,后面两个方法则使用一个提供的初始值。

1
2
Optional<Integer> total = Stream.of(1,2,3,4,5).reduce( (x, y) -> x +y);
Integer total2 = Stream.of(1,2,3,4,5).reduce(0, (x, y) -> x +y);

值得注意的是accumulator应该满足结合性(associative)。

toArray()

将流中的元素放入到一个数组中。

组合

concat用来连接类型一样的两个流。

1
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)

转换

toArray方法将一个流转换成数组,而如果想转换成其它集合类型,西需要调用collect方法,利用Collectors.toXXX方法进行转换:

1
2
3
4
5
public static <T,C extends Collection<T>> Collector<T,?,C> toCollection(Supplier<C> collectionFactory)
public static …… toConcurrentMap(……)
public static <T> Collector<T,?,List<T>> toList()
public static …… toMap(……)
public static <T> Collector<T,?,Set<T>> toSet()

更进一步

虽然Stream提供了很多的操作,但是相对于Scala等语言,似乎还少了一些。一些开源项目提供了额外的一些操作,比如protonpack项目提供了下列方法:

  • takeWhile and takeUntil
  • skipWhile and skipUntil
  • zip and zipWithIndex
  • unfold
  • MapStream
  • aggregate
  • Streamable
  • unique collector

java8-utils 也提供了一些有益的辅助方法。

参考文档

  1. https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html
  2. http://www.leveluplunch.com/java/examples/
  3. https://github.com/poetix/protonpack
  4. https://github.com/NitorCreations/java8-utils