JDK21虚拟线程

虚拟线程是JDK21最重要的、可能会成为JDK8的lambda这样标志性的特性。这里对虚拟线程做原理级别的系统性分析。

虚拟线程的设计动机

对于后台服务器开发,一个请求对应一条处理线程,是最容易理解的服务器应用编程思路。但是,线程的数量会受到物理资源限制:比如默认Java线程会占用1MB内存,就会受到物理内存大小的限制;比如线程切换涉及内核态-用户态转换,需要频繁保存上下文,浪费CPU时间进行数据复制,受到CPU和带宽的限制。在海量请求到达时,线程资源会很快耗尽,成为主要的性能瓶颈。

早期的一个通常解法是:提高线程的利用率。一方面利用池化技术,减少线程创建销毁的开销,多余任务排队执行减少线程的数量;另一方面利用异步编程API(比如Future),在等待I/O时让出资源。但这样的后果是,一个请求的处理逻辑会被切分成很多段,执行-阻塞-回调-执行-阻塞-回调……这些段可能运行在不同的线程上,导致代码理解困难、调试困难。即使利用JDK8的CompletableFuture对并发编程逻辑做了编排,也只是治标而已。

而解决“线程数量限制”、“线程切换开销大”等问题的一个常用方案,就是构建用户态运行的“协程”。因此,Project Loom项目发起和推进Java的并发编程模型优化,其愿景为“write sync run async”,即“写同步代码,跑异步逻辑”。Loom提出了“纤程(Fiber)“的概念,并认为“纤程”由“续体(Continuation)”和“调度器(Scheduler)”两部分组成。而“纤程”在提供给用户的操作入口,称为“虚拟线程(Virtual Thread)”。“虚拟线程”资源几乎是无限的,每个请求都可以使用一个“虚拟线程”来执行业务逻辑。

loom.png

虚拟线程的基本原理

基本概念

virtual_thread.png

  • Fiber:纤程。类似“协程”的概念,由JVM调度,用户态。
  • Continuation:续体。纤程的基石之一,纤程调度的基本单位,可以随时挂起、唤醒。其挂起等底层逻辑由JVM实现,Java类Continuation只是一层浅包装。
  • Scheduler:调度器。纤程的基石之一,通常都是基于Thread的通用设计,既可以调度纤程,又可以调度线程。
  • Virtual Thread:虚拟线程,实现类是VirtualThread。是“纤程”提供给用户的操作入口。它是java.lang.Thread的实例,但是没有调用传统的native方法,不绑定到任何操作系统线程,而是被JVM调度,默认的调度器是ForkJoinPool。
  • Native Thread:本地线程/原生线程。浅包装了一层操作系统线程,借助Thread来实现,关键方法都是JNI本地方法调用,底层是C/C++。
  • Platform Thread:平台线程。Thread的非虚拟线程实例,被操作系统调度。只是为了区别于虚拟线程的逻辑概念,不存在实际的类,实现依赖本地线程。
  • Carrier Thread:载体线程。载体线程也是逻辑概念,不存在实际的类,是指虚拟线程执行时所依附的平台线程。虚拟线程没有系统资源可以去执行,因此需要依附在一个平台线程上去执行,执行过程中,虚拟线程不断加载(mount)、卸载(unmount),可能会更换多个不同的载体线程,并不强行绑定到某一个平台线程载体之上。虚拟线程:载体线程=M:N,其中M远大于N。
  • OS Thread:操作系统线程。真实的windows、linux操作系统的线程。

创建虚拟线程非常容易,Java虚拟线程VirtualThread继承了Thread,因此几乎和Thread是同样的使用方法,本质都是用VirtualThreadBuildernewVirtualThread方法创建VirtualThread,调度器默认都是ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 使用ofVirtual
// newVirtualThread(scheduler, nextThreadName(), characteristics(), task)
public static void main(String[] args) throws InterruptedException {
Thread vt = Thread.ofVirtual().start(() -> System.out.println(Thread.currentThread().threadId()));
vt.join();
}

