虚拟线程的底层原理,续体,jdk.internal.vm.Continuation,并且打造自己的虚拟线程
275
类别: 
开发交流

我们将介绍虚拟线程的底层原理,续体的代码介绍,同时也会演示他的执行流程,同时通过续体打造一个自己的轻量级虚拟线程,包含调度器和虚拟线程类

续体的使用

1. 续体在jdk里面是不提供直接调用的,需要添加jvm参数

--enable-preview
--add-exports java.base/jdk.internal.vm=ALL-UNNAMED

2. 续体的基本代码使用

import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;


public static void main(String[] args) {
    //续体 (虚拟线程的底层实现原理)
    Continuation continuation = getContinuation();
    doSomething("main执行1", continuation);
    doSomething("main执行2", continuation);
    doSomething("main执行3", continuation);
  }


  private static void doSomething(String opt, Continuation continuation) {
    continuation.run();
    System.out.println(opt + "," + Thread.currentThread().getName());
  }

  public static Continuation getContinuation() {
    return new Continuation(scope, ThreadConfig::runTask);
  }

  public static void runTask() {
    a();
    b();
    c();
  }

  public static void a() {
    System.out.println("a," + Thread.currentThread().getName());
    Continuation.yield(scope);
  }

  public static void b() {
    System.out.println("b," + Thread.currentThread().getName());
    Continuation.yield(scope);
  }

  public static void c() {
    System.out.println("c," + Thread.currentThread().getName());
    Continuation.yield(scope);
  }

执行效果如下:

a,main
main执行1,main
b,main
main执行2,main
c,main
main执行3,main

对于以上的执行的结果你是否有诧异,感觉不可思议,按理来说应该是

a,main
b,main
c,main
main执行1,main
main执行2,main
main执行3,main

3. 流程原理, 原理在于堆栈来回拷贝

虚拟线程底层续体的执行流程.png

实现自己的虚拟线程

1. 虚拟线程类 VirtualThread

package com.huyu.thread;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;

public class VirtualThread {

  private final static AtomicLong ID = new AtomicLong(1);

  public final static VirtualThreadSchedule DEFAULT_SCHEDULE = new VirtualThreadSchedule();

  private final ContinuationScope scope;

  private final Continuation continuation;

  private final Long id;

  private final String name;

  private final AtomicBoolean isRunning = new AtomicBoolean(false);

  private final VirtualThreadSchedule schedule;


  public VirtualThread(String startName, Runnable runnable) {
    this(startName, runnable, DEFAULT_SCHEDULE);
  }


  public VirtualThread(String startName, Runnable runnable, VirtualThreadSchedule schedule) {
    this.schedule = Objects.isNull(schedule) ? DEFAULT_SCHEDULE : schedule;
    this.id = ID.getAndIncrement();
    this.name = startName + id;
    this.scope = new ContinuationScope(name);
    this.continuation = new Continuation(scope, runnable);
  }

  public static VirtualThread currentThread() {
    return VirtualThreadSchedule.SCOPE_VALUE.get();
  }

  public String getName() {
    return name;
  }

  @Override
  public String toString() {
    return "VirThread{" + "id=" + id + ", name='" + name + '\'' + '}';
  }


  public void start() {
    if (isRunning.compareAndSet(false, true)) {
      DEFAULT_SCHEDULE.schedule(this);
      return;
    }
    throw new IllegalStateException("虚拟线程已经启动");
  }


  /**
   * 不允许用户手动调用
   */
  protected void yield() {
    Continuation.yield(scope);
  }


  /**
   * 不允许用户手动调用
   */
  protected void schedule() {
    schedule.schedule(this);
  }

  /**
   * 不允许用户手动调佣
   */
  protected void run() {
    continuation.run();
  }
}

2. 调度器(底层是一个普通的平台线程池)

package com.huyu.thread;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;

/**
 * 调度器
 */
public class VirtualThreadSchedule {

  public final static ScopedValue<VirtualThread> SCOPE_VALUE = ScopedValue.newInstance();

  private final Queue<VirtualThread> QUEUE = new LinkedBlockingQueue<>();

  private final ThreadPoolExecutor executor;

  private final AtomicInteger id = new AtomicInteger(1);

  public VirtualThreadSchedule() {
    this(Runtime.getRuntime().availableProcessors());
  }

