Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FastJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 胖包与瘦包的打包与部署
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 文件上传
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • /zh/06_web/19.html
    • 全局异常处理器
    • 异步
    • 动态 返回 CSS 实现
    • 返回图片
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • 接口访问统计
    • 接口请求和响应数据记录
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 跨域
    • 添加 Controller
    • 常用工具类
    • HTTP Basic 认证
    • WebJars
    • JProtobuf
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板
    • 数据源配置与使用
    • ActiveRecord
    • Model
    • 生成器与 Model
    • Db 工具类
    • 批量操作
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • Java-DB 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • TQL(Table SQL)前端输入规范
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • hutool-JWT
    • FixedTokenInterceptor
    • 使用内置 TokenManager 实现登录
    • 用户系统
    • 重置密码
    • 匿名登录
    • Google 登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
    • 短信登录
    • 移动端微信登录实现指南
    • 移动端重置密码
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Enjoy 使用示例
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • tio-core
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • ssl
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • 邮箱
    • JSON
    • 读取文件
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils 使用文档
    • 系统监控
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 整合 magic-script
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 使用 AWS S3 存储文件并整合到 Tio-Boot 项目中
    • 存储文件到 腾讯 COS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • tio-boot 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • Tio Boot 整合 Spring Boot Starter
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_mysql

    • 使用 Docker 运行 MySQL
    • /zh/42_mysql/02.html
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • PostgreSQL 全文检索
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • /zh/43_oceanbase/05.html
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • /zh/50_media/03.html
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
  • 55_telegram4j

    • 数据库设计
    • /zh/55_telegram4j/02.html
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 56_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • AI 问答
    • /zh/60_LLM/03.html
    • /zh/60_LLM/04.html
    • 增强检索(RAG)
    • 结构化数据检索
    • 搜索+AI
    • 集成第三方 API
    • 后置处理
    • 推荐问题生成
    • 连接代码执行器
    • 避免 GPT 混乱
    • /zh/60_LLM/13.html
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • 对接 Perplexity API
    • 意图识别与生成提示词
    • 智能问答模块设计与实现
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 通用搜索
    • /zh/61_ai_agent/15.html
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 62_translator

    • 简介
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_java-linux

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • MCP 协议
    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_manim

    • 简介
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 执行 Python (Manim) 代码
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • manimgl
    • EGL
    • /zh/66_manim/30.html
    • /zh/66_manim/31.html
    • 成本核算
    • /zh/66_manim/33.html
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 图片管理
    • /zh/70_tio-boot-admin/08.html
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 71_tio-boot

    • /zh/71_tio-boot/01.html
    • Swagger 整合到 Tio-Boot 中的指南
    • HTTP/1.1 Pipelining 性能测试报告
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

tio-运行原理详解

t-io 是一个高效、轻量级的异步 I/O 框架,广泛用于处理高并发网络通信场景,支持 TCP、UDP、WebSocket 和 Http 等协议。本文将详细解析 t-io 的运行机制,包括其核心组件、数据处理流程、线程池设计和连接管理机制。通过全面解析这些内容,可以帮助开发者更好地理解 t-io 的工作原理,进而在实际项目中灵活运用。

一、t-io 核心组件

t-io 框架中有几个重要的核心组件,这些组件各自负责不同的功能模块,并协同工作来保证系统的高效运行。本文详细介绍以下核心组件:

  1. TioServer:用于启动服务器,处理客户端连接请求和管理整个 I/O 通道。
  2. TioConfig 和 ServerTioConfig:全局配置类,管理 I/O 操作的处理器、监听器、线程池等核心配置。
  3. Threads:线程池管理类,提供专用线程池以应对不同任务类型(如 I/O 处理和业务逻辑处理)。
  4. AcceptCompletionHandler:负责接受新的客户端连接并为其分配资源。
  5. ReadCompletionHandler:负责从客户端异步读取数据,并将数据传递给后续的解码和处理环节。
  6. DecodeRunnable:负责将读取到的原始字节数据解码为具体的业务包(Packet),解码成功后将进入处理阶段。
  7. CloseRunnable:负责关闭连接并释放相关资源,保证连接正常关闭和数据安全传输。
  8. ChannelContext:代表一个具体的连接上下文,管理连接的状态和数据传输

