Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FastJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 胖包与瘦包的打包与部署
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 文件上传
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • /zh/06_web/19.html
    • 全局异常处理器
    • 异步
    • 动态 返回 CSS 实现
    • 返回图片
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • 接口访问统计
    • 接口请求和响应数据记录
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 跨域
    • 添加 Controller
    • 常用工具类
    • HTTP Basic 认证
    • WebJars
    • JProtobuf
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板
    • 数据源配置与使用
    • ActiveRecord
    • Model
    • 生成器与 Model
    • Db 工具类
    • 批量操作
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • Java-DB 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • TQL(Table SQL)前端输入规范
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • hutool-JWT
    • FixedTokenInterceptor
    • 使用内置 TokenManager 实现登录
    • 用户系统
    • 重置密码
    • 匿名登录
    • Google 登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
    • 短信登录
    • 移动端微信登录实现指南
    • 移动端重置密码
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Enjoy 使用示例
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • tio-core
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • ssl
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • 邮箱
    • JSON
    • 读取文件
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils 使用文档
    • 系统监控
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 整合 magic-script
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 使用 AWS S3 存储文件并整合到 Tio-Boot 项目中
    • 存储文件到 腾讯 COS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • tio-boot 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • Tio Boot 整合 Spring Boot Starter
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_mysql

    • 使用 Docker 运行 MySQL
    • /zh/42_mysql/02.html
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • PostgreSQL 全文检索
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • /zh/43_oceanbase/05.html
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • /zh/50_media/03.html
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
  • 55_telegram4j

    • 数据库设计
    • /zh/55_telegram4j/02.html
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 56_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • AI 问答
    • /zh/60_LLM/03.html
    • /zh/60_LLM/04.html
    • 增强检索(RAG)
    • 结构化数据检索
    • 搜索+AI
    • 集成第三方 API
    • 后置处理
    • 推荐问题生成
    • 连接代码执行器
    • 避免 GPT 混乱
    • /zh/60_LLM/13.html
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • 对接 Perplexity API
    • 意图识别与生成提示词
    • 智能问答模块设计与实现
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 通用搜索
    • /zh/61_ai_agent/15.html
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 62_translator

    • 简介
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_java-linux

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • MCP 协议
    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_manim

    • 简介
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 执行 Python (Manim) 代码
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • manimgl
    • EGL
    • /zh/66_manim/30.html
    • /zh/66_manim/31.html
    • 成本核算
    • /zh/66_manim/33.html
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 图片管理
    • /zh/70_tio-boot-admin/08.html
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 71_tio-boot

    • /zh/71_tio-boot/01.html
    • Swagger 整合到 Tio-Boot 中的指南
    • HTTP/1.1 Pipelining 性能测试报告
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

深入解析 Tio 源码:构建高性能 Java 网络应用

在现代网络应用中,处理大量并发连接和高效的数据传输是至关重要的。Tio(简称 t-io)作为一个基于 Java 的高性能网络通信框架,旨在简化这一过程。本文将基于提供的代码片段,深入解析 t-io 的源码结构和关键技术细节,帮助开发者更好地理解并应用这一框架。

目录

  1. Tio 框架概述
  2. 核心配置类:TioConfig
  3. 服务器配置:ServerTioConfig
  4. 服务器启动与监听:TioServer
  5. 连接接收处理:AcceptCompletionHandler
  6. 数据读取与解码:ReadCompletionHandler 与 DecodeTask
  7. 消息处理:HandlePacketTask
  8. 网络通信管理:Tio
  9. 异常处理与连接关闭:CloseTask
  10. 总结与技术要点

Tio 框架概述

Tio 是一个基于 Java AIO 的网络通信框架,旨在提供高性能、易用性和可扩展性。它支持 TCP、WebSocket 等协议,具备心跳机制、IP 黑名单、群组管理等功能。通过抽象化的配置和事件驱动的处理方式,Tio 简化了网络应用的开发过程。

核心配置类:TioConfig

TioConfig是 Tio 框架的核心配置类,负责管理所有与网络通信相关的配置和状态。以下是对其关键部分的详细解析。

类结构与成员变量

public abstract class TioConfig extends MapWithLockPropSupport {
    static Logger log = LoggerFactory.getLogger(TioConfig.class);

    public static final Set<ServerTioConfig> ALL_SERVER_GROUPCONTEXTS = new HashSet<>();
    public static final Set<ClientTioConfig> ALL_CLIENT_GROUPCONTEXTS = new HashSet<>();
    public static final Set<TioConfig> ALL_GROUPCONTEXTS = new HashSet<>();

    public static final int READ_BUFFER_SIZE = Integer.getInteger("tio.default.read.buffer.size", 20480);
    private final static AtomicInteger ID_ATOMIC = new AtomicInteger();

    private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    public boolean isShortConnection = false;
    public SslConfig sslConfig = null;
    public boolean debug = false;
    public GroupStat groupStat = null;
    public boolean statOn = true;
    public PacketConverter packetConverter = null;

    private CacheFactory cacheFactory;
    @SuppressWarnings("rawtypes")
    private RemovalListenerWrapper ipRemovalListenerWrapper;
    public long startTime = SystemTimer.currTime;
    public long heartbeatTimeout = 1000 * 120;
    public boolean logWhenDecodeError = false;
    private int readBufferSize = READ_BUFFER_SIZE;
    private GroupListener groupListener = null;
    private AioId tioUuid = new DefaultTAioId();

    public ClientNodes clientNodes = new ClientNodes();
    public SetWithLock<ChannelContext> connections = new SetWithLock<ChannelContext>(new HashSet<ChannelContext>());
    public Groups groups = new Groups();
    public Users users = new Users();
    public Tokens tokens = new Tokens();
    public Ids ids = new Ids();
    public BsIds bsIds = new BsIds();
    public Ips ips = new Ips();
    public IpStats ipStats = new IpStats(this, null);;
    protected String id;
    protected int maxDecodeErrorCountForIp = 10;
    protected String name = "Untitled";
    private IpStatListener ipStatListener = DefaultIpStatListener.me;
    private boolean isStopped = false;
    public IpBlacklist ipBlacklist = null;
    public MapWithLock<Integer, Packet> waitingResps = new MapWithLock<Integer, Packet>(new HashMap<Integer, Packet>());

    public boolean disgnostic = EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC);

    // 构造函数与初始化方法略
}

