Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FatJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 使用Systemctl启动项目
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
    • 已过时
    • 胖包与瘦包的打包与部署
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 文件上传
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • 请求拦截器
    • Controller拦截器
    • LoggingInterceptor
    • 全局异常处理器
    • 异步处理
    • 动态 返回 CSS 实现
    • 返回图片
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • 接口访问统计
    • 接口请求和响应数据记录
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 跨域
    • 添加 Controller
    • 常用工具类
    • HTTP Basic 认证
    • Http响应加密
    • 在 Tio-boot 中使用零拷贝发送大文件
    • Tio Boot 分片上传服务设计与实现
    • WebJars
    • JProtobuf
    • Tio-Boot HTTP Speed Test
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板 (SqlTemplates)
    • 数据源配置与使用
    • ActiveRecord
    • Db 工具类
    • 批量操作
    • Model
    • 生成器与 Model
    • 注解
    • 异常处理
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • TQL(Table SQL)前端输入规范
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • FixedTokenInterceptor
    • TokenManager
    • 数据表
    • 匿名登录
    • 注册和登录
    • 个人中心
    • 重置密码
    • Google 登录
    • 短信登录
    • 移动端微信登录
    • 移动端重置密码
    • 微信登录
    • 移动端微信登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Enjoy 使用示例
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • 使用 tio-core 在 tio-boot 中构建独立的 TCP 服务器
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • SSL
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • 邮箱
    • JSON
    • 读取文件
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils
    • 系统监控
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
    • TCP数据转发
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 与 magic-script 集成指南
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 使用 AWS S3 存储文件并整合到 Tio-Boot 项目中
    • 存储文件到 腾讯 COS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • tio-boot 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • Tio Boot 整合 Spring Boot Starter
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_mysql

    • 使用 Docker 运行 MySQL
    • /zh/42_mysql/02.html
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • PostgreSQL 全文检索
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • /zh/43_oceanbase/05.html
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • /zh/50_media/03.html
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
  • 55_telegram4j

    • 数据库设计
    • /zh/55_telegram4j/02.html
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 56_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • AI 问答
    • /zh/60_LLM/03.html
    • /zh/60_LLM/04.html
    • 增强检索(RAG)
    • 结构化数据检索
    • 搜索+AI
    • 集成第三方 API
    • 后置处理
    • 推荐问题生成
    • 连接代码执行器
    • 避免 GPT 混乱
    • /zh/60_LLM/13.html
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • Perplexity API
    • 意图识别
    • 智能问答
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 通用搜索
    • /zh/61_ai_agent/15.html
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 62_translator

    • 简介
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_java-kit-server

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • MCP 协议
    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_manim

    • Teach me anything - 基于大语言的知识点讲解视频生成系统
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 执行 Python (Manim) 代码
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • manimgl
    • EGL
    • /zh/66_manim/30.html
    • /zh/66_manim/31.html
    • /zh/66_manim/32.html
    • /zh/66_manim/33.html
  • 68_java-llm-proxy

    • 使用tio-boot搭建多模型LLM代理服务
  • 69_ai-brower

    • AI Browser:基于用户指令的浏览器自动化系统
    • 提示词
    • dom构建- buildDomTree.js
    • dom构建- 将网页可点击元素提取与可视化
    • 提取网内容
    • 启动浏览器
    • 操作浏览器指令
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 图片管理
    • /zh/70_tio-boot-admin/08.html
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 71_tio-boot

    • /zh/71_tio-boot/01.html
    • Swagger 整合到 Tio-Boot 中的指南
    • HTTP/1.1 Pipelining 性能测试报告
  • 73_tio-mail-wing

    • tio-mail-wing简介
    • 任务1:实现POP3系统
    • 使用 getmail 验证 tio-mail-wing POP3 服务
    • 任务2:实现 SMTP 服务
    • 数据库初始化文档
    • 用户管理
    • 邮件管理
    • 任务3:实现 SMTP 服务 数据库版本
    • 任务4:实现 POP3 服务(数据库版本)
    • IMAP 协议
    • 拉取多封邮件
    • 任务5:实现 IMAP 服务(数据库版本)
    • IMAP实现讲解
    • IMAP 手动测试脚本
    • IMAP 认证机制
    • 主动推送
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

Tio Boot 分片上传服务设计与实现

1. 概述

