异步

使用场景

大文件上传及上传后进行语音识别的应用场景

  • 大文件上传: 企业员工上传完整的会议录音或录像文件
  • 上传后进行语音识别: 对会议内容进行自动转写,生成会议纪要,便于后续查阅和分析讨论点

如果上传完成后不采用异步任务对文件进行语音识别,可能会出现以下问题:

  • 响应延迟: 语音识别是一个计算密集型的操作,特别是当处理大文件时。如果在同一请求中直接进行语音识别,服务器响应时间将大幅增加,导致用户体验变差。
  • 超时错误: 许多 Web 服务器和 HTTP 客户端都有请求超时的限制。长时间的同步处理可能导致请求超时,从而使得语音识别任务失败。
  • 资源占用: 大量的同步语音识别任务可能会占用大量服务器资源,包括 CPU 和内存,影响服务器处理其他请求的能力,降低整体服务的可用性和稳定性。
  • 用户界面冻结: 对于基于 Web 的应用,如果在前端直接等待语音识别的结果,可能导致用户界面冻结,无法进行其他操作,影响用户体验。

目前比较通用的做法是采用异步任务

  1. 文件上传
  • 用户通过客户端(如 Web 页面、移动应用)上传文件。
  • 服务器接收到文件后,存储在服务器上或云存储服务中,并立即返回一个响应告诉用户文件已成功上传。
  1. 异步任务创建
  • 在文件上传成功后,服务器不直接进行语音识别处理,而是创建一个异步任务。
  • 这个任务被发送到消息队列中,这样可以在系统资源允许的情况下按顺序或并行处理。
  1. 任务处理
  • 后台工作进程监听消息队列。一旦发现新任务,就开始处理该任务。
  • 在处理过程中,工作进程执行语音识别操作,将音频文件转写为文本。
  1. 更新状态和通知
  • 一旦异步任务完成,系统会更新任务的状态(例如,从“处理中”变为“已完成”)。
  • 系统可以通过不同的方式通知用户任务已完成,比如电子邮件通知、短信、应用内通知等。
  • 用户可以通过客户端查询任务状态,或者直接获取任务结果(如转写好的文本)。

异步线程池

TioThreadUtils

  • tio-boot 内置了线程池工具类 com.litongjava.tio.utils.thread.TioThreadUtils,并且 tio-boot 在关闭是 会自动释放线程资源 示例
    TioThreadUtils.submit(() -> {
      try {
        RespVo result = Aop.get(AsrSubmitService.class).submit(uploadFile, email);
        log.info("异步任务执行完成, 结果: {}", result);
      } catch (Exception e) {
        log.error("异步任务执行异常", e);
      }
    });

CompletableFuture.runAsync

CompletableFuture.runAsync 是 Java CompletableFuture 类的一部分,用于在异步执行一个无返回值的任务。它通常用于需要在后台线程中执行一些操作,而不阻塞主线程的场景。runAsync 方法有两个主要重载形式:

  1. runAsync(Runnable runnable):使用默认的 ForkJoinPool 作为线程池执行任务。
  2. runAsync(Runnable runnable, Executor executor):使用指定的 Executor 执行任务。

用法

CompletableFuture.runAsync(() -> {
    // 在异步线程中执行的代码
});

底层运行机制

CompletableFuture.runAsync 的底层运行机制主要涉及以下几个方面:

  1. 线程池管理

    • 当使用 runAsync(Runnable runnable) 时,CompletableFuture 默认使用全局的 ForkJoinPool.commonPool() 线程池来执行异步任务。
    • ForkJoinPool 是一个适合并行任务处理的线程池,支持任务分割和工作窃取机制,以提高线程利用率和执行效率。
    • 如果使用 runAsync(Runnable runnable, Executor executor),任务会由你提供的自定义 Executor 执行,这给了你更大的灵活性来控制线程的使用。
  2. 任务提交和执行

    • runAsync 被调用时,传入的 Runnable 任务会被提交到线程池中,线程池会在适当的时候执行这个任务。
    • 任务被执行时,runAsync 方法立即返回一个 CompletableFuture 对象,而不等待任务执行完毕。
  3. 非阻塞操作

    • CompletableFuture 的异步方法(如 runAsync)不会阻塞调用线程,这意味着主线程可以继续执行后续操作。
    • 异步操作完成后,可以通过 thenRun, thenAccept, thenApply 等方法对结果进行处理。
  4. 任务的生命周期

    • CompletableFuture 的状态在任务的执行过程中会发生变化,从未完成状态转变为完成状态。
    • 你可以在 CompletableFuture 完成后设置回调函数,如 .thenRun(...),这使得异步任务的结果可以在完成后被处理。

ForkJoinPool 的运行机制

默认情况下,CompletableFuture 使用的 ForkJoinPool.commonPool() 是一个并行的工作窃取线程池,适合处理大量的小任务。其运行机制包括:

  1. 任务分割ForkJoinPool 可以将任务分解成更小的子任务并行处理。
  2. 工作窃取:线程池中的每个线程都有自己的双端队列,线程首先从自己的队列中获取任务执行。如果自己的队列为空,它会尝试从其他线程的队列末尾"窃取"任务以保持高效利用。
  3. 线程调度ForkJoinPool 动态管理线程数,以平衡任务执行和资源消耗。

注意事项

  • 线程池的选择:默认的 ForkJoinPool.commonPool() 适用于 CPU 密集型任务。如果任务是 IO 密集型或需要更严格的线程管理,最好提供自定义的 Executor
  • 任务的阻塞:在异步任务中避免长时间的阻塞操作(如等待网络或文件 IO),以免耗尽线程池中的线程。

示例

以下是一个使用 CompletableFuture.runAsync 执行异步任务的示例:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Running in a separate thread: " + Thread.currentThread().getName());
    // 模拟长时间运行的任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

这个代码片段会在 ForkJoinPool 中的一个线程上运行,而不会阻塞主线程。

tio-boot TioThreadUtils 实现异步示例

在 Controller 中使用线程池,开启异步任务

import com.litongjava.apps.asrgpt.services.AsrSubmitService;
import com.litongjava.apps.asrgpt.validator.AsrSubmitValidator;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.UploadFile;
import com.litongjava.tio.http.server.util.Resps;
import com.litongjava.tio.utils.resp.RespVo;
import com.litongjava.tio.utils.thread.TioThreadUtils;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AsrSubmitController {

  public HttpResponse submit(HttpRequest request) {
    UploadFile uploadFile = request.getUploadFile("file");
    String email = request.getParam("email");
    log.info("upload file size:{}", uploadFile.getData().length);
    log.info("email:{}", email);

    //validator
    RespVo respVo = Aop.get(AsrSubmitValidator.class).submit(email);
    if (respVo != null) {
      return Resps.json(request, respVo);
    }

    TioThreadUtils.submit(() -> {
      try {
        RespVo result = Aop.get(AsrSubmitService.class).submit(uploadFile, email);
        log.info("异步任务执行完成, 结果: {}", result);
      } catch (Exception e) {
        log.error("异步任务执行异常", e);
      }
    });

    // 立即响应客户端,表示文件上传请求已接收,正在处理中
    RespVo ok = RespVo.ok();
    ok.setMsg("文件上传成功,正在处理中...");
    return Resps.json(request, ok);
  }
}