主要成员变量解析

  1. 连接管理

    • clientNodes:管理所有客户端节点。
    • connections:当前所有的连接集合。
    • groups:管理群组信息。
    • users、tokens、ids、bsIds、ips:分别管理用户、令牌、ID、业务 ID、IP 相关的信息。
  2. 缓存与统计

    • cacheFactory:用于创建缓存工厂,支持不同类型的缓存实现。
    • ipStats:IP 统计信息,支持多种统计时段。
    • groupStat:群组统计信息。
    • heartbeatTimeout:心跳超时时间,默认 120 秒。
  3. 配置与状态

    • byteOrder:字节序,默认大端。
    • sslConfig:SSL 配置,用于启用加密通信。
    • debug:调试模式开关。
    • statOn:统计开关。
    • packetConverter:自定义包转换器。
    • ipBlacklist:IP 黑名单管理。
  4. 生命周期管理

    • startTime:配置启动时间。
    • isStopped:停止标志。
    • id:唯一标识符,通过AtomicInteger递增生成。
    • name:配置名称。

关键方法解析

  1. 初始化方法

    init()方法负责初始化配置,注册到全局上下文集合,并设置默认的 IP 移除监听器。

    public void init() {
        if (cacheFactory == null) {
            this.cacheFactory = ConcurrentMapCacheFactory.INSTANCE;
        }
    
        if (ipRemovalListenerWrapper == null) {
            setDefaultIpRemovalListenerWrapper();
        }
    
        ALL_GROUPCONTEXTS.add(this);
        if (this instanceof ServerTioConfig) {
            ALL_SERVER_GROUPCONTEXTS.add((ServerTioConfig) this);
        } else {
            ALL_CLIENT_GROUPCONTEXTS.add((ClientTioConfig) this);
        }
    
        if (ALL_GROUPCONTEXTS.size() > 20) {
            log.warn("You have created {} TioConfig objects, you might be misusing t-io.", ALL_GROUPCONTEXTS.size());
        }
        this.id = ID_ATOMIC.incrementAndGet() + "";
    
        if (this.ipStats == null) {
            this.ipStats = new IpStats(this, null);
        }
    }
    
  2. 设置默认 IP 移除监听器

    如果未指定 IP 移除监听器,则设置默认的监听器,用于处理 IP 被移除后的逻辑。

    @SuppressWarnings({ "rawtypes", "unchecked" })
    public void setDefaultIpRemovalListenerWrapper() {
        this.ipRemovalListenerWrapper = new RemovalListenerWrapper();
        IpStatMapCacheRemovalListener ipStatMapCacheRemovalListener = new IpStatMapCacheRemovalListener(this, ipStatListener);
        ipRemovalListenerWrapper.setListener(ipStatMapCacheRemovalListener);
    }
    
  3. 抽象方法

    TioConfig是一个抽象类,需要子类实现以下两个方法:

    public abstract AioHandler getAioHandler();
    public abstract AioListener getAioListener();
    public abstract boolean isServer();
    
    • getAioHandler():获取 AIO 处理器,用于处理数据的编码和解码。
    • getAioListener():获取 AIO 监听器,用于监听连接事件。
    • isServer():判断是否为服务器配置。

服务器配置:ServerTioConfig

ServerTioConfig是TioConfig的一个具体实现,专门用于服务器端的配置管理。

类结构与成员变量

public class ServerTioConfig extends TioConfig {
    static Logger log = LoggerFactory.getLogger(ServerTioConfig.class);

    private ServerAioHandler serverAioHandler = null;
    private ServerAioListener serverAioListener = null;
    private Thread checkHeartbeatThread = null;
    private boolean needCheckHeartbeat = true;
    private boolean isShared = false;

    // 构造函数与其他方法略
}

主要成员变量解析

  1. AIO 处理器与监听器

    • serverAioHandler:服务器端 AIO 处理器,负责数据的编码和解码。
    • serverAioListener:服务器端 AIO 监听器,监听连接事件和状态变化。
  2. 心跳检测

    • checkHeartbeatThread:用于定时检查连接的心跳线程。
    • needCheckHeartbeat:是否需要进行心跳检测的标志。
  3. 共享配置

    • isShared:标志配置是否被共享,影响心跳检测的逻辑。

关键方法解析

  1. 使用 SSL

    useSsl方法用于配置 SSL,加密通信数据。

    public void useSsl(String keyStoreFile, String trustStoreFile, String keyStorePwd) throws Exception {
        if (StrUtil.isNotBlank(keyStoreFile) && StrUtil.isNotBlank(trustStoreFile)) {
            SslConfig sslConfig = SslConfig.forServer(keyStoreFile, trustStoreFile, keyStorePwd);
            this.setSslConfig(sslConfig);
        }
    }
    
    public void useSsl(InputStream keyStoreInputStream, InputStream trustStoreInputStream, String passwd) throws Exception {
        SslConfig sslConfig = SslConfig.forServer(keyStoreInputStream, trustStoreInputStream, passwd);
        this.setSslConfig(sslConfig);
    }
    
  2. 初始化方法

    init()方法不仅初始化基础配置,还启动心跳检测线程。

    public void init() {
        super.init();
        this.groupStat = new ServerGroupStat();
        GlobalIpBlacklist.INSTANCE.init(this);
        Runnable check = new Runnable() {
            @Override
            public void run() {
                // 心跳检测逻辑
            }
        };
    
        checkHeartbeatThread = new Thread(check, "tio-timer-checkheartbeat-" + id + "-" + name);
        checkHeartbeatThread.setDaemon(true);
        checkHeartbeatThread.setPriority(Thread.MIN_PRIORITY);
        checkHeartbeatThread.start();
    }
    
  3. 心跳检测线程

    心跳检测线程定期扫描所有连接,判断是否超时未响应,进行必要的处理。

    Runnable check = new Runnable() {
        @Override
        public void run() {
            // 初始等待
            Thread.sleep(1000 * 10);
    
            while (needCheckHeartbeat && !isStopped()) {
                if (heartbeatTimeout <= 0) {
                    break;
                }
                try {
                    Thread.sleep(heartbeatTimeout);
                } catch (InterruptedException e1) {
                    log.error(e1.toString(), e1);
                }
                long start = SystemTimer.currTime;
                SetWithLock<ChannelContext> setWithLock = connections;
                Set<ChannelContext> set = null;
                ReadLock readLock = setWithLock.readLock();
                readLock.lock();
                try {
                    set = setWithLock.getObj();
    
                    for (ChannelContext channelContext : set) {
                        long compareTime = Math.max(channelContext.stat.latestTimeOfReceivedByte, channelContext.stat.latestTimeOfSentPacket);
                        long currtime = SystemTimer.currTime;
                        long interval = currtime - compareTime;
    
                        boolean needRemove = false;
                        if (channelContext.heartbeatTimeout != null && channelContext.heartbeatTimeout > 0) {
                            needRemove = interval > channelContext.heartbeatTimeout;
                        } else {
                            needRemove = interval > heartbeatTimeout;
                        }
    
                        if (needRemove) {
                            if (!ServerTioConfig.this.serverAioListener.onHeartbeatTimeout(channelContext, interval, channelContext.stat.heartbeatTimeoutCount.incrementAndGet())) {
                                log.info("{}, {} ms or not send and receive message", channelContext, interval);
                                channelContext.setCloseCode(CloseCode.HEARTBEAT_TIMEOUT);
                                Tio.remove(channelContext, interval + " ms not send and receive message");
                            }
                        }
                    }
                } catch (Throwable e) {
                    log.error("", e);
                } finally {
                    try {
                        readLock.unlock();
                        if (debug) {
                            diagnostic(start, set, start1, count);
                        }
                    } catch (Throwable e) {
                        log.error("", e);
                    }
                }
            }
        }
    };
    