二、t-io 数据流处理流程

t-io 的数据处理采用异步非阻塞 I/O 模型,整个数据流从客户端请求到服务器响应的处理大致包括以下几个阶段:

  1. 接收连接:通过 AcceptCompletionHandler 异步接收客户端的连接请求。每当有客户端发起连接时,t-io 通过 AsynchronousServerSocketChannel.accept() 异步处理新连接,并为其分配 ChannelContext 对象来维护连接状态。

  2. 读取数据:新连接建立后,ReadCompletionHandler 负责从客户端异步读取数据。该操作是非阻塞的,读取的数据会存储到 ByteBuffer 中,之后传递给解码器处理。

  3. 解码数据:读取的数据传递给 DecodeRunnable 进行解码处理。DecodeRunnable 会将 ByteBuffer 中的原始数据转换为业务数据包 Packet,并交由下游进行处理。

  4. 业务处理:解码后的数据包会通过 HandlerRunnable 进行业务逻辑处理,处理的结果最终通过 SendRunnable 发送响应数据回客户端。

  5. 响应数据发送:在处理完业务逻辑后,SendRunnable 负责将响应数据打包,并异步发送到客户端,完成整个数据流的闭环。

这种异步非阻塞模型使得 t-io 能够在高并发场景下有效地处理大量连接和请求,保证了系统的高效和稳定性。

三、t-io 线程池设计

t-io 的高效运行离不开合理的线程池设计,它通过不同的线程池来应对不同类型的任务,确保系统的负载均衡和性能优化。主要有以下两个核心线程池:

  1. tioExecutor:处理数据解码和业务逻辑的线程池。该线程池专门用于执行解码任务(DecodeRunnable)和业务处理任务(HandlerRunnable),其线程数量依据 CPU 核心数和系统配置动态调整。
  2. groupExecutor:负责处理网络 I/O 操作的线程池。该线程池主要用于接收新连接、管理心跳检测、处理网络 I/O 任务,确保在高并发场景下有效地管理大量客户端连接。

线程池的配置是根据可用 CPU 核心数进行优化的:

public static final int CORE_POOL_SIZE = AVAILABLE_PROCESSORS * 1;
public static final int MAX_POOL_SIZE_FOR_TIO = Math.max(CORE_POOL_SIZE * 3, 64);
public static final int MAX_POOL_SIZE_FOR_GROUP = Math.max(CORE_POOL_SIZE * 16, 256);

其中 CORE_POOL_SIZE 是系统 CPU 核心数的倍数,MAX_POOL_SIZE_FOR_TIO 和 MAX_POOL_SIZE_FOR_GROUP 分别为解码和 I/O 操作设置了最大线程池容量,以确保高并发环境下的性能表现。

四、TioServer 启动流程

tioServer 是 t-io 框架中服务器启动的核心类,它负责服务器的初始化、配置和监听客户端连接请求。以下是 TioServer 的启动流程:

  1. 配置初始化:服务器启动时,首先调用 TioConfig 和 ServerTioConfig 的初始化方法,设置缓存工厂、监听器、线程池等全局配置。这些配置确保服务器能够正确处理后续的 I/O 操作和业务逻辑。

  2. 创建异步通道组:服务器通过 AsynchronousChannelGroup 和 AsynchronousServerSocketChannel 创建异步 I/O 通道。AsynchronousServerSocketChannel 是非阻塞的,并通过设置接收缓冲区大小和复用地址选项,确保多线程环境下的高效运行。

  3. 启动监听器:服务器通过 serverSocketChannel.bind() 绑定到指定的 IP 地址和端口,并开始监听客户端的连接请求。

  4. 接收客户端连接:当有客户端连接到服务器时,AcceptCompletionHandler 被触发,负责处理新的连接请求,并为每个连接分配 ChannelContext 对象,以跟踪连接状态。

