虚拟线程
使用虚拟线程并发执行数学与安全判定
本文介绍如何在 ApiMathController#determine 接口中,使用 MvThreadUtils(基于虚拟线程)并发执行两个独立的判定逻辑:determineTextMath 与 determineTextSafe。目标是在保持原有“判定顺序与返回语义不变”的前提下,减少总体等待时间。
背景
/api/v1/math/determine 接口需要对用户输入的 question 做两类校验:
- 是否为数学问题:
determineTextMath(question) - 是否为安全问题:
determineTextSafe(question)
原始实现为串行调用:先执行 math 判定,再执行 safe 判定。若两个判定内部均包含网络调用、模型推理、IO 等耗时操作,则串行执行会把耗时累加,导致接口整体响应变慢。
由于这两个判定逻辑相互独立,可以并发执行,从而将总耗时从“二者之和”降低到“二者的最大值”(忽略少量线程调度开销)。
设计目标
并发执行两次判定(两条虚拟线程同时开始)
仍然保持原本的业务判定顺序:
- 先判断是否为数学问题(失败优先返回
10001) - 再判断是否安全(失败返回
10002)
- 先判断是否为数学问题(失败优先返回
保持返回结构不变:最终
result字段沿用 safe 判定结果正确处理中断与执行异常,避免吞掉异常或破坏线程中断语义
关键点说明
1. 为什么并发但仍“串行消费结果”
做法是:
提交两个任务立即开始执行(并发开始)
按照原逻辑顺序调用
Future#get():- 先取
mathFuture.get(),保持 math 失败时优先返回 - 再取
safeFuture.get(),继续安全校验
- 先取
这样既能利用并发缩短总耗时,又不改变原来的业务优先级与错误码语义。
2. 虚拟线程的意义
虚拟线程适合大量阻塞型任务(IO、远程调用、等待模型响应等),可以显著降低平台线程占用压力,让并发更轻量、更经济。
3. 异常处理原则
InterruptedException:- 必须恢复中断标记:
Thread.currentThread().interrupt(); - 返回一个可识别的错误响应
- 必须恢复中断标记:
ExecutionException:- 代表任务内部抛出了异常
- 记录日志并返回内部错误响应
代码实现
下面是修改后的完整控制器实现(使用虚拟线程并发执行两个判定任务):
package com.litongjava.manim.controller;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.jfinal.kit.Kv;
import com.litongjava.annotation.Get;
import com.litongjava.annotation.RequestPath;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.manim.services.MathQuesitonService;
import com.litongjava.manim.utils.MvThreadUtils;
import com.litongjava.model.body.RespBodyVo;
import lombok.extern.slf4j.Slf4j;
@RequestPath("/api/v1/math")
@Slf4j
public class ApiMathController {
private MathQuesitonService mathQuesitonService = Aop.get(MathQuesitonService.class);
@Get
public RespBodyVo determine(String question) {
if (question == null) {
return RespBodyVo.fail("question can not be null");
}
// 两个虚拟线程同时执行
Future<Boolean> mathFuture = MvThreadUtils.submit(() -> mathQuesitonService.determineTextMath(question));
Future<Boolean> safeFuture = MvThreadUtils.submit(() -> mathQuesitonService.determineTextSafe(question));
try {
// 按原来的串行判定顺序消费结果:先 math,后 safe
Boolean isMath = mathFuture.get();
if (isMath != null && !isMath) {
return RespBodyVo.fail(10001, "only support math question");
}
Boolean isSafe = safeFuture.get();
if (isSafe != null && !isSafe) {
return RespBodyVo.fail(10002, "only support safe question");
}
// 保持与原实现一致:最终返回 safe 的判定结果
Kv kv = Kv.by("result", isSafe);
return RespBodyVo.ok(kv);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("determine interrupted, question={}", question, e);
return RespBodyVo.fail(10003, "request interrupted");
} catch (ExecutionException e) {
log.error("determine execute failed, question={}", question, e);
return RespBodyVo.fail(10004, "internal error");
}
}
}
返回结果语义
question == null- 返回失败:
question can not be null
- 返回失败:
determineTextMath(question)返回false(且不为null)- 返回失败:
10001 only support math question
- 返回失败:
determineTextSafe(question)返回false(且不为null)- 返回失败:
10002 only support safe question
- 返回失败:
两者均通过(或返回
null)- 返回成功:
{"result": isSafe}(保持与原逻辑一致)
- 返回成功:
性能收益预期
在两次判定耗时分别为 T_math、T_safe 的情况下:
- 串行:
T_total ≈ T_math + T_safe - 并发:
T_total ≈ max(T_math, T_safe)(外加少量调度开销)
当两个判定都存在明显 IO 或等待时间时,并发通常能带来直观收益。
注意事项与可选优化
提前返回时的取消
- 当前实现如果 math 判定失败会直接返回,但 safe 任务可能仍在运行。
- 若希望进一步节省资源,可以在 math 失败时尝试
safeFuture.cancel(true)(是否能中断取决于任务内部是否响应中断)。
超时控制
- 如果担心外部调用卡死,可使用带超时的
get(timeout, unit),并返回特定错误码。
- 如果担心外部调用卡死,可使用带超时的
日志与监控
- 可增加耗时统计,便于验证并发带来的收益与定位慢点。
如果你希望我再补一版“带取消与超时”的增强实现(同时保持原错误码优先级不变),我也可以直接给出完整代码。
其他代码
TioVirtualThreadUtils
package com.litongjava.manim.utils;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Virtual thread utils (JDK 21+).
*
* @author Tong Li
*/
public class TioVirtualThreadUtils {
private static volatile ExecutorService virtualThreadExecutor;
static {
start();
}
private static ThreadFactory namedVirtualThreadFactory(String baseName) {
AtomicInteger threadNumber = new AtomicInteger(1);
return r -> Thread.ofVirtual().name(baseName + "-" + threadNumber.getAndIncrement()).unstarted(r);
}
public static ExecutorService getExecutor() {
return virtualThreadExecutor;
}
public static <T> Future<T> submit(Callable<T> task) {
return virtualThreadExecutor.submit(task);
}
public static <T> Future<T> submit(Runnable task, T result) {
return virtualThreadExecutor.submit(task, result);
}
public static Future<?> submit(Runnable task) {
return virtualThreadExecutor.submit(task);
}
public static void execute(Runnable runnable) {
virtualThreadExecutor.execute(runnable);
}
/**
* Start executor if not started. Uses a per-task virtual thread executor,
* suitable for IO-bound workloads.
*/
public static void start() {
if (virtualThreadExecutor == null) {
// Two equivalent options:
// 1) with custom name factory:
virtualThreadExecutor = Executors.newThreadPerTaskExecutor(namedVirtualThreadFactory("tio-vt"));
// 2) simplest:
// virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
}
public static void restart() {
stop();
start();
}
/**
* Stop executor immediately. Note: shutdownNow() will attempt to interrupt
* running tasks.
*/
public static void stop() {
if (virtualThreadExecutor != null) {
virtualThreadExecutor.shutdownNow();
virtualThreadExecutor = null;
}
}
}
MvThreadUtils
package com.litongjava.manim.utils;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
public class MvThreadUtils {
public static void execute(Runnable runable) {
TioVirtualThreadUtils.execute(runable);
}
public static <T> Future<T> submit(Callable<T> task) {
return TioVirtualThreadUtils.submit(task);
}
public static <T> Future<T> submit(Runnable task, T result) {
return TioVirtualThreadUtils.submit(task, result);
}
public static Future<?> submit(Runnable task) {
return TioVirtualThreadUtils.submit(task);
}
}