本文档详细介绍了基于 Tio Boot 框架实现的文件分片上传服务。该服务旨在解决大文件上传过程中可能遇到的网络不稳定、超时等问题,通过将大文件分割成多个小块(分片)进行上传,提高上传的稳定性和效率。

1.1. 核心思想

  • 分片上传 (Chunked Upload):客户端将待上传的大文件按照预设大小(如 5MB)切分为多个小块。
  • 服务端协调:服务端负责管理整个上传流程,包括初始化上传任务、接收并存储分片、以及最终合并所有分片成完整文件。
  • 断点续传基础:通过记录已上传的分片信息,为后续实现断点续传功能奠定基础(当前版本未完全实现断点续传逻辑,但架构支持)。

2. 整体流程

分片上传流程主要分为三个步骤:

  1. 初始化上传 (initUpload):

    • 客户端向服务端发送一个初始化请求,携带文件的基本信息(文件名、总大小、分片总数、仓库名等)。
    • 服务端接收到请求后,生成一个唯一的 uploadId 作为本次上传任务的标识符。
    • 服务端创建一个临时目录用于存放分片文件。
    • 服务端将本次上传任务的关键信息(如 uploadId, 文件名, 分片总数, 临时目录路径等)存储在一个内存映射 (uploadMap) 中(生产环境建议使用 Redis 等持久化存储)。
    • 服务端将生成的 uploadId 返回给客户端。
  2. 上传分片 (uploadChunk):

    • 客户端根据初始化时获得的分片总数,依次上传每个分片。
    • 对于每个分片,客户端发送一个请求,携带 uploadId、当前分片的索引 (partIndex) 以及分片的二进制数据。
    • 服务端接收到分片后,根据 uploadId 从 uploadMap 中找到对应的上传任务信息。
    • 服务端验证 partIndex 的有效性。
    • 服务端将该分片数据写入到临时目录下的一个特定文件中(文件名通常与 uploadId 和 partIndex 相关)。
    • 服务端在内存中将该 partIndex 标记为“已接收”。
    • 服务端返回确认信息给客户端,表示该分片上传成功。
  3. 完成上传 (completeUpload):

    • 当客户端上传完所有分片后,向服务端发送一个完成请求,携带 uploadId。
    • 服务端根据 uploadId 获取上传任务信息。
    • 服务端检查所有分片是否都已标记为“已接收”。
    • 如果所有分片均已上传,服务端按照分片顺序,将临时目录下的所有分片文件按顺序读取并合并成最终的完整文件,并保存到指定的目标路径(如用户仓库目录下)。
    • (可选)服务端尝试设置合并后文件的最后修改时间,以匹配原始文件的时间戳。
    • 服务端清理临时目录中的分片文件和内存中的上传任务信息。
    • 服务端返回最终文件的路径或成功信息给客户端。

3. 服务端实现 (Java/Tio Boot)

3.1. 数据模型 (ChunkedUploadInfo)

用于在服务端存储和管理单个分片上传任务的信息。

package com.litongjava.http.file.server.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 分片上传信息类
 * 用于存储和管理单次分片上传任务的状态和元数据
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChunkedUploadInfo {
  private String uploadId;           // 唯一标识本次上传任务的ID
  private String fileName;           // 原始文件名
  private Long fileSize;             // 原始文件总大小 (字节)
  private Integer totalParts;        // 总分片数量
  private String repo;               // 目标仓库名称
  private String username;           // 上传用户的用户名
  private Long originalModTime;      // 原始文件的最后修改时间戳 (秒)
  private String tempDirPath;        // 存放临时分片文件的目录路径
  private boolean[] receivedParts;   // 布尔数组,标记每个分片是否已接收
}

3.2. 处理器 (FileChunkedUploadHandler)

核心处理逻辑,包含初始化、上传分片、完成上传三个主要方法。

package com.litongjava.http.file.server.handler;

