虚拟线程的底层原理,续体,jdk.internal.vm.Continuation,并且打造自己的虚拟线程
我们将介绍虚拟线程的底层原理,续体的代码介绍,同时也会演示他的执行流程,同时通过续体打造一个自己的轻量级虚拟线程,包含调度器和虚拟线程类
续体的使用
1. 续体在jdk里面是不提供直接调用的,需要添加jvm参数
--enable-preview
--add-exports java.base/jdk.internal.vm=ALL-UNNAMED
2. 续体的基本代码使用
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;
public static void main(String[] args) {
//续体 (虚拟线程的底层实现原理)
Continuation continuation = getContinuation();
doSomething("main执行1", continuation);
doSomething("main执行2", continuation);
doSomething("main执行3", continuation);
}
private static void doSomething(String opt, Continuation continuation) {
continuation.run();
System.out.println(opt + "," + Thread.currentThread().getName());
}
public static Continuation getContinuation() {
return new Continuation(scope, ThreadConfig::runTask);
}
public static void runTask() {
a();
b();
c();
}
public static void a() {
System.out.println("a," + Thread.currentThread().getName());
Continuation.yield(scope);
}
public static void b() {
System.out.println("b," + Thread.currentThread().getName());
Continuation.yield(scope);
}
public static void c() {
System.out.println("c," + Thread.currentThread().getName());
Continuation.yield(scope);
}
执行效果如下:
a,main
main执行1,main
b,main
main执行2,main
c,main
main执行3,main
对于以上的执行的结果你是否有诧异,感觉不可思议,按理来说应该是
a,main
b,main
c,main
main执行1,main
main执行2,main
main执行3,main
3. 流程原理, 原理在于堆栈来回拷贝

实现自己的虚拟线程
1. 虚拟线程类 VirtualThread
package com.huyu.thread;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;
public class VirtualThread {
private final static AtomicLong ID = new AtomicLong(1);
public final static VirtualThreadSchedule DEFAULT_SCHEDULE = new VirtualThreadSchedule();
private final ContinuationScope scope;
private final Continuation continuation;
private final Long id;
private final String name;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final VirtualThreadSchedule schedule;
public VirtualThread(String startName, Runnable runnable) {
this(startName, runnable, DEFAULT_SCHEDULE);
}
public VirtualThread(String startName, Runnable runnable, VirtualThreadSchedule schedule) {
this.schedule = Objects.isNull(schedule) ? DEFAULT_SCHEDULE : schedule;
this.id = ID.getAndIncrement();
this.name = startName + id;
this.scope = new ContinuationScope(name);
this.continuation = new Continuation(scope, runnable);
}
public static VirtualThread currentThread() {
return VirtualThreadSchedule.SCOPE_VALUE.get();
}
public String getName() {
return name;
}
@Override
public String toString() {
return "VirThread{" + "id=" + id + ", name='" + name + '\'' + '}';
}
public void start() {
if (isRunning.compareAndSet(false, true)) {
DEFAULT_SCHEDULE.schedule(this);
return;
}
throw new IllegalStateException("虚拟线程已经启动");
}
/**
* 不允许用户手动调用
*/
protected void yield() {
Continuation.yield(scope);
}
/**
* 不允许用户手动调用
*/
protected void schedule() {
schedule.schedule(this);
}
/**
* 不允许用户手动调佣
*/
protected void run() {
continuation.run();
}
}
2. 调度器(底层是一个普通的平台线程池)
package com.huyu.thread;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
/**
* 调度器
*/
public class VirtualThreadSchedule {
public final static ScopedValue<VirtualThread> SCOPE_VALUE = ScopedValue.newInstance();
private final Queue<VirtualThread> QUEUE = new LinkedBlockingQueue<>();
private final ThreadPoolExecutor executor;
private final AtomicInteger id = new AtomicInteger(1);
public VirtualThreadSchedule() {
this(Runtime.getRuntime().availableProcessors());
}
public VirtualThreadSchedule(int schedulerThreadCount) {
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(schedulerThreadCount,
new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r, "#" + id.getAndIncrement() + ",虚拟线程调度器");
// thread.setDaemon(true);
return thread;
}
});
Thread thread = new Thread(this::start);
// thread.setDaemon(true);
thread.start();
}
private void start() {
while (true) {
VirtualThread virtualThread = QUEUE.poll();
if (Objects.nonNull(virtualThread)) {
executor.submit(() -> {
ScopedValue.where(SCOPE_VALUE, virtualThread).run(virtualThread::run);
});
}
Thread.onSpinWait();
}
}
/**
* 调度虚拟线程
*
* @param virtualThread
*/
public void schedule(VirtualThread virtualThread) {
QUEUE.add(virtualThread);
}
}
3. 模拟阻塞或者网络IO调用(判断如果是虚拟线程的话就让出执行权利),并且Java提供的虚拟线程在进行阻塞或者网络IO调用之前思想也是一样
package com.huyu.thread;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
public class WaitOpt {
/**
* 用来模拟阻塞时间之后再次进行调度该虚拟线程
*/
public final static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(
2);
/**
* 阻塞等待
*
* @param time
*/
@SneakyThrows
public static void await(Long time, boolean isPrint) {
if (isPrint) {
System.out.println("调用阻塞等待函数,让出CPU时间片");
}
//阻塞多长时间,让出时间片
VirtualThread virtualThread = VirtualThreadSchedule.SCOPE_VALUE.get();
if (Objects.isNull(virtualThread)) {
//执行的不是自定义的虚拟线程
Thread.sleep(time);
} else {
//执行的是自定义虚拟线程
executorService.schedule(() -> {
virtualThread.schedule();
}, time, TimeUnit.MILLISECONDS);
virtualThread.yield();
}
}
}
4. 测试代码
1. 简单测试代码
package com.huyu.thread;
public class SimpleCreateVirtualDemo2 {
public static void main(String[] args) {
VirtualThread virtualThread = new VirtualThread("virtualThread", () -> {
System.out.println(
"自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",正在执行当中......");
WaitOpt.await(1000L, true);
System.out.println(
"自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",执行完毕");
});
virtualThread.start();
VirtualThread virtualThread1 = new VirtualThread("virtualThread", () -> {
System.out.println(
"自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",正在执行当中......");
WaitOpt.await(1000L, true);
System.out.println(
"自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",执行完毕");
});
virtualThread1.start();
}
}
执行效果如下:

2. 复杂创建百万虚拟线程测试代码
package com.huyu.thread;
import java.util.concurrent.CountDownLatch;
public class CreateMoreVirtualThreadDemo {
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
int skipPrintSize = 100000;
int size = 1000000;
boolean skipPrint = skipPrintSize <= size;
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 1; i <= size; i++) {
VirtualThread virtualThread = new VirtualThread("virtualThread", () -> {
if (!skipPrint) {
System.out.println(
"自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",正在执行当中......");
}
//休眠一秒钟,这个用来模拟 LockSupport.park()或者是等待网络数据, 在Java提供的虚拟线程即将调用阻塞的函数
//的时候会判断是否是虚拟线程,然后就会调用虚拟线程的阻塞方法
WaitOpt.await(1000L, !skipPrint);
if (!skipPrint) {
System.out.println(
"自定义虚拟线程: " + VirtualThread.currentThread().getName() + ",执行完毕");
}
countDownLatch.countDown();
});
virtualThread.start();
}
countDownLatch.await();
System.out.println(String.format("启动虚拟线程size %s , 消耗时间: %s ms", size,
System.currentTimeMillis() - startTime));
}
}
得出结论: 在Mac M2的机器上,执行100万虚拟线程需要2s时间,如图所示:





