Skip to content

JEP 428: Structured Concurrency (Incubator) | 结构化并发(孵化器)

摘要

通过引入用于 结构化并发 的 API 来简化多线程编程。结构化并发将运行在不同线程中的多个任务视为单一工作单元,从而简化错误处理和取消操作,提高可靠性,并增强可观察性。这是一个 孵化中的 API

目标

  • 提高多线程代码的可维护性、可靠性和可观察性。
  • 推广一种并发编程风格,可以消除由取消和关闭操作引起的常见风险,如线程泄漏和取消延迟。

非目标

  • 不旨在替换 java.util.concurrent 包中的任何并发构造,如 ExecutorServiceFuture
  • 不旨在定义 Java 的确定性结构化并发 API。其他结构化并发构造可以由第三方库或在未来的 JDK 版本中定义。
  • 不旨在定义线程间共享数据流的方法(即 通道)。我们可能会在未来提出此类建议。
  • 不旨在用新的线程取消机制替换现有的线程中断机制。我们可能会在未来提出此类建议。

动机

开发人员通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果子任务之间具有足够的独立性,并且硬件资源充足,则可以通过并发执行子任务来加快整体任务的执行速度(即降低延迟)。例如,一个组合多个 I/O 操作结果的任务,如果每个 I/O 操作都在自己的线程中并发执行,则执行速度会更快。虚拟线程(JEP 425)使得为每个此类 I/O 操作分配一个线程变得经济高效,但管理可能产生的大量线程仍然是一个挑战。

使用 ExecutorService 的非结构化并发

Java 5 中引入的 java.util.concurrent.ExecutorService API,帮助开发者并发地执行子任务。

以下是一个名为 handle() 的方法示例,它代表服务器应用程序中的一个任务。它通过向 ExecutorService 提交两个子任务来处理一个传入的请求。一个子任务执行 findUser() 方法,另一个子任务执行 fetchOrder() 方法。ExecutorService 会立即为每个子任务返回一个 Future 对象,并在各自的线程中执行每个子任务。handle() 方法通过对其 Future 对象的 get() 方法的阻塞调用等待子任务的结果,因此称该任务 连接 其子任务。

java
Response handle() throws ExecutionException, InterruptedException {
    Future<String>  user  = esvc.submit(() -> findUser());
    Future<Integer> order = esvc.submit(() -> fetchOrder());
    String theUser  = user.get();   // 连接 findUser
    int    theOrder = order.get();  // 连接 fetchOrder
    return new Response(theUser, theOrder);
}

由于子任务并发执行,每个子任务都可以独立地成功或失败。(在此上下文中,失败意味着抛出异常。)通常,如果 handle() 这样的任务有任何子任务失败,则它也应该失败。当失败发生时,理解线程的生存期可能会变得异常复杂:

  • 如果 findUser() 抛出异常,则 handle() 在调用 user.get() 时将抛出异常,但 fetchOrder() 将继续在其自己的线程中运行。这是一个 线程泄漏,它最好的情况是浪费资源,最坏的情况是 fetchOrder() 线程会干扰其他任务。

  • 如果执行 handle() 的线程被中断,则中断不会传播到子任务。findUser()fetchOrder() 线程都将泄漏,甚至在 handle() 失败后也会继续运行。

  • 如果 findUser() 执行时间较长,但在此期间 fetchOrder() 失败,则 handle() 将不必要地在 user.get() 上阻塞等待 findUser() 的完成,而不是取消它。只有在 findUser() 完成后 user.get() 返回,order.get() 才会抛出异常,导致 handle() 失败。

在每种情况下,问题都在于我们的程序在逻辑上是按任务 - 子任务关系结构化的,但这些关系仅存在于开发者的脑海中。这不仅增加了出错的可能性,而且使得诊断和解决此类错误变得更加困难。例如,像线程转储这样的可观察性工具将在不相关线程的调用堆栈上显示 handle()findUser()fetchOrder(),而不会提示任务 - 子任务关系。

当发生错误时,我们可能会尝试通过显式取消其他子任务来做得更好,例如,使用 try-finally 块包装任务,并在失败任务的 catch 块中调用其他任务 Future 对象的 cancel(boolean) 方法。我们还需要在 try-with-resources 语句 中使用 ExecutorService,如 JEP 425 中的示例所示,因为 Future 没有提供等待已取消任务的方法。但要做到这一切并不容易,而且常常会使代码的逻辑意图更难以辨别。跟踪任务间的关系,并手动添加所需的任务间取消边,这对开发者来说是一个很大的要求。