技术细节解析

  1. 心跳机制

    心跳机制通过定期检查连接的最近接收和发送时间,判断连接是否活跃。如果超过超时时间未有数据交互,则触发超时处理,包括调用监听器的onHeartbeatTimeout方法,并可能关闭连接。

  2. 线程与并发

    • 使用ReentrantReadWriteLock管理连接集合的并发访问,确保高效的读写操作。
    • 心跳检测运行在一个独立的守护线程中,确保不会阻塞主线程。
  3. 统计与日志

    • 统计信息包括已接受连接数、当前连接数、发送和接收的数据量等。
    • 在调试模式下,定期输出详细的诊断信息,帮助开发者监控系统状态。

服务器启动与监听:TioServer

TioServer类负责启动服务器,绑定端口,监听连接,并处理接收到的连接请求。

类结构与成员变量

public class TioServer {
    private static Logger log = LoggerFactory.getLogger(TioServer.class);
    private ServerTioConfig serverTioConfig;
    private AsynchronousServerSocketChannel serverSocketChannel;
    private Node serverNode;
    private boolean isWaitingStop = false;
    private boolean checkLastVersion = true;
    private ExecutorService groupExecutor;
    private AsynchronousChannelGroup channelGroup;

    // 构造函数与其他方法略
}

主要成员变量解析

  1. 配置与节点

    • serverTioConfig:服务器配置对象。
    • serverNode:服务器节点信息,包括 IP 和端口。
  2. 网络通信

    • serverSocketChannel:异步服务器套接字通道,负责接受客户端连接。
    • channelGroup:异步通道组,管理一组异步通道。
  3. 状态管理

    • isWaitingStop:标志服务器是否正在等待停止。
    • checkLastVersion:版本检查开关。
  4. 线程与执行器

    • groupExecutor:用于管理异步通道的线程池。

关键方法解析

  1. 启动服务器

    start()方法负责初始化服务器配置,绑定端口,启动监听,并接受连接。

    public void start(String serverIp, int serverPort) throws IOException {
        serverTioConfig.init();
        serverTioConfig.getCacheFactory().register(TioCoreConfigKeys.REQEUST_PROCESSING, null, null, null);
    
        this.serverNode = new Node(serverIp, serverPort);
        if (EnvUtils.getBoolean("tio.core.hotswap.reload", false)) {
            groupExecutor = Threads.getGroupExecutor();
            channelGroup = AsynchronousChannelGroup.withThreadPool(groupExecutor);
            serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
        } else {
            serverSocketChannel = AsynchronousServerSocketChannel.open();
        }
    
        serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024);
    
        InetSocketAddress listenAddress = null;
    
        if (StrUtil.isBlank(serverIp)) {
            listenAddress = new InetSocketAddress(serverPort);
        } else {
            listenAddress = new InetSocketAddress(serverIp, serverPort);
        }
    
        serverSocketChannel.bind(listenAddress, 0);
    
        AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler();
        serverSocketChannel.accept(this, acceptCompletionHandler);
        serverTioConfig.startTime = System.currentTimeMillis();
        Threads.getTioExecutor();
    }
    

    步骤解析:

    • 初始化配置:调用serverTioConfig.init(),确保所有配置和状态被正确初始化。
    • 绑定端口:根据提供的 IP 和端口,绑定服务器套接字通道。
    • 启动监听:创建AcceptCompletionHandler实例,开始接受客户端连接。
    • 心跳线程:初始化和启动心跳检测线程,确保连接的活跃性。
  2. 停止服务器

    stop()方法负责优雅地关闭服务器,释放资源,并关闭所有连接。

    public boolean stop() {
        isWaitingStop = true;
    
        if (channelGroup != null) {
            try {
                channelGroup.shutdownNow();
            } catch (Exception e) {
                log.error("Faild to execute channelGroup.shutdownNow()", e);
            }
        }
    
        if (groupExecutor != null) {
            try {
                groupExecutor.shutdownNow();
            } catch (Exception e) {
                log.error("Failed to close groupExecutor", e);
            }
    
        }
    
        if (serverSocketChannel != null) {
            try {
                serverSocketChannel.close();
            } catch (Exception e) {
                log.error("Failed to close serverSocketChannel", e);
            }
    
        }
    
        serverTioConfig.setStopped(true);
        boolean ret = Threads.close();
        log.info(this.serverNode + " stopped");
        return ret;
    }
    

    步骤解析:

    • 标记停止:设置isWaitingStop为true,通知其他组件服务器即将停止。
    • 关闭异步通道组:尝试关闭channelGroup,终止所有关联的异步操作。
    • 关闭执行器:关闭线程池groupExecutor,释放线程资源。
    • 关闭服务器套接字通道:关闭serverSocketChannel,停止接受新连接。
    • 更新配置状态:设置serverTioConfig的停止标志,并关闭线程池。
  3. 版本检查

    checkLastVersion用于控制是否进行版本检查,但在提供的代码中仅记录了日志,未实际实现版本检查逻辑。

    public void setCheckLastVersion(boolean checkLastVersion) {
        log.debug("community edition is no longer supported");
    }
    

连接接收处理:AcceptCompletionHandler

AcceptCompletionHandler类实现了CompletionHandler接口,负责处理新接收到的客户端连接。

类结构与成员变量

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, TioServer> {
    private static Logger log = LoggerFactory.getLogger(AcceptCompletionHandler.class);

    public AcceptCompletionHandler() {}

    @Override
    public void completed(AsynchronousSocketChannel clientSocketChannel, TioServer tioServer) {
        // 连接完成后的处理逻辑
    }

    @Override
    public void failed(Throwable exc, TioServer tioServer) {
        // 连接失败后的处理逻辑
    }
}