// 使用startVirtualThread
// ThreadBuilders.newVirtualThread(null, null, 0, task);
public static void main(String[] args) throws InterruptedException {
Thread vt = Thread.startVirtualThread(() -> System.out.println(Thread.currentThread().threadId()));
vt.join();
}

// 使用工厂方法,可以配置线程名字
// newVirtualThread(scheduler, name, characteristics(), task)
// VirtualThreadFactory(scheduler, name(), counter(), characteristics(), uncaughtExceptionHandler());
public static void main(String[] args) throws InterruptedException {
ThreadFactory vtFactory = Thread.ofVirtual().name("vt-", 0).factory();
Thread vt = vtFactory.newThread(() -> System.out.println(Thread.currentThread().getName()));
vt.start();
vt.join();
}

// 使用ExecutorService
// 本质是传入了一个VirtualThreadFactory去创建线程
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService vtExecutorService = Executors.newVirtualThreadPerTaskExecutor();
vtExecutorService.submit(() -> System.out.println(Thread.currentThread().threadId())).get();
}

整体架构设计

overview.png

  • 从用户视角上看,用户通过VirtualThreadFactory创建了虚拟线程VirtualThread,提交到了执行器ThreadPerTaskExecutor去执行。这个执行器有个特点是,每次执行都会调用factory去创建一个新的线程来执行,可以无限创建线程。由于传入的是VirtualThreadFactory,所以只会创建虚拟线程。
  • 从任务视角上看,每个新的虚拟线程,默认的调度器是VirtualThread类中的ForkJoinPool实例;任务会被包装成VThreadContinuation这个续体,最后包装成一个runContinuation的Runnable方法提交给调度器ForkJoinPool来调度执行。
  • 从调度视角上看,提交的任务会在FIFO的工作队列排队,然后执行runContinuation会做载体线程的mount、unmount等操作,其中的cont.run会触发执行vthread.run做一些上下文的处理以及通知,最后执行task.run执行真正的业务逻辑。期间续体可能反复被挂起、唤醒,并重新提交至工作队列,直至任务执行完毕。

Continuation 续体

续体是协程的一个经典概念,在Loom项目中,Continuation是Java中的“协程(coroutine)”的抽象,它是一个具有入口点(entry point)的连续的子程序,可能会在某些点挂起(suspend)或让出资源(yield),当恢复(resume)时,会继续执行,且执行上下文保持不变。

Java中的Continuation是有栈协程(stackful),会保存/恢复调用JVM虚拟机调用栈,不支持保存/恢复本地调用栈(JNI stack),因此调用native方法过程中,续体是无法被挂起的。不支持的原因是,存储native堆栈需要保持连续内存,轻量和灵活受到限制,因此放弃以获得更大的灵活性,从而减少内存使用。

Continuation示例、栈帧上下文、层级Scope

我们运行一个Continuation的示例:

注意:JDK9开始,Java平台模块化系统引入了更严格的访问控制,以提高代码的封装性和安全性。默认情况下,java.base模块(即Java的核心类库)不允许向未命名模块开放其内部API(如jdk.internal.vm),而Continuation正是jdk.internal.vm模块的类,因此需要在启动应用程序时导出。对于idea启动,只需要编辑Run/Debug Configurations,然后增加VM options--add-exports java.base/jdk.internal.vm=ALL-UNNAMED,是VM Options,不是Program Arguments,也不是Environment varaiables。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;

/**
* @author bewindoweb <bewindoweb1995@foxmail.com>
* @date 2024/11/4 08:23
*/
public class Main {
public static void main(String[] args) {
ContinuationScope parentScope = new ContinuationScope("parentScope");
ContinuationScope childScope = new ContinuationScope("childScope");

Continuation child = new Continuation(childScope, () -> {
System.out.println("[3] before child");
Continuation.yield(parentScope);
System.out.println("[5] after child");
});

Continuation parent = new Continuation(parentScope, () -> {
System.out.println("[2] before parent");
child.run();
System.out.println("[6] after parent");
});

System.out.println("[1] start");
parent.run();
System.out.println("[4] yielding");
parent.run();
}
}