这种需要手动协调生命周期的原因在于 ExecutorServiceFuture 允许无限制的并发模式。对涉及的任何线程都没有约束或排序。一个线程可以创建 ExecutorService,第二个线程可以向其提交工作,而执行工作的线程与第一个或第二个线程都没有关系。此外,在线程提交工作之后,完全不同的线程可以等待执行结果。任何具有 Future 引用的代码都可以加入它(即,通过调用 get() 等待其结果),甚至是在获得 Future 的线程之外的其他线程中的代码。实际上,一个任务启动的子任务不必返回给提交它的任务。它可以返回给多个任务中的任何一个——或者甚至不返回给任何任务。

由于 ExecutorServiceFuture 允许这种非结构化的使用,因此它们不会强制或甚至跟踪任务和子任务之间的关系,尽管这种关系很常见且有用。因此,即使子任务是在同一个任务中提交和连接的,一个子任务的失败也不能自动导致另一个子任务的取消:在上面的 handle() 方法中,fetchOrder() 的失败不能自动导致 findUser() 的取消。fetchOrder()FuturefindUser()Future 无关,并且它们都与最终将通过其 get() 方法连接它们的线程无关。与其要求开发者手动管理这种取消,我们更希望可靠地自动化它。

任务结构应反映代码结构

ExecutorService 下自由组合的线程相比,单线程代码的执行总是强制执行任务和子任务的层次结构。方法的体块 {...} 对应于一个任务,而在该块内调用的方法对应于子任务。被调用的方法必须返回给调用它的方法,或者向它抛出异常。它不能比调用它的方法更长寿,也不能返回或向不同的方法抛出异常。因此,所有子任务都在任务之前完成,每个子任务都是其父任务的子任务,并且每个子任务相对于其他子任务和任务的生命周期由代码的语法块结构控制。

例如,在 handle() 的这个单线程版本中,任务和子任务的关系从语法结构中显而易见:

java
Response handle() throws IOException {
    String theUser  = findUser();
    int    theOrder = fetchOrder();
    return new Response(theUser, theOrder);
}

findUser() 子任务成功或失败完成之前,我们不会启动 fetchOrder() 子任务。如果 findUser() 失败,则我们根本不会启动 fetchOrder(),且 handle() 任务会隐式失败。子任务只能向其父任务返回结果这一事实具有重要意义:它意味着父任务可以隐式地将一个子任务的失败视为触发取消所有剩余子任务并随后使自身失败的触发器。

在单线程代码中,任务 - 子任务层次结构在运行时通过调用栈具体化。因此,我们免费获得了相应的父子关系,这些关系控制着错误的传播。在观察单个线程时,层次关系显而易见:findUser()(以及稍后的 fetchOrder())似乎从属于 handle()

如果任务和其子任务之间的父子关系能够在语法上表达并在运行时具体化,就像单线程代码那样,那么多线程编程将变得更加简单、可靠和可观察。这种语法结构将界定子任务的生命周期,并能够在运行时表示线程间层次结构,类似于线程内调用栈。这种表示将支持错误传播和取消,以及对并发程序的有意义观察。

(Java 已经有一个用于对并发任务施加结构的 API,即 java.util.concurrent.ForkJoinPool,它是并行流背后的执行引擎。然而,该 API 是为计算密集型任务设计的,而不是涉及 I/O 的任务。)

结构化并发

结构化并发 是一种多线程编程方法,它保留了单线程代码的可读性、可维护性和可观察性。它体现了以下原则:

如果一个任务分解为并发子任务,那么这些子任务都会返回到同一个地方,即任务的代码块。

“结构化并发”这一术语由 Martin Sústrik 首创,并由 Nathaniel J. Smith 推广。来自其他语言(如 Erlang 的分层监视器)的思想为结构化并发中的错误处理设计提供了信息。

在结构化并发中,子任务代表任务执行。任务等待子任务的结果并监视它们是否失败。与单线程代码中结构化编程技术类似,结构化并发对多线程的强大功能来自两个想法:(1)通过代码块执行流的明确定义进入和退出点;(2)操作生命周期的严格嵌套,以反映它们在代码中的语法嵌套。

