JDK25的结构性并发API介绍
146
类别: 
开发交流

具体为什么引入结构性并发,使用它有什么好处,请见JDK25的JEP505,地址是: https://openjdk.org/jeps/505

结构性并发

API介绍

一. StructuredTaskScope
1. StructuredTaskScope构建

以下两个问题,在>=JDK25的API里面要注意,<=JDK24的时候没有遇到

  1. 结构体作用域执行完join()之后后续就无法再次进行fork()了,会抛出异常。
  2. 作用域创建在某个线程,那么fork()调用的时候也必须在那个线程上,要不然就会抛出异常 throw new WrongThreadException("Current thread not owner")。

JDK25构造StructuredTaskScope实例的API

2. StructuredTaskScope实例方法
//创建有返回值的任务
<U extends T> Subtask<U> fork(Callable<? extends U> task);

//创建无返回值的任务
<U extends T> Subtask<U> fork(Runnable task);

//根据Joiner的效果等待任务执行
R join() throws InterruptedException;
二. Configuration

用来配置作用域

  1. 配置作用域的名称
  2. 配置这个作用域的任务执行的超时时间
  3. 配置这个作用域的线程工厂(虚拟线程,平台线程)
//线程工厂
Configuration withThreadFactory(ThreadFactory threadFactory);

//作用域名称
Configuration withName(String name);

//超时时间
Configuration withTimeout(Duration timeout);
三. Subtask
  1. state()方法,这个方法用来获取任务的状态,任务状态分为成功状态(任务被执行了),失败状态(任务抛出了异常),不可用状态(可能因为Joiner的效果导致的,详细可见Joiner)
  2. exception()方法,获取到你的任务执行过程当中抛出的异常
  3. get()方法,获取任务的结果,如果任务抛出了异常或者因为Joiner的效果可能拿不到结果还可能抛出异常。
@PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
    sealed interface Subtask<T> extends Supplier<T> permits StructuredTaskScopeImpl.SubtaskImpl {
        /**
         * Represents the state of a subtask.
         * @see Subtask#state()
         * @since 21
         */
        @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
        enum State {
            /**
             * The subtask result or exception is not available. This state indicates that
             * the subtask was forked but has not completed, it completed after the scope
             * was cancelled, or it was forked after the scoped was cancelled (in which
             * case a thread was not created to execute the subtask).
             */
            UNAVAILABLE,
            /**
             * The subtask completed successfully. The {@link Subtask#get() Subtask.get()}
             * method can be used to get the result. This is a terminal state.
             */
            SUCCESS,
            /**
             * The subtask failed with an exception. The {@link Subtask#exception()
             * Subtask.exception()} method can be used to get the exception. This is a
             * terminal state.
             */
            FAILED,
        }

        /**
         * {@return the subtask state}
         */
        State state();

        /**
         * Returns the result of this subtask if it completed successfully. If the subtask
         * was forked with {@link #fork(Callable) fork(Callable)} then the result from the
         * {@link Callable#call() call} method is returned. If the subtask was forked with
         * {@link #fork(Runnable) fork(Runnable)} then {@code null} is returned.
         *
         * <p> Code executing in the scope owner thread can use this method to get the
         * result of a successful subtask only after it has {@linkplain #join() joined}.
         *
         * <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
         * onComplete} method should test that the {@linkplain #state() subtask state} is
         * {@link State#SUCCESS SUCCESS} before using this method to get the result.
         *
         * @return the possibly-null result
         * @throws IllegalStateException if the subtask has not completed, did not complete
         * successfully, or the current thread is the scope owner invoking this
         * method before {@linkplain #join() joining}
         * @see State#SUCCESS
         */
        T get();

        /**
         * {@return the exception or error thrown by this subtask if it failed}
         * If the subtask was forked with {@link #fork(Callable) fork(Callable)} then the
         * exception or error thrown by the {@link Callable#call() call} method is returned.
         * If the subtask was forked with {@link #fork(Runnable) fork(Runnable)} then the
         * exception or error thrown by the {@link Runnable#run() run} method is returned.
         *
         * <p> Code executing in the scope owner thread can use this method to get the
         * exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
         *
         * <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
         * onComplete} method should test that the {@linkplain #state() subtask state} is
         * {@link State#FAILED FAILED} before using this method to get the exception.
         *
         * @throws IllegalStateException if the subtask has not completed, completed with
         * a result, or the current thread is the scope owner invoking this method
         * before {@linkplain #join() joining}
         * @see State#FAILED
         */
        Throwable exception();
    }