对应的结果是:

1
2
3
4
5
6
[1] start
[2] before parent
[3] before child
[4] yielding
[5] after child
[6] after parent

可以通过这个例子很好地体会Continuation的具体含义:

  • 当yield时会让JVM记录栈帧上下文,让出控制权;
  • 当再次run时,会恢复栈帧上下文,从yield的代码点继续执行。

栈帧的记录和恢复,是Continuation调用StackChunk实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Continuation {
static {
……
StackChunk.init(); // ensure StackChunk class is initialized
……
}

private boolean isEmpty() {
for (StackChunk c = tail; c != null; c = c.parent()) {
if (!c.isEmpty())
return false;
}
return true;
}

StackChunk在JDK只留了一个壳子,实现是在JVM里面,不再深入。

默认的Continuation是支持嵌套的(nested continuations),根据Scope来确认层级,所以挂起parentScope,就可以挂起parentScope最里层的闭包续体(innermost enclosing continuation)。不过对于VirtualThread的实现,传入和使用的是固定Scope:

1
2
3
4
5
final class VirtualThread extends BaseVirtualThread {
……
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
……
}

Continuation的Pin问题

Pin表面含义就是“钉住”,本质是由于某种原因导致Continuation不能被挂起,间接导致VirtualThread在了某个PlatformThread之上了。这样的后果是,VirtualThreadPlatformThread变回了1:1的数量关系,且平台线程一直被占用,这会导致海量虚拟线程得不到执行,终将因线程资源耗尽导致崩溃。

Continuation类清晰地定义了导致pin的原因:

1
2
3
4
5
6
7
8
private static Pinned pinnedReason(int reason) {
return switch (reason) {
case 2 -> Pinned.CRITICAL_SECTION;
case 3 -> Pinned.NATIVE;
case 4 -> Pinned.MONITOR;
default -> throw new AssertionError("Unknown pinned reason: " + reason);
};
}
  1. Pinned.CRITICAL_SECTION:执行临界区代码
    对于Java,在执行类加载(classloading)等过程时,就处于“执行临界区代码”的状态
  2. Pinned.NATIVE:执行本地方法
    Continuation没有记录本地调用栈,因此不支持在本地方法(native method)执行过程中挂起。
  3. Pinned.MONITOR:利用监视器的同步方法
    比如synchronized关键字和Object.wait,都用到了监视器。synchronized锁的owner是当前的载体线程,因此虚拟线程池持锁会导致同步语义混乱。

那么Loom为什么不去解决这个问题呢,理由是:

  1. Pinned.CRITICAL_SECTION:类加载大部分情况都是启动时,之后就很少执行了,因此调度器去调度一下即可,不处理。
  2. Pinned.NATIVE:为了灵活性和减少内存使用,放弃支持本地调用栈。
  3. Pinned.MONITOR:synchronized等关键字,大部分场景只是在极短时间内保护内存访问,所以可以忽略。如果要长时间保持的(比如synchronized里面等I/O),建议改代码,改为使用并发包java.util.concurrent中的同步功能。而Object.wait,在现代代码中并不常见,如果有,也建议改为并发包java.util.concurrent。

调度器

对于“纤程”之“调度器”,Loom项目没有继续做深入的研究,认为ForkJoinPool已经是一个很好的纤程调度器了。

为什么是ForkJoinPool?

ForkJoinPool是java并发包java.util.concurrent的一个经典调度器,它有个2个最重要的特性:

  1. 分治任务和工作队列

forkjoinpool_workqueue.png

正如其名,通过“fork”将大任务拆分为小任务并发执行,然后通过“join”将小任务结果合并为大任务结果。和普通scheduler共用一个FIFO任务队列的设计不同,ForkJoinPool默认每个CPU核心1条工作线程,每个工作线程都会有自己独立的WorkQueue双端队列存储待执行任务。默认async=false,采用LIFO类似栈的方式来处理任务,新创建的子任务会被优先执行,适合“分治”。“纤程”做了特殊的配置:

  • 工作线程数量最高256而不是CPU核心数,主要为了在发生Pin时能够有额外的线程顶上
  • “纤程”不需要“分治”,所以采用了async=true的FIFO类似队列方式处理任务,也就是”异步模式“,先创建就先执行,会更加高效。
1
2
3
4
5
6
7
8
// VitualThread类,createDefaultScheduler方法
private static ForkJoinPool createDefaultScheduler() {
……
maxPoolSize = Integer.max(parallelism, 256);
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode, 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
……
}

虽然“纤程”利用不上“分治”,但独立的工作队列给“纤程”运行提供了方便。“纤程”一个经典的设计课题是——“续体和调度器”应该分开实现吗,比如“续体”在应用层,“调度器”在内核层?

答案是否定的,不应该分开:

  • 第一、调度器处理“纤程”切换,需要用户态-内核态切换开销。
  • 第二,OS调度器通常需要通用,但通用调度算法不一定在所有业务上好用,不如ForkJoinPool。比如对于服务器常见的设计模式“A线程负责连接,B线程负责处理后续数据交互,A和B共享数据x”,对于OS调度器,很可能把A和B分配到不同CPU核心上运行,造成x需要做一些数据同步的开销;而使用ForkJoinPool,只需要把A和B的任务都添加到同一个工作队列里,就可以在同一个CPU核心上运行,充分利用CPU cache。
  1. 工作窃取算法(work-stealing)

forkjoinpool_workstealing.png

ForkJoinPool希望最大化利用CPU资源,因此当某个工作队列没有任务时,说明CPU核心空闲,就会“窃取”其他CPU核心的工作线程的工作队列任务来执行。“纤程”是异步模式async=true(FIFO),所以窃取方式采用LIFO的形式,也就是从队尾获取任务,减少和FIFO的工作队列正常逻辑产生的并发冲突,如果发生冲突会利用CAS来解决并发问题。

很明显,ForkJoinPool非常适合CPU密集型的任务,Java的“纤程”本质上也是希望尽可能让CPU资源都跑满,来支持海量“纤程”的执行。如果熟悉Go的GPM模型,可以发现Goroutine也是采用的“本地队列”+“工作窃取”的调度策略来最大化CPU使用率,提高吞吐量。

综上,对Java,ForkJoinPool已经是一个设计优秀的“纤程”调度器了。

调度器应该做时间片抢占调度吗?

OS用时间片抢占调度,主要是为了解决执行的公平性问题,避免长任务一直执行,导致其他任务饿死。那么“纤程“的调度器应该使用时间片抢占调度吗?

不应该。虽然对于Java,时间片抢占调度可以利用SafePoint安全点机制很容易地实现,但拥有这个功能并没有什么好处,理由:

  • 大量“纤程”,都在频繁执行长任务:瓶颈在CPU,有无时间片抢占调度,都无济于事。
  • 大量“纤程”,偶尔执行长任务:只需要一个优秀的调度器能够将“纤程”调度到可用的CPU核上去运行即可,也就是ForkJoinPool的工作窃取机制就可以办到。
  • 少量“纤程”,都在频繁地执行长任务。这种情况,从程序设计上就不应该使用“纤程”,应该用“线程”。

虚拟线程调度状态

virtual_thread_state.png
(图中小方框是每个虚拟线程状态对应的线程状态,这里只关注虚拟线程状态)

  1. 正常情况
    NEW -> STARTED -> RUNNING -> TERMINATED
  2. 发生park
    RUNNING -> PARKING/TIMED_PARKING
  • 挂起成功则PARKED/TIMED_PARKED,成功后进行unpark可以解除挂起状态进入UNPARKED,进而可以进入RUNNING状态
  • 挂起失败则PINNED/TIMED_PINNED,进行unpark,可以进入RUNNING状态继续执行
  1. 发生yield
    RUNNING -> YIELDING
  • 让步成功则YIELDED,再次被调度即可进入RUNNING继续执行
  • 让步失败则直接回退到RUNNING状态

yield和park的区别:

  • yield是直接让步等待调度,park是等待解锁后才能被调度
  • yield和park底层都是调用Continuation续体的yield方法
  • yield的入口是Thread.yield,让出控制权后,runContinuation方法会继续执行afterYield(),然后通过submitRunContinuation或者externalSubmitRunContinuation再次提交任务到ForkJoinPool。
  • park的入口是LockSupport.park/parkNanos,调用Continuation续体的yield方法之前:
    • 如果是parkNanos(),会利用定时器ScheduledExecutorService UNPARKER定时unpark
    • 如果是park(),那么由其他线程进行unpark
  • park在调用Continuation续体的yield方法之后,等待unpark()调用:
    • 如果是PARKED/TIMED_PARKED,设置状态为UNPARKED,然后submitRunContinuation再次提交任务到ForkJoinPool
    • 如果是PINNED/TIMED_PINNED,则会对载体线程进行unpark

关键代码:VirtualThread#runContinuation, VirtualThread#tryYield(), VirtualThread#park(), VirtualThread#unpark(), VirtualThread#afterYield

内存模型

虚拟线程和GC

虚拟线程的堆栈作为堆栈块对象,存储在Java的堆中,堆栈随着应用程序运行增长和收缩,可以容纳任意深度的堆栈,使得虚拟线程的“海量”得以实现。

虚拟线程本身不会被用于GC Roots,因为:

  • 如果虚拟线程正在运行,那么它被scheduler持有
  • 如果虚拟线程block,那么它被block的对象持有

由于堆栈块对象分配在堆上,会被垃圾收集器管理。虚拟线程最好不要使用G1作为垃圾收集器,G1不支持巨大的堆栈块对象,当堆栈块大小达到G1 Region的一半时,可能导致StackOverflowError。因此JDK21使用了分代的ZGC来减少GC压力。

虚拟线程和ThreadLocal

ThreadLocal有2个用途:

  • 关联线程上下文。虚拟线程也需要。
  • 避免并发数据访问。Loom认为这是ThreadLocal的滥用,这会导致线程数和CPU资源绑定,不适合虚拟线程。

海量虚拟线程如果都配置ThreadLocal,肯定会爆炸,但是虚拟线程并没有禁止使用ThreadLocal,只是提示谨慎使用。在实际操作中,对于大量使用ThreadLocal的中间件,比如经典的Netty,就会需要修改大量代码。

目前正在开发ScopedValue这个新功能来替代ThreadLocal,在JDK21暂时还是preview特性,核心思路就是只实现“关联线程上下文”的功能,共享不可变数据。

虚拟线程生态

jcmd/JFR/MBeans

由于虚拟线程数量太多,传统的jstack和jcmd不太方便观测,因此在jcmd中引入一种新的线程转储:

1
$ jcmd <pid> Thread.dump_to_file -format=json <file>

我们新建一个测试类,用try-with-resource的方式执行虚拟线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
* @author bewindoweb <bewindoweb1995@foxmail.com>
* @date 2024/11/4 08:23
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
System.out.println("PID: " + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 100_0000).forEach(i -> {
executor.submit(() -> {
try {
Thread.sleep(Duration.ofSeconds(1));
} catch (Exception ignore){}
});
});
}
while (true) {
Thread.sleep(Duration.ofSeconds(1));
}
}
}