由于代码块的进入和退出点定义明确,因此并发子任务的生命周期被限制在其父任务的语法块内。由于同级子任务的生命周期嵌套在其父任务的生命周期内,因此可以将它们视为一个单元进行推理和管理。由于父任务的生命周期又嵌套在其父任务的生命周期内,因此运行时可以将任务层次结构实体化为树状结构。这棵树是单线程调用栈的并发对应物,可观察性工具可以使用它来表示子任务从属于其父任务。

结构化并发与 JDK 实现的轻量级线程(即虚拟线程)非常匹配。许多虚拟线程共享同一个操作系统线程,从而允许存在大量的虚拟线程。除了数量众多之外,虚拟线程的成本也足够低,可以表示任何并发行为单元,甚至是涉及 I/O 的行为。这意味着服务器应用程序可以使用结构化并发同时处理成千上万或数百万个传入请求:它可以为处理每个请求的任务分配一个新的虚拟线程,当任务通过提交子任务进行并发执行时,它可以为每个子任务分配一个新的虚拟线程。在幕后,通过为每个虚拟线程安排一个对其唯一父级的引用来实现任务 - 子任务关系的树状实体化,这类似于调用栈中的帧引用其唯一调用者的方式。

总之,虚拟线程提供了丰富的线程资源。结构化并发确保它们得到正确且稳健的协调,并使可观察性工具能够按照开发人员的理解显示线程。在 JDK 中提供结构化并发的 API 将提高服务器应用程序的可维护性、可靠性和可观察性。

描述

结构化并发 API 的主要类是 StructuredTaskScope。这个类允许开发者将任务组织为一组并发子任务,并将它们作为一个单元进行协调。子任务通过单独“分叉”并在之后作为一个单元“合并”来在它们自己的线程中执行,而且可能作为一个单元被取消。子任务的成功结果或异常会被父任务聚合并处理。StructuredTaskScope 将子任务(或“分叉”)的生命周期限制在一个明确的 词法作用域 内,其中任务与其子任务的所有交互(分叉、合并、取消、错误处理和结果组合)都发生在这个作用域内。

以下是之前 handle() 方法的示例,现在使用 StructuredTaskScopeShutdownOnFailure 策略的解释见 下文):

java
Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String>  user  = scope.fork(() -> findUser());
        Future<Integer> order = scope.fork(() -> fetchOrder());

        scope.join();           // 合并两个分叉
        scope.throwIfFailed();  // ... 并传播错误

        // 在这里,两个分叉都已成功完成,所以组合它们的结果
        return new Response(user.resultNow(), order.resultNow());
    }
}

与原始示例相比,理解这里涉及的线程的生命周期变得很容易:在所有情况下,它们的生命周期都被限制在一个词法作用域内,即 try-with-resources 语句的主体。此外,使用 StructuredTaskScope 确保了一系列有价值的特性:

  • 带短路功能的错误处理 —— 如果 findUser()fetchOrder() 子任务中的任何一个失败,另一个如果尚未完成则会被取消。(这是通过 ShutdownOnFailure 实现的取消策略来管理的;其他策略也是可能的)。

  • 取消传播 —— 如果在调用 join() 之前或期间运行 handle() 的线程被中断,当线程退出作用域时,两个分叉都会自动被取消。

  • 清晰性 —— 上面的代码结构清晰:设置子任务,等待它们完成或被取消,然后决定是继续执行(并处理已完成子任务的结果)还是失败(子任务已完成,因此无需进一步清理)。

  • 可观察性 —— 如 下文 所述,线程转储会清楚地显示任务层次结构,其中运行 findUser()fetchOrder() 的线程会显示为作用域的子节点。

ExecutorService.submit(...)类似,StructuredTaskScope.fork(...)方法接受一个Callable并返回一个Future。然而,与ExecutorService不同的是,返回的Future对象并不是为了通过其get()方法加入或通过其cancel()方法取消。相反,作用域内的所有分叉都是作为一个单元来加入或取消的。为此,设计了两个新的Future方法:resultNow()exceptionNow(),它们用于子任务完成后,例如在调用scope.join()之后。

使用 StructuredTaskScope