import java.io.File;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.attribute.FileTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.fastjson2.JSONObject;
import com.litongjava.http.file.server.consts.FileConst;
import com.litongjava.http.file.server.model.ChunkedUploadInfo;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.model.body.RespBodyVo;
import com.litongjava.tio.boot.admin.services.AppUserService;
import com.litongjava.tio.boot.http.TioRequestContext;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.UploadFile;
import com.litongjava.tio.utils.json.FastJson2Utils;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FileChunkedUploadHandler {

  // 临时存储上传信息,实际生产环境建议使用Redis等持久化存储
  // 使用 ConcurrentHashMap 保证线程安全
  private static final Map<String, ChunkedUploadInfo> uploadMap = new ConcurrentHashMap<>();

  /**
   * 初始化分片上传
   * 客户端调用此接口开始一个新的分片上传任务
   * @param request HTTP请求,包含JSON格式的初始化参数
   * @return HttpResponse 返回包含 upload_id 的JSON响应
   */
  public HttpResponse initUpload(HttpRequest request) {
    HttpResponse response = TioRequestContext.getResponse();
    String bodyString = request.getBodyString();
    JSONObject jsonObject = FastJson2Utils.parseObject(bodyString);
    try {
      String repo = jsonObject.getString("repo");
      String fileName = jsonObject.getString("file_name");
      Long fileSize = jsonObject.getLong("file_size");
      Integer totalParts = jsonObject.getInteger("total_parts");
      Long originalModTime = jsonObject.getLong("original_mod_time"); // 可选

      // 参数校验
      if (repo == null || fileName == null || fileSize == null || totalParts == null) {
        return response.setJson(RespBodyVo.fail("Missing required parameters"));
      }

      // 获取用户信息
      String userIdString = request.getUserIdString();
      String username = Aop.get(AppUserService.class).getUsernameById(userIdString);

      // 生成唯一上传ID
      String uploadId = UUID.randomUUID().toString().replace("-", "");

      // 创建临时目录用于存放分片
      String tempDirPath = FileConst.DATA_DIR + File.separator + username + File.separator + ".temp";
      File tempDir = new File(tempDirPath);
      if (!tempDir.exists()) {
        tempDir.mkdirs(); // 创建目录(包括父目录)
      }

      // 保存上传信息到内存Map
      ChunkedUploadInfo uploadInfo = new ChunkedUploadInfo();
      uploadInfo.setUploadId(uploadId);
      uploadInfo.setFileName(fileName);
      uploadInfo.setFileSize(fileSize);
      uploadInfo.setTotalParts(totalParts);
      uploadInfo.setRepo(repo);
      uploadInfo.setUsername(username);
      uploadInfo.setOriginalModTime(originalModTime);
      uploadInfo.setTempDirPath(tempDirPath);
      uploadInfo.setReceivedParts(new boolean[totalParts]); // 初始化为false

      uploadMap.put(uploadId, uploadInfo);

      // 返回成功响应和 upload_id
      Map<String, Object> data = new HashMap<>();
      data.put("upload_id", uploadId);
      return response.setJson(RespBodyVo.ok(data));
    } catch (Exception e) {
      log.error("Init chunked upload failed", e);
      return response.setJson(RespBodyVo.fail("Init chunked upload failed: " + e.getMessage()));
    }
  }

  /**
   * 上传单个分片
   * 客户端调用此接口上传一个具体的分片数据
   * @param request HTTP请求,包含 upload_id, part_index 和文件数据
   * @return HttpResponse 返回确认信息的JSON响应
   */
  public HttpResponse uploadChunk(HttpRequest request) {
    HttpResponse response = TioRequestContext.getResponse();
    try {
      String uploadId = request.getParam("upload_id");
      Integer partIndex = request.getInt("part_index");

      // 参数校验
      if (uploadId == null || partIndex == null) {
        return response.setJson(RespBodyVo.fail("Missing required parameters"));
      }

      // 获取上传任务信息
      ChunkedUploadInfo uploadInfo = uploadMap.get(uploadId);
      if (uploadInfo == null) {
        return response.setJson(RespBodyVo.fail("Invalid upload_id"));
      }

      // 检查分片索引是否有效
      if (partIndex < 0 || partIndex >= uploadInfo.getTotalParts()) {
        return response.setJson(RespBodyVo.fail("Invalid part_index"));
      }

      // 获取上传的分片文件数据
      UploadFile uploadFile = request.getUploadFile("file");
      if (uploadFile == null) {
        return response.setJson(RespBodyVo.fail("Missing file data"));
      }

      // 保存分片到临时文件
      String chunkFileName = uploadId + "_part_" + partIndex;
      File chunkFile = new File(uploadInfo.getTempDirPath() + File.separator + chunkFileName);
      Files.write(chunkFile.toPath(), uploadFile.getData()); // 写入分片数据

      // 标记该分片已接收
      uploadInfo.getReceivedParts()[partIndex] = true;

      // 返回成功响应
      Map<String, Object> data = new HashMap<>();
      data.put("part_index", partIndex);
      data.put("upload_id", uploadId);
      return response.setJson(RespBodyVo.ok(data));
    } catch (Exception e) {
      log.error("Upload chunk failed", e);
      return response.setJson(RespBodyVo.fail("Upload chunk failed: " + e.getMessage()));
    }
  }

  /**
   * 完成分片上传
   * 客户端在所有分片上传完成后调用此接口,触发服务端合并文件
   * @param request HTTP请求,包含 upload_id
   * @return HttpResponse 返回最终文件信息或错误信息的JSON响应
   */
  public HttpResponse completeUpload(HttpRequest request) {
    HttpResponse response = TioRequestContext.getResponse();
    String bodyString = request.getBodyString();
    JSONObject jsonObject = FastJson2Utils.parseObject(bodyString);
    try {
      String uploadId = jsonObject.getString("upload_id");

      // 参数校验
      if (uploadId == null) {
        return response.setJson(RespBodyVo.fail("Missing upload_id"));
      }

      // 获取上传任务信息
      ChunkedUploadInfo uploadInfo = uploadMap.get(uploadId);
      if (uploadInfo == null) {
        return response.setJson(RespBodyVo.fail("Invalid upload_id"));
      }

      // 检查所有分片是否都已上传
      boolean allReceived = true;
      for (boolean received : uploadInfo.getReceivedParts()) {
        if (!received) {
          allReceived = false;
          break;
        }
      }
      if (!allReceived) {
        return response.setJson(RespBodyVo.fail("Not all chunks have been uploaded"));
      }

      // 确定最终文件路径并创建父目录
      String finalFilePath = FileConst.DATA_DIR + File.separator + uploadInfo.getUsername() + File.separator
          + uploadInfo.getRepo() + File.separator + uploadInfo.getFileName();
      File finalFile = new File(finalFilePath);
      File parentFile = finalFile.getParentFile();
      if (!parentFile.exists()) {
        parentFile.mkdirs();
      }

      // 使用 RandomAccessFile 按顺序合并所有分片
      try (RandomAccessFile raf = new RandomAccessFile(finalFile, "rw")) {
        for (int i = 0; i < uploadInfo.getTotalParts(); i++) {
          String chunkFileName = uploadId + "_part_" + i;
          File chunkFile = new File(uploadInfo.getTempDirPath() + File.separator + chunkFileName);
          if (chunkFile.exists()) {
            byte[] chunkData = Files.readAllBytes(chunkFile.toPath()); // 读取分片
            raf.write(chunkData); // 写入最终文件
            chunkFile.delete(); // 删除临时分片文件
          }
        }
      }

      // (可选)设置最终文件的最后修改时间
      if (uploadInfo.getOriginalModTime() != null && uploadInfo.getOriginalModTime() > 0) {
        FileTime fileTime = FileTime.fromMillis(uploadInfo.getOriginalModTime() * 1000); // 假设前端传的是秒
        Files.setLastModifiedTime(finalFile.toPath(), fileTime);
      }

      // 清理资源:从内存Map移除任务信息
      uploadMap.remove(uploadId);

      // 尝试删除临时目录(如果为空)
      File tempDir = new File(uploadInfo.getTempDirPath());
      if (tempDir.exists() && tempDir.list().length == 0) {
        tempDir.delete();
      }

      // 返回成功响应和最终文件信息
      Map<String, Object> data = new HashMap<>();
      data.put("file_path", finalFilePath);
      data.put("file_name", uploadInfo.getFileName());
      return response.setJson(RespBodyVo.ok(data));
    } catch (Exception e) {
      log.error("Complete chunked upload failed", e);
      return response.setJson(RespBodyVo.fail("Complete chunked upload failed: " + e.getMessage()));
    }
  }
}