public void start(String serverIp, int serverPort) throws IOException {
    serverTioConfig.getCacheFactory().register(TioCoreConfigKeys.REQEUST_PROCESSING, null, null, null);
    this.serverNode = new Node(serverIp, serverPort);
    channelGroup = AsynchronousChannelGroup.withThreadPool(serverTioConfig.groupExecutor);
    serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
    serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024);
    InetSocketAddress listenAddress = new InetSocketAddress(serverIp, serverPort);
    serverSocketChannel.bind(listenAddress, 0);
    AcceptCompletionHandler acceptCompletionHandler = serverTioConfig.getAcceptCompletionHandler();
    serverSocketChannel.accept(this, acceptCompletionHandler);
    serverTioConfig.startTime = System.currentTimeMillis();
}

五、AcceptCompletionHandler:接收连接请求

AcceptCompletionHandler 是 t-io 中负责接收新客户端连接的组件。它在客户端发起连接时被异步调用,负责为新连接分配资源,并初始化对应的 ChannelContext。AcceptCompletionHandler 的主要任务包括:

  1. 接收新连接:当有客户端发起连接时,AsynchronousServerSocketChannel.accept() 异步调用 AcceptCompletionHandler 的 completed() 方法,接收该连接请求。

  2. 资源分配:为新连接创建 ChannelContext,并为其分配读写缓冲区等资源,以便后续的数据传输和处理。

  3. 安全检查:AcceptCompletionHandler 会进行一些安全性检查,例如判断客户端 IP 是否在黑名单中,或者连接数量是否超出系统上限等。

public void completed(AsynchronousSocketChannel asynchronousSocketChannel, TioServer tioServer) {
    if (tioServer.isWaitingStop()) {
        log.info("The server will be shut down and no new requests will be accepted:{}", tioServer.getServerNode());
    } else {
        AsynchronousServerSocketChannel serverSocketChannel = tioServer.getServerSocketChannel();
        serverSocketChannel.accept(tioServer, this);
    }

    ServerTioConfig serverTioConfig = tioServer.getServerTioConfig();
    try {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
        String clientIp = inetSocketAddress.getHostString();

        // 检查客户端 IP 是否在黑名单中
        if (IpBlacklist.isInBlacklist(serverTioConfig, clientIp)) {
            log.info("{} on the blacklist, {}", clientIp, serverTioConfig.getName());
            asynchronousSocketChannel.close();
            return;
        }

        // 创建新的 ChannelContext 来处理该连接
        ServerChannelContext channelContext = new ServerChannelContext(serverTioConfig, asynchronousSocketChannel);
        channelContext.setClosed(false);
        channelContext.stat.setTimeFirstConnected(SystemTimer.currTime);
        channelContext.setServerNode(tioServer.getServerNode());

        // 将新连接绑定到 I/O 线程池,并开始读取数据
        ReadCompletionHandler readCompletionHandler = channelContext.getReadCompletionHandler();
        ByteBuffer readByteBuffer = readCompletionHandler.getReadByteBuffer();
        asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, readCompletionHandler);

    } catch (Throwable e) {


 log.error("Error handling new connection", e);
    }
}

六、ReadCompletionHandler:异步读取数据

ReadCompletionHandler 负责从客户端异步读取数据。t-io 使用异步 I/O 模型,当客户端发送数据时,服务器端的 ReadCompletionHandler 会被触发,执行读取操作。ReadCompletionHandler 的主要任务包括:

  1. 异步读取数据:从客户端读取数据,并将其存储在 ByteBuffer 中。读取操作是异步的,不会阻塞主线程。

  2. 检查读取结果:读取数据后,检查读取的字节数。如果读取的数据量大于 0,则将数据传递给解码器进行解码。如果读取失败或读取字节数为 0,则需要关闭连接。

  3. 解码数据:将读取到的数据传递给 DecodeRunnable 进行解码,转换为业务数据包。