四. Joiner
1. Joiner.anySuccessfulResultOrThrow()
  1. 在添加的任务当中,其中有一个任务成功的话,就认为这个作用域就成功了,可通过join()返回值来获取成功的结果,如果所有的任务全部发生了异常的话,就会抛出异常
  2. 这些任务的返回值是一样的
void main() {
 Callable<List<String>> fetchUserName = () -> {
      Thread.sleep(1000);
      return List.of("1");
    };

    Callable<List<Integer>> fetchAge = () -> {
      Thread.sleep(1000);
      return List.of(18);
    };

    Callable<Object> fetchError = () -> {
      Thread.sleep(200);
      throw new RuntimeException("fetch 预期异常");
    };

    try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
      //失败的任务的等待时间相对于获取其他任务较短
      scope.fork(fetchUserName);
      scope.fork(fetchUserName);
      scope.fork(() -> {
        throw new RuntimeException("任务失败");
      });
      try {
        List<String> usernameList = (List<String>) scope.join();
        IO.println(usernameList); // [1]
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (FailedException e) { //如果所有的任务都失败的话,这里就会抛出异常
        IO.println(e.getCause().getMessage()); //无打印
      }
    }
}


2. Joiner.awaitAllSuccessfulOrThrow() | Joiner.allSuccessfulOrThrow()

相同点: 两者的下过都是等待所有任务成功,如果有一个任务失败在执行join()的时候就会抛出FailedException异常,这个异常包裹了任务执行失败的异常,后续还未执行的任务会进行取消(可通过SubTask来获取状态和异常信息)
不同点:

  1. 就在于join()的返回值上,Joiner.awaitAllSuccessfulOrThrow()的join返回值是void或者说是null,Joiner.allSuccessfulOrThrow()的join返回值是Stream,就是fork的任务集合
  2. 用于场景不同 Joiner.awaitAllSuccessfulOrThrow()用于任务结果不是同一种返回值,通过fork()得到的Subtask拿到结果,
    Joiner.allSuccessfulOrThrow()用于任务结果是同一种返回值,join()返回流任务,结果直接可以转换成集合或者其他类型。

Joiner.awaitAllSuccessfulOrThrow()

void main() {
    Callable<List<String>> fetchUserName = () -> {
      Thread.sleep(1000);
      return List.of("1");
    };

    Callable<List<Integer>> fetchAge = () -> {
      Thread.sleep(1000);
      return List.of(18);
    };

    Callable<Object> fetchError = () -> {
      Thread.sleep(200);
      throw new RuntimeException("fetch 预期异常");
    };

 try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow())) {
      Subtask<List<Integer>> ageSupplier = scope.fork(fetchAge);
      Subtask<List<String>> usernameSupplier = scope.fork(fetchUserName);
      Subtask<Object> errorSupplier = scope.fork(fetchError);
      try {
        scope.join();
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (FailedException e) {
        //获取任务错误的异常,是否打印出 "fetch 预期异常"
        IO.println(e.getCause().getMessage()); //fetch 预期异常
      }
      // 调用 get()拿去结果,可能会有两种情况: (但是如果出现join出现FailedException异常的话,建议后续就不要get(),因为不可预测(如果异常任务先触发的话,后续的任务就会取消,就无法get(),但是可以通过 exception(),state() 来获取到任务的状态或者任务抛出的异常))
      // 1. 可能抛出 java.lang.IllegalStateException: Result is unavailable or subtask did not complete successfully
      // 2. 拿到正确结果
      IO.println(usernameSupplier.get());
      IO.println(ageSupplier.get());
    }
}

Joiner.allSuccessfulOrThrow()