4. 客户端实现 (Golang)

客户端负责执行具体的分片逻辑并与服务端交互。

4.1. 响应结构体

定义了服务端返回的响应数据结构。

// ChunkUploadResponse 定义了单个分片上传或初始化的响应结构
type ChunkUploadResponse struct {
	PartIndex  int    `json:"part_index"`  // 已上传的分片索引
	UploadID   string `json:"upload_id,omitempty"` // 初始化时返回的唯一上传ID
	ETag       string `json:"etag,omitempty"` // (示例中未使用) 对象存储的ETag
	IsComplete bool   `json:"is_complete,omitempty"` // (示例中未使用) 是否完成
}

4.2. 核心函数 (client 包)

package client

import (
	"bytes"
	"encoding/json"
	"fmt"
	"github.com/cloudwego/hertz/pkg/common/hlog" // 或使用标准 log
	"github.com/litongjava/hfile/model" // 假设包含 APIResponse 结构
	"io"
	"mime/multipart"
	"net/http"
	"os"
	"strconv"
)

// 假设 ChunkSize 在包级别定义,例如 5 * 1024 * 1024 (5MB)
const ChunkSize = 5 * 1024 * 1024

// UploadInChunks 执行完整的分片上传流程
// serverURL: 服务端基础URL
// token: 用户认证令牌
// repo: 目标仓库名
// filePath: 本地待上传文件的路径
func UploadInChunks(serverURL, token, repo, filePath string) error {
	// 1. 打开本地文件
	file, err := os.Open(filePath)
	if err != nil {
		return fmt.Errorf("failed to open file: %w", err)
	}
	defer file.Close() // 确保函数结束时关闭文件

	// 2. 获取文件信息
	fileInfo, err := file.Stat()
	if err != nil {
		return fmt.Errorf("failed to get file info: %w", err)
	}
	fileSize := fileInfo.Size()
	// 计算分片总数
	totalParts := int((fileSize + ChunkSize - 1) / ChunkSize) // 向上取整
	modTime := fileInfo.ModTime().Unix() // 获取文件修改时间

	hlog.Infof("Start chunk upload: file=%s, size=%d, chunks=%d", filePath, fileSize, totalParts)

	// 3. 初始化分片上传,获取 upload_id
	uploadID, err := initChunkedUpload(serverURL, token, repo, fileInfo.Name(), fileSize, totalParts, modTime)
	if err != nil {
		return fmt.Errorf("failed to init chunked upload: %w", err)
	}

	// 4. 循环上传每个分片
	for partIndex := 0; partIndex < totalParts; partIndex++ {
		start := int64(partIndex) * ChunkSize
		end := start + ChunkSize
		if end > fileSize {
			end = fileSize // 最后一个分片可能小于 ChunkSize
		}

		// 读取分片数据
		chunk := make([]byte, end-start)
		_, err := file.ReadAt(chunk, start) // 使用 ReadAt 精确定位读取
		if err != nil && err != io.EOF {
			return fmt.Errorf("failed to read chunk %d: %w", partIndex, err)
		}

		// 上传单个分片
		err = uploadChunk(serverURL, token, repo, uploadID, partIndex, chunk, fileInfo.Name())
		if err != nil {
			return fmt.Errorf("failed to upload chunk %d: %w", partIndex, err)
		}

		hlog.Infof("Chunk %d/%d uploaded successfully", partIndex+1, totalParts)
	}

	// 5. 通知服务端完成上传并合并文件
	err = completeChunkedUpload(serverURL, token, repo, uploadID)
	if err != nil {
		return fmt.Errorf("failed to complete chunked upload: %w", err)
	}

	hlog.Infof("All chunks uploaded and merged successfully for file: %s", filePath)
	return nil
}

