分场景运行代码及流式播放支持
本篇文档旨在详细说明如何分场景运行代码,以及如何利用流式传输(Stream)技术实现视频播放。文档内容包含运行过程说明、接口调用示例以及完整的 Java 代码实现,并对各个模块进行了详细解释,帮助读者全面理解整个流程。
1. 运行过程说明
在之前的工作中,我们已经实现了分场景生成代码,本节主要介绍如何分场景运行代码以及如何实现流式播放。
1.1 首次场景渲染
调用接口 /manim/start
后,会返回如下 JSON 格式响应:
{
"sessionId": "500972768364683264",
"sessionIdPrt": "105553151428672",
"exitCode": 0,
"executeCode": null,
"viode_length": 0.0,
"taskId": "0",
"stdOut": null,
"stdErr": null,
"output": "/data/session/500972768364683264/main.m3u8",
"images": null,
"videos": null
}
在首次场景渲染成功后,生成的 main.m3u8
文件即可用于播放,播放地址例如:
http://xxx/data/session/500972768364683264/main.m3u8
1.2 后续场景代码提交
后续的场景执行需要通过提交代码的方式运行,调用示例为:
POST /manim?session_prt=105553141509952&m3u8_path=./data/hls/500960829722333184/main.m3u8
{CODE}
每次提交代码执行后,成功返回对应场景的地址,例如:
- Sence 01
/data/hls/500972882755936256/main.m3u8
- Sence 02
/data/hls/500973222591029248/main.m3u8
- Sence 03
/data/hls/500973399972339712/main.m3u8
- Sence 04
/data/hls/500973577777274880/main.m3u8
- Sence 05
/data/hls/500973739706769408/main.m3u8
1.3 场景合并与结束
在所有场景提交并执行完毕后,调用 /manim/finish
接口完成整个视频的合并工作。调用示例如下:
/manim/finish?session_prt=105553151428672&m3u8_path=./data/session/500972768364683264/main.m3u8&videos=/data/hls/500972882755936256/main.m3u8,/data/hls/500973222591029248/main.m3u8,/data/hls/500973399972339712/main.m3u8,/data/hls/500973577777274880/main.m3u8,/data/hls/500973739706769408/main.m3u8
响应示例如下:
{
"sessionId": "0",
"sessionIdPrt": "0",
"exitCode": 0,
"executeCode": null,
"viode_length": 87.666667,
"taskId": "0",
"stdOut": null,
"stdErr": null,
"output": null,
"images": null,
"videos": null
}
在该步骤中,后端会合并各个场景的视频片段,生成最终的 MP4 文件,并通过 HLS 分段生成新的 .m3u8
文件,同时获取视频的时长信息。
2. 代码实现
接下来的代码实现主要分为两部分:HTTP 请求处理的 ManimHanlder 和具体业务逻辑处理的 ManimService。下面详细介绍各个模块的实现细节。
2.1 ManimHanlder.java
该类用于处理 HTTP 请求,并调用业务服务层执行相关操作。主要包含以下几个方法:
start
用于初始化 HLS 渲染场景,创建新的会话目录,并调用NativeMedia.initPersistentHls
初始化持久化 HLS 播放。返回的 JSON 中包含sessionId
、sessionIdPrt
(对应持久化会话的标识)以及生成的 m3u8 文件路径。finish
用于结束会话。根据传入的m3u8_path
判断对应的 HLS 文件是否存在,如果存在则调用NativeMedia.finishPersistentHls
完成会话合并,否则调用NativeMedia.freeHlsSession
释放会话。
同时,在传入videos
参数时会对视频文件进行合并操作,利用NativeMedia.merge
方法合并各个分场景生成的 MP4 文件,并计算视频时长。index
用于接受代码执行请求。当请求带有stream
参数时,会添加 Server-Sent Events(SSE)的响应头,实现流式数据返回;否则直接执行代码并返回最终结果。
以下是 ManimHanlder.java 的完整代码实现:
package com.litongjava.linux.handler;
import java.io.File;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.linux.ProcessResult;
import com.litongjava.linux.service.ManimService;
import com.litongjava.media.NativeMedia;
import com.litongjava.tio.boot.http.TioRequestContext;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.server.util.CORSUtils;
import com.litongjava.tio.utils.hutool.FilenameUtils;
import com.litongjava.tio.utils.json.JsonUtils;
import com.litongjava.tio.utils.snowflake.SnowflakeIdUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ManimHanlder {
ManimService manimService = Aop.get(ManimService.class);
public HttpResponse start(HttpRequest request) {
HttpResponse response = TioRequestContext.getResponse();
CORSUtils.enableCORS(response);
long sessionId = SnowflakeIdUtils.id();
String subPath = "./data/session/" + sessionId;
new File(subPath).mkdirs();
String m3u8Path = subPath + "/main.m3u8";
String tsPattern = subPath + "/segment_video_%03d.ts";
int startNumber = 0;
int segmentDuration = 10; // 每个分段时长(秒)
long initPersistentHls = NativeMedia.initPersistentHls(m3u8Path, tsPattern, startNumber, segmentDuration);
ProcessResult processResult = new ProcessResult();
processResult.setSessionId(sessionId);
processResult.setSessionIdPrt(initPersistentHls);
processResult.setOutput(m3u8Path.replace("./", "/"));
response.setJson(processResult);
return response;
}
public HttpResponse finish(HttpRequest request) {
HttpResponse response = TioRequestContext.getResponse();
CORSUtils.enableCORS(response);
Long session_prt = request.getLong("session_prt");
ProcessResult processResult = new ProcessResult();
if (session_prt == null) {
processResult.setStdErr("m3u8_path can not be empty");
return response.setJson(processResult);
}
String m3u8Path = request.getString("m3u8_path");
if (m3u8Path == null) {
processResult.setStdErr("m3u8_path can not be empty");
return response.setJson(processResult);
} else {
m3u8Path = "." + m3u8Path;
}
String videos = request.getString("videos");
log.info("session_prt:{},m3u8Path:{}", session_prt, m3u8Path, videos);
File file = new File(m3u8Path);
if (file.exists()) {
log.info("finishPersistentHls");
NativeMedia.finishPersistentHls(session_prt, m3u8Path);
} else {
log.info("freeHlsSession");
NativeMedia.freeHlsSession(session_prt);
}
if (videos != null) {
String subPath = FilenameUtils.getSubPath(m3u8Path);
String outputPath = subPath + "/main.mp4";
String[] split = videos.split(",");
String[] mp4FileList = new String[split.length];
for (int i = 0; i < split.length; i++) {
String string = "." + split[i].replace(".m3u8", ".mp4");
mp4FileList[i] = string;
}
log.info("merge:{}", JsonUtils.toJson(mp4FileList));
boolean merged = NativeMedia.merge(mp4FileList, outputPath);
log.info("merged:{}", merged);
if (merged) {
double videoLength = NativeMedia.getVideoLength(outputPath);
processResult.setViode_length(videoLength);
}
}
return response.setJson(processResult);
}
public HttpResponse index(HttpRequest request) {
HttpResponse response = TioRequestContext.getResponse();
CORSUtils.enableCORS(response);
ChannelContext channelContext = request.getChannelContext();
Boolean stream = request.getBoolean("stream");
Long session_prt = request.getLong("session_prt");
String m3u8Path = request.getString("m3u8_path");
log.info("{},{}", session_prt, m3u8Path);
if (stream == null) {
stream = false;
}
String code = request.getBodyString();
if (stream) {
response.addServerSentEventsHeader();
Tio.send(channelContext, response);
response.setSend(false);
}
try {
ProcessResult executeScript = manimService.executeCode(code, stream, session_prt, m3u8Path, channelContext);
if (executeScript != null) {
response.setJson(executeScript);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
response.setStatus(500);
response.setString(e.getMessage());
}
return response;
}
}
2.2 ManimService.java 实现
该服务类负责处理脚本代码的执行、文件监控以及视频合并等核心逻辑。下面对主要方法和功能做详细介绍:
2.2.1 executeCode 方法
目录创建与代码预处理
在executeCode
方法中,首先会创建缓存目录cache
,并为每次提交生成唯一 ID 作为子文件夹的标识;同时将传入的代码字符串中占位符#(output_path)
替换为实际的输出目录。脚本文件生成
在scripts
目录下创建对应的子目录,并将预处理后的 Python 脚本保存为script.py
文件。视频渲染与目录监控
根据是否需要流式返回结果:流式发送(stream 为 true)
创建需要监控的文件夹(存放部分视频片段),启动一个独立线程来监控目录中文件的创建(使用 Java WatchService),以便实时将新生成的文件路径通知客户端。接下来通过异步线程执行 Python 脚本,执行结束后将结果(包括最终生成的视频文件路径)以 SSE 形式发送回客户端。非流式执行
直接调用execute
方法执行 Python 脚本,等待脚本执行结束后获取结果,并进一步调用NativeMedia.splitVideoToHLS
将生成的视频文件切分为 HLS 格式,再调用NativeMedia.appendVideoSegmentToHls
将当前场景的视频追加到整体会话中。
视频合并与 HLS 分段
若脚本执行生成的视频存在,则将视频文件拷贝至目标文件夹,并进行 HLS 分段处理,生成新的 m3u8 播放列表供播放使用。
2.2.2 execute 方法
该方法利用 Java 的 ProcessBuilder 来调用 Python 解释器执行脚本,并将标准输出和错误输出重定向到日志文件中。待脚本执行完毕后,读取日志内容,并封装在 ProcessResult
对象中返回。
2.2.3 watchFolder 方法
该方法使用 WatchService 监控指定目录中新创建的文件,并通过 Tio 的 SSE(Server-Sent Events)功能,将每个新文件的全路径发送给客户端,实现实时流式数据传输。
下面分别展示 ManimService.java 的完整代码实现,以下代码块均原样保留,未做任何省略。
2.3 ManimService.java
package com.litongjava.linux.service;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;
import com.litongjava.linux.ProcessResult;
import com.litongjava.media.NativeMedia;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.sse.SsePacket;
import com.litongjava.tio.http.server.util.SseEmitter;
import com.litongjava.tio.utils.hutool.FileUtil;
import com.litongjava.tio.utils.json.JsonUtils;
import com.litongjava.tio.utils.snowflake.SnowflakeIdUtils;
import com.litongjava.tio.utils.thread.TioThreadUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ManimService {
public ProcessResult executeCode(String code, Boolean stream, Long sessionId, String m3u8Path, ChannelContext channelContext) throws IOException, InterruptedException {
new File("cache").mkdirs();
long id = SnowflakeIdUtils.id();
String subFolder = "cache" + File.separator + id;
code = code.replace("#(output_path)", subFolder);
String folder = "scripts" + File.separator + id;
File fileFolder = new File(folder);
if (!fileFolder.exists()) {
fileFolder.mkdirs();
}
String scriptPath = folder + File.separator + "script.py";
FileUtil.writeString(code, scriptPath, StandardCharsets.UTF_8.toString());
// 执行脚本
String videoFolder = subFolder + File.separator + "videos" + File.separator + "1080p30";
log.info("videoFolder:{}", videoFolder);
// 定义需要监控的文件夹,注意此处为绝对路径或根据实际情况调整
String partVideoFolder = videoFolder + File.separator + "partial_movie_files" + File.separator + "CombinedScene";
// 如果需要流式发送,则启动文件夹监控线程
ProcessResult execute = null;
if (stream) {
File file = new File(partVideoFolder);
file.mkdirs();
log.info("watch:{}", file.getAbsolutePath());
Thread watcherThread = new Thread(() -> watchFolder(partVideoFolder, channelContext));
watcherThread.start();
TioThreadUtils.execute(() -> {
try {
ProcessResult execute2 = execute(scriptPath);
execute2.setTaskId(id);
String filePath = videoFolder + File.separator + "CombinedScene.mp4";
File combinedScenefile = new File(filePath);
if (combinedScenefile.exists()) {
execute2.setOutput(filePath.replace("\\", "/"));
} else {
log.info("file is not exists:{}", filePath);
}
// 可以等待一段时间,以确保监控期间捕获到文件创建事件
Thread.sleep(2000);
if (watcherThread != null && watcherThread.isAlive()) {
watcherThread.interrupt();
}
String skipNullJson = JsonUtils.toSkipNullJson(execute2);
Tio.send(channelContext, new SsePacket("result", skipNullJson));
SseEmitter.closeSeeConnection(channelContext);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} else {
execute = execute(scriptPath);
execute.setTaskId(id);
String filePath = videoFolder + File.separator + "CombinedScene.mp4";
File file = new File(filePath);
if (file.exists()) {
execute.setOutput(filePath.replace("\\", "/"));
String subPath = "./data/hls/" + id + "/";
String name = "main";
String relPath = subPath + name + ".mp4";
File relPathFile = new File(relPath);
relPathFile.getParentFile().mkdirs();
Files.copy(file.toPath(), relPathFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
String hlsPath = subPath + name + ".m3u8";
log.info("to hls:{}", hlsPath);
NativeMedia.splitVideoToHLS(hlsPath, relPath, subPath + "/" + name + "_%03d.ts", 10);
execute.setOutput(hlsPath.replace("./", "/"));
if (sessionId != null) {
log.info("merge into:{},{}", sessionId, m3u8Path);
String appendVideoSegmentToHls = NativeMedia.appendVideoSegmentToHls(sessionId, filePath);
log.info("merge result:{}", appendVideoSegmentToHls);
} else {
log.info("skip merge to hls");
}
} else {
log.info("file is not exists:{}", filePath);
}
}
return execute;
}
public static ProcessResult execute(String scriptPath) throws IOException, InterruptedException {
String osName = System.getProperty("os.name").toLowerCase();
log.info("osName: {} scriptPath: {}", osName, scriptPath);
ProcessBuilder pb;
if (osName.contains("windows") || osName.startsWith("mac")) {
pb = new ProcessBuilder("python", scriptPath);
} else {
pb = new ProcessBuilder("python3", scriptPath);
}
pb.environment().put("PYTHONIOENCODING", "utf-8");
// 获取脚本所在目录
File scriptFile = new File(scriptPath);
File scriptDir = scriptFile.getParentFile();
if (scriptDir != null && !scriptDir.exists()) {
scriptDir.mkdirs();
}
// 定义日志文件路径,存放在与 scriptPath 相同的目录
File stdoutFile = new File(scriptDir, "stdout.log");
File stderrFile = new File(scriptDir, "stderr.log");
// 将输出和错误流重定向到对应的日志文件
pb.redirectOutput(stdoutFile);
pb.redirectError(stderrFile);
Process process = pb.start();
int exitCode = process.waitFor();
// 读取日志文件内容,返回给客户端(如果需要实时返回,可用其他方案监控文件变化)
String stdoutContent = new String(Files.readAllBytes(stdoutFile.toPath()), StandardCharsets.UTF_8);
String stderrContent = new String(Files.readAllBytes(stderrFile.toPath()), StandardCharsets.UTF_8);
ProcessResult result = new ProcessResult();
result.setExitCode(exitCode);
result.setStdOut(stdoutContent);
result.setStdErr(stderrContent);
return result;
}
/**
* 监控指定目录中新建文件的变化,并发送给客户端
*/
private void watchFolder(String folderPath, ChannelContext channelContext) {
Path path = Paths.get(folderPath);
try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
while (!Thread.currentThread().isInterrupted()) {
WatchKey key = null;
try {
key = watchService.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
if (key == null) {
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
String newFileName = event.context().toString();
String fullPath = folderPath + File.separator + newFileName;
// 通过 Tio.send 发送新文件路径到客户端
Tio.send(channelContext, new SsePacket("part", fullPath));
}
}
key.reset();
}
} catch (IOException e) {
log.error("Error watching folder {}", folderPath, e);
}
}
}
3. 总结
本文档详细描述了如何通过分场景提交代码执行和流式监控视频生成过程。
- 运行流程:先通过
/manim/start
初始化场景,再依次提交场景代码,通过接口/manim/index
实时返回部分生成的视频文件(利用 SSE),最终利用/manim/finish
接口合并生成完整的视频。 - 代码实现:文档中包含了 HTTP 请求处理(ManimHanlder)以及核心业务逻辑(ManimService)的完整 Java 源码。
- 关键技术点:
- 使用 Java WatchService 实现文件目录实时监控。
- 利用 ProcessBuilder 执行 Python 脚本,并重定向脚本输出。
- 通过 HLS 分段生成流媒体播放列表,实现流式播放。
通过对代码实现与运行流程的详细解释,可以帮助开发者快速理解并扩展基于此方案的视频分场景运行与流式播放系统。