Tio Boot 分片上传服务设计与实现
1. 概述
本文档详细介绍了基于 Tio Boot 框架实现的文件分片上传服务。该服务旨在解决大文件上传过程中可能遇到的网络不稳定、超时等问题,通过将大文件分割成多个小块(分片)进行上传,提高上传的稳定性和效率。
1.1. 核心思想
- 分片上传 (Chunked Upload):客户端将待上传的大文件按照预设大小(如 5MB)切分为多个小块。
- 服务端协调:服务端负责管理整个上传流程,包括初始化上传任务、接收并存储分片、以及最终合并所有分片成完整文件。
- 断点续传基础:通过记录已上传的分片信息,为后续实现断点续传功能奠定基础(当前版本未完全实现断点续传逻辑,但架构支持)。
2. 整体流程
分片上传流程主要分为三个步骤:
初始化上传 (
initUpload
):- 客户端向服务端发送一个初始化请求,携带文件的基本信息(文件名、总大小、分片总数、仓库名等)。
- 服务端接收到请求后,生成一个唯一的
uploadId
作为本次上传任务的标识符。 - 服务端创建一个临时目录用于存放分片文件。
- 服务端将本次上传任务的关键信息(如
uploadId
, 文件名, 分片总数, 临时目录路径等)存储在一个内存映射 (uploadMap
) 中(生产环境建议使用 Redis 等持久化存储)。 - 服务端将生成的
uploadId
返回给客户端。
上传分片 (
uploadChunk
):- 客户端根据初始化时获得的分片总数,依次上传每个分片。
- 对于每个分片,客户端发送一个请求,携带
uploadId
、当前分片的索引 (partIndex
) 以及分片的二进制数据。 - 服务端接收到分片后,根据
uploadId
从uploadMap
中找到对应的上传任务信息。 - 服务端验证
partIndex
的有效性。 - 服务端将该分片数据写入到临时目录下的一个特定文件中(文件名通常与
uploadId
和partIndex
相关)。 - 服务端在内存中将该
partIndex
标记为“已接收”。 - 服务端返回确认信息给客户端,表示该分片上传成功。
完成上传 (
completeUpload
):- 当客户端上传完所有分片后,向服务端发送一个完成请求,携带
uploadId
。 - 服务端根据
uploadId
获取上传任务信息。 - 服务端检查所有分片是否都已标记为“已接收”。
- 如果所有分片均已上传,服务端按照分片顺序,将临时目录下的所有分片文件按顺序读取并合并成最终的完整文件,并保存到指定的目标路径(如用户仓库目录下)。
- (可选)服务端尝试设置合并后文件的最后修改时间,以匹配原始文件的时间戳。
- 服务端清理临时目录中的分片文件和内存中的上传任务信息。
- 服务端返回最终文件的路径或成功信息给客户端。
- 当客户端上传完所有分片后,向服务端发送一个完成请求,携带
3. 服务端实现 (Java/Tio Boot)
3.1. 数据模型 (ChunkedUploadInfo
)
用于在服务端存储和管理单个分片上传任务的信息。
package com.litongjava.http.file.server.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 分片上传信息类
* 用于存储和管理单次分片上传任务的状态和元数据
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChunkedUploadInfo {
private String uploadId; // 唯一标识本次上传任务的ID
private String fileName; // 原始文件名
private Long fileSize; // 原始文件总大小 (字节)
private Integer totalParts; // 总分片数量
private String repo; // 目标仓库名称
private String username; // 上传用户的用户名
private Long originalModTime; // 原始文件的最后修改时间戳 (秒)
private String tempDirPath; // 存放临时分片文件的目录路径
private boolean[] receivedParts; // 布尔数组,标记每个分片是否已接收
}
3.2. 处理器 (FileChunkedUploadHandler
)
核心处理逻辑,包含初始化、上传分片、完成上传三个主要方法。
package com.litongjava.http.file.server.handler;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.attribute.FileTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.fastjson2.JSONObject;
import com.litongjava.http.file.server.consts.FileConst;
import com.litongjava.http.file.server.model.ChunkedUploadInfo;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.model.body.RespBodyVo;
import com.litongjava.tio.boot.admin.services.AppUserService;
import com.litongjava.tio.boot.http.TioRequestContext;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.UploadFile;
import com.litongjava.tio.utils.json.FastJson2Utils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FileChunkedUploadHandler {
// 临时存储上传信息,实际生产环境建议使用Redis等持久化存储
// 使用 ConcurrentHashMap 保证线程安全
private static final Map<String, ChunkedUploadInfo> uploadMap = new ConcurrentHashMap<>();
/**
* 初始化分片上传
* 客户端调用此接口开始一个新的分片上传任务
* @param request HTTP请求,包含JSON格式的初始化参数
* @return HttpResponse 返回包含 upload_id 的JSON响应
*/
public HttpResponse initUpload(HttpRequest request) {
HttpResponse response = TioRequestContext.getResponse();
String bodyString = request.getBodyString();
JSONObject jsonObject = FastJson2Utils.parseObject(bodyString);
try {
String repo = jsonObject.getString("repo");
String fileName = jsonObject.getString("file_name");
Long fileSize = jsonObject.getLong("file_size");
Integer totalParts = jsonObject.getInteger("total_parts");
Long originalModTime = jsonObject.getLong("original_mod_time"); // 可选
// 参数校验
if (repo == null || fileName == null || fileSize == null || totalParts == null) {
return response.setJson(RespBodyVo.fail("Missing required parameters"));
}
// 获取用户信息
String userIdString = request.getUserIdString();
String username = Aop.get(AppUserService.class).getUsernameById(userIdString);
// 生成唯一上传ID
String uploadId = UUID.randomUUID().toString().replace("-", "");
// 创建临时目录用于存放分片
String tempDirPath = FileConst.DATA_DIR + File.separator + username + File.separator + ".temp";
File tempDir = new File(tempDirPath);
if (!tempDir.exists()) {
tempDir.mkdirs(); // 创建目录(包括父目录)
}
// 保存上传信息到内存Map
ChunkedUploadInfo uploadInfo = new ChunkedUploadInfo();
uploadInfo.setUploadId(uploadId);
uploadInfo.setFileName(fileName);
uploadInfo.setFileSize(fileSize);
uploadInfo.setTotalParts(totalParts);
uploadInfo.setRepo(repo);
uploadInfo.setUsername(username);
uploadInfo.setOriginalModTime(originalModTime);
uploadInfo.setTempDirPath(tempDirPath);
uploadInfo.setReceivedParts(new boolean[totalParts]); // 初始化为false
uploadMap.put(uploadId, uploadInfo);
// 返回成功响应和 upload_id
Map<String, Object> data = new HashMap<>();
data.put("upload_id", uploadId);
return response.setJson(RespBodyVo.ok(data));
} catch (Exception e) {
log.error("Init chunked upload failed", e);
return response.setJson(RespBodyVo.fail("Init chunked upload failed: " + e.getMessage()));
}
}
/**
* 上传单个分片
* 客户端调用此接口上传一个具体的分片数据
* @param request HTTP请求,包含 upload_id, part_index 和文件数据
* @return HttpResponse 返回确认信息的JSON响应
*/
public HttpResponse uploadChunk(HttpRequest request) {
HttpResponse response = TioRequestContext.getResponse();
try {
String uploadId = request.getParam("upload_id");
Integer partIndex = request.getInt("part_index");
// 参数校验
if (uploadId == null || partIndex == null) {
return response.setJson(RespBodyVo.fail("Missing required parameters"));
}
// 获取上传任务信息
ChunkedUploadInfo uploadInfo = uploadMap.get(uploadId);
if (uploadInfo == null) {
return response.setJson(RespBodyVo.fail("Invalid upload_id"));
}
// 检查分片索引是否有效
if (partIndex < 0 || partIndex >= uploadInfo.getTotalParts()) {
return response.setJson(RespBodyVo.fail("Invalid part_index"));
}
// 获取上传的分片文件数据
UploadFile uploadFile = request.getUploadFile("file");
if (uploadFile == null) {
return response.setJson(RespBodyVo.fail("Missing file data"));
}
// 保存分片到临时文件
String chunkFileName = uploadId + "_part_" + partIndex;
File chunkFile = new File(uploadInfo.getTempDirPath() + File.separator + chunkFileName);
Files.write(chunkFile.toPath(), uploadFile.getData()); // 写入分片数据
// 标记该分片已接收
uploadInfo.getReceivedParts()[partIndex] = true;
// 返回成功响应
Map<String, Object> data = new HashMap<>();
data.put("part_index", partIndex);
data.put("upload_id", uploadId);
return response.setJson(RespBodyVo.ok(data));
} catch (Exception e) {
log.error("Upload chunk failed", e);
return response.setJson(RespBodyVo.fail("Upload chunk failed: " + e.getMessage()));
}
}
/**
* 完成分片上传
* 客户端在所有分片上传完成后调用此接口,触发服务端合并文件
* @param request HTTP请求,包含 upload_id
* @return HttpResponse 返回最终文件信息或错误信息的JSON响应
*/
public HttpResponse completeUpload(HttpRequest request) {
HttpResponse response = TioRequestContext.getResponse();
String bodyString = request.getBodyString();
JSONObject jsonObject = FastJson2Utils.parseObject(bodyString);
try {
String uploadId = jsonObject.getString("upload_id");
// 参数校验
if (uploadId == null) {
return response.setJson(RespBodyVo.fail("Missing upload_id"));
}
// 获取上传任务信息
ChunkedUploadInfo uploadInfo = uploadMap.get(uploadId);
if (uploadInfo == null) {
return response.setJson(RespBodyVo.fail("Invalid upload_id"));
}
// 检查所有分片是否都已上传
boolean allReceived = true;
for (boolean received : uploadInfo.getReceivedParts()) {
if (!received) {
allReceived = false;
break;
}
}
if (!allReceived) {
return response.setJson(RespBodyVo.fail("Not all chunks have been uploaded"));
}
// 确定最终文件路径并创建父目录
String finalFilePath = FileConst.DATA_DIR + File.separator + uploadInfo.getUsername() + File.separator
+ uploadInfo.getRepo() + File.separator + uploadInfo.getFileName();
File finalFile = new File(finalFilePath);
File parentFile = finalFile.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}
// 使用 RandomAccessFile 按顺序合并所有分片
try (RandomAccessFile raf = new RandomAccessFile(finalFile, "rw")) {
for (int i = 0; i < uploadInfo.getTotalParts(); i++) {
String chunkFileName = uploadId + "_part_" + i;
File chunkFile = new File(uploadInfo.getTempDirPath() + File.separator + chunkFileName);
if (chunkFile.exists()) {
byte[] chunkData = Files.readAllBytes(chunkFile.toPath()); // 读取分片
raf.write(chunkData); // 写入最终文件
chunkFile.delete(); // 删除临时分片文件
}
}
}
// (可选)设置最终文件的最后修改时间
if (uploadInfo.getOriginalModTime() != null && uploadInfo.getOriginalModTime() > 0) {
FileTime fileTime = FileTime.fromMillis(uploadInfo.getOriginalModTime() * 1000); // 假设前端传的是秒
Files.setLastModifiedTime(finalFile.toPath(), fileTime);
}
// 清理资源:从内存Map移除任务信息
uploadMap.remove(uploadId);
// 尝试删除临时目录(如果为空)
File tempDir = new File(uploadInfo.getTempDirPath());
if (tempDir.exists() && tempDir.list().length == 0) {
tempDir.delete();
}
// 返回成功响应和最终文件信息
Map<String, Object> data = new HashMap<>();
data.put("file_path", finalFilePath);
data.put("file_name", uploadInfo.getFileName());
return response.setJson(RespBodyVo.ok(data));
} catch (Exception e) {
log.error("Complete chunked upload failed", e);
return response.setJson(RespBodyVo.fail("Complete chunked upload failed: " + e.getMessage()));
}
}
}
4. 客户端实现 (Golang)
客户端负责执行具体的分片逻辑并与服务端交互。
4.1. 响应结构体
定义了服务端返回的响应数据结构。
// ChunkUploadResponse 定义了单个分片上传或初始化的响应结构
type ChunkUploadResponse struct {
PartIndex int `json:"part_index"` // 已上传的分片索引
UploadID string `json:"upload_id,omitempty"` // 初始化时返回的唯一上传ID
ETag string `json:"etag,omitempty"` // (示例中未使用) 对象存储的ETag
IsComplete bool `json:"is_complete,omitempty"` // (示例中未使用) 是否完成
}
4.2. 核心函数 (client
包)
package client
import (
"bytes"
"encoding/json"
"fmt"
"github.com/cloudwego/hertz/pkg/common/hlog" // 或使用标准 log
"github.com/litongjava/hfile/model" // 假设包含 APIResponse 结构
"io"
"mime/multipart"
"net/http"
"os"
"strconv"
)
// 假设 ChunkSize 在包级别定义,例如 5 * 1024 * 1024 (5MB)
const ChunkSize = 5 * 1024 * 1024
// UploadInChunks 执行完整的分片上传流程
// serverURL: 服务端基础URL
// token: 用户认证令牌
// repo: 目标仓库名
// filePath: 本地待上传文件的路径
func UploadInChunks(serverURL, token, repo, filePath string) error {
// 1. 打开本地文件
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close() // 确保函数结束时关闭文件
// 2. 获取文件信息
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to get file info: %w", err)
}
fileSize := fileInfo.Size()
// 计算分片总数
totalParts := int((fileSize + ChunkSize - 1) / ChunkSize) // 向上取整
modTime := fileInfo.ModTime().Unix() // 获取文件修改时间
hlog.Infof("Start chunk upload: file=%s, size=%d, chunks=%d", filePath, fileSize, totalParts)
// 3. 初始化分片上传,获取 upload_id
uploadID, err := initChunkedUpload(serverURL, token, repo, fileInfo.Name(), fileSize, totalParts, modTime)
if err != nil {
return fmt.Errorf("failed to init chunked upload: %w", err)
}
// 4. 循环上传每个分片
for partIndex := 0; partIndex < totalParts; partIndex++ {
start := int64(partIndex) * ChunkSize
end := start + ChunkSize
if end > fileSize {
end = fileSize // 最后一个分片可能小于 ChunkSize
}
// 读取分片数据
chunk := make([]byte, end-start)
_, err := file.ReadAt(chunk, start) // 使用 ReadAt 精确定位读取
if err != nil && err != io.EOF {
return fmt.Errorf("failed to read chunk %d: %w", partIndex, err)
}
// 上传单个分片
err = uploadChunk(serverURL, token, repo, uploadID, partIndex, chunk, fileInfo.Name())
if err != nil {
return fmt.Errorf("failed to upload chunk %d: %w", partIndex, err)
}
hlog.Infof("Chunk %d/%d uploaded successfully", partIndex+1, totalParts)
}
// 5. 通知服务端完成上传并合并文件
err = completeChunkedUpload(serverURL, token, repo, uploadID)
if err != nil {
return fmt.Errorf("failed to complete chunked upload: %w", err)
}
hlog.Infof("All chunks uploaded and merged successfully for file: %s", filePath)
return nil
}
// initChunkedUpload 向服务端发送初始化请求
func initChunkedUpload(serverURL, token, repo, fileName string, fileSize int64, totalParts int, modTime int64) (string, error) {
url := fmt.Sprintf("%s/file/upload/init?repo=%s", serverURL, repo) // 注意URL路径需与服务端路由匹配
reqBody := map[string]interface{}{
"repo": repo,
"file_name": fileName,
"file_size": fileSize,
"total_parts": totalParts,
"original_mod_time": modTime,
}
jsonData, _ := json.Marshal(reqBody) // 错误处理可更完善
// 构造 HTTP 请求
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token) // 设置认证头
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("init failed with status %d: %s", resp.StatusCode, string(body))
}
// 解析服务端响应
var apiResp model.APIResponse // 假设 model.APIResponse 是通用的响应结构
err = json.Unmarshal(body, &apiResp)
if err != nil {
return "", fmt.Errorf("failed to parse init response: %w", err)
}
if !apiResp.Ok {
return "", fmt.Errorf("init failed: %s", *apiResp.Msg)
}
// 从响应数据中提取 upload_id
data, ok := apiResp.Data.(map[string]interface{})
if !ok {
return "", fmt.Errorf("invalid init response format")
}
uploadID, ok := data["upload_id"].(string)
if !ok {
return "", fmt.Errorf("upload_id not found in response")
}
return uploadID, nil
}
// uploadChunk 上传单个分片
func uploadChunk(serverURL, token, repo, uploadID string, partIndex int, chunk []byte, fileName string) error {
url := fmt.Sprintf("%s/file/upload/chunk?repo=%s", serverURL, repo) // 注意URL路径需与服务端路由匹配
// 构造 multipart/form-data 请求体
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
// 添加文件数据
part, err := writer.CreateFormFile("file", fileName) // "file" 是服务端期望的字段名
if err != nil {
return err
}
_, err = part.Write(chunk)
if err != nil {
return err
}
// 添加其他字段 (upload_id, part_index)
_ = writer.WriteField("upload_id", uploadID)
_ = writer.WriteField("part_index", strconv.Itoa(partIndex)) // 转换为字符串
err = writer.Close() // 关闭 writer 以写入结尾 boundary
if err != nil {
return err
}
// 构造 HTTP 请求
req, _ := http.NewRequest("POST", url, body)
req.Header.Set("Content-Type", writer.FormDataContentType()) // 设置正确的 Content-Type
req.Header.Set("Authorization", "Bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("chunk upload failed with status %d: %s", resp.StatusCode, string(respBody))
}
// 解析响应
var apiResp model.APIResponse
err = json.Unmarshal(respBody, &apiResp)
if err != nil {
return fmt.Errorf("failed to parse chunk upload response: %w", err)
}
if !apiResp.Ok {
return fmt.Errorf("chunk upload failed: %s", *apiResp.Msg)
}
return nil
}
// completeChunkedUpload 通知服务端完成上传
func completeChunkedUpload(serverURL, token, repo, uploadID string) error {
url := fmt.Sprintf("%s/file/upload/complete?repo=%s", serverURL, repo) // 注意URL路径需与服务端路由匹配
reqBody := map[string]interface{}{
"upload_id": uploadID,
}
jsonData, _ := json.Marshal(reqBody)
// 构造 HTTP 请求
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("complete failed with status %d: %s", resp.StatusCode, string(body))
}
// 解析响应
var apiResp model.APIResponse
err = json.Unmarshal(body, &apiResp)
if err != nil {
return fmt.Errorf("failed to parse complete response: %w", err)
}
if !apiResp.Ok {
return fmt.Errorf("complete failed: %s", *apiResp.Msg)
}
return nil
}
5. 注意事项与改进建议
- 内存存储 (
uploadMap
): 当前实现使用ConcurrentHashMap
存储上传信息,这在单机、短期任务下可行,但在分布式或长时间运行的生产环境中,强烈建议替换为 Redis、数据库等持久化存储方案,以防止服务重启导致上传信息丢失。 - 并发与幂等性: 当前服务端逻辑没有处理同一
uploadId
下同一partIndex
的并发上传或重复上传。在实际应用中,应考虑增加校验机制,确保幂等性,避免数据混乱。 - 错误处理与重试: 客户端和服务端都应有更健壮的错误处理和重试机制,以应对网络波动或临时性错误。
- 安全性: 确保
Authorization
头的正确使用和验证,防止未授权访问。对文件名、路径等用户输入进行严格校验,防止路径遍历等安全漏洞。 - 性能优化: 对于大文件合并,可以考虑使用更高效的 I/O 模式(如
FileChannel.transferFrom
)。临时文件的存储路径和最终文件的存储路径应考虑磁盘 I/O 性能。