// initChunkedUpload 向服务端发送初始化请求
func initChunkedUpload(serverURL, token, repo, fileName string, fileSize int64, totalParts int, modTime int64) (string, error) {
	url := fmt.Sprintf("%s/file/upload/init?repo=%s", serverURL, repo) // 注意URL路径需与服务端路由匹配
	reqBody := map[string]interface{}{
		"repo":              repo,
		"file_name":         fileName,
		"file_size":         fileSize,
		"total_parts":       totalParts,
		"original_mod_time": modTime,
	}
	jsonData, _ := json.Marshal(reqBody) // 错误处理可更完善

	// 构造 HTTP 请求
	req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "Bearer "+token) // 设置认证头

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()

	body, _ := io.ReadAll(resp.Body)
	if resp.StatusCode != http.StatusOK {
		return "", fmt.Errorf("init failed with status %d: %s", resp.StatusCode, string(body))
	}

	// 解析服务端响应
	var apiResp model.APIResponse // 假设 model.APIResponse 是通用的响应结构
	err = json.Unmarshal(body, &apiResp)
	if err != nil {
		return "", fmt.Errorf("failed to parse init response: %w", err)
	}
	if !apiResp.Ok {
		return "", fmt.Errorf("init failed: %s", *apiResp.Msg)
	}

	// 从响应数据中提取 upload_id
	data, ok := apiResp.Data.(map[string]interface{})
	if !ok {
		return "", fmt.Errorf("invalid init response format")
	}
	uploadID, ok := data["upload_id"].(string)
	if !ok {
		return "", fmt.Errorf("upload_id not found in response")
	}

	return uploadID, nil
}

