结构化并发在 Java 19 中是孵化功能,比预览功能的成熟度更低。想要尝鲜的朋友可以试试。想要在 LTS 版本中使用该功能,估计得等到 java 25了。
结构化并发并不是一个新鲜的名词,这个概念已经出现很久了。Kotlin 的协程(Coroutines)就已经实现了结构化并发。Java 19 中引入了结构化并发,可以作为一个新的多线程编程模式。结构化并发和虚拟线程(说一说 Java 19 中的虚拟线程)都来自 OpenJDK 的 Loom 项目。
多线程编程的难点
多线程编程一直以来都是 Java 开发中比较复杂难懂的部分。这一方面是多线程编程自身的复杂性,另外一方面则是由于语言和标准库的限制。如果给你一个开发任务,让你在单线程中完成,那肯定会简单很多。比如下面的一段代码,在 takeAction 方法里面,分别调用 callOp1 和 callOp2 方法来得到两个值,最后再把用得到的结果值来调用 callOp3。
int takeAction(String input) {
int result1 = callOp1(input);
int result2 = callOp2(input);
return callOp3(result1, result2);
}
如果 takeAction 方法在同一个线程中执行,那么整个方法内部的调用流程是很清晰的。当 takeAction 方法返回时,callOp1、callOp2 和 callOp3 方法必定已经完成,或者由于之前的错误还未被调用。也就是说,这些方法的状态是确定的,只能处于执行成功、执行失败和未执行这三种状态之一。
如果 callOp1、callOp2 和 callOp3 这3个方法都是在各自的线程中运行的,那么很多在单线程情况下理所应当的结论,就不会成立了。这些方法的状态会变得更复杂:
- 由于执行方法的线程池的负载过大,方法处于等待执行的状态。
- 执行方法的线程可能被中断,导致方法的执行被取消。
- callOp1 和 callOp2 是并行执行的,如果 callOp1 在执行中产生了错误,而此时 callOp2 如果还在执行中,应该取消 callOp2 的执行,因为 takeAction 会以错误的结果返回,callOp2 的结果不再需要了。
- 当 takeAction 方法以错误的结果返回时,callOp2 可能仍然还在执行中,直到得到最后的结果,但是这个结果会被丢弃。这是因为对 callOp2 的取消请求可能未得到及时的响应。
- 任何一个任务的执行时间可能都过长,比如它可能需要等待别的任务完成。这就要求妥善处理超时的问题。
这是多线程开发难以掌握的原因之一,因为它要求你考虑各种复杂的情况,拉高了开发的门槛。在实际的开发中,我们会使用 Java 标准库和第三方库提供的 API 来进行多线程开发。这可以在一定程度上简化开发。
任务执行
在多线程开发中,绝大部分的工作都可以抽象成任务执行。这些任务的执行有下面这些特点:
- 执行流程从主任务开始。主任务会被划分成多个子任务。子任务可以被进一步划分。全部的任务会组成一个树形结构。
- 每个任务在独立的线程中执行。任务的执行可能成功或失败,也可能被取消。
- 父任务负责管理子任务。根据子任务的执行状态,已有的正在运行的子任务可能被取消,新的子任务可能被创建。
- 任务之间通过线程安全的数据结构来共享数据,或者使用消息传递机制来进行通信。
在目前的 Java 开发中,对于这样多线程任务的执行,一般使用的是 CompletableFuture 或 ListenableFuture 的级联方式。每个任务返回一个 CompletableFuture 对象来表示执行的结果。如果执行成功,CompletableFuture 表示得到的结果值;如果执行失败,CompletableFuture 表示产生的异常。使用 CompletableFuture 时并不是直接获取结果,而是添加一个回调方法,表示当前任务执行之后的下一步操作。
同样的 takeAction 方法,如果用 CompletableFuture 来实现,会是下面这样的代码。与单线程的代码相比,CompletableFuture 的代码并不直观。
CompletableFuture<Integer> takeAction(String input) {
var result1 = callOp1(input);
var result2 = callOp2(input);
return result1
.thenCombine(result2, (v1, v2) -> new int[] {v1, v2})
.thenCompose(values -> callOp3(values[0], values[1]));
}
结构化并发
对于结构化并发(Structured Concurrency),并没有一个清晰的解释。结构化并发要解决的问题是,如何让开发人员更加容易地编写并发代码,尽可能地对开发人员屏蔽多线程相关的细节。具体来说,结构化并发让你以类似单线程的方式来编写多线程代码。
由于结构化并发在 Java 19 中是孵化功能,相关的模块需要被显式地启用。这是通过添加 javac 和 java 的参数 “–add-modules jdk.incubator.concurrent” 来实现的。
结构化并发使用了前面提到的任务执行的概念。在使用结构化并发之前,首先需要创建一个结构化任务作用域(scope),然后在这个作用域中创建并执行任务。这些任务的生命周期由作用域负责管理。 当作用域被关闭时,其中所包含的任务都会结束。这就意味着开发人员不用处理任务的失败、取消和超时等情况,完全由底层平台提供支持。
在一个作用域中,其中创建的任务中也可以创建自己的作用域,用来管理该任务的子任务。这就形成了一个任务组成的树形结构。
使用结构化并发的基础类是 jdk.incubator.concurrent.StructuredTaskScope。StructuredTaskScope 表示的是一个使用结构化并发的作用域。
下表列出了 StructuredTaskScope 中的方法。
方法 |
说明 |
fork(Callable<? extends U> task) |
启动一个线程来执行任务 |
join() |
等待所有任务执行完成,或当前作用域被关闭 |
joinUntil(Instant deadline) |
与 join() 相同,只不过设置了终止时间 |
Shutdown() |
结束任务作用域 |
close() |
关闭任务作用域 |
下面的代码给出了 StructuredTaskScope 的使用示例,用来实现 takeAction 方法。在 takeAction 方法中的作用域中创建了调用 callOp1 和 callOp2 的子任务。等这两个子任务完成之后,使用得到的结果调用 combine 方法。combine 方法中也创建了作用域,调用了 callOp3 一个子任务。
public class StructuredWay {
int takeAction(String input) throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.shutdownOnFailure()) {
Future<Integer> v1 = scope.fork(() -> callOp1(input));
Future<Integer> v2 = scope.fork(() -> callOp2(input));
scope.join();
scope.throwIfFailed();
return combine(v1.resultNow(), v2.resultNow());
}
}
int combine(int result1, int result2) throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<Integer> r = scope.fork(() -> callOp3(result1, result2));
scope.join();
scope.throwIfFailed();
return r.resultNow();
}
}
}
使用 StructuredTaskScope 的基本流程如下:
- 主任务创建一个 StructuredTaskScope 对象。
- 使用 fork 方法来创建子任务。
- 使用 join 或 joinUntil 方法来等待子任务完成或取消。
- 在 join 或 joinUntil 方法返回之后,处理子任务中可能出现的错误,并使用子任务产生的结果。
- 关闭作用域对象,一般使用 try-with-resources 可以自动进行关闭。
看到上面的代码,你的第一反应可能是这样的代码也很繁琐。不过这里的重点在于思维方式的变化。结构化并发的思维方式,更加类似传统的单线程应用,因此更容易理解。
在上述的代码中,用到的是作用域的实现 ShutdownOnFailure。与它作用类似的是 ShutdownOnSuccess。
- ShutdownOnFailure 适用的是作用域中的所有任务都必须成功的场景。只要有一个任务失败,该作用域的 shutdown 方法会被调用,其他未完成的任务线程也会被中断。
- ShutdownOnSuccess 适用的是作用域中的任意任务成功即可的场景。只要有一个任务成功,该作用域的 shutdown 方法会被调用,其他未完成的任务线程也会被中断。
对于作用域的任务,如果在执行中出现错误,可以调用 StructuredTaskScope 的 shutdown 方法来终止执行。调用 shutdown 方法会阻止新任务的执行,同时取消正在运行中的任务。
关于结构化并发的基本介绍就到这里。由于结构化并发的实现还处于非常早期的阶段,API 可能发生在后续版本中产生变化。大家并不需要关注过多的细节,但是有必要关注这种即将到来的新的多线程编程模式,应该会让以后的多线程编程更简单。

如若转载,请注明出处:https://www.yiheng8.com/176605.html