当我第一次遇到Stream
API时,内心是困惑的,因为我立马就会联想到Java I/O的InputStream
and OutputStream
。然而,实际上Java 8 的Stream API是完全不同的事物,它为Java 提供函数式编程方面发挥了重要作用。
函数式编程有一个重要概念,叫做Monad。推荐阅读阮一峰大师的相关博客<<图解 Monad>>
本篇主要讲解如何使用Java 8 Stream API以及其相关的各种操作。例如对一个数据流的顺序操作,以及强大的流操作reduce
,collect
和flatMap
,当然还要深入研究一下涉及到程序运行性能的并行流(parallel streams)。
初探Streams
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。
Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
流操作分为中间操作和终端操作,并组合成流管道。流管道由源(例如集合,数组,生成器函数或I / O通道)组成;然后是零个或多个中间操作,例如Stream.filter或Stream.map;最后是一个终端操作,如Stream.forEach或Stream.reduce。在上述例子中filter
,map
和sorted
是中间操作,而forEach
是一个终端的操作。有关所有可用流操作的完整列表,请参阅Stream Javadoc。
中间操作进一步分为无状态操作和有状态操作。无状态操作(例如过滤器和映射)在处理新元素时不保留先前看到的元素的状态 - 每个元素都可以独立于其他元素上的操作进行处理。有状态操作(例如,distinct和sorted)可以在处理新元素时包含先前看到的元素的状态。
终端操作(例如Stream.forEach或IntStream.sum)可以遍历流以产生结果或副作用。在执行终端操作之后,流管道被认为已消耗,并且不能再使用;如果需要再次遍历同一数据源,则必须返回数据源以获取新流。
各种类型的流
可以从各种数据源(尤其是集合)创建流。Lists and Sets支持新方法stream()
并parallelStream()
创建顺序流或并行流。并行流能够在多个线程上运行,本教程的后续部分将对此进行介绍。我们现在专注于顺序流:
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
在对象列表上调用stream()
方法将返回常规对象流。当然我们也可以不创建集合就可以使用流计算,如下面的代码示例中看到的那样:
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1
只需使用 Stream.of()
方法从一堆对象引用中创建一个流。
除了常规对象流之外,Java 8还附带了特殊类型的流操作类,用于处理原始数据类型int
,long
以及double
。你可能已经猜到了IntStream
,LongStream
而且DoubleStream
。
IntStreams可以使用IntStream.range()
方法替换常规for循环:
IntStream.range(1, 4)
.forEach(System.out::println);
// 1
// 2
// 3
所有这些原始流都像常规对象流一样工作,但有以下不同之处:原始流使用专门的lambda表达式,例如IntFunction
代替Function
或IntPredicate
代替Predicate
。原始流支持额外的终端聚合操作,例如sum()
和average()
:
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 5.0
有时将常规对象流转换为基本流是有用的,反之亦然。为此,对象流支持特殊的映射操作,例如mapToInt()
,mapToLong()
和mapToDouble
:
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); // 3
可以通过mapToObj()
方法将原始流转换为对象流:
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
下面是一个组合示例:双精度流首先映射到int流,然后映射到字符串类型的对象流:
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
流操作的执行流程
现在我们已经学会了如何创建和使用不同类型的流,让我们深入了解流操作的内部执行流程。
中间操作的一个重要特征是懒惰。如下是缺少终端操作的示例:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});
执行此代码段时,不会向控制台打印任何内容。这是因为只有存在终端操作时才执行中间操作。
让我们通过终端操作forEach
扩展上面的例子:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
执行此代码段,会在控制台上产生的输出如下所示:
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
请注意结果输出的顺序!是否感到惊讶?!可能有些人会以为流中所有的元素一起沿着各个操作水平移动,然而实际上,每个元素都沿着链垂直移动。也就是说,只有流中的第一个元素d2通过filter
和forEach
后,第二个元素a2才会开始被处理。
这种机制可以减少对每个元素执行的实际操作数,如下一个示例所示:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});
// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
在anyMatch
操作中,匹配以A
开头的元素。由于每个元素都沿着链垂直移动,map
在这种情况下只需要执行两次。因此,map
操作得到了尽可能少的调用次数。
执行链的顺序很重要
下面的示例包括两个中间操作map
和filter
,以及一个终端操作forEach
。让我们再次检查这些操作是如何执行的:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
As you might have guessed both map
and filter
are called five times for every string in the underlying collection whereas forEach
is only called once.
你可能已经猜到了结果,map
并且filter
对于底层集合中的每个字符串调用了五次,而forEach
只调用一次。
如果我们改变操作的顺序,移动filter
到链的开头,我们可以大大减少实际的执行次数:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
现在,map
只调用一次,因此操作管道对大量输入元素的执行速度要快得多。在编写复杂的方法链时要记住这一点。
让我们通过一个额外的sorted
操作来扩展上面的例子:
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
排序是一种特殊的中间操作。也被叫做有状态操作,因为为了在排序期间必须维护元素集合的状态。
执行此示例将产生以下控制台输出:
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2
首先,对整个输入集合执行排序操作。换句话说,sorted
是水平执行的。因此,在这种情况下,sorted
方法对输入集合中的每个元素的多个组合总共调用了8次。
我们可以通过重新排序执行链来优化性能:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
在此示例中,sorted
从未调用过,因为filter
将输入集合减少到只有一个元素。因此,对于较大的输入集合,性能会大大提高。
流不可重复使用
Java 8流无法重用。只要您调用任何终端操作,流就会关闭:
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception
对同一个流结果调用anyMatch
后,再调用noneMatch
方法,就会很遗憾的得到异常:
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)
为了克服这个限制,我们必须为我们想要执行的每个终端操作创建一个新的流链,例如我们可以创建一个流提供者来构建一个新的流,其中已经设置了中间操作filter
:
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok
每次调用get()
构造一个我们保存的新流,以调用所需的终端操作。
高级操作
Streams支持大量不同的操作。我们已经了解了最重要的操作,如filter
或map
。我并不会亲自带你研究其他所有可用的基本操作(参见Stream Javadoc),相反,让我们深入研究更复杂的操作collect
,flatMap
和reduce
。
本节中的大多数代码示例使用以下人员列表进行演示:
class Person {
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name;
}
}
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
Collect
Collect是一个非常有用的终端操作,将流的元素转变成其他不同的结果,例如一个List
,Set
或Map
。Collect方法接受Collector
,Collector
包含四种不同操作的操作:供应者(supplier),累加器(accumulator),组合器(combiner)和修整器(finisher)。这听起来非常复杂,但好消息是Java 8通过Collectors
类支持各种内置收集器。因此,对于最常见的操作,您不必自己实现收集器。
让我们从一个常见的用例开始:
List<Person> filtered =
persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());
System.out.println(filtered); // [Peter, Pamela]
正如你所看到的,从流的元素构造列表非常简单。需要一个集合而不是列表 - 只需使用Collectors.toSet()
。
下面示例按年龄对所有人进行分组:
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));
personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
Collectors非常多才多艺。还可以在流的元素上进行聚合操作,例如,确定所有人的平均年龄:
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge); // 19.0
如果您对更全面的统计信息感兴趣,汇总收集器将返回一个特殊的内置摘要统计信息对象。因此,我们可以简单地确定人的最小,最大和平均年龄,以及年龄总和,并统计参与计算的元素数量。
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
下一个示例将所有人连接成一个字符串:
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
如上代码所示,联接收集器接受分隔符以及可选的前缀和后缀。
为了将流元素转换为映射,我们必须指定如何映射键和值。请记住,映射的键必须是唯一的,否则抛出IllegalStateException
异常。当然您可以选择将合并函数作为附加参数以绕过异常:
Map<Integer, String> map = persons
.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1 + ";" + name2));
System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
现在我们知道了一些最强大的内置收集器,让我们尝试构建我们自己的特殊收集器。我们希望将流的所有人转换为单个字符串,该字符串由|
字符分隔的大写字母组成。为了实现这一目标,我们创建了一个新的收集器Collector.of()
。我们必须通过收集器的四个要素:供应者,累加器,组合器和修整器。
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher
String names = persons
.stream()
.collect(personNameCollector);
System.out.println(names); // MAX | PETER | PAMELA | DAVID
由于Java中的字符串是不可变的,我们需要一个帮助类StringJoiner
,让收集器构造我们的字符串。供应者最初使用适当的分隔符构造这样的StringJoiner。累加器用于将每个人的大写名称添加到StringJoiner。组合器知道如何将两个StringJoiners合并为一个。在最后一步中,整理器从StringJoiner构造所需的String。
FlatMap
我们已经学会了如何利用该map
操作将流的对象转换为另一种类型的对象。Map有点受限,因为每个对象只能映射到另一个对象。但是,如果我们想将一个对象转换为多个其他对象或根本不转换,该怎么办?flatMap
为解决此问题而生。
FlatMap将流的每个元素转换为其他对象的流。因此,每个对象将被转换为由流支持的零个,一个或多个其他对象。然后将这些流的内容放入返回的flatMap
操作流中。
在我们看到flatMap
实际操作之前,我们需要一个适当的类型层
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}
接下来,我们利用有关流的知识来实例化几个对象:
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
Now we have a list of three foos each consisting of three bars.
现在我们有一个列表,列表中有三个foo,每个foo由三个bars组成。
FlatMap接受一个必须返回对象流的函数。所以为了解决每个foo的bar对象,我们只需传递相应的函数:
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
如你所见,我们已成功将三个foo对象的流转换为九个bar对象的流。
最后,上面的代码示例可以简化为流操作的单个管道:
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
FlatMap也可用于Java 8中引入的类Optional
, flatMap
操作返回optional类型的对象。所以它可以用来防止讨厌的null
检查。
想想这样一个高度分层的结构:
class Outer {
Nested nested;
}
class Nested {
Inner inner;
}
class Inner {
String foo;
}
为了解析foo
外部实例的内部字符串,您必须添加多个空值检查以防止可能的NullPointerExceptions:
Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}
利用flatMap
操作可以获得相同的行为:
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);
每次调用flatMap
返回一个Optional
包装所需对象(如果存在)或null
(不存在)。
Reduce
reduce操作将流的所有元素组合成单个结果。
Java 8支持三种不同的reduce
方法。
- 第一种是将元素流简化为流的一个元素。让我们看看我们如何使用这种方法来确定年龄最大的人:
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela
reduce
方法接受BinaryOperator
累加器函数。 这实际上是一个BiFunction
,其中两个操作数共享相同的类型,在上面示例代码中共享的是Person类型。 BiFunction类似于Function但接受两个参数。 示例函数比较两个人的年龄,以便返回具有最大年龄的人。
- 第二种
reduce
方法接受标识值和BinaryOperator
累加器。此方法可用于构造一个新的Person,其中包含来自流中所有其他人的聚合名称和年龄:
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
- 第三种
reduce
方法接受三个参数:标识值,BiFunction
累加器和BinaryOperator
类型的组合函数。 由于标识值类型不限于Person类型,我们可以利用此reduce
来确定所有人的年龄总和:
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 76
如你所见,结果是76,但是究竟发生了什么?让我们通过一些调试输出扩展上面的代码:
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
结果显示,累加器功能完成了所有工作。首先使用初始标识值0和第一个人Max调用累加器。在接下来的三个步骤sum
中,总和不断增加最后一步人的年龄,直至总年龄为76岁。
等一下!组合器永远不会被调用?
接下来,并行执行相同的流将解除秘密:
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
并行执行此流会导致完全不同的执行行为。现在实际上调用了组合器。由于并行调用累加器,因此需要组合器来对单独的累加值求和。
让我们在下一章深入探讨并行流。
并行流
流可以并行执行,以增加大量输入元素的运行时性能。并行流ForkJoinPool
通过静态方法ForkJoinPool.commonPool()
使用公共可用的流。底层线程池最多使用五个线程 - 具体取决于可用物理CPU核心的数量:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
通过设置以下JVM参数可以减小或增加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持parallelStream()
方法来创建并行元素流。或者,您可以在给定流上调用中间方法parallel()
,以将顺序流转换为并行流。
为了低估并行流的并行执行行为,下一个示例将当前线程的信息打印到控制台:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
根据这些调试的输出结果,我们应该更好地了解哪些线程实际用于执行流操作:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
如你所见,并行流利用公共中的所有可用线程ForkJoinPool
来执行流操作。多次运行的结果可能不同,因为实际使用的线程是非确定性的。
让我们通过一个额外的sort
操作扩展该示例:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
乍一看执行结果,看起来很奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
似乎sort
只在主线程上顺序执行。实际上,sort
在并行流上使用新的Java 8方法Arrays.parallelSort()
。其运行机制在Javadoc中对此进行了详细的阐述:
If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.(如果指定数组的长度小于最小粒度,则使用Arrays.sort方法对其进行排序。)
回到上一节reduce
的例子。我们已经发现组合器函数只是在并行计算的时候被调用。让我们看看实际涉及哪些线程:
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
控制台输出结果显示累加器和组合器函数在所有可用线程上并行执行:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
总之,可以说并行流可以为具有大量输入元素的流带来良好的性能提升。但请记住,某些并行流操作,比如reduce
和collect
需要额外的计算(组合操作),这在顺序执行时是不需要的。
此外,我们了解到所有并行流操作共享VM内相同的J的ForkJoinPool
。因此,尽量避免执行阻塞流的操作,因为这会减慢应用程序中其他依赖并行流的部分。
Happy coding!