void main() {
    //结构性并发
    Callable<List<String>> fetchUserName = () -> {
      Thread.sleep(1000);
      return List.of("1");
    };

    Callable<List<Integer>> fetchAge = () -> {
      Thread.sleep(1000);
      return List.of(18);
    };

    Callable<Object> fetchError = () -> {
      Thread.sleep(200);
      throw new RuntimeException("fetch 预期异常");
    };

    try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
      //失败的任务的等待时间相对于获取其他任务较短
      scope.fork(fetchUserName);
      scope.fork(fetchAge);
      scope.fork(fetchError);
      try {
        Stream<Subtask<Object>> tasks = scope.join();
        System.out.println(tasks.toList().size());
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (FailedException e) { //有一个任务失败,抛出的异常这里就会抛出
        IO.println(e.getCause().getMessage()); //fetch 预期异常
      }
    }
  }
3. Joiner.awaitAll()
  1. 等待所有的任务执行,在join()的时候不在乎其中有些任务是否失败(是否有异常),join()返回是null
  2. 如果真的要获取到任务的状态(失败,成功)或者结果,请通过fork()返回的SubTask记录起来,通过提供的api(state(),get(),exception())来获取
  3. 用于不关心任务状态的使用场景,但是也可以通过SubTask来获取每个任务的情况
  4. 注意的点: Subtask的get()方法尽量判断状态或者try异常来处理
void main() {
    //结构性并发
    Callable<List<String>> fetchUserName = () -> {
      Thread.sleep(1000);
      return List.of("1");
    };

    Callable<List<Integer>> fetchAge = () -> {
      Thread.sleep(1000);
      return List.of(18);
    };

    Callable<Object> fetchError = () -> {
      Thread.sleep(200);
      throw new RuntimeException("fetch 预期异常");
    };

    try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
      //失败的任务的等待时间相对于获取其他任务较短
      Subtask<Object> errorSupplier = scope.fork(fetchError);
      Subtask<List<Integer>> ageSupplier = scope.fork(fetchAge);
      Subtask<List<String>> usernameSupplier = scope.fork(fetchUserName);
      try {
        scope.join();
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (FailedException e) {
        IO.println(e.getCause().getMessage()); //无打印
      }
      //可以正确的拿去没有失败的异常
      IO.println(usernameSupplier.get()); // [1]
      IO.println(ageSupplier.get()); // [18]
      try {
        IO.println(
            errorSupplier.get()); //抛出异常(但是拿不到任务实际抛出的异常) java.lang.IllegalStateException: Result is unavailable or subtask did not complete successfully
      } catch (IllegalStateException illegalStateException) {
        //拿到任务的异常
        IO.println(errorSupplier.exception()); //fetch 预期异常
      }
    }

4. Joiner.allUntil(Predicate<Subtask<? extends T>> isDone)
  1. 用于扩展,用来表示这个scope的任务是否执行完毕(如果直接返回false的话,代表scope还可以继续执行fork)

1,返回false的话,scope还可以继续执行fork()
2,返回true的话,scope会cancel(),后续fork()任务无法执行,之前的任务获取get()因为可能会抛出java.lang.IllegalStateException: Result is unavailable or subtask did not complete successfully

cancel()的逻辑,会导致之前的任务会进行中断

 private void cancel() {
        if (!cancelled && CANCELLED.compareAndSet(this, false, true)) {
            // prevent new threads from starting
            flock.shutdown();

            // interrupt all unfinished threads
            interruptAll();

            // wakeup join
            flock.wakeup();
        }
    }
void main() {
    //结构性并发
    Callable<List<String>> fetchUserName = () -> {
      Thread.sleep(1000);
      return List.of("1");
    };

    Callable<List<Integer>> fetchAge = () -> {
      Thread.sleep(1000);
      return List.of(18);
    };

    Callable<Object> fetchError = () -> {
      Thread.sleep(200);
      throw new RuntimeException("fetch 预期异常");
    };

  try (var scope = StructuredTaskScope.open(Joiner.allUntil((_) -> false))) {
      //失败的任务的等待时间相对于获取其他任务较短
      scope.fork(fetchError);
      scope.fork(fetchUserName);
      scope.fork(fetchAge);
      try {
        Stream<Subtask<Object>> tasks = scope.join();
        System.out.println(tasks.toList().size());
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (FailedException e) { //有一个任务失败,抛出的异常这里就会抛出
        IO.println(e.getCause().getMessage());
      }
    }
}
标签:
评论 0
/ 1000
0
0
收藏