JEP 437: Structured Concurrency (Second Incubator) | 结构化并发(第二次孵化)
摘要
通过引入一个用于“结构化并发”的 API 来简化多线程编程。结构化并发将在不同线程中运行的多个任务视为一个工作单元,从而简化错误处理和取消操作,提高可靠性并增强可观察性。这是一个 孵化中的 API。
历史
结构化并发由 JEP 428 提出,并在 JDK 19 中作为孵化中的 API 交付。这个 JEP 提议在 JDK 20 中重新孵化这个 API(不做任何改变),以便有更多时间收集反馈并获得更多关于这个特性的经验。
重新孵化的 API 中唯一的变化是 “StructuredTaskScope” 被更新为支持在任务范围内创建的线程继承作用域值(JEP 429)。这简化了跨线程共享不可变数据的过程。
目标
- 提高多线程代码的可维护性、可靠性和可观察性。
- 推广一种并发编程风格,这种风格可以消除由取消和关闭引起的常见风险,如线程泄漏和取消延迟。
非目标
- 不是要替换“java.util.concurrent”包中的任何并发构造,如“ExecutorService”和“Future”。
- 不是要为 Java 定义最终的结构化并发 API。其他结构化并发构造可以由第三方库定义或在未来的 JDK 版本中定义。
- 不是要定义一种在线程之间共享数据流的方法(即“通道”)。我们可能会在未来提议这样做。
- 不是要用新的线程取消机制替换现有的线程中断机制。我们可能会在未来提议这样做。
动机
开发人员通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果子任务彼此足够独立,并且如果有足够的硬件资源,那么通过并发执行子任务可以使整个任务运行得更快(即具有更低的延迟)。例如,如果每个 I/O 操作在自己的线程中并发执行,那么一个由多个 I/O 操作结果组成的任务将运行得更快。虚拟线程(JEP 425)使得为每个这样的 I/O 操作专门分配一个线程变得经济高效,但管理可能由此产生的大量线程仍然是一个挑战。
使用“ExecutorService”的非结构化并发
在 Java 5 中引入的 “java.util.concurrent.ExecutorService” API 帮助开发人员并发执行子任务。
例如,这里有一个方法“handle()”,它代表服务器应用程序中的一个任务。它通过向“ExecutorService”提交两个子任务来处理传入的请求。一个子任务执行“findUser()”方法,另一个子任务执行“fetchOrder()”方法。“ExecutorService”立即为每个子任务返回一个 “Future”,并在自己的线程中执行每个子任务。“handle()”方法通过阻塞调用子任务的 Future 的 “get()” 方法等待子任务的结果,因此该任务被称为“连接”它的子任务。
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // 连接 findUser
int theOrder = order.get(); // 连接 fetchOrder
return new Response(theUser, theOrder);
}
2
3
4
5
6
7
因为子任务并发执行,所以每个子任务可以独立成功或失败。(在这种情况下,失败意味着抛出异常。)通常,如果“handle()”这样的任务的任何子任务失败,该任务应该失败。当发生失败时,理解线程的生命周期可能会非常复杂:
- 如果“findUser()”抛出异常,那么在调用“user.get()”时“handle()”将抛出异常,但“fetchOrder()”将继续在自己的线程中运行。这是一个“线程泄漏”,在最好的情况下,这会浪费资源;在最坏的情况下,“fetchOrder()”线程会干扰其他任务。
- 如果执行“handle()”的线程被中断,中断不会传播到子任务。“findUser()”和“fetchOrder()”线程都会泄漏,即使“handle()”已经失败,它们也会继续运行。
- 如果“findUser()”执行时间很长,但与此同时“fetchOrder()”失败,那么“handle()”将不必要地等待“findUser()”,通过阻塞在“user.get()”上而不是取消它。只有在“findUser()”完成并且“user.get()”返回后,“order.get()”才会抛出异常,导致“handle()”失败。
在每种情况下,问题是我们的程序在逻辑上是由任务 - 子任务关系构建的,但这些关系只存在于开发人员的脑海中。这不仅增加了出错的可能性,而且使诊断和解决此类错误更加困难。例如,像线程转储这样的可观察性工具将在不相关线程的调用栈上显示“handle()”、“findUser()”和“fetchOrder()”,而没有任务 - 子任务关系的提示。
我们可以尝试通过在发生错误时显式取消其他子任务来做得更好,例如通过用“try-finally”包装任务,并在失败任务的 catch 块中调用其他任务的 Future 的 “cancel(boolean)” 方法。我们还需要在 “try-with-resources 语句” 中使用“ExecutorService”,如 JEP 425 中的示例所示,因为“Future”没有提供一种等待已被取消的任务的方法。但所有这些都很难正确实现,而且它常常使代码的逻辑意图更难辨别。跟踪任务间的关系,并手动添加所需的任务间取消边,这对开发人员要求很高。
这种需要手动协调生命周期的原因是“ExecutorService”和“Future”允许无限制的并发模式。所涉及的任何线程都没有约束或顺序。一个线程可以创建一个“ExecutorService”,第二个线程可以向它提交工作,而执行工作的线程与第一个或第二个线程都没有关系。此外,在一个线程提交工作后,一个完全不同的线程可以等待执行结果。任何具有对“Future”引用的代码都可以连接它(即通过调用“get()”等待其结果),甚至是在获得“Future”的线程以外的线程中的代码。实际上,由一个任务启动的子任务不必返回到提交它的任务。它可以返回到许多任务中的任何一个——甚至没有任务。
因为“ExecutorService”和“Future”允许这样的非结构化使用,所以它们不强制执行甚至不跟踪任务和子任务之间的关系,尽管这样的关系很常见且很有用。因此,即使子任务在同一任务中提交和连接,一个子任务的失败也不能自动导致另一个子任务的取消:在上面的“handle()”方法中,“fetchOrder()”的失败不能自动导致“findUser()”的取消。“fetchOrder()”的 Future 与“findUser()”的 Future 无关,并且两者都与最终将通过其“get()”方法连接它的线程无关。我们不想让开发人员手动管理这样的取消,而是想可靠地自动化它。
任务结构应反映代码结构
与“ExecutorService”下随意的线程集合相比,单线程代码的执行始终强制执行任务和子任务的层次结构。方法的主体块“{...}”对应一个任务,在块内调用的方法对应子任务。被调用的方法必须返回到调用它的方法,或者向调用它的方法抛出异常。它不能比调用它的方法存活时间更长,也不能返回到或向不同的方法抛出异常。因此,所有子任务在任务之前完成,每个子任务都是其父任务的子任务,并且每个子任务相对于其他子任务和任务的生命周期由代码的语法块结构控制。
例如,在这个单线程版本的“handle()”中,任务 - 子任务关系从语法结构中显而易见:
Response handle() throws IOException {
String theUser = findUser();
int theOrder = fetchOrder();
return new Response(theUser, theOrder);
}
2
3
4
5
在“findUser()”子任务完成(无论成功与否)之前,我们不会启动“fetchOrder()”子任务。如果“findUser()”失败,那么我们根本不会启动“fetchOrder()”,并且“handle()”任务隐式失败。子任务只能返回到其父任务这一事实很重要:这意味着父任务可以隐式地将一个子任务的失败视为触发取消所有剩余子任务然后自身失败的信号。
在单线程代码中,任务 - 子任务层次结构在运行时在调用栈中被具体化。因此,我们免费获得了相应的父子关系,这些关系控制错误传播。当观察单个线程时,层次关系很明显:“findUser()”(以及稍后的“fetchOrder()”)看起来从属于“handle()”。
如果任务与其子任务之间的父子关系在语法上得到表达并在运行时被具体化,那么多线程编程将更容易、更可靠且更具可观察性——就像单线程代码一样。语法结构将描绘子任务的生命周期,并实现线程间层次结构的运行时表示,类似于线程内的调用栈。该表示将实现错误传播和取消,以及对并发程序的有意义观察。
(Java 已经有一个用于在并发任务上强加结构的 API,即“java.util.concurrent.ForkJoinPool”,它是并行流背后的执行引擎。然而,该 API 是为计算密集型任务而设计的,而不是涉及 I/O 的任务。)
结构化并发
“结构化并发”是一种多线程编程方法,它保留了单线程代码的可读性、可维护性和可观察性。它体现了以下原则:
“如果一个任务拆分为并发子任务,那么它们都返回到同一个地方,即任务的代码块。”
“结构化并发”一词由 Martin Sústrik 创造,并由 Nathaniel J. Smith 推广。来自其他语言的想法,如 Erlang 的分层监督者,为结构化并发中的错误处理设计提供了信息。
在结构化并发中,子任务代表一个任务工作。任务等待子任务的结果并监视它们是否失败。与单线程代码的结构化编程技术一样,多线程的结构化并发的强大之处来自两个想法:(1)为通过代码块的执行流定义明确的入口和出口点,以及(2)以反映其在代码中的语法嵌套的方式严格嵌套操作的生命周期。
因为代码块的入口和出口点是明确定义的,所以并发子任务的生命周期被限制在其父任务的语法块中。因为兄弟子任务的生命周期嵌套在其父任务的生命周期内,所以它们可以作为一个单元进行推理和管理。因为父任务的生命周期又嵌套在其父任务的生命周期内,所以运行时可以将任务层次结构具体化为一棵树。这棵树是单个线程调用栈的并发对应物,可观察性工具可以使用它将子任务显示为从属于其父任务。
结构化并发与虚拟线程非常匹配,虚拟线程是由 JDK 实现的轻量级线程。许多虚拟线程共享同一个操作系统线程,允许有非常大量的虚拟线程。除了数量丰富之外,虚拟线程足够便宜,可以表示任何并发行为单元,甚至涉及 I/O 的行为。这意味着服务器应用程序可以使用结构化并发同时处理数千或数百万个传入请求:它可以为处理每个请求的任务专门分配一个新的虚拟线程,并且当一个任务通过提交子任务进行并发执行时,它可以为每个子任务专门分配一个新的虚拟线程。在幕后,任务 - 子任务关系通过为每个虚拟线程携带对其唯一父线程的引用来具体化为一棵树,类似于调用栈中的帧如何引用其唯一调用者。
总之,虚拟线程提供了大量的线程。结构化并发确保它们被正确且稳健地协调,并使可观察性工具能够按照开发人员的理解显示线程。在 JDK 中拥有一个用于结构化并发的 API 将提高服务器应用程序的可维护性、可靠性和可观察性。
描述
结构化并发 API 的主要类是 “StructuredTaskScope”。这个类允许开发人员将一个任务构建为一个并发子任务的家族,并将它们作为一个单元进行协调。子任务通过单独“分叉”在自己的线程中执行,然后作为一个单元“连接”,并可能作为一个单元取消。子任务的成功结果或异常由父任务聚合和处理。“StructuredTaskScope”将子任务(或“分叉”)的生命周期限制在一个明确的 词法作用域 中,在这个作用域中,任务与其子任务的所有交互——分叉、连接、取消、处理错误和组合结果——都发生。
这是前面的“handle()”示例,使用“StructuredTaskScope”编写(“ShutdownOnFailure”在 下面 解释):
Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> findUser());
Future<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // 连接两个分叉
scope.throwIfFailed(); //...并传播错误
// 在这里,两个分叉都已成功,所以组合它们的结果
return new Response(user.resultNow(), order.resultNow());
}
}
2
3
4
5
6
7
8
9
10
11
12
与原始示例相比,理解这里涉及的线程的生命周期很容易:在所有情况下,它们的生命周期都被限制在一个词法作用域内,即“try-with-resources”语句的主体。此外,使用“StructuredTaskScope”确保了许多有价值的属性:
“带短路的错误处理”——如果“findUser()”或“fetchOrder()”子任务失败,如果另一个子任务尚未完成,则将其取消。(这由“ShutdownOnFailure”实现的取消策略管理;其他策略也是可能的)。
“取消传播”——如果在调用“join()”之前或期间运行“handle()”的线程被中断,当线程退出作用域时,两个分叉将自动取消。
“清晰性”——上面的代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务的结果)还是失败(并且子任务已经完成,所以没有更多需要清理的)。
“可观察性”——如 下面 所述,线程转储清楚地显示任务层次结构,运行“findUser()”和“fetchOrder()”的线程显示为作用域的子线程。
与“ExecutorService.submit(...)”一样,“StructuredTaskScope.fork(...)”方法接受一个“Callable”并返回一个“Future”。然而,与“ExecutorService”不同,返回的 Future 不是通过其“get()”方法连接或通过其“cancel()”方法取消的。相反,作用域中的所有分叉都是作为一个单元进行连接或取消的。两个新的“Future”方法,“resultNow()” 和 “exceptionNow()”,设计用于子任务完成后使用,例如在调用“scope.join()”之后。
使用“StructuredTaskScope”
使用“StructuredTaskScope”的代码的一般工作流程如下:
- 创建一个作用域。创建作用域的线程是其“所有者”。
- 在作用域中分叉并发子任务。
- 作用域中的任何分叉或作用域的所有者可以调用作用域的 “shutdown()” 方法来请求取消所有剩余的子任务。
- 作用域的所有者将作用域(即其所有分叉)作为一个单元进行连接。所有者可以调用作用域的 “join()” 方法,该方法会阻塞直到所有分叉都已完成(无论成功与否)或通过“shutdown()”被取消。或者,所有者可以调用作用域的 “joinUntil(java.time.Instant)” 方法,该方法接受一个截止时间。
- 连接后,处理分叉中的任何错误并处理它们的结果。
- 关闭作用域,通常通过“try-with-resources”隐式地进行。这将关闭作用域并等待任何滞后的分叉完成。
如果所有者是现有作用域的成员(即作为其中的一个分叉创建),那么那个作用域将成为新作用域的父作用域。任务因此形成一棵树,作用域作为中间节点,线程作为叶子。
每个分叉在其自己新创建的线程中运行,默认情况下是一个虚拟线程。分叉的线程由作用域拥有,而作用域又由其创建线程拥有,从而形成一个层次结构。任何分叉都可以创建自己嵌套的“StructuredTaskScope”来分叉自己的子任务,从而扩展层次结构。该层次结构反映在代码的块结构中,它限制了分叉的生命周期:一旦作用域关闭,所有分叉的线程都保证已终止,并且当块退出时不会留下任何线程。
作用域中的任何分叉、嵌套作用域中的任何分叉以及作用域的所有者可以随时调用作用域的“shutdown()”方法来表示任务已完成——即使其他分叉仍在运行。“shutdown()”方法会 中断 作用域中仍处于活动状态的所有分叉的线程。因此,所有分叉都应该以对中断有响应的方式编写。实际上,“shutdown()”是顺序代码中“break”语句的并发类比。
当“join()”返回时,所有分叉都已完成(无论成功与否)或已被取消。它们的结果或异常可以通过其 Future 的 “resultNow()” 或 “exceptionNow()” 方法在没有任何额外阻塞的情况下获得。(如果在 Future 完成之前调用这些方法,它们将抛出“IllegalStateException”。)
在作用域内调用“join()”或“joinUntil()”是强制性的。如果作用域的块在连接之前退出,那么作用域将等待所有分叉终止,然后抛出异常。
作用域的所有者线程可能在连接之前或连接期间被中断。例如,它可能是已关闭的外部作用域的一个分叉。如果发生这种情况,那么“join()”和“joinUntil(Instant)”将抛出异常,因为继续下去没有意义。然后“try-with-resources”语句将关闭作用域,这将取消所有分叉并等待它们终止。这具有将任务的取消自动传播到其子任务的效果。如果“joinUntil(Instant)”方法的截止时间在分叉终止或调用“shutdown()”之前过期,那么它将抛出异常,并且再次,“try-with-resources”语句将关闭作用域。
“StructuredTaskScope”的结构化使用在运行时被强制执行。例如,尝试从不在作用域的树层次结构中的线程(即所有者、分叉以及嵌套作用域中的分叉)调用“fork(Callable)”将失败并抛出异常。在“try-with-resources”块之外使用作用域并且在不调用“close()”或不保持正确嵌套的“close()”调用的情况下返回,可能会导致作用域的方法抛出“StructureViolationException”。
“StructuredTaskScope”对并发操作强制执行结构和顺序。因此,它不实现“ExecutorService”或“Executor”接口,因为这些接口的实例通常以非结构化的方式使用(见 下面)。然而,将使用“ExecutorService”但可以从结构中受益的代码迁移到使用“StructuredTaskScope”是很直接的。
“StructuredTaskScope”位于一个 孵化模块 中,默认情况下被排除
上面的例子使用了“StructuredTaskScope” API,所以要在 JDK XX 上运行它们,你必须添加“jdk.incubator.concurrent”模块,并且还要启用预览功能以启用虚拟线程:
- 使用“javac --release XX --enable-preview --add-modules jdk.incubator.concurrent Main.java”编译程序,并使用“java --enable-preview --add-modules jdk.incubator.concurrent Main”运行它;或者,
- 当使用 源代码启动器 时,使用“java --source XX --enable-preview --add-modules jdk.incubator.concurrent Main.java”运行程序;或者,
- 当使用 jshell 时,使用“jshell --enable-preview --add-modules jdk.incubator.concurrent”启动它。
关闭策略
在处理并发子任务时,通常使用“短路模式”来避免做不必要的工作。例如,有时如果其中一个子任务失败(即“全部调用”)或如果其中一个子任务成功(即“任何调用”)就取消所有子任务是有意义的。“StructuredTaskScope”的两个子类,“ShutdownOnFailure” 和 “ShutdownOnSuccess”,通过分别在第一个分叉失败或成功时关闭作用域的策略来支持这些模式。它们还提供了处理异常和成功结果的方法。
这是一个具有失败时关闭策略的“StructuredTaskScope”(也在上面的“handle()”示例中使用),它并发运行一组任务,如果其中任何一个任务失败,则整个任务失败:
<T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<Future<T>> futures = tasks.stream().map(scope::fork).toList();
scope.join();
scope.throwIfFailed(e -> e); // 如果任何分叉失败,则按原样传播异常
// 在这里,所有任务都已成功,所以组合它们的结果
return futures.stream().map(Future::resultNow).toList();
}
}
2
3
4
5
6
7
8
9
这是一个具有成功时关闭策略的“StructuredTaskScope”,它返回第一个成功子任务的结果:
<T> T race(List<Callable<T>> tasks, Instant deadline) throws ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
scope.joinUntil(deadline);
return scope.result(); // 如果没有分叉成功完成,则抛出异常
}
}
2
3
4
5
6
7
8
9
一旦一个分叉成功,这个作用域就会自动关闭,取消剩余的活动分叉。如果所有分叉都失败或给定的截止时间过期,任务就会失败。这种模式在例如需要从一组冗余服务中的任何一个获得结果的服务器应用程序中很有用。
虽然这两种关闭策略是现成提供的,但开发人员可以通过扩展“StructuredTaskScope”并覆盖 “handleComplete(Future)” 方法来创建抽象其他模式的自定义策略。
扇入场景
上面的例子集中在“扇出”场景,它管理多个并发的传出 I/O 操作。“StructuredTaskScope”在“扇入”场景中也很有用,它管理多个并发的传入 I/O 操作。在这种场景中,我们通常会响应传入请求创建未知数量的分叉。这是一个在“StructuredTaskScope”内分叉子任务以处理传入连接的服务器的示例:
void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
try (var scope = new StructuredTaskScope<Void>()) {
try {
while (true) {
var socket = serverSocket.accept();
scope.fork(() -> handle(socket));
}
} finally {
// 如果发生错误或我们被中断,我们停止接受连接
scope.shutdown(); // 关闭所有活动连接
scope.join();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
因为所有连接处理子任务都是在作用域内创建的,所以线程转储将把它们显示为作用域所有者的子线程。
可观察性
我们扩展了由 JEP 425 添加的新 JSON 线程转储格式,以显示“StructuredTaskScope”将线程分组为层次结构:
$ jcmd <pid> Thread.dump_to_file -format=json <file>
每个作用域的 JSON 对象包含在该作用域中分叉的线程数组以及它们的堆栈跟踪。作用域的所有者线程通常会在连接方法中被阻塞,等待子任务完成;线程转储通过显示结构化并发强加的树层次结构,使得很容易看到子任务的线程在做什么。作用域的 JSON 对象也有一个对其父作用域的引用,以便可以从转储中重构程序的结构。
“com.sun.management.HotSpotDiagnosticsMXBean” API 也可以直接或通过平台 “MBeanServer” 以及本地或远程 JMX 工具间接生成这样的线程转储。
替代方案
- 什么都不做。让开发人员继续使用现有的低级“java.util.concurrent” API,并继续不得不仔细考虑并发代码中出现的所有异常情况和生命周期协调问题。
- 增强“ExecutorService”接口。我们制作了这个接口的一个实现原型,它始终强制执行结构并限制哪些线程可以提交任务。然而,我们发现这是有问题的,因为在 JDK 和生态系统中,“ExecutorService”(及其父接口“Executor”)的大多数使用都是非结构化的。将相同的 API 用于一个限制多得多的概念肯定会引起混淆。例如,在大多数情况下,将一个结构化的“ExecutorService”实例传递给接受此类型的现有方法几乎肯定会抛出异常。