使用 StructuredTaskScope 的代码的一般工作流程如下:

  1. 创建作用域。创建作用域的线程是其所有者

  2. 在作用域内分叉并发子任务

  3. 作用域内的任何分叉或作用域的所有者 都可以调用作用域的 shutdown() 方法来请求取消所有剩余的子任务。

  4. 作用域的所有者将作用域(即其所有分叉)作为一个单元加入。所有者可以调用作用域的 join() 方法,该方法会阻塞,直到所有分叉都已完成(无论成功与否)或通过 shutdown() 被取消。或者,所有者可以调用作用域的 joinUntil(java.time.Instant) 方法,该方法接受一个截止时间。

  5. 加入后,处理分叉中的任何错误并处理其结果

  6. 关闭作用域,这通常通过 try-with-resources 隐式完成。这会关闭作用域并等待任何滞后的分叉完成。

如果所有者是现有作用域(即作为某个作用域的分叉创建的)的成员,则该作用域成为新作用域的父作用域。因此,任务形成一棵树,作用域作为中间节点,线程作为叶子节点。

每个分叉都在其新创建的线程中运行,该线程默认为虚拟线程。分叉的线程由作用域拥有,作用域又由其创建线程拥有,从而形成了一个层次结构。任何分叉都可以创建自己的嵌套 StructuredTaskScope 来分叉自己的子任务,从而扩展这个层次结构。这个层次结构反映在代码的块结构中,它限制了分叉的生命周期:一旦作用域被关闭,所有分叉的线程都将终止,块退出时不会留下任何线程。

作用域中的任何分叉、嵌套作用域中的任何分叉以及作用域的所有者都可以随时调用作用域的 shutdown() 方法来表示任务已完成——即使其他分叉仍在运行。shutdown() 方法会 中断 作用域中仍处于活动状态的所有分叉的线程。因此,所有分叉都应该以响应中断的方式编写。实际上,shutdown() 是顺序代码中 break 语句的并发对应物。

join() 方法返回时,所有的 fork(分支任务)都已经完成(无论成功或失败)或被取消。它们的结果或异常可以通过它们对应的 future 对象的 resultNow()exceptionNow() 方法获取,而无需任何额外的阻塞操作。(如果这些方法在 future 完成之前被调用,将抛出 IllegalStateException。)

在作用域(scope)内调用 join()joinUntil() 方法是强制性的。如果作用域的代码块在 join 操作之前退出,则作用域将等待所有 fork 终止,然后抛出一个异常。

作用域的所有线程可能在 join 操作之前或过程中被中断。例如,这可能是由于封闭作用域的一个 fork 被关闭导致的。如果发生这种情况,join()joinUntil(Instant) 将抛出异常,因为继续执行没有意义。try-with-resources 语句将关闭作用域,这将取消所有 fork 并等待它们终止。这实际上是将任务的取消自动传播到其子任务。如果 joinUntil(Instant) 方法的截止时间在 fork 终止或调用 shutdown() 之前到期,则它将抛出异常,并且 try-with-resources 语句将再次关闭作用域。

StructuredTaskScope 的结构化使用在运行时得到强制执行。例如,尝试从不在作用域树层次结构(即所有者、fork 以及嵌套作用域中的 fork)中的线程调用 fork(Callable) 将失败并抛出异常。在 try-with-resources 块外部使用作用域而不调用 close() 方法返回,或不保持 close() 调用的正确嵌套,可能会导致作用域的方法抛出 StructureViolationException

StructuredTaskScope 对并发操作施加了结构和顺序。因此,它不实现 ExecutorServiceExecutor 接口,因为这些接口的实例通常以非结构化的方式使用(见 下文)。但是,很容易将使用 ExecutorService 但受益于结构的代码迁移到使用 StructuredTaskScope

StructuredTaskScope 位于 孵化器模块 中,默认情况下被排除

上面的示例使用了 StructuredTaskScope API,因此要在 JDK XX 上运行它们,您必须添加 jdk.incubator.concurrent 模块,并启用预览功能以启用虚拟线程:

  • 使用 javac --release XX --enable-preview --add-modules jdk.incubator.concurrent Main.java 编译程序,并使用 java --enable-preview --add-modules jdk.incubator.concurrent Main 运行它;或者,

  • 使用 源代码启动器 时,使用 java --source XX --enable-preview --add-modules jdk.incubator.concurrent Main.java 运行程序;或者,

  • 使用 jshell 时,使用 jshell --enable-preview --add-modules jdk.incubator.concurrent 启动它。

关闭策略

