JEP 461: Stream Gatherers (Preview) | 流收集器(预览)
摘要
增强 Stream API 以支持自定义中间操作。这将使流管道能够以现有内置中间操作不容易实现的方式转换数据。这是一个 预览 API。
目标
使流管道更加灵活和富有表现力。
尽可能允许自定义中间操作处理无限大小的流。
非目标
改变 Java 编程语言以更好地促进流处理不是目标。
对使用 Stream API 的代码的编译进行特殊处理不是目标。
动机
Java 8 引入了第一个专门为 lambda 表达式设计的 API:Stream API,java.util.stream
。流是一个延迟计算的、可能无界的值序列。该 API 支持顺序或并行处理流的能力。
一个 流管道 由三部分组成:元素源、任意数量的中间操作和一个终端操作。例如:
long numberOfWords =
Stream.of("the", "", "fox", "jumps", "over", "the", "", "dog") // (1)
.filter(Predicate.not(String::isEmpty)) // (2)
.collect(Collectors.counting()); // (3)
2
3
4
这种编程风格既富有表现力又高效。使用构建器风格的 API,每个中间操作都返回一个新的流;只有在调用终端操作时才开始评估。在这个例子中,第 (1) 行创建一个流,但不进行评估,第 (2) 行设置一个中间的 filter
操作但仍然不评估流,最后第 (3) 行的终端 collect
操作评估整个流管道。
Stream API 提供了一组相当丰富的(尽管是固定的)中间操作和终端操作:映射、过滤、归约、排序等等。它还包括一个可扩展的终端操作,Stream::collect
,它允许以各种方式总结流管道的输出。
到目前为止,流在 Java 生态系统中的使用已经非常普遍,并且非常适合许多任务,但是固定的中间操作集意味着一些复杂的任务不能很容易地表示为流管道。要么所需的中间操作不存在,要么它存在但不直接支持该任务。
例如,假设任务是获取一个字符串流并使其不同,但不同性基于字符串长度而不是内容。也就是说,最多应该发出一个长度为 1 的字符串,最多一个长度为 2 的字符串,最多一个长度为 3 的字符串,依此类推。理想情况下,代码应该看起来像这样:
var result = Stream.of("foo", "bar", "baz", "quux")
.distinctBy(String::length) // 假设的
.toList();
// result ==> [foo, quux]
2
3
4
5
不幸的是,distinctBy
不是内置的中间操作。最接近的内置操作,distinct
,通过使用对象相等性比较元素来跟踪它已经看到的元素。也就是说,distinct
是有状态的,但在这种情况下使用了错误的状态:我们希望它根据字符串长度的相等性而不是字符串内容来跟踪元素。我们可以通过声明一个根据字符串长度定义对象相等性的类,将每个字符串包装在该类的一个实例中,并将 distinct
应用于这些实例来解决这个限制。然而,这种任务的表达方式不直观,并且使得代码难以维护:
record DistinctByLength(String str) {
@Override public boolean equals(Object obj) {
return obj instanceof DistinctByLength(String other)
&& str.length() == other.length();
}
@Override public int hashCode() {
return str == null? 0 : Integer.hashCode(str.length());
}
}
var result = Stream.of("foo", "bar", "baz", "quux")
.map(DistinctByLength::new)
.distinct()
.map(DistinctByLength::str)
.toList();
// result ==> [foo, quux]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
作为另一个例子,假设任务是将元素分组为固定大小为三的组,但只保留前两组:[0, 1, 2, 3, 4, 5, 6,...]
应该产生 [[0, 1, 2], [3, 4, 5]]
。理想情况下,代码应该看起来像这样:
var result = Stream.iterate(0, i -> i + 1)
.windowFixed(3) // 假设的
.limit(2)
.toList();
// result ==> [[0, 1, 2], [3, 4, 5]]
2
3
4
5
6
不幸的是,没有内置的中间操作支持这个任务。最好的选择是通过使用自定义的 Collector
调用 collect
将固定窗口分组逻辑放在终端操作中。然而,我们必须在 collect
操作之前使用固定大小的 limit
操作,因为收集器不能在新元素出现时向 collect
发出它已完成的信号——对于无限流,这种情况会永远发生。此外,任务本质上是关于有序数据的,所以让收集器并行执行分组是不可行的,并且如果调用了它的组合器,它必须通过抛出异常来表明这一事实。结果代码难以理解:
var result
= Stream.iterate(0, i -> i + 1)
.limit(3 * 2)
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(),
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current);
} else {
groups.getLast().add(element);
}
},
(left, right) -> {
throw new UnsupportedOperationException("Cannot be parallelized");
}
));
// result ==> [[0, 1, 2], [3, 4, 5]]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
多年来,已经为 Stream API 提出了许多新的中间操作。大多数在单独考虑时是有意义的,但是添加所有这些操作会使(已经很大的)Stream API 更难学习,因为它的操作将更难发现。
Stream API 的设计者明白,有一个 扩展点 是很理想的,这样任何人都可以定义中间流操作。然而,当时他们不知道那个扩展点应该是什么样子。最终很明显,终端操作的扩展点,即 Stream::collect(Collector)
,是有效的。现在我们可以对中间操作采取类似的方法。
总之,更多的中间操作创造了更多的情境价值,使流更适合更多的任务。我们应该为自定义中间操作提供一个 API,允许开发人员以他们喜欢的方式转换有限和无限流。
描述
Stream::gather(Gatherer)
是一种新的中间流操作,它通过应用一个称为 收集器 的用户定义实体来处理流中的元素。使用 gather
操作,我们可以构建高效、可并行的流,实现几乎任何中间操作。Stream::gather(Gatherer)
对于中间操作来说就像 Stream::collect(Collector)
对于终端操作一样。
收集器 表示流中元素的一种转换;它是 java.util.stream.Gatherer
接口的一个实例。收集器可以以一对一、一对多、多对一或多对多的方式转换元素。它们可以跟踪先前看到的元素以影响后续元素的转换,它们可以短路以将无限流转换为有限流,并且它们可以启用并行执行。例如,一个收集器可以将一个输入元素转换为一个输出元素,直到某个条件变为真,此时它开始将一个输入元素转换为两个输出元素。
收集器由四个协同工作的函数定义:
可选的 初始化器 函数提供一个在处理流元素时维护私有状态的对象。例如,一个收集器可以存储当前元素,以便下次应用它时,它可以将新元素与现在的前一个元素进行比较,并比如说只发出两者中较大的一个。实际上,这样的收集器将两个输入元素转换为一个输出元素。
集成器 函数将来自输入流的新元素进行集成,可能检查私有状态对象并可能向输出流发出元素。它也可以在到达输入流的末尾之前终止处理;例如,一个在整数流中搜索最大整数的收集器如果检测到
Integer.MAX_VALUE
就可以终止。可选的 组合器 函数可用于在输入流被标记为并行时并行评估收集器。如果一个收集器不支持并行,那么它仍然可以是并行流管道的一部分,但它将按顺序进行评估。这对于操作本质上是有序的并且因此不能并行化的情况很有用。
可选的 完成器 函数在没有更多输入元素要消耗时被调用。这个函数可以检查私有状态对象,并可能发出额外的输出元素。例如,一个在其输入元素中搜索特定元素的收集器可以在其完成器被调用时报告失败,比如说通过抛出异常。
当被调用时,Stream::gather
执行以下等效步骤:
创建一个
Downstream
对象,当给定收集器输出类型的一个元素时,将其传递给管道中的下一个阶段。通过调用其 初始化器 的
get()
方法获得收集器的私有状态对象。通过调用其
integrator()
方法获得收集器的 集成器。当还有更多输入元素时,调用集成器的
integrate(...)
方法,将状态对象、下一个元素和下游对象传递给它。如果该方法返回false
,则终止。获得收集器的 完成器,并使用状态和下游对象调用它。
Stream
接口中声明的每个现有中间操作都可以通过调用带有实现该操作的收集器的 gather
来实现。例如,给定一个 T
类型元素的流,Stream::map
通过应用一个函数将每个 T
元素转换为一个 U
元素,然后将 U
元素传递到下游;这只是一个无状态的一对一收集器。再举一个例子,Stream::filter
接受一个谓词,该谓词决定是否应将输入元素传递到下游;这只是一个无状态的一对多收集器。实际上,每个流管道在概念上等同于
source.gather(...).gather(...).gather(...).collect(...)
内置收集器
我们在 java.util.stream.Gatherers
类中引入以下内置收集器:
fold
是一个有状态的多对一收集器,它逐步构建一个聚合,并在没有更多输入元素时发出该聚合。mapConcurrent
是一个有状态的一对一收集器,它对每个输入元素并发地调用一个提供的函数,最多达到一个提供的限制。scan
是一个有状态的一对一收集器,它将一个提供的函数应用于当前状态和当前元素以产生下一个元素,并将其传递到下游。windowFixed
是一个有状态的多对多收集器,它将输入元素分组到给定大小的列表中,并在窗口满时将窗口传递到下游。windowSliding
是一个有状态的多对多收集器,它将输入元素分组到给定大小的列表中。在第一个窗口之后,每个后续窗口都是通过复制其前一个窗口,删除第一个元素并从输入流中附加下一个元素来创建的。
并行评估
收集器的并行评估分为两种不同的模式。当没有提供组合器时,流库仍然可以通过并行执行上游和下游操作来提取并行性,类似于可短路的 parallel().forEachOrdered()
操作。当提供了组合器时,并行评估类似于可短路的 parallel().reduce()
操作。
组合收集器
收集器通过 andThen(Gatherer)
方法支持组合,该方法连接两个收集器,其中第一个收集器产生第二个收集器可以消耗的元素。这使得通过组合更简单的收集器来创建复杂的收集器成为可能,就像 函数组合 一样。从语义上讲,
source.gather(a).gather(b).gather(c).collect(...)
等同于
source.gather(a.andThen(b).andThen(c)).collect(...)
收集器与收集器
Gatherer
接口的设计深受 Collector
设计的影响。主要区别是:
Gatherer
使用一个Integrator
而不是一个BiConsumer
进行每个元素的处理,因为它需要一个额外的输入参数用于Downstream
对象,并且因为它需要返回一个boolean
来指示处理是否应该继续。Gatherer
使用一个BiConsumer
作为其完成器而不是一个Function
,因为它需要一个额外的输入参数用于其Downstream
对象,并且因为它不能返回结果,因此是void
。
示例:拥抱流
有时,缺乏适当的中间操作会迫使我们将流评估为一个列表,并在循环中运行我们的分析逻辑。例如,假设我们有一个按时间顺序排列的温度读数流:
record Reading(Instant obtainedAt, int kelvins) {
Reading(String time, int kelvins) {
this(Instant.parse(time), kelvins);
}
static Stream<Reading> loadRecentReadings() {
// 在现实中,这些可以从文件、数据库、服务或其他地方读取
return Stream.of(
new Reading("2023-09-21T10:15:30.00Z", 310),
new Reading("2023-09-21T10:15:31.00Z", 312),
new Reading("2023-09-21T10:15:32.00Z", 350),
new Reading("2023-09-21T10:15:33.00Z", 310)
);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
进一步假设,我们想要检测这个流中的可疑变化,定义为在五秒的时间窗口内,连续两个读数之间的温度变化超过 30 开尔文:
boolean isSuspicious(Reading previous, Reading next) {
return next.obtainedAt().isBefore(previous.obtainedAt().plusSeconds(5))
&& (next.kelvins() > previous.kelvins() + 30
|| next.kelvins() < previous.kelvins() - 30);
}
2
3
4
5
这需要对输入流进行顺序扫描,所以我们必须避免声明式流处理,并强制实现我们的分析:
List<List<Reading>> findSuspicious(Stream<Reading> source) {
var suspicious = new ArrayList<List<Reading>>();
Reading previous = null;
boolean hasPrevious = false;
for (Reading next : source.toList()) {
if (!hasPrevious) {
hasPrevious = true;
previous = next;
} else {
if (isSuspicious(previous, next))
suspicious.add(List.of(previous, next));
previous = next;
}
}
return suspicious;
}
var result = findSuspicious(Reading.loadRecentReadings());
// result ==> [[Reading[obtainedAt=2023-09-21T10:15:31Z, kelvins=312],
// Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350]],
// [Reading[obtainedAt=2023-09-21T10:15:32Z, kelvins=350],
// Reading[obtainedAt=2023-09-21T10:15:33Z, kelvins=310]]]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
然而,使用收集器,我们可以更简洁地表达这一点:
List<List<Reading>> findSuspicious(Stream<Reading> source) {
return source.gather(Gatherers.windowSliding(2))
.filter(window -> (window.size() == 2
&& isSuspicious(window.get(0),
window.get(1))))
.toList();
}
2
3
4
5
6
7
示例:定义收集器
在 Gatherers
类中声明的 windowFixed
收集器可以作为 Gatherer
接口的直接实现来编写:
record WindowFixed<TR>(int windowSize)
implements Gatherer<TR, ArrayList<TR>, List<TR>>
{
public WindowFixed {
// 验证输入
if (windowSize < 1)
throw new IllegalArgumentException("窗口大小必须为正整数");
}
@Override
public Supplier<ArrayList<TR>> initializer() {
// 创建一个 ArrayList 来保存当前打开的窗口
return () -> new ArrayList<>(windowSize);
}
@Override
public Integrator<ArrayList<TR>, TR, List<TR>> integrator() {
// 集成器在消耗每个元素时被调用
return Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
// 将元素添加到当前打开的窗口中
window.add(element);
// 在未达到所需窗口大小时,返回 true 表示需要更多元素
if (window.size() < windowSize)
return true;
// 当窗口已满时,通过创建副本关闭它
var result = new ArrayList<TR>(window);
// 清空窗口以便开始下一个
window.clear();
// 将关闭的窗口发送到下游
return downstream.push(result);
});
}
// 由于此操作本质上是顺序的,因此省略了组合器,不能并行化
@Override
public BiConsumer<ArrayList<TR>, Downstream<? super List<TR>>> finisher() {
// 完成器在没有更多元素从上游传递时运行
return (window, downstream) -> {
// 如果下游仍然接受更多元素且当前打开的窗口非空,则将其副本发送到下游
if(!downstream.isRejecting() &&!window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
示例用法:
jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(new WindowFixed(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
2
示例:临时收集器
windowFixed
收集器也可以通过 Gatherer.ofSequential(...)
工厂方法以临时方式编写:
/**
* 将元素收集到固定大小的组中。最后一组可能包含较少的元素。
* @param windowSize 组的最大大小
* @return 一个新的收集器,将元素收集到固定大小的组中
* @param <TR> 返回的收集器消耗和产生的元素类型
*/
static <TR> Gatherer<TR,?, List<TR>> fixedWindow(int windowSize) {
// 验证输入
if (windowSize < 1)
throw new IllegalArgumentException("窗口大小必须为非零值");
// 这个收集器本质上依赖于顺序,因此不应并行化
return Gatherer.ofSequential(
// 初始化器创建一个 ArrayList 来保存当前打开的窗口
() -> new ArrayList<TR>(windowSize),
// 集成器在消耗每个元素时被调用
Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
// 将元素添加到当前打开的窗口中
window.add(element);
// 在未达到所需窗口大小时,返回 true 表示需要更多元素
if (window.size() < windowSize)
return true;
// 当窗口已满时,通过创建副本关闭它
var result = new ArrayList<TR>(window);
// 清空窗口以便开始下一个
window.clear();
// 将关闭的窗口发送到下游
return downstream.push(result);
}),
// 由于此操作本质上是顺序的,因此省略了组合器,不能并行化
// 完成器在没有更多元素从上游传递时运行
(window, downstream) -> {
// 如果下游仍然接受更多元素且当前打开的窗口非空,则将其副本发送到下游
if(!downstream.isRejecting() &&!window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
}
);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
示例用法:
jshell> Stream.of(1,2,3,4,5,6,7,8,9).gather(fixedWindow(3)).toList()
$1 ==> [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
2
示例:可并行化的收集器
在并行流中使用时,只有当收集器提供组合器函数时才会并行评估。例如,这个可并行化的收集器根据提供的选择器函数最多发出一个元素:
static <TR> Gatherer<TR,?, TR> selectOne(BinaryOperator<TR> selector) {
// 验证输入
Objects.requireNonNull(selector, "选择器不能为空");
// 用于跨元素跟踪信息的私有状态
class State {
TR value; // 当前最佳值
boolean hasValue; // 当 value 持有有效值时为 true
}
// 使用 `of` 工厂方法构建一个收集器,给定一组用于 `initializer`、`integrator`、`combiner` 和 `finisher` 的函数
return Gatherer.of(
// 初始化器创建一个新的 State 实例
State::new,
// 集成器;在这种情况下,我们使用 `ofGreedy` 表示这个集成器永远不会短路
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
if (!state.hasValue) {
// 第一个元素,只需保存它
state.value = element;
state.hasValue = true;
} else {
// 选择两个值中的哪一个要保存,并保存它
state.value = selector.apply(state.value, element);
}
return true;
}),
// 组合器,在并行评估期间使用
(leftState, rightState) -> {
if (!leftState.hasValue) {
// 如果左边没有值,返回右边
return rightState;
} else if (!rightState.hasValue) {
// 如果右边没有值,返回左边
return leftState;
} else {
// 如果两边都有值,选择其中一个保留并将其存储在 leftState 中,因为它将被返回
leftState.value = selector.apply(leftState.value,
rightState.value);
return leftState;
}
},
// 完成器
(state, downstream) -> {
// 如果有值,则将选定的值发送到下游
if (state.hasValue)
downstream.push(state.value);
}
);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
示例用法,在随机整数流上:
jshell> Stream.generate(() -> ThreadLocalRandom.current().nextInt())
.limit(1000) // 取前 1000 个元素
.gather(selectOne(Math::max)) // 选择看到的最大值
.parallel() // 并行执行
.findFirst() // 提取最大值
$1 ==> Optional[99822]
2
3
4
5
6
替代方案
我们在 单独的设计文档 中探索了替代方案。
风险和假设
使用自定义收集器以及在
Gatherers
类中声明的内置收集器,不会像使用在Stream
类中声明的内置中间操作那样简洁。然而,自定义收集器的定义在复杂性上与用于终端collect
操作的自定义收集器的定义相似。此外,使用自定义和内置收集器在复杂性上与使用自定义收集器和在Collectors
类中声明的内置收集器相似。在预览此功能的过程中,我们可能会修改内置收集器的集合,并且在未来的版本中可能会修改内置收集器的集合。
我们不会为在
Gatherers
类中定义的每个内置收集器向Stream
类添加新的中间操作,尽管为了一致性很想这样做。为了保持Stream
类的可学习性,我们只会在经验表明它们广泛有用后才考虑向其添加新的中间操作。我们可能会在后续的预览中添加这样的方法,甚至在这个功能最终确定后添加。现在公开新的内置收集器并不排除以后添加专门的Stream
方法。