@Override
public void completed(Integer result, ByteBuffer byteBuffer) {
  if (result > 0) {
    TioConfig tioConfig = channelContext.tioConfig;

    // 统计接收到的字节数
    if (tioConfig.statOn) {
        tioConfig.groupStat.receivedBytes.addAndGet(result);
        tioConfig.groupStat.receivedTcps.incrementAndGet();
        channelContext.stat.receivedBytes.addAndGet(result);
        channelContext.stat.receivedTcps.incrementAndGet();
    }

    // 更新最近一次收到字节的时间
    channelContext.stat.latestTimeOfReceivedByte = SystemTimer.currTime;

    // 调用 IP 统计模块
    if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) {
        try {
            for (Long v : tioConfig.ipStats.durationList) {
                IpStat ipStat = tioConfig.ipStats.get(v, channelContext);
                ipStat.getReceivedBytes().addAndGet(result);
                ipStat.getReceivedTcps().incrementAndGet();
                tioConfig.getIpStatListener().onAfterReceivedBytes(channelContext, result, ipStat);
            }
        } catch (Exception e1) {
            log.error(channelContext.toString(), e1);
        }
    }

    // 通知监听器数据接收完成
    if (tioConfig.getAioListener() != null) {
        try {
            tioConfig.getAioListener().onAfterReceivedBytes(channelContext, result);
        } catch (Exception e) {
            log.error("", e);
        }
    }

    // 将数据翻转为读取模式,并开始解码
    readByteBuffer.flip();
    if (channelContext.sslFacadeContext == null) {
        if (tioConfig.useQueueDecode) {
            channelContext.decodeRunnable.addMsg(ByteBufferUtils.copy(readByteBuffer));
            channelContext.decodeRunnable.execute();
        } else {
            channelContext.decodeRunnable.setNewReceivedByteBuffer(readByteBuffer);
            channelContext.decodeRunnable.decode();
        }
    } else {
        try {
            ByteBuffer copiedByteBuffer = ByteBufferUtils.copy(readByteBuffer);
            channelContext.sslFacadeContext.getSslFacade().decrypt(copiedByteBuffer);
        } catch (Exception e) {
            log.error(channelContext + ", " + e.toString() + readByteBuffer, e);
            Tio.close(channelContext, e, e.toString(), CloseCode.SSL_DECRYPT_ERROR);
        }
    }

    // 再次读取新的数据
    if (TioUtils.checkBeforeIO(channelContext)) {
        read();
    }

} else if (result == 0) {
    log.error("{}, 读到的数据长度为0", channelContext);
    Tio.close(channelContext, null, "读到的数据长度为0", CloseCode.READ_COUNT_IS_ZERO);
} else if (result < 0) {
    if (result == -1) {
        Tio.close(channelContext, null, "对方关闭了连接", CloseCode.CLOSED_BY_PEER);
    } else {
        Tio.close(channelContext, null, "读数据时返回" + result, CloseCode.READ_COUNT_IS_NEGATIVE);
    }
}

ReadCompletionHandler 的 completed() 方法是 t-io 框架中处理 I/O 操作的重要环节。通过 AsynchronousSocketChannel,系统能够在非阻塞的情况下读取客户端的数据,并通过 ByteBuffer 传递给后续的解码任务。数据读取完成后,系统会自动进行解码。

七、DecodeRunnable:解码数据包