在处理并发子任务时,通常会使用 短路模式 来避免不必要的工作。例如,如果其中一个子任务失败,则取消所有子任务(即 全部调用)是有意义的;或者,如果其中一个子任务成功,则取消其他子任务(即 任一调用)。StructuredTaskScope 的两个子类 ShutdownOnFailureShutdownOnSuccess 分别支持在第一个 fork 失败或成功时关闭作用域的策略,并提供了处理异常和成功结果的方法。

以下是一个带有失败时关闭策略的 StructuredTaskScope(也在上面的 handle() 示例中使用),它并发运行一系列任务,如果其中任何一个任务失败,则整个操作失败:

java
<T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        List<Future<T>> futures = tasks.stream().map(scope::fork).toList();
        scope.join(); // 等待所有任务完成
        scope.throwIfFailed(e -> e); // 如果有任何分支失败,则原样抛出异常
        // 此时,所有任务都已成功完成,因此组合它们的结果
        return futures.stream().map(Future::resultNow).toList();
    }
}

以下是一个带有成功时关闭策略的 StructuredTaskScope,它返回第一个成功子任务的结果:

java
<T> T race(List<Callable<T>> tasks, Instant deadline) throws ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
        for (var task : tasks) {
            scope.fork(task); // 分叉执行每个任务
        }
        scope.joinUntil(deadline); // 等待直到截止时间或所有任务完成
        return scope.result(); // 如果没有任何分支成功完成,则抛出异常
    }
}

一旦有一个分支成功,这个作用域就会自动关闭,取消其余活跃的分支。如果所有分支都失败或达到给定的截止时间,则任务失败。这种模式在例如服务器应用程序中非常有用,这些应用程序需要从一组冗余服务中的任何一个获取结果。

虽然这两种关闭策略是开箱即用的,但开发人员可以通过扩展 StructuredTaskScope 并重写 handleComplete(Future) 方法来创建自定义策略,以抽象其他模式。

扇入场景

上面的示例主要关注 扇出 场景,这些场景管理多个并发的出站 I/O 操作。StructuredTaskScope扇入 场景中也非常有用,这些场景管理多个并发的入站 I/O 操作。在这种场景中,我们通常会根据传入的请求创建未知数量的分支。以下是一个在 StructuredTaskScope 内分叉子任务以处理传入连接的服务器示例:

java
void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
    try (var scope = new StructuredTaskScope<Void>()) {
        try {
            while (true) {
                var socket = serverSocket.accept(); // 接受连接
                scope.fork(() -> handle(socket)); // 分叉处理连接
            }
        } finally {
            // 如果发生错误或我们被中断,则停止接受连接
            scope.shutdown(); // 关闭所有活跃的连接
            scope.join(); // 等待所有子任务完成
        }
    }
}

由于所有处理连接的子任务都是在作用域内创建的,因此线程转储会将它们显示为作用域所有者的子线程。

观测性

我们扩展了由 JEP 425 添加的新 JSON 线程转储格式,以展示 StructuredTaskScope 将线程组织成层次结构的方式:

shell
$ jcmd <pid> Thread.dump_to_file -format=json <file>

每个作用域的 JSON 对象包含在该作用域中分叉的线程数组及其堆栈跟踪。一个作用域的所有者线程通常会阻塞在 join 方法中等待子任务完成;线程转储使得通过显示由结构化并发强加的树形层次结构来查看子任务线程正在做什么变得容易。作用域的 JSON 对象还有一个对其父级的引用,因此可以从转储中重构程序的结构。

com.sun.management.HotSpotDiagnosticsMXBean API 也可以用来生成这样的线程转储,无论是直接使用还是通过平台的 MBeanServer 和本地或远程 JMX 工具间接使用。

替代方案

  • 什么都不做。让开发者继续使用现有的低级别的 java.util.concurrent API,并继续仔细考虑并发代码中出现的所有异常情况和生命周期协调问题。

  • 增强 ExecutorService 接口。我们原型实现了一个该接口的版本,该版本总是强制执行结构化并限制哪些线程可以提交任务。然而,我们发现这样做存在问题,因为在 JDK 和生态系统中对 ExecutorService(及其父接口 Executor)的大多数使用都不是结构化的。将同一个 API 用于一个更为受限的概念必定会引起混淆。例如,将一个结构化的 ExecutorService 实例传递给现有接受这种类型的函数,在大多数情况下几乎肯定会抛出异常。

依赖项