假设pid是6324,我们打印线程:

1
jcmd 6324 Thread.dump_to_file -format=json test.json

就可以得到完整的线程堆栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
{
"threadDump": {
"processId": "6324",
"time": "2024-11-04T11:59:19.436406Z",
"runtimeVersion": "21.0.5+11-LTS",
"threadContainers": [
……
{
"container": "ForkJoinPool.commonPool\/jdk.internal.vm.SharedThreadContainer@28251d01",
"parent": "<root>",
"owner": null,
"threads": [
],
"threadCount": "0"
},
{
"container": "java.util.concurrent.ThreadPoolExecutor@43c82e61",
"parent": "<root>",
"owner": null,
"threads": [
],
"threadCount": "0"
},
{
"container": "java.util.concurrent.ScheduledThreadPoolExecutor@184f6be2",
"parent": "<root>",
"owner": null,
"threads": [
{
"tid": "127",
"name": "VirtualThread-unparker",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(AbstractQueuedSynchronizer.java:519)",
"java.base\/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3780)",
"java.base\/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3725)",
"java.base\/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1712)",
"java.base\/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)",
"java.base\/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)",
"java.base\/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1070)",
"java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)",
"java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)",
"java.base\/java.lang.Thread.run(Thread.java:1583)",
"java.base\/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:186)"
]
}
],
"threadCount": "1"
},
{
"container": "ForkJoinPool-1\/jdk.internal.vm.SharedThreadContainer@47f27ba7",
"parent": "<root>",
"owner": null,
"threads": [
{
"tid": "24",
"name": "ForkJoinPool-1-worker-1",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1893)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
},
{
"tid": "31",
"name": "ForkJoinPool-1-worker-2",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1893)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
},
{
"tid": "52",
"name": "ForkJoinPool-1-worker-3",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1893)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
},
{
"tid": "80",
"name": "ForkJoinPool-1-worker-4",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1893)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
},
{
"tid": "94",
"name": "ForkJoinPool-1-worker-5",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:449)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1891)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
},
{
"tid": "95",
"name": "ForkJoinPool-1-worker-6",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1893)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
},
{
"tid": "103",
"name": "ForkJoinPool-1-worker-7",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1893)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
},
{
"tid": "111",
"name": "ForkJoinPool-1-worker-8",
"stack": [
"java.base\/jdk.internal.misc.Unsafe.park(Native Method)",
"java.base\/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)",
"java.base\/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1893)",
"java.base\/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1809)",
"java.base\/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)"
]
}
],
"threadCount": "8"
}
]
}
}