主要方法解析

  1. 连接成功处理

    completed()方法在成功接收到一个客户端连接时被调用。

    @Override
    public void completed(AsynchronousSocketChannel clientSocketChannel, TioServer tioServer) {
        AsynchronousServerSocketChannel serverSocketChannel = tioServer.getServerSocketChannel();
    
        if (tioServer.isWaitingStop()) {
            log.info("The server will be shut down and no new requests will be accepted:{}", tioServer.getServerNode());
        } else {
            serverSocketChannel.accept(tioServer, this);
        }
        if (serverSocketChannel == null) {
            log.info("receive serverSocketChannel is null skip");
            return;
        }
    
        if (!serverSocketChannel.isOpen()) {
            log.info("receive serverSocketChannel is not open skip");
            return;
        }
    
        String clientIp = null;
        int port = 0;
        InetSocketAddress inetSocketAddress;
        try {
            inetSocketAddress = (InetSocketAddress) clientSocketChannel.getRemoteAddress();
            clientIp = inetSocketAddress.getHostString();
            port = inetSocketAddress.getPort();
            if (EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC, false)) {
                log.info("new connection:{},{}", clientIp, port);
            }
        } catch (IOException e1) {
            log.error("Failed to get client ip and port", e1);
        }
    
        ServerTioConfig serverTioConfig = tioServer.getServerTioConfig();
    
        try {
            if (IpBlacklist.isInBlacklist(serverTioConfig, clientIp)) {
                log.info("{} on the blacklist, {}", clientIp, serverTioConfig.getName());
                clientSocketChannel.close();
                return;
            }
    
            if (serverTioConfig.statOn) {
                ((ServerGroupStat) serverTioConfig.groupStat).accepted.incrementAndGet();
            }
    
            clientSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
            clientSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024);
            clientSocketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 64 * 1024);
            clientSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    
            ServerChannelContext channelContext = new ServerChannelContext(serverTioConfig, clientSocketChannel, clientIp, port);
            channelContext.setClosed(false);
            channelContext.stat.setTimeFirstConnected(SystemTimer.currTime);
            channelContext.setServerNode(tioServer.getServerNode());
    
            boolean isConnected = true;
            boolean isReconnect = false;
            if (serverTioConfig.getServerAioListener() != null) {
                if (!SslUtils.isSsl(channelContext.tioConfig)) {
                    try {
                        serverTioConfig.getServerAioListener().onAfterConnected(channelContext, isConnected, isReconnect);
                    } catch (Throwable e) {
                        log.error("ServerAioListener onAfterConnected:", e);
                    }
                }
            }
    
            if (CollUtil.isNotEmpty(serverTioConfig.ipStats.durationList)) {
                try {
                    for (Long v : serverTioConfig.ipStats.durationList) {
                        IpStat ipStat = (IpStat) serverTioConfig.ipStats.get(v, channelContext);
                        ipStat.getRequestCount().incrementAndGet();
                        serverTioConfig.getIpStatListener().onAfterConnected(channelContext, isConnected, isReconnect, ipStat);
                    }
                } catch (Exception e) {
                    log.error("IpStatListener onAfterConnected:", e);
                }
            }
    
            if (!tioServer.isWaitingStop()) {
                ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(channelContext);
                ByteBuffer readByteBuffer = ByteBufferPool.BUFFER_POOL.acquire(serverTioConfig.getByteOrder());
                readByteBuffer.position(0);
                readByteBuffer.limit(readByteBuffer.capacity());
                clientSocketChannel.read(readByteBuffer, readByteBuffer, readCompletionHandler);
            }
        } catch (Throwable e) {
            log.error("Failed to read data from :{},{}", clientIp, port);
            e.printStackTrace();
        }
    }
    

    步骤解析:

    • 重复接受连接:在每次成功接受连接后,立即调用serverSocketChannel.accept()继续接受下一个连接。
    • 获取客户端 IP 和端口:通过clientSocketChannel.getRemoteAddress()获取连接的客户端信息。
    • IP 黑名单检查:如果客户端 IP 在黑名单中,则拒绝连接。
    • 配置 Socket 选项:设置SO_REUSEADDR、接收和发送缓冲区大小、SO_KEEPALIVE等选项,优化连接性能。
    • 创建 ChannelContext:为新连接创建ChannelContext对象,管理连接的状态和统计信息。
    • 触发连接事件:调用serverAioListener.onAfterConnected()方法,通知监听器连接已建立。
    • 心跳统计:更新 IP 统计信息。
    • 开始读取数据:为新连接创建ReadCompletionHandler,并开始异步读取数据。
  2. 连接失败处理

    failed()方法在接受连接失败时被调用。

    @Override
    public void failed(Throwable exc, TioServer tioServer) {
        if (tioServer.isWaitingStop()) {
            log.info("The server will be shut down and no new requests will be accepted:{}", tioServer.getServerNode());
        } else {
            AsynchronousServerSocketChannel serverSocketChannel = tioServer.getServerSocketChannel();
            serverSocketChannel.accept(tioServer, this);
            log.error("[" + tioServer.getServerNode() + "] listening exception", exc);
        }
    }
    

    步骤解析:

    • 停止标志检查:如果服务器正在等待停止,则不再接受新连接。
    • 继续接受连接:如果未停止,继续调用accept()方法,保持服务器的可用性。
    • 日志记录:记录连接失败的异常信息,便于调试和监控。

技术细节解析

  1. 异步 NIO

    使用 Java AIO 的AsynchronousServerSocketChannel和AsynchronousSocketChannel实现非阻塞的连接接受和数据传输,提升系统的并发处理能力。

  2. IP 黑名单

    在接收到新连接时,首先检查客户端 IP 是否在黑名单中。如果是,则拒绝连接,增强系统的安全性。

  3. Socket 选项优化

    • SO_REUSEADDR:允许重新使用本地地址,避免在服务器重启后因端口占用而无法绑定。
    • SO_RCVBUF和SO_SNDBUF:设置接收和发送缓冲区大小,优化数据传输性能。
    • SO_KEEPALIVE:启用 TCP 保活,检测连接的有效性。
  4. 连接上下文管理

    为每个连接创建独立的ChannelContext,负责管理连接的状态、统计信息和配置,确保高效的连接管理。

数据读取与解码:ReadCompletionHandler 与 DecodeTask

数据读取和解码是网络通信的关键环节,涉及数据的接收、解码、处理等多个步骤。Tio 通过ReadCompletionHandler和DecodeTask实现这一过程。