解码数据包是 t-io 数据处理中的关键环节之一。DecodeRunnable 负责将从客户端读取到的原始字节流解码成 Packet,以供业务逻辑处理。它处理的流程如下:

  1. 解码器设置:DecodeRunnable 初始化时,会根据 TioConfig 的配置设置解码器(即 AioHandler)用于处理具体的解码任务。

  2. 数据合并与处理:如果当前读取到的数据不足以完成一次完整的解码,DecodeRunnable 会缓存数据,并在下次有新数据到来时合并处理,保证不丢包。

  3. 解码逻辑:解码器会根据数据的长度和格式,将 ByteBuffer 中的数据转换为 Packet 对象。Packet 是业务数据的载体,包含了请求的所有重要信息。

  4. 处理多次解码:在一次读取操作中,如果 ByteBuffer 中包含了多个 Packet,DecodeRunnable 会在解码完一个 Packet 后继续解码下一个,直到数据全部解码完成。

  5. 异常处理:如果解码过程中发生异常(例如数据包不完整或格式错误),系统会捕获异常并记录日志,同时关闭当前连接。

public void decode() {
    ByteBuffer byteBuffer = newReceivedByteBuffer;
    if (lastByteBuffer != null) {
        byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
        lastByteBuffer = null;
    }

    // 循环解码所有数据包
    label_2: while (true) {
        try {
            int initPosition = byteBuffer.position();
            int limit = byteBuffer.limit();
            int readableLength = limit - initPosition;
            Packet packet = null;

            // 根据需要解码的字节数进行判断
            if (channelContext.packetNeededLength != null) {
                if (readableLength >= channelContext.packetNeededLength) {
                    packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
                } else {
                    lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
                    return;
                }
            } else {
                try {
                    packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
                } catch (BufferUnderflowException e) {
                    // 数据不足,无法解码
                }
            }

            // 数据包解码完成
            if (packet != null) {
                channelContext.setPacketNeededLength(null);
                channelContext.stat.latestTimeOfReceivedPacket = SystemTimer.currTime;
                channelContext.stat.decodeFailCount = 0;

                // 统计解码后的数据包信息
                int packetSize = byteBuffer.position() - initPosition;
                packet.setByteCount(packetSize);

                if (tioConfig.statOn) {
                    tioConfig.groupStat.receivedPackets.incrementAndGet();
                    channelContext.stat.receivedPackets.incrementAndGet();
                }

                // 处理解码成功后的逻辑
                handler(packet, packetSize);

                // 如果还有剩余数据,继续解码
                if (byteBuffer.hasRemaining()) {
                    continue label_2;
                } else {
                    lastByteBuffer = null;
                    return;
                }
            } else {
                lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
                return;
            }
        } catch (Throwable e) {
            log.error("解码出现异常", e);
            Tio.close(channelContext, e, "解码异常:" + e.getMessage(), CloseCode.DECODE_ERROR);
            return;
        }
    }
}

在 DecodeRunnable 中,系统会根据当前读取到的 ByteBuffer 数据进行解码。如果解码失败,系统会尝试再次读取数据以完成解码任务。成功解码的数据包 Packet 会交由 HandlerRunnable 进行进一步处理。

八、CloseRunnable:关闭连接

CloseRunnable 负责在连接关闭时释放所有资源,并确保数据传输完毕。t-io 的连接关闭操作是异步的,通过 CloseRunnable 实现。CloseRunnable 的主要功能如下:

  1. 连接关闭时机:当客户端主动关闭连接或服务器检测到异常(如超时或解码失败)时,系统会触发 CloseRunnable 来处理关闭操作。

  2. 资源释放:关闭连接时,系统会释放与该连接相关的资源,如 ByteBuffer、任务队列等,防止内存泄漏。

  3. 日志记录:每次关闭连接时,系统都会记录相关的关闭原因、连接状态等信息,以便后续分析和调试。

  4. 重连机制:对于客户端连接,如果连接因为网络问题中断,CloseRunnable 可以将该连接加入重连队列,等待下次重连。

CloseRunnable 的 runTask 方法是在 t-io 框架中处理关闭连接的时机被调用的。具体来说,当一个连接需要关闭时,无论是因为客户端主动断开连接,还是服务器端检测到错误或超时,t-io 框架都会将该连接的 ChannelContext 放入 CloseRunnable 的任务队列中,并触发 CloseRunnable 的执行。其调用过程如下:

1. 连接断开触发 CloseRunnable

当连接需要关闭时,t-io 框架会调用 Tio.close(channelContext, ...) 方法。这个方法会根据不同的关闭原因,触发 CloseRunnable 进行关闭操作。

public static void close(ChannelContext channelContext, Throwable throwable, String remark, CloseCode closeCode) {
    // 标记连接已关闭
    channelContext.setClosed(true);

    // 将当前连接的 CloseMeta 信息保存
    channelContext.closeMeta.setNeedRemove(false);
    channelContext.closeMeta.setRemark(remark);
    channelContext.closeMeta.setThrowable(throwable);

    // 将该连接加入到 CloseRunnable 的任务队列
    channelContext.tioConfig.closeRunnable.addMsg(channelContext);

    // 执行 CloseRunnable 的任务
    channelContext.tioConfig.closeRunnable.execute();
}

2. 什么时候调用 CloseRunnable.runTask

CloseRunnable.runTask 方法的调用时机是:

  • 当连接因为读写异常、解码失败、心跳超时等原因需要关闭时,t-io 会调用 Tio.close 方法。
  • Tio.close 方法会将 ChannelContext 对象添加到 CloseRunnable 的任务队列中(通过 addMsg(channelContext))。
  • CloseRunnable 中的 execute() 方法会触发任务队列的消费,也就是调用 runTask 方法,实际关闭连接。

3. CloseRunnable 的执行过程

当 execute() 方法被调用时,CloseRunnable 会从任务队列中取出需要关闭的 ChannelContext,并调用 runTask 方法对其进行处理。该处理包括:

  • 调用监听器的 onBeforeClose 方法通知连接即将关闭。
  • 清理与该连接相关的资源。
  • 关闭底层的 AsynchronousSocketChannel。
  • 从 t-io 的各种管理列表中移除该连接(如 connections、clientNodes 等)。

4. 关键逻辑示例

当连接需要关闭时,Tio.close() 会触发如下代码片段:

@Override
public void runTask() {
    ChannelContext channelContext = null;
    while ((channelContext = msgQueue.poll()) != null) {
        try {
            // 执行关闭前的逻辑
            boolean isNeedRemove = channelContext.closeMeta.isNeedRemove;
            String remark = channelContext.closeMeta.remark;
            Throwable throwable = channelContext.closeMeta.throwable;

            // 关闭前通知监听器
            if (channelContext.tioConfig.getAioListener() != null) {
                channelContext.tioConfig.getAioListener().onBeforeClose(channelContext, throwable, remark, isNeedRemove);
            }

            // 关闭连接并释放资源
            channelContext.setClosed(true);
            MaintainUtils.remove(channelContext);

        } catch (Throwable e) {
            log.error("关闭连接时出错", e);
        }
    }
}

CloseRunnable.runTask() 方法是在 t-io 框架中执行关闭连接的关键步骤之一。其调用时机由 Tio.close() 方法触发,通常是在连接断开、读写异常、超时、解码错误等情况下。CloseRunnable 通过异步的方式关闭连接并释放资源,确保连接管理的高效性和安全性。

九. Tio.close 方法是在什么时候调用的?

t-io 框架中的 Tio.close 方法是用于关闭连接的,通常在以下几种情况下被调用:

9.1 连接读写异常时

如果在读写数据时出现异常,ReadCompletionHandler 或 WriteCompletionHandler 会捕获异常,并调用 Tio.close 方法来关闭该连接。例如,在 ReadCompletionHandler 的 failed 方法中,当数据读取失败时,框架会调用 Tio.close 来关闭连接:

@Override
public void failed(Throwable exc, ByteBuffer byteBuffer) {
    Tio.close(channelContext, exc, "读数据时发生异常: " + exc.getClass().getName(), CloseCode.READ_ERROR);
}