// uploadChunk 上传单个分片
func uploadChunk(serverURL, token, repo, uploadID string, partIndex int, chunk []byte, fileName string) error {
	url := fmt.Sprintf("%s/file/upload/chunk?repo=%s", serverURL, repo) // 注意URL路径需与服务端路由匹配

	// 构造 multipart/form-data 请求体
	body := &bytes.Buffer{}
	writer := multipart.NewWriter(body)

	// 添加文件数据
	part, err := writer.CreateFormFile("file", fileName) // "file" 是服务端期望的字段名
	if err != nil {
		return err
	}
	_, err = part.Write(chunk)
	if err != nil {
		return err
	}

	// 添加其他字段 (upload_id, part_index)
	_ = writer.WriteField("upload_id", uploadID)
	_ = writer.WriteField("part_index", strconv.Itoa(partIndex)) // 转换为字符串

	err = writer.Close() // 关闭 writer 以写入结尾 boundary
	if err != nil {
		return err
	}

	// 构造 HTTP 请求
	req, _ := http.NewRequest("POST", url, body)
	req.Header.Set("Content-Type", writer.FormDataContentType()) // 设置正确的 Content-Type
	req.Header.Set("Authorization", "Bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	respBody, _ := io.ReadAll(resp.Body)
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("chunk upload failed with status %d: %s", resp.StatusCode, string(respBody))
	}

	// 解析响应
	var apiResp model.APIResponse
	err = json.Unmarshal(respBody, &apiResp)
	if err != nil {
		return fmt.Errorf("failed to parse chunk upload response: %w", err)
	}
	if !apiResp.Ok {
		return fmt.Errorf("chunk upload failed: %s", *apiResp.Msg)
	}

	return nil
}

// completeChunkedUpload 通知服务端完成上传
func completeChunkedUpload(serverURL, token, repo, uploadID string) error {
	url := fmt.Sprintf("%s/file/upload/complete?repo=%s", serverURL, repo) // 注意URL路径需与服务端路由匹配
	reqBody := map[string]interface{}{
		"upload_id": uploadID,
	}
	jsonData, _ := json.Marshal(reqBody)

	// 构造 HTTP 请求
	req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "Bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	body, _ := io.ReadAll(resp.Body)
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("complete failed with status %d: %s", resp.StatusCode, string(body))
	}

	// 解析响应
	var apiResp model.APIResponse
	err = json.Unmarshal(body, &apiResp)
	if err != nil {
		return fmt.Errorf("failed to parse complete response: %w", err)
	}
	if !apiResp.Ok {
		return fmt.Errorf("complete failed: %s", *apiResp.Msg)
	}

	return nil
}

5. 注意事项与改进建议

  • 内存存储 (uploadMap): 当前实现使用 ConcurrentHashMap 存储上传信息,这在单机、短期任务下可行,但在分布式或长时间运行的生产环境中,强烈建议替换为 Redis、数据库等持久化存储方案,以防止服务重启导致上传信息丢失。
  • 并发与幂等性: 当前服务端逻辑没有处理同一 uploadId 下同一 partIndex 的并发上传或重复上传。在实际应用中,应考虑增加校验机制,确保幂等性,避免数据混乱。
  • 错误处理与重试: 客户端和服务端都应有更健壮的错误处理和重试机制,以应对网络波动或临时性错误。
  • 安全性: 确保 Authorization 头的正确使用和验证,防止未授权访问。对文件名、路径等用户输入进行严格校验,防止路径遍历等安全漏洞。
  • 性能优化: 对于大文件合并,可以考虑使用更高效的 I/O 模式(如 FileChannel.transferFrom)。临时文件的存储路径和最终文件的存储路径应考虑磁盘 I/O 性能。
Edit this page
Last Updated:
Contributors: Tong Li
Prev
在 Tio-boot 中使用零拷贝发送大文件
Next
WebJars