JDK25的结构性并发API介绍
具体为什么引入结构性并发,使用它有什么好处,请见JDK25的JEP505,地址是: https://openjdk.org/jeps/505
结构性并发
API介绍
一. StructuredTaskScope
1. StructuredTaskScope构建
以下两个问题,在>=JDK25的API里面要注意,<=JDK24的时候没有遇到
- 结构体作用域执行完join()之后后续就无法再次进行fork()了,会抛出异常。
- 作用域创建在某个线程,那么fork()调用的时候也必须在那个线程上,要不然就会抛出异常 throw new WrongThreadException("Current thread not owner")。

2. StructuredTaskScope实例方法
//创建有返回值的任务
<U extends T> Subtask<U> fork(Callable<? extends U> task);
//创建无返回值的任务
<U extends T> Subtask<U> fork(Runnable task);
//根据Joiner的效果等待任务执行
R join() throws InterruptedException;
二. Configuration
用来配置作用域
- 配置作用域的名称
- 配置这个作用域的任务执行的超时时间
- 配置这个作用域的线程工厂(虚拟线程,平台线程)
//线程工厂
Configuration withThreadFactory(ThreadFactory threadFactory);
//作用域名称
Configuration withName(String name);
//超时时间
Configuration withTimeout(Duration timeout);
三. Subtask
- state()方法,这个方法用来获取任务的状态,任务状态分为成功状态(任务被执行了),失败状态(任务抛出了异常),不可用状态(可能因为Joiner的效果导致的,详细可见Joiner)
- exception()方法,获取到你的任务执行过程当中抛出的异常
- get()方法,获取任务的结果,如果任务抛出了异常或者因为Joiner的效果可能拿不到结果还可能抛出异常。
@PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
sealed interface Subtask<T> extends Supplier<T> permits StructuredTaskScopeImpl.SubtaskImpl {
/**
* Represents the state of a subtask.
* @see Subtask#state()
* @since 21
*/
@PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
enum State {
/**
* The subtask result or exception is not available. This state indicates that
* the subtask was forked but has not completed, it completed after the scope
* was cancelled, or it was forked after the scoped was cancelled (in which
* case a thread was not created to execute the subtask).
*/
UNAVAILABLE,
/**
* The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
* method can be used to get the result. This is a terminal state.
*/
SUCCESS,
/**
* The subtask failed with an exception. The {@link Subtask#exception()
* Subtask.exception()} method can be used to get the exception. This is a
* terminal state.
*/
FAILED,
}
/**
* {@return the subtask state}
*/
State state();
/**
* Returns the result of this subtask if it completed successfully. If the subtask
* was forked with {@link #fork(Callable) fork(Callable)} then the result from the
* {@link Callable#call() call} method is returned. If the subtask was forked with
* {@link #fork(Runnable) fork(Runnable)} then {@code null} is returned.
*
* <p> Code executing in the scope owner thread can use this method to get the
* result of a successful subtask only after it has {@linkplain #join() joined}.
*
* <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
* onComplete} method should test that the {@linkplain #state() subtask state} is
* {@link State#SUCCESS SUCCESS} before using this method to get the result.
*
* @return the possibly-null result
* @throws IllegalStateException if the subtask has not completed, did not complete
* successfully, or the current thread is the scope owner invoking this
* method before {@linkplain #join() joining}
* @see State#SUCCESS
*/
T get();
/**
* {@return the exception or error thrown by this subtask if it failed}
* If the subtask was forked with {@link #fork(Callable) fork(Callable)} then the
* exception or error thrown by the {@link Callable#call() call} method is returned.
* If the subtask was forked with {@link #fork(Runnable) fork(Runnable)} then the
* exception or error thrown by the {@link Runnable#run() run} method is returned.
*
* <p> Code executing in the scope owner thread can use this method to get the
* exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
*
* <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
* onComplete} method should test that the {@linkplain #state() subtask state} is
* {@link State#FAILED FAILED} before using this method to get the exception.
*
* @throws IllegalStateException if the subtask has not completed, completed with
* a result, or the current thread is the scope owner invoking this method
* before {@linkplain #join() joining}
* @see State#FAILED
*/
Throwable exception();
}
四. Joiner
1. Joiner.anySuccessfulResultOrThrow()
- 在添加的任务当中,其中有一个任务成功的话,就认为这个作用域就成功了,可通过join()返回值来获取成功的结果,如果所有的任务全部发生了异常的话,就会抛出异常
- 这些任务的返回值是一样的
void main() {
Callable<List<String>> fetchUserName = () -> {
Thread.sleep(1000);
return List.of("1");
};
Callable<List<Integer>> fetchAge = () -> {
Thread.sleep(1000);
return List.of(18);
};
Callable<Object> fetchError = () -> {
Thread.sleep(200);
throw new RuntimeException("fetch 预期异常");
};
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
//失败的任务的等待时间相对于获取其他任务较短
scope.fork(fetchUserName);
scope.fork(fetchUserName);
scope.fork(() -> {
throw new RuntimeException("任务失败");
});
try {
List<String> usernameList = (List<String>) scope.join();
IO.println(usernameList); // [1]
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (FailedException e) { //如果所有的任务都失败的话,这里就会抛出异常
IO.println(e.getCause().getMessage()); //无打印
}
}
}
2. Joiner.awaitAllSuccessfulOrThrow() | Joiner.allSuccessfulOrThrow()
相同点: 两者的下过都是等待所有任务成功,如果有一个任务失败在执行join()的时候就会抛出FailedException异常,这个异常包裹了任务执行失败的异常,后续还未执行的任务会进行取消(可通过SubTask来获取状态和异常信息)
不同点:
- 就在于join()的返回值上,Joiner.awaitAllSuccessfulOrThrow()的join返回值是void或者说是null,Joiner.allSuccessfulOrThrow()的join返回值是Stream,就是fork的任务集合
- 用于场景不同 Joiner.awaitAllSuccessfulOrThrow()用于任务结果不是同一种返回值,通过fork()得到的Subtask拿到结果,
Joiner.allSuccessfulOrThrow()用于任务结果是同一种返回值,join()返回流任务,结果直接可以转换成集合或者其他类型。
Joiner.awaitAllSuccessfulOrThrow()
void main() {
Callable<List<String>> fetchUserName = () -> {
Thread.sleep(1000);
return List.of("1");
};
Callable<List<Integer>> fetchAge = () -> {
Thread.sleep(1000);
return List.of(18);
};
Callable<Object> fetchError = () -> {
Thread.sleep(200);
throw new RuntimeException("fetch 预期异常");
};
try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
Subtask<List<Integer>> ageSupplier = scope.fork(fetchAge);
Subtask<List<String>> usernameSupplier = scope.fork(fetchUserName);
Subtask<Object> errorSupplier = scope.fork(fetchError);
try {
scope.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (FailedException e) {
//获取任务错误的异常,是否打印出 "fetch 预期异常"
IO.println(e.getCause().getMessage()); //fetch 预期异常
}
// 调用 get()拿去结果,可能会有两种情况: (但是如果出现join出现FailedException异常的话,建议后续就不要get(),因为不可预测(如果异常任务先触发的话,后续的任务就会取消,就无法get(),但是可以通过 exception(),state() 来获取到任务的状态或者任务抛出的异常))
// 1. 可能抛出 java.lang.IllegalStateException: Result is unavailable or subtask did not complete successfully
// 2. 拿到正确结果
IO.println(usernameSupplier.get());
IO.println(ageSupplier.get());
}
}
Joiner.allSuccessfulOrThrow()
void main() {
//结构性并发
Callable<List<String>> fetchUserName = () -> {
Thread.sleep(1000);
return List.of("1");
};
Callable<List<Integer>> fetchAge = () -> {
Thread.sleep(1000);
return List.of(18);
};
Callable<Object> fetchError = () -> {
Thread.sleep(200);
throw new RuntimeException("fetch 预期异常");
};
try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
//失败的任务的等待时间相对于获取其他任务较短
scope.fork(fetchUserName);
scope.fork(fetchAge);
scope.fork(fetchError);
try {
Stream<Subtask<Object>> tasks = scope.join();
System.out.println(tasks.toList().size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (FailedException e) { //有一个任务失败,抛出的异常这里就会抛出
IO.println(e.getCause().getMessage()); //fetch 预期异常
}
}
}
3. Joiner.awaitAll()
- 等待所有的任务执行,在join()的时候不在乎其中有些任务是否失败(是否有异常),join()返回是null
- 如果真的要获取到任务的状态(失败,成功)或者结果,请通过fork()返回的SubTask记录起来,通过提供的api(state(),get(),exception())来获取
- 用于不关心任务状态的使用场景,但是也可以通过SubTask来获取每个任务的情况
- 注意的点: Subtask的get()方法尽量判断状态或者try异常来处理
void main() {
//结构性并发
Callable<List<String>> fetchUserName = () -> {
Thread.sleep(1000);
return List.of("1");
};
Callable<List<Integer>> fetchAge = () -> {
Thread.sleep(1000);
return List.of(18);
};
Callable<Object> fetchError = () -> {
Thread.sleep(200);
throw new RuntimeException("fetch 预期异常");
};
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
//失败的任务的等待时间相对于获取其他任务较短
Subtask<Object> errorSupplier = scope.fork(fetchError);
Subtask<List<Integer>> ageSupplier = scope.fork(fetchAge);
Subtask<List<String>> usernameSupplier = scope.fork(fetchUserName);
try {
scope.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (FailedException e) {
IO.println(e.getCause().getMessage()); //无打印
}
//可以正确的拿去没有失败的异常
IO.println(usernameSupplier.get()); // [1]
IO.println(ageSupplier.get()); // [18]
try {
IO.println(
errorSupplier.get()); //抛出异常(但是拿不到任务实际抛出的异常) java.lang.IllegalStateException: Result is unavailable or subtask did not complete successfully
} catch (IllegalStateException illegalStateException) {
//拿到任务的异常
IO.println(errorSupplier.exception()); //fetch 预期异常
}
}
4. Joiner.allUntil(Predicate<Subtask<? extends T>> isDone)
- 用于扩展,用来表示这个scope的任务是否执行完毕(如果直接返回false的话,代表scope还可以继续执行fork)
1,返回false的话,scope还可以继续执行fork()
2,返回true的话,scope会cancel(),后续fork()任务无法执行,之前的任务获取get()因为可能会抛出java.lang.IllegalStateException: Result is unavailable or subtask did not complete successfully
cancel()的逻辑,会导致之前的任务会进行中断
private void cancel() {
if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
// prevent new threads from starting
flock.shutdown();
// interrupt all unfinished threads
interruptAll();
// wakeup join
flock.wakeup();
}
}
void main() {
//结构性并发
Callable<List<String>> fetchUserName = () -> {
Thread.sleep(1000);
return List.of("1");
};
Callable<List<Integer>> fetchAge = () -> {
Thread.sleep(1000);
return List.of(18);
};
Callable<Object> fetchError = () -> {
Thread.sleep(200);
throw new RuntimeException("fetch 预期异常");
};
try (var scope = StructuredTaskScope.open(Joiner.allUntil((_) -> false))) {
//失败的任务的等待时间相对于获取其他任务较短
scope.fork(fetchError);
scope.fork(fetchUserName);
scope.fork(fetchAge);
try {
Stream<Subtask<Object>> tasks = scope.join();
System.out.println(tasks.toList().size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (FailedException e) { //有一个任务失败,抛出的异常这里就会抛出
IO.println(e.getCause().getMessage());
}
}
}