9.2 解码异常时

如果在处理收到的数据时,DecodeRunnable 中的解码过程出现问题,通常是由于数据格式不符合预期,Tio.close 会被调用来终止该连接并清理资源。例如,在 DecodeRunnable 中,捕获到解码异常时会调用 Tio.close:

catch (Throwable e) {
    if (channelContext.logWhenDecodeError) {
        log.error("解码时发生异常", e);
    }
    Tio.close(channelContext, e, "解码异常:" + e.getMessage(), CloseCode.DECODE_ERROR);
}

9.3 心跳超时时

t-io 提供了心跳检测机制,用于检测客户端与服务器之间的活跃性。如果长时间未收到数据或心跳信号,框架会认为连接超时,并调用 Tio.close 来关闭连接。例如,在 ServerTioConfig 的心跳检测线程中,如果连接长时间没有任何数据传输,会调用 Tio.close:

if (interval > heartbeatTimeout) {
    Tio.close(channelContext, "心跳超时", CloseCode.HEARTBEAT_TIMEOUT);
}

9.4 主动关闭连接

在业务逻辑中,服务器或客户端可能需要主动关闭连接。例如,当服务器要强制断开某个连接时,业务代码可以直接调用 Tio.close(channelContext) 方法来关闭该连接。

9.5 客户端关闭连接

当客户端主动断开连接时,服务器端可能会捕获到该断开的信号(如读取到 EOF),并调用 Tio.close 来关闭服务器端的连接。

9.6. 如果是 HTTP 请求,连接是通过心跳检测关闭的吗?

9.6.1 短连接 vs 长连接

在 HTTP 请求中,有两种主要的连接模式:

  • 短连接:传统的 HTTP/1.0 中,客户端和服务器每次请求结束后都会关闭连接。此时不需要心跳检测,连接会在请求结束后立即关闭。
  • 长连接:HTTP/1.1 引入了持久连接(keep-alive),允许多个请求在同一个连接中发送。如果该连接长时间没有活动,可能会使用心跳检测来保持连接的活跃性。

9.6.12 HTTP 的连接管理

对于长连接的 HTTP 请求,t-io 的心跳检测机制仍然可以发挥作用。具体来说,如果客户端在一个持久连接中长时间没有发送新的请求,服务器可以通过心跳检测来判断该连接是否超时,进而关闭连接。

  • 如果 HTTP 请求是通过短连接实现的,那么连接在请求完成后就会自动关闭,心跳检测不会起作用。
  • 对于使用 keep-alive 的 HTTP 长连接,如果该连接长时间未发送任何数据或请求,那么心跳机制可能会检测到该连接的空闲,并触发 Tio.close 方法关闭连接。

9.6.3 HTTP/2 的心跳机制

对于 HTTP/2 或 WebSocket 等协议,长连接更为普遍。在这种情况下,t-io 可以通过心跳机制监测连接的活跃度。如果连接处于空闲状态超过设定的时间,心跳检测将会关闭连接。

结论

  • Tio.close 方法是在网络异常(读写异常、解码异常)、心跳超时、主动断开连接等情况下被调用的。
  • 对于 HTTP 请求:
    • 短连接 在请求结束后自动关闭,不需要心跳检测。
    • 长连接(如 HTTP/1.1 keep-alive 或 HTTP/2),心跳检测可以用来关闭长时间未活动的连接。

总结

t-io 是一个高效的异步 I/O 框架,支持高并发和低延迟的网络应用开发。通过 AcceptCompletionHandler、ReadCompletionHandler、DecodeRunnable 和 CloseRunnable 等组件,t-io 实现了对网络连接的管理、数据的异步读取与解码、以及连接的安全关闭。这些组件相互配合,确保了数据在服务器和客户端之间的高效流转。

参考文献

tiocloud 文档资料

Edit this page
Last Updated:
Contributors: Tong Li
Prev
t-io 消息处理流程
Next
TioConfig