ReadCompletionHandler

ReadCompletionHandler类实现了CompletionHandler接口,负责处理异步读取操作完成后的回调。

类结构与成员变量

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    private static Logger log = LoggerFactory.getLogger(ReadCompletionHandler.class);
    private ChannelContext channelContext = null;

    public ReadCompletionHandler(ChannelContext channelContext) {
        this.channelContext = channelContext;
    }

    @Override
    public void completed(Integer result, ByteBuffer byteBuffer) {
        // 处理读取完成后的逻辑
    }

    @Override
    public void failed(Throwable exc, ByteBuffer byteBuffer) {
        // 处理读取失败后的逻辑
    }
}

关键方法解析

  1. 读取完成处理

    completed()方法在数据读取完成时被调用,负责处理接收到的数据。

    @Override
    public void completed(Integer result, ByteBuffer byteBuffer) {
        if (result > 0) {
            TioConfig tioConfig = channelContext.tioConfig;
            if (tioConfig.statOn) {
                tioConfig.groupStat.receivedBytes.addAndGet(result);
                tioConfig.groupStat.receivedTcps.incrementAndGet();
                channelContext.stat.receivedBytes.addAndGet(result);
                channelContext.stat.receivedTcps.incrementAndGet();
            }
    
            channelContext.stat.latestTimeOfReceivedByte = SystemTimer.currTime;
    
            if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) {
                try {
                    for (Long v : tioConfig.ipStats.durationList) {
                        IpStat ipStat = tioConfig.ipStats.get(v, channelContext);
                        ipStat.getReceivedBytes().addAndGet(result);
                        ipStat.getReceivedTcps().incrementAndGet();
                        tioConfig.getIpStatListener().onAfterReceivedBytes(channelContext, result, ipStat);
                    }
                } catch (Exception e1) {
                    log.error(channelContext.toString(), e1);
                }
            }
    
            if (tioConfig.getAioListener() != null) {
                try {
                    tioConfig.getAioListener().onAfterReceivedBytes(channelContext, result);
                } catch (Exception e) {
                    log.error(channelContext.toString(), e);
                }
            }
    
            byteBuffer.flip();
            if (channelContext.sslFacadeContext == null) {
                new DecodeTask().decode(channelContext, byteBuffer);
            } else {
                ByteBuffer copiedByteBuffer = null;
                try {
                    copiedByteBuffer = ByteBufferUtils.copy(byteBuffer);
                    log.debug("{},Decrypt SSL data:{}", channelContext, copiedByteBuffer);
                    channelContext.sslFacadeContext.getSslFacade().decrypt(copiedByteBuffer);
                } catch (Exception e) {
                    log.error(channelContext + ", " + e.toString() + copiedByteBuffer, e);
                    Tio.close(channelContext, e, e.toString(), CloseCode.SSL_DECRYPT_ERROR);
                }
            }
    
            if (TioUtils.checkBeforeIO(channelContext)) {
                read(byteBuffer);
            } else {
                ByteBufferPool.BUFFER_POOL.release(byteBuffer);
            }
    
        } else if (result == 0) {
            String message = "The length of the read data is 0";
            log.error("close {}, because {}", channelContext, message);
            Tio.close(channelContext, null, message, CloseCode.READ_COUNT_IS_ZERO);
            ByteBufferPool.BUFFER_POOL.release(byteBuffer);
            return;
        } else if (result < 0) {
            if (result == -1) {
                String message = "The connection closed by peer";
                if (EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC, false)) {
                    log.info("close {}, because {}", channelContext.getClientIpAndPort(), message);
                }
                Tio.close(channelContext, null, message, CloseCode.CLOSED_BY_PEER);
                ByteBufferPool.BUFFER_POOL.release(byteBuffer);
                return;
            } else {
                String message = "The length of the read data is less than -1";
                Tio.close(channelContext, null, "读数据时返回" + result, CloseCode.READ_COUNT_IS_NEGATIVE);
                log.error("close {}, because {}", channelContext, message);
                ByteBufferPool.BUFFER_POOL.release(byteBuffer);
                return;
            }
        }
    }
    

    步骤解析:

    • 数据有效性检查:根据result判断读取的数据长度,处理不同情况。

      • result > 0:成功读取到数据,进行统计和解码处理。
      • result == 0:读取到的数据长度为 0,关闭连接。
      • result < 0:读取错误,可能是对端关闭连接,进行相应处理。
    • 统计信息更新:根据读取的数据长度,更新全局和连接级别的统计信息。

    • SSL 解密:如果连接启用了 SSL,则对接收到的数据进行解密处理。

    • 数据解码:通过DecodeTask进行数据的解码,转换为业务包。

    • 继续读取数据:调用read()方法,继续异步读取数据,保持连接的持续活跃。

  2. 读取失败处理

    failed()方法在数据读取失败时被调用,负责关闭连接。

    @Override
    public void failed(Throwable exc, ByteBuffer byteBuffer) {
        Tio.close(channelContext, exc, "Failed to read data: " + exc.getClass().getName(), CloseCode.READ_ERROR);
    }
    

    步骤解析:

    • 关闭连接:由于读取失败,调用Tio.close()方法关闭连接,释放资源。
    • 日志记录:记录失败的异常信息,便于问题排查。

技术细节解析

  1. 异步读取

    使用 Java AIO 的AsynchronousSocketChannel进行非阻塞的数据读取,提升系统的并发处理能力。

  2. 心跳机制

    通过更新latestTimeOfReceivedByte和latestTimeOfSentPacket,监控连接的活跃性,结合心跳检测线程,确保连接的有效性。

  3. SSL 支持

    • 解密处理:在数据读取后,若启用了 SSL,则对接收到的数据进行解密,确保数据的安全性。
    • 错误处理:如果解密过程中出现异常,则关闭连接,防止潜在的安全风险。
  4. 缓冲区管理

    • 使用ByteBufferPool管理缓冲区,减少内存分配和回收的开销,提升性能。
    • 通过ByteBuffer.flip()准备数据进行读取和解码。

消息处理:HandlePacketTask

HandlePacketTask类负责将解码后的业务包交给业务逻辑处理器,进行具体的业务处理。

类结构与成员变量

public class HandlePacketTask {
    private AtomicLong synFailCount = new AtomicLong();

    public void handler(ChannelContext channelContext, Packet packet) {
        // 处理业务包的逻辑
    }
}