  public VirtualThreadSchedule(int schedulerThreadCount) {
    executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(schedulerThreadCount,
        new ThreadFactory() {
          @Override
          public Thread newThread(@NotNull Runnable r) {
            Thread thread = new Thread(r, "#" + id.getAndIncrement() + ",虚拟线程调度器");
//            thread.setDaemon(true);
            return thread;
          }
        });

    Thread thread = new Thread(this::start);
//    thread.setDaemon(true);
    thread.start();
  }

  private void start() {
    while (true) {
      VirtualThread virtualThread = QUEUE.poll();
      if (Objects.nonNull(virtualThread)) {
        executor.submit(() -> {
          ScopedValue.where(SCOPE_VALUE, virtualThread).run(virtualThread::run);
        });
      }
      Thread.onSpinWait();
    }
  }

  /**
   * 调度虚拟线程
   *
   * @param virtualThread
   */
  public void schedule(VirtualThread virtualThread) {
    QUEUE.add(virtualThread);
  }
}

3. 模拟阻塞或者网络IO调用(判断如果是虚拟线程的话就让出执行权利),并且Java提供的虚拟线程在进行阻塞或者网络IO调用之前思想也是一样

package com.huyu.thread;

import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;

public class WaitOpt {

  /**
   * 用来模拟阻塞时间之后再次进行调度该虚拟线程
   */
  public final static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
      2);

  /**
   * 阻塞等待
   *
   * @param time
   */
  @SneakyThrows
  public static void await(Long time, boolean isPrint) {
    if (isPrint) {
      System.out.println("调用阻塞等待函数,让出CPU时间片");
    }
    //阻塞多长时间,让出时间片
    VirtualThread virtualThread = VirtualThreadSchedule.SCOPE_VALUE.get();
    if (Objects.isNull(virtualThread)) {
      //执行的不是自定义的虚拟线程
      Thread.sleep(time);
    } else {
      //执行的是自定义虚拟线程
      executorService.schedule(() -> {
        virtualThread.schedule();
      }, time, TimeUnit.MILLISECONDS);
      virtualThread.yield();
    }
  }

}

4. 测试代码

1. 简单测试代码
package com.huyu.thread;

public class SimpleCreateVirtualDemo2 {

  public static void main(String[] args) {

    VirtualThread virtualThread = new VirtualThread("virtualThread", () -> {
      System.out.println(
          "自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",正在执行当中......");
      WaitOpt.await(1000L, true);
      System.out.println(
          "自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",执行完毕");
    });
    virtualThread.start();

    VirtualThread virtualThread1 = new VirtualThread("virtualThread", () -> {
      System.out.println(
          "自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",正在执行当中......");
      WaitOpt.await(1000L, true);
      System.out.println(
          "自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",执行完毕");
    });
    virtualThread1.start();
  }
}

执行效果如下:
简单虚拟线程调用.png

2. 复杂创建百万虚拟线程测试代码
package com.huyu.thread;

import java.util.concurrent.CountDownLatch;

public class CreateMoreVirtualThreadDemo {

  public static void main(String[] args) throws InterruptedException {
    long startTime = System.currentTimeMillis();
    int skipPrintSize = 100000;
    int size = 1000000;
    boolean skipPrint = skipPrintSize <= size;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 1; i <= size; i++) {
      VirtualThread virtualThread = new VirtualThread("virtualThread", () -> {
        if (!skipPrint) {
          System.out.println(
              "自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",正在执行当中......");
        }
        //休眠一秒钟,这个用来模拟 LockSupport.park()或者是等待网络数据, 在Java提供的虚拟线程即将调用阻塞的函数
        //的时候会判断是否是虚拟线程,然后就会调用虚拟线程的阻塞方法
        WaitOpt.await(1000L, !skipPrint);
        if (!skipPrint) {
          System.out.println(
              "自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",执行完毕");
        }
        countDownLatch.countDown();
      });
      virtualThread.start();
    }
    countDownLatch.await();
    System.out.println(String.format("启动虚拟线程size %s , 消耗时间: %s ms", size,
        System.currentTimeMillis() - startTime));
  }
}

得出结论: 在Mac M2的机器上,执行100万虚拟线程需要2s时间,如图所示:
创建百万虚拟线程.png

标签:
评论 0
/ 1000
0
0
收藏