另外还可以通过Java自带的JFR(Java Flight Recorder,Java飞行记录器)、MBeans来监控虚拟线程状态。

springboot

springboot 3.x已经支持虚拟线程:

1
2
3
threads:
virtual:
enabled: true

并且支持FlightRecorderApplicationStartup来支持spring生命周期的JVM监控,基于Java Flight Recorder

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
public class MyApplication {

public static void main(String[] args) {
SpringApplication application = new SpringApplication(MyApplication.class);
application.setApplicationStartup(new FlightRecorderApplicationStartup(2048));
application.run(args);
}

}

详见:https://docs.spring.io/spring-boot/reference/features/spring-application.html#features.spring-application.virtual-threads

注意:如果使用Undertow作为Servlet容器来使用虚拟线程,可能会有内存溢出问题,目前SpringBoot正在修复,如果不能解决可能会考虑移除Undertow对虚拟线程的支持,因此推荐使用Tomcat。

总结

虚拟线程是一种不需要池化、不适合CPU密集型任务、可以大量创建、被JVM调度的纤程入口,底层由调度器ForkJoinPool和续体Continuation组成,能够实现“写同步代码,跑异步逻辑”的愿景。目前比较明显的缺点是在执行类加载、本地方法调用、synchronized时会退化为平台线程,大量使用synchronized的中间件或应用需要用j.u.c并发包中的组件进行重写。生态方面java基本的调试器支持监控、最核心的springboot也支持其功能,足够使用了。

参考文档

Project Loom: Fibers and Continuations for the Java Virtual Machine

作者

香蕉微波炉

发布于

2024-11-03

更新于

2024-11-03

许可协议