JEP 453: Structured Concurrency (Preview) | 结构化并发(预览)
摘要
通过引入 结构化并发 的 API 来简化并发编程。结构化并发将在不同线程中运行的相关任务组视为一个工作单元,从而简化错误处理和取消操作,提高可靠性并增强可观察性。这是一个 预览 API。
历史
结构化并发由 JEP 428 提出,并在 JDK 19 中作为孵化 API 交付。它在 JDK 20 中由 JEP 437 再次孵化,并进行了小的更新以继承作用域值(JEP 429)。
我们在此提议在 java.util.concurrent
包中将结构化并发设为预览 API。唯一的重大变化是,StructuredTaskScope
的 fork(...)
方法返回一个 Subtask
而不是一个 Future
,如下文 #为什么 fork 不返回 Future? 中所讨论的。
目标
推广一种并发编程风格,这种风格可以消除由取消和关闭引起的常见风险,如线程泄漏和取消延迟。
提高并发代码的可观察性。
非目标
不是目标去替换
java.util.concurrent
包中的任何并发构造,如ExecutorService
和Future
。不是目标去为 Java 平台定义最终的结构化并发 API。其他结构化并发构造可以由第三方库定义或在未来的 JDK 版本中定义。
不是目标去定义一种在线程之间共享数据流的方法(即 通道)。我们可能在未来提议这样做。
不是目标用新的线程取消机制替换现有的线程中断机制。我们可能在未来提议这样做。
动机
开发人员通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果子任务彼此足够独立,并且如果有足够的硬件资源,那么通过并发执行子任务可以使整个任务运行得更快(即具有更低的延迟)。例如,如果每个 I/O 操作在其自己的线程中并发执行,那么组合多个 I/O 操作结果的任务将运行得更快。虚拟线程(JEP 444)使得为每个这样的 I/O 操作分配一个线程具有成本效益,但管理可能产生的大量线程仍然是一个挑战。
使用 ExecutorService
的非结构化并发
在 Java 5 中引入的 java.util.concurrent.ExecutorService
API 帮助开发人员并发执行子任务。
例如,这里有一个方法 handle()
,它在服务器应用程序中表示一个任务。它通过向一个 ExecutorService
提交两个子任务来处理传入的请求。一个子任务执行 findUser()
方法,另一个子任务执行 fetchOrder()
方法。ExecutorService
立即为每个子任务返回一个 Future
,并根据 Executor
的调度策略并发执行子任务。handle()
方法通过阻塞调用子任务的未来的 get()
方法来等待子任务的结果,因此该任务被称为 连接 它的子任务。
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // 连接 findUser
int theOrder = order.get(); // 连接 fetchOrder
return new Response(theUser, theOrder);
}
2
3
4
5
6
7
因为子任务并发执行,所以每个子任务可以独立成功或失败。(在这种情况下,失败意味着抛出异常。)通常,如果任务(如 handle()
)的任何子任务失败,该任务应该失败。当发生失败时,理解线程的生命周期可能会非常复杂:
如果
findUser()
抛出异常,那么在调用user.get()
时handle()
将抛出异常,但fetchOrder()
将在其自己的线程中继续运行。这是一个 线程泄漏,在最好的情况下,这会浪费资源;在最坏的情况下,fetchOrder()
线程会干扰其他任务。如果执行
handle()
的线程被中断,中断不会传播到子任务。findUser()
和fetchOrder()
线程都会泄漏,即使在handle()
失败后仍会继续运行。如果
findUser()
执行时间很长,但与此同时fetchOrder()
失败,那么handle()
将不必要地等待findUser()
,通过阻塞在user.get()
上,而不是取消它。只有在findUser()
完成并且user.get()
返回后,order.get()
才会抛出异常,导致handle()
失败。
在每种情况下,问题是我们的程序在逻辑上是由任务 - 子任务关系构建的,但这些关系只存在于开发人员的脑海中。
这不仅增加了出错的可能性,而且使诊断和解决此类错误更加困难。例如,像线程转储这样的可观察性工具将在不相关的线程的调用栈上显示 handle()
、findUser()
和 fetchOrder()
,而没有任务 - 子任务关系的提示。
我们可以尝试通过在发生错误时显式取消其他子任务来做得更好,例如通过用 try-finally
包装任务,并在失败任务的 catch 块中调用其他任务的未来的 cancel(boolean)
方法。我们还需要在 try
-with-resources 语句 中使用 ExecutorService
,如 JEP 425 中的示例所示,因为 Future
没有提供等待已被取消的任务的方法。但是所有这些都很难正确实现,并且它经常使代码的逻辑意图更难辨别。跟踪任务间的关系,并手动添加所需的任务间取消边,这对开发人员要求很高。
这种手动协调生命周期的需求是由于 ExecutorService
和 Future
允许无限制的并发模式。所涉及的任何线程都没有约束或顺序。一个线程可以创建一个 ExecutorService
,第二个线程可以向它提交工作,而执行工作的线程与第一个或第二个线程都没有关系。此外,在一个线程提交工作后,一个完全不同的线程可以等待执行结果。任何具有对 Future
的引用的代码都可以连接它(即通过调用 get()
等待其结果),甚至是在获得 Future
的线程以外的线程中的代码。实际上,一个任务启动的子任务不必返回到提交它的任务。它可以返回到许多任务中的任何一个——甚至没有。
因为 ExecutorService
和 Future
允许这样的非结构化使用,所以它们不强制执行甚至不跟踪任务和子任务之间的关系,即使这样的关系是常见且有用的。因此,即使子任务在同一任务中提交和连接,一个子任务的失败也不能自动导致另一个子任务的取消:在上面的 handle()
方法中,fetchOrder()
的失败不能自动导致 findUser()
的取消。fetchOrder()
的未来与 findUser()
的未来无关,并且两者都与最终通过其 get()
方法连接它的线程无关。我们不想让开发人员手动管理这样的取消,而是想可靠地自动化它。
任务结构应反映代码结构
与 ExecutorService
下的自由运行的线程集合相比,单线程代码的执行总是强制执行任务和子任务的层次结构。方法的主体块 {...}
对应一个任务,在块内调用的方法对应子任务。被调用的方法必须返回调用它的方法,或者向调用它的方法抛出异常。它不能比调用它的方法寿命更长,也不能向不同的方法返回或抛出异常。因此,所有子任务在任务之前完成,每个子任务是其父任务的子任务,并且每个子任务相对于其他子任务和任务的寿命由代码的语法块结构控制。
例如,在这个单线程版本的 handle()
中,任务 - 子任务关系从语法结构中很明显:
Response handle() throws IOException {
String theUser = findUser();
int theOrder = fetchOrder();
return new Response(theUser, theOrder);
}
2
3
4
5
我们在 findUser()
子任务完成(无论成功与否)之前不会启动 fetchOrder()
子任务。如果 findUser()
失败,我们根本不会启动 fetchOrder()
,并且 handle()
任务隐式失败。子任务只能返回其父任务这一事实很重要:这意味着父任务可以隐式地将一个子任务的失败视为触发取消其他未完成子任务并随后自身失败的信号。
在单线程代码中,任务 - 子任务层次结构在运行时在调用栈中被具体化。因此,我们免费获得了相应的父子关系,这些关系控制错误传播。当观察一个单线程时,层次关系很明显:findUser()
(以及后来的 fetchOrder()
)显示为从属于 handle()
。这使得很容易回答“handle()
现在在做什么?”这个问题。
如果任务与其子任务之间的父子关系从代码的语法结构中显而易见,并且在运行时也被具体化——就像单线程代码一样,那么并发编程将更容易、更可靠且更具可观察性。语法结构将描绘子任务的寿命,并实现线程间层次结构的运行时表示,类似于单线程的调用栈。该表示将实现错误传播和取消,以及对并发程序的有意义观察。
(Java 平台已经有一个用于在并发任务上强加结构的 API,即 java.util.concurrent.ForkJoinPool
,它是并行流背后的执行引擎。然而,该 API 是为计算密集型任务而设计的,而不是涉及 I/O 的任务。)
结构化并发
结构化并发 是一种并发编程方法,它保留了任务和子任务之间的自然关系,从而产生更易读、可维护和可靠的并发代码。“结构化并发”一词由 Martin Sústrik 创造,并由 Nathaniel J. Smith 推广。来自其他语言的想法,如 Erlang 的分层监督者,为结构化并发中的错误处理设计提供了信息。
结构化并发源于一个简单的原则:
如果一个任务拆分为并发子任务,那么它们都返回同一个地方,即任务的代码块。
在结构化并发中,子任务代表一个任务工作。任务等待子任务的结果并监视它们是否失败。与单线程代码的结构化编程技术一样,多线程的结构化并发的力量来自两个想法:(1)通过代码块的执行流的明确定义的入口和出口点,以及(2)以反映其在代码中的语法嵌套的方式严格嵌套操作的寿命。
因为代码块的入口和出口点是明确定义的,所以并发子任务的寿命被限制在其父任务的语法块中。因为兄弟子任务的寿命嵌套在其父任务的寿命内,所以它们可以作为一个单元进行推理和管理。因为父任务的寿命又嵌套在其父任务的寿命内,所以运行时可以将任务层次结构具体化为一棵树,这是单线程调用栈的并发对应物。这允许代码将策略(如截止日期)应用于任务的整个子树,并允许可观察性工具将子任务显示为从属于其父任务。
结构化并发与虚拟线程非常匹配,虚拟线程是由 JDK 实现的轻量级线程。许多虚拟线程共享同一个操作系统线程,允许有非常大量的虚拟线程。除了数量丰富之外,虚拟线程足够便宜,可以表示任何并发行为单元,甚至涉及 I/O 的行为。这意味着服务器应用程序可以使用结构化并发同时处理数千或数百万个传入请求:它可以为处理每个请求的任务专门分配一个新的虚拟线程,并且当一个任务通过提交子任务进行并发执行而展开时,它可以为每个子任务专门分配一个新的虚拟线程。在幕后,通过安排每个虚拟线程携带对其唯一父线程的引用来将任务 - 子任务关系具体化为一棵树,类似于调用栈中的帧如何引用其唯一调用者。
总之,虚拟线程提供了大量的线程。结构化并发可以正确且稳健地协调它们,并使可观察性工具能够按照开发人员的理解显示线程。在 JDK 中有一个用于结构化并发的 API 将使构建可维护、可靠和可观察的服务器应用程序更容易。
描述
结构化并发 API 的主要类是 java.util.concurrent
包中的 StructuredTaskScope
。这个类允许开发人员将一个任务构建为一组并发子任务的家族,并将它们作为一个单元进行协调。子任务通过单独 分叉 然后作为一个单元 连接 并可能作为一个单元取消来在它们自己的线程中执行。子任务的成功结果或异常由父任务聚合和处理。StructuredTaskScope
将子任务的寿命限制在一个明确的 词法作用域 中,在该作用域中,任务与其子任务的所有交互——分叉、连接、取消、处理错误和组合结果——都发生。
这是前面的 handle()
示例,使用 StructuredTaskScope
编写(ShutdownOnFailure
在 #关闭策略 中解释):
Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> user = scope.fork(() -> findUser());
Supplier<Integer> order = scope.fork(() -> fetchOrder());
scope.join() // 连接两个子任务
.throwIfFailed(); //...并传播错误
// 在这里,两个子任务都已成功,所以组合它们的结果
return new Response(user.get(), order.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
与原始示例相比,理解这里涉及的线程的寿命很容易:在所有情况下,它们的寿命都被限制在一个词法作用域内,即 try
-with-resources 语句的主体。此外,使用 StructuredTaskScope
确保了许多有价值的属性:
带短路的错误处理——如果
findUser()
或fetchOrder()
子任务失败,如果另一个尚未完成,则将其取消。(这由ShutdownOnFailure
实现的关闭策略管理;其他策略也是可能的)。取消传播——如果在调用
join()
之前或期间运行handle()
的线程被中断,则当线程退出作用域时,两个子任务将自动取消。清晰性——上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务的结果)还是失败(并且子任务已经完成,所以没有更多要清理的)。
可观察性——如下文 #可观察性 所述,线程转储清楚地显示任务层次结构,运行
findUser()
和fetchOrder()
的线程显示为作用域的子线程。
StructuredTaskScope
是一个 预览 API,默认情况下是禁用的。
要使用 StructuredTaskScope
API,你必须启用预览 API,如下所示:
使用
javac --release 21 --enable-preview Main.java
编译程序,并使用java --enable-preview Main
运行它;或者,当使用 源代码启动器 时,使用
java --source 21 --enable-preview Main.java
运行程序;或者,当使用 jshell 时,使用
jshell --enable-preview
启动它。
使用 StructuredTaskScope
StructuredTaskScope
API 如下:
public class StructuredTaskScope<T> implements AutoCloseable {
public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public void shutdown();
public StructuredTaskScope<T> join() throws InterruptedException;
public StructuredTaskScope<T> joinUntil(Instant deadline)
throws InterruptedException, TimeoutException;
public void close();
protected void handleComplete(Subtask<? extends T> handle);
protected final void ensureOwnerAndJoined();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
使用 StructuredTaskScope
的代码的一般工作流程是:
创建一个作用域。创建作用域的线程是它的 所有者。
使用
fork(Callable)
方法在作用域中分叉子任务。在任何时候,任何子任务或作用域的所有者都可以调用作用域的
shutdown()
方法来取消未完成的子任务并阻止新子任务的分叉。作用域的所有者将作用域(即其所有子任务)作为一个单元进行连接。所有者可以调用作用域的
join()
方法,等待直到所有子任务都已完成(成功或不成功)或通过shutdown()
被取消。或者,它可以调用作用域的joinUntil(java.time.Instant)
方法,等待直到一个截止时间。连接后,处理子任务中的任何错误并处理它们的结果。
关闭作用域,通常通过
try
-with-resources 隐式地进行。如果作用域尚未关闭,这将关闭作用域,并等待任何已被取消但尚未完成的子任务完成。
每次调用 fork(...)
都会启动一个新线程来执行一个子任务,默认情况下子任务是一个虚拟线程。一个子任务可以创建自己的嵌套 StructuredTaskScope
来分叉自己的子任务,从而创建一个层次结构。这个层次结构反映在代码的块结构中,它限制了子任务的寿命:一旦作用域关闭,所有子任务的线程都保证已终止,并且当块退出时不会留下任何线程。
作用域中的任何子任务、嵌套作用域中的任何子子任务以及作用域的所有者可以在任何时候调用作用域的 shutdown()
方法来表示任务已完成——即使其他子任务仍在执行。shutdown()
方法 中断 仍在执行子任务的线程,并导致 join()
或 joinUntil(Instant)
方法返回。因此,所有子任务都应该以对中断有响应的方式编写。在调用 shutdown()
之后分叉的新子任务将处于 UNAVAILABLE
状态并且不会被运行。实际上,shutdown()
是顺序代码中 break
语句的并发类比。
在作用域内调用 join()
或 joinUntil(Instant)
是强制性的。如果作用域的块在连接之前退出,那么作用域将等待所有子任务终止,然后抛出异常。
作用域的所有者线程可能在连接之前或连接期间被中断。例如,它可能是已关闭的外部作用域的子任务。如果发生这种情况,join()
和 joinUntil(Instant)
将抛出异常,因为继续下去没有意义。然后 try
-with-resources 语句将关闭作用域,这将取消所有子任务并等待它们终止。这具有将任务的取消自动传播到其子任务的效果。如果 joinUntil(Instant)
方法的截止时间在子任务终止或调用 shutdown()
之前过期,那么它将抛出异常,并且再次,try
-with-resources 语句将关闭作用域。
当 join()
成功完成时,每个子任务要么已成功完成,要么失败,要么因为作用域被关闭而被取消。
连接后,作用域的所有者处理失败的子任务并处理成功完成的子任务的结果;这通常由关闭策略完成(见 下文)。成功完成的任务的结果可以使用 Subtask.get()
方法获得。get()
方法从不阻塞;如果在连接之前或子任务未成功完成时错误地调用它,它将抛出 IllegalStateException
。
在作用域中分叉的子任务继承 ScopedValue
绑定(JEP 446)。如果作用域的所有者从绑定的 ScopedValue
读取值,那么每个子任务将读取相同的值。
如果作用域的所有者本身是现有作用域的子任务,即它是作为分叉子任务创建的,那么那个作用域将成为新作用域的父作用域。作用域和子任务因此形成一棵树。
StructuredTaskScope
的结构化使用在运行时被强制执行。例如,尝试从不在作用域的树层次结构中的线程(即所有者、子任务和嵌套作用域中的子子任务)调用 fork(Callable)
将失败并抛出异常。在 try
-with-resources 块之外使用作用域并在不调用 close()
的情况下返回,或者不保持 close()
调用的正确嵌套,可能会导致作用域的方法抛出 StructureViolationException
。
StructuredTaskScope
对并发操作强制执行结构和顺序。因此,它不实现 ExecutorService
或 Executor
接口,因为这些接口的实例通常以非结构化的方式使用(见 下文)。然而,将使用 ExecutorService
但可以从结构中受益的代码迁移到使用 StructuredTaskScope
是很直接的。
在实践中,大多数使用 StructuredTaskScope
的情况不会直接使用 StructuredTaskScope
类,而是使用下一节中描述的两个实现关闭策略的子类之一。在其他情况下,用户可能会编写自己的子类来实现自定义关闭策略。
关闭策略
在处理并发子任务时,通常使用 短路模式 来避免做不必要的工作。例如,有时如果其中一个子任务失败(即 全部调用)或如果其中一个子任务成功(即 任何一个调用)就取消所有子任务是有意义的。StructuredTaskScope
的两个子类,ShutdownOnFailure
和 ShutdownOnSuccess
,通过在第一个子任务失败或成功时分别关闭作用域的策略来支持这些模式。
关闭策略还提供了集中处理异常以及可能的成功结果的方法。这符合结构化并发的精神,根据这种精神,整个作用域被视为一个单元。
这是一个具有失败时关闭策略的 StructuredTaskScope
(也在上面的 handle()
示例中使用),它并发运行一组任务,如果其中任何一个任务失败,整个任务就失败:
<T> List<T> runAll(List<Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<? extends Supplier<T>> suppliers = tasks.stream().map(scope::fork).toList();
scope.join()
.throwIfFailed(); // 如果任何子任务失败就传播异常
// 在这里,所有任务都已成功,所以组合它们的结果
return suppliers.stream().map(Supplier::get).toList();
}
}
2
3
4
5
6
7
8
9
10
这是一个具有成功时关闭策略的 StructuredTaskScope
,它返回第一个成功子任务的结果:
<T> T race(List<Callable<T>> tasks, Instant deadline)
throws InterruptedException, ExecutionException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
return scope.joinUntil(deadline)
.result(); // 如果没有子任务成功完成就抛出异常
}
}
2
3
4
5
6
7
8
9
10
一旦一个子任务成功,这个作用域就会自动关闭,取消未完成的子任务。如果所有子任务都失败或给定的截止时间过期,任务就失败。这种模式在例如需要从一组冗余服务中的任何一个获得结果的服务器应用程序中可能很有用。
虽然这两种关闭策略是现成提供的,但开发人员可以创建抽象其他模式的自定义策略(见 下文)。
处理结果
在连接并通过关闭策略集中处理异常(例如,使用 ShutdownOnFailure::throwIfFailed
)之后,作用域的所有者可以使用从对 fork(...)
的调用中返回的 Subtask
对象来处理子任务的结果,如果这些结果没有被策略处理(例如,通过 ShutdownOnSuccess::result()
)。
通常,作用域所有者将调用的唯一 Subtask
方法是 get()
方法。所有其他 Subtask
方法通常仅在自定义关闭策略的 handleComplete(...)
方法的实现中使用(见 下文)。实际上,我们建议将引用由 fork(...)
返回的 Subtask
的变量类型化为例如 Supplier<String>
而不是 Subtask<String>
(当然,除非你选择使用 var
)。如果关闭策略本身处理子任务结果——就像在 ShutdownOnSuccess
的情况下——那么应该完全避免由 fork(...)
返回的 Subtask
对象,并且将 fork(...)
方法视为返回 void
。子任务应该将作用域所有者在策略集中处理异常后应该处理的任何信息作为其结果返回。
如果作用域所有者处理子任务异常以产生复合结果,而不是使用关闭策略,那么异常可以作为值从子任务中返回。例如,这里有一个方法,它并行运行一个任务列表,并返回一个已完成的 Future
列表,其中包含每个任务各自的成功或异常结果:
<T> List<Future<T>> executeAll(List<Callable<T>> tasks)
throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<? extends Supplier<Future<T>>> futures = tasks.stream()
.map(task -> asFuture(task))
.map(scope::fork)
.toList();
scope.join();
return futures.stream().map(Supplier::get).toList();
}
}
static <T> Callable<Future<T>> asFuture(Callable<T> task) {
return () -> {
try {
return CompletableFuture.completedFuture(task.call());
} catch (Exception ex) {
return CompletableFuture.failedFuture(ex);
}
};
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
自定义关闭策略
可以扩展 StructuredTaskScope
,并覆盖其受保护的 handleComplete(...)
方法,以实现除 ShutdownOnSuccess
和 ShutdownOnFailure
之外的策略。子类可以,例如:
- 收集成功完成的子任务的结果并忽略失败的子任务,
- 当子任务失败时收集异常,或者
- 在出现某种情况时调用
shutdown()
方法关闭并使join()
唤醒。
当一个子任务完成时,即使在调用了 shutdown()
之后,它也会作为一个 Subtask
报告给 handleComplete(...)
方法:
public sealed interface Subtask<T> extends Supplier<T> {
enum State { SUCCESS, FAILED, UNAVAILABLE }
State state();
Callable<? extends T> task();
T get();
Throwable exception();
}
2
3
4
5
6
7
8
在调用 shutdown()
之前成功完成(SUCCESS
状态)或未成功完成(FAILED
状态)的子任务会调用 handleComplete(...)
方法。只有当子任务处于 SUCCESS
状态时才能调用 get()
方法,只有当子任务处于 FAILED
状态时才能调用 exception()
方法;在其他情况下调用 get()
或 exception()
将导致它们抛出 IllegalStateException
。UNAVAILABLE
状态表示以下情况之一:(1)子任务已分叉但尚未完成;(2)子任务在关闭后完成,或者(3)子任务在关闭后分叉,因此尚未开始。对于处于 UNAVAILABLE
状态的子任务,永远不会调用 handleComplete(...)
方法。
子类通常会定义方法,以便在 join()
方法返回后执行的代码可以使用结果、状态或其他结果。收集结果并忽略失败子任务的子类可以定义一个方法来返回结果的集合。实现当子任务失败时关闭的策略的子类可以定义一个方法来获取第一个失败子任务的异常。
这是一个 StructuredTaskScope
子类的示例,它收集成功完成的子任务的结果。它定义了 results()
方法,供主任务用来检索结果。
class MyScope<T> extends StructuredTaskScope<T> {
private final Queue<T> results = new ConcurrentLinkedQueue<>();
MyScope() { super(null, Thread.ofVirtual().factory()); }
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS)
results.add(subtask.get());
}
@Override
public MyScope<T> join() throws InterruptedException {
super.join();
return this;
}
// 返回成功完成的子任务的结果流
public Stream<T> results() {
super.ensureOwnerAndJoined();
return results.stream();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
这个自定义策略可以这样使用:
<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
try (var scope = new MyScope<T>()) {
for (var task : tasks) scope.fork(task);
return scope.join()
.results().toList();
}
}
2
3
4
5
6
7
扇入场景
上面的例子集中在 扇出 场景,它管理多个并发的传出 I/O 操作。StructuredTaskScope
在 扇入 场景中也很有用,它管理多个并发的传入 I/O 操作。在这种场景中,我们通常会响应传入请求创建未知数量的子任务。
这是一个服务器的示例,它在 StructuredTaskScope
内分叉子任务来处理传入连接:
void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
try (var scope = new StructuredTaskScope<Void>()) {
try {
while (true) {
var socket = serverSocket.accept();
scope.fork(() -> handle(socket));
}
} finally {
// 如果出现错误或我们被中断,我们停止接受
scope.shutdown(); // 关闭所有活动连接
scope.join();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
从并发的角度来看,这种场景的不同与其说是请求的方向,不如说是任务的持续时间和数量。在这里,与前面的例子不同,作用域的所有者的持续时间是无限制的——它只有在被中断时才会停止。子任务的数量也是未知的,因为它们是根据外部事件动态分叉的。
所有的连接处理子任务都是在作用域内创建的,所以在线程转储中很容易看到它们的目的,线程转储将把它们显示为作用域所有者的子线程。也很容易将整个服务作为一个单元关闭。
可观察性
我们扩展了由 JEP 444 添加的新 JSON 线程转储格式,以显示 StructuredTaskScope
将线程分组为层次结构:
$ jcmd <pid> Thread.dump_to_file -format=json <file>
每个作用域的 JSON 对象包含在该作用域中分叉的线程数组,以及它们的堆栈跟踪。作用域的所有者线程通常会在连接方法中被阻塞,等待子任务完成;线程转储通过显示结构化并发强加的树层次结构,使得很容易看到子任务的线程在做什么。作用域的 JSON 对象也有一个对其父作用域的引用,以便可以从转储中重构程序的结构。
com.sun.management.HotSpotDiagnosticsMXBean
API 也可以直接或通过平台的 MBeanServer
以及本地或远程 JMX 工具间接生成这样的线程转储。
为什么 fork(...)
不返回一个 Future
?
当 StructuredTaskScope
API 处于孵化阶段时,fork(...)
方法返回一个 Future
。这通过使 fork(...)
类似于现有的 ExecutorService::submit
方法提供了一种熟悉感。然而,鉴于 StructuredTaskScope
的使用方式与 ExecutorService
不同——以一种结构化的方式,如上文所述——使用 Future
带来的更多是困惑而不是清晰。
Future
的常见用法涉及调用其get()
方法,该方法会阻塞直到结果可用。但是在StructuredTaskScope
的上下文中,以这种方式使用Future
不仅不被鼓励,而且适得其反。结构化的Future
对象应该仅在join()
返回后查询,此时它们已知已完成或被取消,并且应该使用的方法不是熟悉的get()
,而是新引入的resultNow()
,它从不阻塞。一些开发人员想知道为什么
fork(...)
不返回更强大的CompletableFuture
对象。由于由fork(...)
返回的Future
应该仅在已知其已完成后使用,所以CompletableFuture
没有任何好处,因为它的高级功能仅对未完成的Future
有用。此外,CompletableFuture
是为异步编程范式设计的,而StructuredTaskScope
鼓励阻塞范式。
简而言之,Future
和 CompletableFuture
被设计为提供在结构化并发中适得其反的自由度。
- 结构化并发是将在不同线程中运行的多个任务视为一个工作单元,而
Future
在将多个任务视为单个任务时最有用。一个作用域应该仅阻塞一次以等待其子任务的结果,然后它应该集中处理异常。因此,在绝大多数情况下,对从fork(...)
返回的Future
应该调用的唯一方法是resultNow()
。这与Future
的普通用法有显著变化,并且Future
接口在这种情况下分散了对其正确用法的注意力。
在当前的 API 中,Subtask::get()
的行为与 API 处于孵化阶段时 Future::resultNow()
完全相同。
替代方案
- 增强
ExecutorService
接口。我们制作了这个接口的一个实现原型,它总是强制执行结构并限制哪些线程可以提交任务。然而,我们发现这是有问题的,因为在 JDK 和生态系统中,ExecutorService
(及其父接口Executor
)的大多数用法都是非结构化的。将相同的 API 用于一个限制更多的概念肯定会引起混淆。例如,在大多数情况下,将一个结构化的ExecutorService
实例传递给接受此类型的现有方法几乎肯定会抛出异常。