关键方法解析

  1. 业务包处理

    handler()方法负责将业务包传递给 AIO 处理器,并更新统计信息。

    public void handler(ChannelContext channelContext, Packet packet) {
        TioConfig tioConfig = channelContext.tioConfig;
    
        if (packet.isKeepConnection() && !channelContext.isBind) {
            tioConfig.ips.bind(channelContext);
            tioConfig.ids.bind(channelContext);
            channelContext.groups = new SetWithLock<String>(new HashSet<>());
            channelContext.isBind = true;
        }
        long start = SystemTimer.currTime;
        try {
            Integer synSeq = packet.getSynSeq();
            if (synSeq != null && synSeq > 0) {
                MapWithLock<Integer, Packet> syns = tioConfig.getWaitingResps();
                Packet initPacket = syns.remove(synSeq);
                if (initPacket != null) {
                    synchronized (initPacket) {
                        syns.put(synSeq, packet);
                        initPacket.notify();
                    }
                } else {
                    log.error("[{}] Failed to synchronize message, synSeq is {}, but there is no corresponding key value in the synchronization collection", synFailCount.incrementAndGet(), synSeq);
                }
            } else {
                if (EnvUtils.getBoolean(TioCoreConfigKeys.TIO_CORE_DIAGNOSTIC, false)) {
                    Long id = packet.getId();
                    String requestInfo = channelContext.getClientIpAndPort() + "_" + id;
                    log.info("handle:{}", requestInfo);
                }
                tioConfig.getAioHandler().handler(packet, channelContext);
            }
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            long end = SystemTimer.currTime;
            long iv = end - start;
            if (tioConfig.statOn) {
                channelContext.stat.handledPackets.incrementAndGet();
                channelContext.stat.handledBytes.addAndGet(packet.getByteCount());
                channelContext.stat.handledPacketCosts.addAndGet(iv);
    
                tioConfig.groupStat.handledPackets.incrementAndGet();
                tioConfig.groupStat.handledBytes.addAndGet(packet.getByteCount());
                tioConfig.groupStat.handledPacketCosts.addAndGet(iv);
            }
    
            if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) {
                try {
                    for (Long v : tioConfig.ipStats.durationList) {
                        IpStat ipStat = (IpStat) tioConfig.ipStats.get(v, channelContext);
                        ipStat.getHandledPackets().incrementAndGet();
                        ipStat.getHandledBytes().addAndGet(packet.getByteCount());
                        ipStat.getHandledPacketCosts().addAndGet(iv);
                        tioConfig.getIpStatListener().onAfterHandled(channelContext, packet, ipStat, iv);
                    }
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
    
            if (tioConfig.getAioListener() != null) {
                try {
                    tioConfig.getAioListener().onAfterHandled(channelContext, packet, iv);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    步骤解析:

    • 连接绑定:如果业务包要求保持连接(isKeepConnection),且连接尚未绑定,则将连接绑定到 IP、ID 等。

    • 同步请求处理:

      • 如果业务包带有同步序列号(synSeq),则从等待响应的集合中查找对应的请求包,并通知等待的线程。
      • 如果未找到对应的请求包,则记录同步失败计数,并记录错误日志。
    • 异步请求处理:

      • 如果业务包没有同步序列号,则直接调用 AIO 处理器的handler()方法,进行业务处理。
    • 统计信息更新:记录处理包的数量、字节数和处理时间。

    • 回调监听器:通知 AIO 监听器业务包已处理完毕。

技术细节解析

  1. 连接绑定

    对于需要保持连接的业务包,通过绑定 IP 和 ID,确保连接的状态与业务逻辑的一致性。

  2. 同步与异步请求处理

    • 同步请求:通过序列号匹配请求与响应,实现同步通信的效果。使用CountDownLatch等待响应。
    • 异步请求:直接将业务包交给 AIO 处理器,无需等待,适用于高并发场景。
  3. 统计与日志

    • 通过AtomicLong和SetWithLock管理统计信息,确保高效的并发处理。
    • 在调试模式下,记录详细的处理日志,便于问题排查。
  4. 异常处理

    • 捕获并记录业务处理过程中的异常,确保系统的稳定性。
    • 对于同步请求未能找到对应的请求包,记录同步失败计数,预防潜在的同步问题。

网络通信管理:Tio

Tio类作为 Tio 框架的核心 API,提供了丰富的方法用于管理连接、发送消息、关闭连接等操作。以下将详细解析其关键功能和技术实现。

类结构与成员变量

public class Tio {
    public static class IpBlacklist {
        // IP黑名单相关方法
    }

    private static Logger log = LoggerFactory.getLogger(Tio.class);

    // 其他内部类和方法略

    private Tio() {}
}

主要功能解析

  1. IP 黑名单管理

    IpBlacklist内部类提供了对 IP 黑名单的管理功能,包括添加、删除、检查等。

    public static class IpBlacklist {
        public static boolean add(TioConfig tioConfig, String ip) {
            return tioConfig.ipBlacklist.add(ip);
        }
    
        public static boolean add(String ip) {
            return GlobalIpBlacklist.INSTANCE.global.add(ip);
        }
    
        public static void clear(TioConfig tioConfig) {
            tioConfig.ipBlacklist.clear();
        }
    
        public static void clear() {
            GlobalIpBlacklist.INSTANCE.global.clear();
        }
    
        public static Collection<String> getAll(TioConfig tioConfig) {
            return tioConfig.ipBlacklist.getAll();
        }
    
        public static Collection<String> getAll() {
            return GlobalIpBlacklist.INSTANCE.global.getAll();
        }
    
        public static boolean isInBlacklist(TioConfig tioConfig, String ip) {
            if (tioConfig.ipBlacklist != null) {
                return tioConfig.ipBlacklist.isInBlacklist(ip) || GlobalIpBlacklist.INSTANCE.global.isInBlacklist(ip);
            } else {
                return GlobalIpBlacklist.INSTANCE.global.isInBlacklist(ip);
            }
        }
    
        public static void remove(TioConfig tioConfig, String ip) {
            tioConfig.ipBlacklist.remove(ip);
        }
    
        public static void remove(String ip) {
            GlobalIpBlacklist.INSTANCE.global.remove(ip);
        }
    }
    

    技术细节解析:

    • 全局与局部黑名单:支持针对特定TioConfig的局部黑名单和全局黑名单,增强灵活性。
    • 并发访问:通过SetWithLock和其他并发集合管理黑名单,确保线程安全。
  2. 发送消息

    Tio类提供了多种发送消息的方法,包括发送到单个连接、群组、指定 IP 等。

    public static Boolean send(ChannelContext channelContext, Packet packet) {
        return send(channelContext, packet, null, null);
    }
    
    private static Boolean send(ChannelContext channelContext, Packet packet, CountDownLatch countDownLatch, PacketSendMode packetSendMode) {
        // 发送逻辑
    }
    
    public static Boolean bSend(ChannelContext channelContext, Packet packet) {
        if (channelContext == null) {
            return false;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        return send(channelContext, packet, countDownLatch, PacketSendMode.SINGLE_BLOCK);
    }
    
    // 其他发送方法略
    

    关键方法解析:

    • send():核心发送方法,负责将消息包传递给SendPacketTask进行发送,并处理同步发送的逻辑。

    • bSend():阻塞发送方法,适用于需要同步等待发送结果的场景。通过CountDownLatch实现同步等待。

    • 群组与指定 IP 发送:通过遍历目标群组或 IP 对应的连接集合,批量发送消息,提高系统的广播能力。

  3. 关闭连接

    Tio提供了多种关闭连接的方法,包括关闭单个连接、群组、指定 IP 等。

    public static void close(ChannelContext channelContext, String remark) {
        close(channelContext, null, remark);
    }
    
    public static void close(ChannelContext channelContext, Throwable throwable, String remark) {
        close(channelContext, throwable, remark, false);
    }
    
    // 其他关闭方法略
    

    关键方法解析:

    • close():核心关闭方法,负责关闭连接并执行必要的清理操作。

    • remove():与close()方法类似,但不执行重连等维护操作,适用于需要立即移除连接的场景。

  4. 群组管理

    Tio提供了方法用于绑定和解绑群组,管理连接的群组关系。

    public static void bindGroup(ChannelContext channelContext, String group) {
        channelContext.tioConfig.groups.bind(group, channelContext);
    }
    
    public static void unbindGroup(String group, ChannelContext channelContext) {
        channelContext.tioConfig.groups.unbind(group, channelContext);
    }
    
    // 其他群组管理方法略
    

    技术细节解析:

    • 高效的群组管理:通过SetWithLock和并发集合管理群组内的连接,确保高效的群组操作。

    • 灵活的群组操作:支持动态的群组绑定与解绑,适应复杂的业务需求。

技术细节解析

  1. 并发与锁机制

    • 使用ReentrantLock和Condition管理异步操作的同步,确保多线程环境下的操作安全。
    • SetWithLock和MapWithLock封装了线程安全的集合操作,简化了并发编程。
  2. 心跳与连接检测

    • 通过心跳检测机制,定期检查连接的活跃性,及时关闭失效连接,提升系统的稳定性。
    • 使用AtomicLong和原子变量管理统计信息,确保高效的并发更新。
  3. SSL 支持

    • 通过SslConfig和SslFacadeContext实现 SSL 加密,确保数据传输的安全性。
    • 在发送和接收数据时,自动进行加密与解密,简化开发者的操作。
  4. 缓冲区管理

    • 使用ByteBufferPool管理缓冲区,减少内存分配与回收的开销,提升系统性能。
    • 通过ByteBufferUtils提供的工具方法,简化缓冲区操作。
  5. 错误处理与日志

    • 通过CompletionHandler接口的failed()方法统一处理读取与写入过程中的异常,确保系统的健壮性。
    • 在不同的错误场景下,记录详细的日志信息,便于开发者排查问题。

异常处理与连接关闭:CloseTask

CloseTask类负责管理连接的关闭过程,包括释放资源、更新状态、处理重连等。

类结构与成员变量

public class CloseTask {
    public static void close(ChannelContext channelContext) {
        // 关闭连接的逻辑
    }
}

关键方法解析

  1. 关闭连接

    close()方法负责关闭连接,并根据配置决定是否进行重连或清理操作。

    public static void close(ChannelContext channelContext) {
        boolean isNeedRemove = channelContext.closeMeta.isNeedRemove;
        String remark = channelContext.closeMeta.remark;
        Throwable throwable = channelContext.closeMeta.throwable;
        channelContext.stat.timeClosed = SystemTimer.currTime;
        try {
            if (channelContext.tioConfig.getAioListener() != null) {
                try {
                    channelContext.tioConfig.getAioListener().onBeforeClose(channelContext, throwable, remark, isNeedRemove);
                } catch (Throwable e) {
                    log.error(e.toString(), e);
                }
            }
    
            try {
                if (channelContext.isClosed && !isNeedRemove) {
                    return;
                }
    
                if (channelContext.isRemoved) {
                    return;
                }
    
                try {
                    if (isNeedRemove) {
                        MaintainUtils.remove(channelContext);
                    } else {
                        ClientTioConfig clientTioConfig = (ClientTioConfig) channelContext.tioConfig;
                        clientTioConfig.closeds.add(channelContext);
                        clientTioConfig.connecteds.remove(channelContext);
                        MaintainUtils.close(channelContext);
                    }
    
                    channelContext.setRemoved(isNeedRemove);
                    if (channelContext.tioConfig.statOn) {
                        channelContext.tioConfig.groupStat.closed.incrementAndGet();
                    }
                    channelContext.stat.timeClosed = SystemTimer.currTime;
                    channelContext.setClosed(true);
                } catch (Throwable e) {
                    log.error(e.toString(), e);
                } finally {
                    if (!isNeedRemove && channelContext.isClosed && !channelContext.isServer()) {
                        ClientChannelContext clientChannelContext = (ClientChannelContext) channelContext;
                        ReconnConf.put(clientChannelContext);
                    }
                }
            } catch (Throwable e) {
                log.error(throwable.toString(), e);
            }
        } finally {
            channelContext.isWaitingClose = false;
        }
    }
    

    步骤解析:

    • 状态检查:判断连接是否已经关闭或被移除,避免重复操作。

    • 调用监听器:如果配置了 AIO 监听器,调用onBeforeClose()方法,通知监听器连接即将关闭。

    • 资源释放:

      • 移除连接:如果需要移除,则从群组、用户等管理集合中移除连接。
      • 更新统计:更新全局和连接级别的关闭统计信息。
      • 设置状态:标记连接为已关闭和已移除。
    • 重连处理:对于客户端配置,如果连接被关闭且不需要移除,则将连接加入重连队列,尝试重新连接。

    • 异常处理:捕获并记录关闭过程中的异常,确保系统稳定性。

技术细节解析

  1. 资源管理

    在关闭连接时,确保所有相关资源(如缓冲区、引用等)被正确释放,防止内存泄漏和资源浪费。

  2. 状态同步

    通过设置标志位(如isClosed、isRemoved)确保连接状态的一致性,避免并发环境下的状态混乱。

  3. 重连机制

    对于客户端连接,在连接被关闭后,根据重连配置决定是否尝试重新连接,增强系统的容错性和可用性。

  4. 监听器回调

    提供钩子机制,通过 AIO 监听器的回调方法,让开发者在连接关闭前执行自定义逻辑,如资源清理、日志记录等。

  5. 多线程支持

    在多线程环境下,通过使用ReentrantLock和Condition,确保关闭操作的线程安全性。

示例运行与日志分析

基于用户提供的日志,以下是一个请求的处理流程解析。

测试请求

curl http://localhost:8000/ok

处理显示的日志

2024-12-06 18:23:40.134 [Thread-2] INFO  c.l.t.s.AcceptCompletionHandler.completed:68 - new connection:127.0.0.1,42972
2024-12-06 18:23:40.134 [Thread-2] INFO  c.l.t.c.t.DecodeTask.decode:41 - decode:127.0.0.1:42972
2024-12-06 18:23:40.134 [Thread-2] INFO  c.l.t.c.t.HandlePacketTask.handler:62 - handle:127.0.0.1:42972_281494
2024-12-06 18:23:40.135 [Thread-1] INFO  c.l.t.c.ReadCompletionHandler.completed:104 - close 127.0.0.1:42972, because The connection closed by peer
2024-12-06 18:23:40.135 [Thread-1512] INFO  c.l.t.c.WriteCompletionHandler.completed:57 - write 172 to 127.0.0.1:42972
2024-12-06 18:23:40.135 [Thread-1] INFO  c.l.t.c.Tio.close:455 - close 127.0.0.1:42972,remark:The connection closed by peer

日志解析

  1. 连接建立

    2024-12-06 18:23:40.134 [Thread-2] INFO  c.l.t.s.AcceptCompletionHandler.completed:68 - new connection:127.0.0.1,42972
    
    • 事件:新连接建立。
    • 信息:客户端 IP 为127.0.0.1,端口为42972。
    • 处理类:AcceptCompletionHandler的completed()方法。
  2. 数据解码

    2024-12-06 18:23:40.134 [Thread-2] INFO  c.l.t.c.t.DecodeTask.decode:41 - decode:127.0.0.1:42972
    
    • 事件:接收到的数据开始解码。
    • 信息:来自127.0.0.1:42972的数据正在解码。
    • 处理类:DecodeTask的decode()方法。
  3. 业务包处理

    2024-12-06 18:23:40.134 [Thread-2] INFO  c.l.t.c.t.HandlePacketTask.handler:62 - handle:127.0.0.1:42972_281494
    
    • 事件:业务包开始处理。
    • 信息:处理来自127.0.0.1:42972的业务包,包 ID 为281494。
    • 处理类:HandlePacketTask的handler()方法。
  4. 读取完成,连接关闭

    2024-12-06 18:23:40.135 [Thread-1] INFO  c.l.t.c.ReadCompletionHandler.completed:104 - close 127.0.0.1:42972, because The connection closed by peer
    
    • 事件:连接读取完成,且对端关闭连接。
    • 信息:因为对端关闭连接,127.0.0.1:42972的连接被关闭。
    • 处理类:ReadCompletionHandler的completed()方法。
  5. 写入完成

    2024-12-06 18:23:40.135 [Thread-1512] INFO  c.l.t.c.WriteCompletionHandler.completed:57 - write 172 to 127.0.0.1:42972
    
    • 事件:数据写入完成。
    • 信息:写入172字节到127.0.0.1:42972。
    • 处理类:WriteCompletionHandler的completed()方法。
  6. 连接关闭处理

    2024-12-06 18:23:40.135 [Thread-1] INFO  c.l.t.c.Tio.close:455 - close 127.0.0.1:42972,remark:The connection closed by peer
    
    • 事件:连接正式关闭。
    • 信息:关闭127.0.0.1:42972的连接,备注信息为The connection closed by peer。
    • 处理类:Tio类的close()方法。

技术细节解析

  1. 连接生命周期

    • 建立:通过AcceptCompletionHandler接受新连接,创建ChannelContext,并开始异步读取数据。
    • 数据处理:读取到数据后,通过ReadCompletionHandler进行解码,并交由HandlePacketTask处理业务逻辑。
    • 连接关闭:对端关闭连接后,ReadCompletionHandler触发关闭流程,调用Tio.close()进行连接的清理和资源释放。
  2. 异步操作与回调

    • 非阻塞通信:使用 Java AIO 的异步通道,实现高效的非阻塞网络通信。
    • 回调机制:通过实现CompletionHandler接口,处理异步操作完成后的回调,实现事件驱动的通信模型。
  3. 错误处理

    • 读取异常:在读取过程中,如果出现异常,调用Tio.close()关闭连接,确保系统的稳定性。
    • 写入异常:在写入过程中,如果出现异常,同样调用Tio.close()关闭连接。
  4. 统计与监控

    • 连接统计:统计已接收连接数、当前连接数、发送和接收的字节数等,提供系统的运行状态监控。
    • 心跳检测:通过心跳机制,检测连接的活跃性,及时关闭失效连接。

总结与技术要点

本文基于提供的代码片段,详细解析了 Tio 框架的核心配置、服务器启动与监听、连接接收处理、数据读取与解码、消息处理、网络通信管理、异常处理与连接关闭等关键组件和功能。通过深入理解这些组件的结构和交互方式,开发者可以更好地应用 Tio 框架,构建高性能、稳定的网络应用。

关键技术要点

  1. 高效的异步通信:基于 Java AIO 的异步通道,实现高并发下的高效网络通信。
  2. 灵活的配置管理:通过TioConfig和ServerTioConfig,提供丰富的配置选项,满足不同场景需求。
  3. 心跳与连接检测:通过心跳机制和统计信息,确保连接的活跃性和系统的稳定性。
  4. IP 黑名单与安全:支持 IP 黑名单功能,增强系统的安全性,防止恶意连接。
  5. 消息编码与解码:通过 AIO 处理器和解码任务,灵活处理不同协议和业务逻辑的数据包。
  6. 群组与用户管理:提供群组、用户、令牌等多种连接管理方式,适应复杂的业务需求。
  7. 异常处理与日志记录:统一的异常处理机制和详细的日志记录,帮助开发者快速定位和解决问题。
  8. 资源管理与性能优化:通过缓冲区池和并发集合,优化资源管理和系统性能,减少内存开销。

通过深入理解 Tio 框架的这些关键技术要点,开发者可以充分发挥其优势,构建高性能、稳定、安全的网络应用。

参考文献

tiocloud 文档资料

Edit this page
Last Updated:
Contributors: Tong Li
Prev
拉黑 IP