Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FatJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 使用Systemctl启动项目
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
    • 已过时
    • 胖包与瘦包的打包与部署
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
    • 开启虚拟线程(Virtual Thread)
    • 框架级错误通知
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件上传
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • Controller拦截器
    • 请求拦截器
    • LoggingInterceptor
    • 全局异常处理器
    • 异步处理
    • 动态 返回 CSS 实现
    • 返回图片
    • 跨域
    • 添加 Controller
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • handler入门
    • 返回 multipart
    • 待定
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 常用工具类
    • HTTP Basic 认证
    • Http响应加密
    • 使用零拷贝发送大文件
    • 分片上传
    • 接口访问统计
    • 接口请求和响应数据记录
    • WebJars
    • JProtobuf
    • 测速
    • Gzip Bomb:使用压缩炸弹防御恶意爬虫
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板 (SqlTemplates)
    • 数据源配置与使用
    • ActiveRecord
    • Db 工具类
    • 批量操作
    • Model
    • Model生成器
    • 注解
    • 异常处理
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • 预留
    • 预留
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
    • TQL(Table SQL)前端输入规范
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 自定义注解拦截器
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • FixedTokenInterceptor
    • TokenManager
    • 数据表
    • 匿名登录
    • 注册和登录
    • 个人中心
    • 重置密码
    • Google 登录
    • 短信登录
    • 移动端微信登录
    • 移动端重置密码
    • 微信登录
    • 移动端微信登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • Tio-Boot 整合 Java-DB 与 Enjoy 模板引擎示例
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • 独立端口启动 TCP 服务器
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • SSL
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • Email
    • JSON
    • File
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils 配置工具
    • 系统监控
    • 线程
    • 虚拟线程
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
    • TCP数据转发
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 与 magic-script 集成指南
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 存储文件到 亚马逊 S3
    • 存储文件到 腾讯 COS
    • 存储文件到 阿里云 OSS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • tio-boot 整合 spring-boot-starter
    • Tio Boot 整合 Spring Boot Starter db
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_quarkus

    • Quarkus(无 HTTP)整合 tio-boot(有 HTTP)
    • tio-boot + Quarkus + Hibernate ORM Panache
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • 索引
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 全文检索
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 42_mysql

    • 使用 Docker 运行 MySQL
    • 常见问题
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • 待定
  • 49_jooq

    • 使用配置类方式整合 jOOQ
    • tio-boot + jOOQ 事务管理
    • 批量操作与性能优化
    • 代码生成(可选)与类型安全升级
    • JSONB、Upsert、窗口函数实战
    • 整合agroal
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • 待定
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
    • 获取视频长度
    • 保存视频的最后一帧
    • 添加水印
    • linux版本
  • 55_cv

    • 使用 Java 运行 YOLOv8 ONNX 模型进行目标检测
    • tio-boot整合yolo
    • ONNX Runtime 推理说明
  • 58_telegram4j

    • 数据库设计
    • 基于 HTTP 协议开发 Telegram 翻译机器人
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 59_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • 流式生成
    • 图片多模态输入
    • 协议自动转换 Google Gemini示例
    • 请求记录
    • 限流和错误处理
    • 整合Gemini realtime模型
    • Voice Agent 前端接入接口文档
    • 整合千问realtime模型
    • 增强检索(RAG)
    • 搜索+AI
    • AI 问答
    • 连接代码执行器
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • Perplexity API
    • 意图识别
    • 智能问答
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 使用 OpenAI ASR 实现语音识别接口(Java 后端示例)
    • 定向搜索
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_ai-coding

    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_java-uni-ai-server

    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 待定
  • 67_java-llm-proxy

    • 使用tio-boot搭建多模型LLM代理服务
  • 68_java-kit-server

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • 执行 Python (Manim) 代码
    • 待定
    • 待定
    • 待定
    • 视频下载增加水印说明文档
  • 69_ai-brower

    • AI Browser:基于用户指令的浏览器自动化系统
    • 提示词
    • dom构建- buildDomTree.js
    • dom构建- 将网页可点击元素提取与可视化
    • 提取网内容
    • 启动浏览器
    • 操作浏览器指令
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 多图片管理
    • 单图片管理(只读模式)
    • 布尔值管理
    • 字段联动
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 73_tio-mail-wing

    • tio-mail-wing简介
    • 任务1:实现POP3系统
    • 使用 getmail 验证 tio-mail-wing POP3 服务
    • 任务2:实现 SMTP 服务
    • 数据库初始化文档
    • 用户管理
    • 邮件管理
    • 任务3:实现 SMTP 服务 数据库版本
    • 任务4:实现 POP3 服务(数据库版本)
    • IMAP 协议
    • 拉取多封邮件
    • 任务5:实现 IMAP 服务(数据库版本)
    • IMAP实现讲解
    • IMAP 手动测试脚本
    • IMAP 认证机制
    • 主动推送
  • 74_tio-mcp-server

    • 实现 MCP Server 开发指南
  • 75_tio-sip

    • SIP Server 第一版原理说明
    • SIP Server 第一版实战
    • 使用livekit-sip进行测试
    • SIP Server 第二版实战
    • SIP Server 第三版实战
    • 性能优化
    • 基于 MediaProcessor 对接 Realtime 模型说明
    • 对接大语言模型
    • 支持 G722 宽带语音
    • G722编码和解码
  • 76_manim

    • Teach me anything - 基于大语言的知识点讲解视频生成系统
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • TTS服务端
    • 废弃
    • 废弃
    • 废弃
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • ManimGL(manimgl)
    • Manim 实战入门:用代码创造数学动画
    • 欢迎
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
    • HTTP/1.1 Pipelining 性能测试报告
    • tio-boot vs Quarkus 性能对比测试报告
  • 81_tio-boot

    • 简介
    • Swagger 整合到 Tio-Boot 中的指南
    • 待定
    • 待定
    • 高性能网络编程中的 ByteBuffer 分配与回收策略
    • TioBootServerHandler 源码解析
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

支持 G722 宽带语音

一、背景

当前项目基于 tio-core 自研了一套轻量级 SIP Server,用于承接语音呼叫接入,并为后续语音机器人、ASR、TTS、Realtime LLM 对话等能力提供底层通话链路。

在前几个阶段中,系统已经逐步完成了以下能力建设:

第一阶段:最小链路打通

第一版主要完成了 SIP 和 RTP 的基础连通性验证,具备以下能力:

  • 监听 SIP TCP 5060 和 SIP UDP 5060
  • 收到 INVITE 后解析 SIP/SDP
  • 动态分配 RTP 端口
  • 返回 200 OK + SDP
  • 接收对端 RTP 音频
  • 通过 RTP Echo 形成通话闭环

这一阶段的意义在于证明:

  • tio-core 可以承载 SIP 信令和 RTP 数据
  • Java 可以完成 RTP 端口动态管理
  • SIP / SDP / RTP 三层配合是正确的

第二阶段:SIP 会话与 SDP 协商增强

第二版在“能跑通”的基础上,开始补齐协议和会话层能力,完成了:

  • SIP TCP 流式解码
  • SIP 消息对象化解析与编码
  • CallSession 生命周期管理
  • ACK 超时清理
  • SDP Offer / Answer 协商
  • 根据共同支持的 codec 返回合法 answer

这一阶段使系统从“可运行 demo”逐步演进为“具备基本 SIP Server 能力的语音接入骨架”。


第三阶段:媒体层从网络包回显升级到音频帧处理

第三版不再停留在“收到 UDP 包原样发回”,而是开始建立真正的媒体处理链:

  • RTP header 解析
  • G.711 编解码
  • AudioFrame 统一音频帧抽象
  • MediaProcessor 媒体处理接口
  • EchoMediaProcessor 音频帧级回环
  • RTP 重组并回发

这一阶段最重要的意义,不是“再做一次 echo”,而是建立了后续可接入 AI 语音能力的媒体架构。


二、现阶段问题

当前系统虽然已经具备接入 Realtime 模型的能力,但在电话语音场景中仍存在一个明显问题:

当前协商结果多为 G711 窄带语音

从实际日志可以看到,对端 SDP Offer 中通常会同时提供:

  • G722
  • PCMU
  • PCMA
  • telephone-event

例如:

m=audio 15282 RTP/AVP 9 0 8 101
a=rtpmap:9 G722/8000
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=rtpmap:101 telephone-event/8000
a=ptime:20

但是当前本端 SDP 协商能力只支持:

  • PCMU
  • PCMA

因此即使对端已经支持 G722,最终仍会协商成:

  • PCMU/8000
  • 或 PCMA/8000

这意味着 RTP 层输入输出仍然是 8k 窄带语音。


对接 Realtime 模型时存在额外重采样损耗

Realtime 模型通常要求:

  • 输入:16k PCM
  • 输出:24k PCM

而当前电话链路实际协商的是 8k,因此媒体链路会变成:

当前窄带链路

  • SIP 输入:8k
  • 上采样到 16k,送入 Realtime 模型
  • 模型输出 24k
  • 再下采样到 8k
  • 编码回 RTP

即:

8k -> 16k -> Realtime Model -> 24k -> 8k

这条链路虽然可用,但存在两个问题:

  1. 电话侧本身已经是窄带音频,信息量有限
  2. 需要在模型边界做额外的升采样和降采样

如果能够在 SIP 协商阶段优先使用 G722,则媒体链路可升级为宽带语音:

16k -> Realtime Model -> 24k -> 16k

这样可以显著减少损耗,并提升整体语音质量。


三、目标

本次改造的目标不是“永远强制使用 G722”,而是建立一套更合理、更可扩展的媒体处理机制,使系统能够根据当前会话协商结果动态工作。

整体目标包括四个方面。

1. SDP 协商层支持并优先选择 G722

当对端 SDP Offer 中包含 G722,且本端也支持时,优先协商 G722。 如果对端不支持 G722,则自动回退到 PCMU 或 PCMA。


2. RTP 编解码层支持 G722

在现有 PCMU / PCMA 之外,补齐 G722Codec,使 RTP 层能够:

  • 按协商结果正确选择 codec
  • 正确完成 RTP payload 与 PCM16 之间的转换

3. 媒体处理层不再写死 8k

当前 RealtimeMediaProcessor 内部仍然隐含了“电话侧一定是 8k”的假设。 本次改造后,媒体处理层应当按当前会话协商格式动态工作:

  • 如果当前协商是 PCMU/PCMA,则按 8k 处理
  • 如果当前协商是 G722,则按 16k 处理

4. Realtime 内部保持固定模型格式

桥接 Realtime 模型这一层不需要感知 SIP 侧 codec 变化,它只需要保持模型要求的固定格式:

  • 模型输入:16k PCM
  • 模型输出:24k PCM

真正负责做“会话采样率 ↔ 模型采样率”转换的,是 RealtimeMediaProcessor。


四、设计原则与责任边界

为了避免职责混乱,这次改造必须先把责任边界定义清楚。

1. 媒体链路定义

当前第三版媒体链路已经比较明确:

  1. RTP 收到包
  2. 解析 RTP header
  3. 按协商 codec 解码成 PCM
  4. 构造成 AudioFrame
  5. 交给 MediaProcessor
  6. MediaProcessor 返回新的 AudioFrame
  7. 按当前 codec 编码
  8. 重组 RTP 并发回对端

2. 核心责任划分

RTP 层职责

RTP 层负责:

  • 按协商 codec 收 RTP
  • 解析 RTP header
  • 按 codec 解码成 PCM
  • 调用 MediaProcessor
  • 按 codec 编码输出
  • 重新封装 RTP

也就是说:

codec encode / decode 是 RTP 层职责,不是 MediaProcessor 职责。


MediaProcessor 职责

MediaProcessor 只负责处理“音频内容本身”,不负责:

  • RTP 封包
  • SIP 协商
  • codec 选择
  • G711 / G722 编码解码

对于 RealtimeMediaProcessor 而言,它的职责是:

  • 接收当前会话格式的 PCM 音频
  • 转成模型输入格式
  • 发送给 Realtime 模型
  • 接收模型输出音频
  • 转回当前会话格式
  • 返回 AudioFrame

因此可以总结为一句话:

RTP 层负责“按协商 codec 收发音频”,RealtimeMediaProcessor 负责“在会话采样率和模型采样率之间转换”。


3.目标状态

改造完成后,系统应具备以下行为:

当对端支持 G722 时

  • SDP 优先协商 G722
  • RTP 解码得到 16k PCM
  • RealtimeMediaProcessor 直接使用 16k 作为模型输入
  • 模型返回 24k PCM
  • 下采样到 16k
  • G722 编码回 RTP

链路如下:

16k -> Realtime Model -> 24k -> 16k

当对端只支持 PCMU / PCMA 时

  • SDP 回退到 G711
  • RTP 解码得到 8k PCM
  • RealtimeMediaProcessor 上采样到 16k
  • 模型返回 24k PCM
  • 下采样到 8k
  • G711 编码回 RTP

链路如下:

8k -> 16k -> Realtime Model -> 24k -> 8k

4、改造方案

本次改造采用“方案二”: 增加独立工具类,根据 CodecSpec 推导当前会话 PCM 参数。

这样做的好处是:

  • 不把 codec 判断逻辑散落在各个类里
  • 不把采样率推导逻辑写进 CallSession
  • 让 RTP 层、MediaProcessor、Realtime 都复用同一套会话格式推导逻辑

1. SDP 协商层支持并优先选择 G722

修改 SdpParser.defaultSupportedCodecs(),将本端支持 codec 列表改为:

  1. G722
  2. PCMU
  3. PCMA

因为 chooseCodec() 本来就是按本端支持顺序优先匹配,所以只要把 G722 放在第一位,就可以在对端 offer G722 时优先协商到它。

这一层的职责只是:

  • 识别对端 offer
  • 选择双方都支持的 codec
  • 把协商结果写入 CallSession

2. 增加会话音频格式解析工具类

新增独立工具类:

  • NegotiatedAudioFormatResolver

用于根据 CallSession 或 CodecSpec 推导当前会话实际 PCM 参数。

例如:

  • PCMU -> 8000
  • PCMA -> 8000
  • G722 -> 16000

这个工具类会被以下模块复用:

  • RtpUdpHandler
  • RealtimeMediaProcessor
  • SipRealtimeSession
  • 后续其他 MediaProcessor

这样就避免了“到处 if codecName == ...”的散乱写法。


3. 增加 G722Codec

新增:

  • com.litongjava.sip.rtp.codec.G722Codec

它实现 AudioCodec 接口,职责是:

  • RTP payload -> PCM16
  • PCM16 -> RTP payload

需要注意的是:

  • SDP / RTP 中 G722/8000 的 8000 是 RTP clock rate 表示
  • 对媒体处理层而言,G722 应按 16k PCM 工作

因此:

  • CodecSpec.clockRate = 8000
  • G722Codec.sampleRate() = 16000

这一点必须区分清楚,不能混淆。


4. RTP 层支持 G722 并按会话格式工作

升级 RtpUdpHandler:

  • chooseCodec(session) 增加 G722 支持

  • 输入 AudioFrame.sampleRate 不再假设 8000,而是来自 codec

  • 在 MediaProcessor 输出后,增加一个兜底步骤:

    • 如果返回的采样率与当前 codec 需要的采样率不一致
    • 则先重采样到 codec 所需采样率
    • 再执行编码

这样 RTP 层就具备了真正意义上的“按会话 codec 工作”的能力。


5. RealtimeMediaProcessor 按会话协商格式动态处理

当前 RealtimeMediaProcessor 里写死了:

  • SIP 输入 8000
  • 模型输入 16000
  • 模型输出 24000
  • SIP 输出 8000

改造后,真正保留写死的只有模型侧:

  • 模型输入 16000
  • 模型输出 24000

而 SIP 侧的输入输出采样率应动态来自:

  • session.getSelectedCodec()
  • 通过 NegotiatedAudioFormatResolver 推导得到

这样处理流程变成:

上行

  • sessionSampleRate -> 16000 -> send to model

下行

  • 24000 -> sessionSampleRate -> return AudioFrame

这样当协商结果是 G722 时,就不会再多做一次无意义的 8k 降采样。


6. SipRealtimeSession 按会话采样率缓存模型下行音频

当前模型下行音频固定按 24k 返回。 在 SipRealtimeSession 中,需要根据当前通话会话格式,将模型输出统一转换到当前会话采样率后再入队。

这样 RealtimeMediaProcessor 从缓冲里拿到的就始终是“当前会话格式的 PCM”,不需要额外假设是 8k。


7 关键类说明

本次改造涉及以下关键类。

SdpParser

负责:

  • 解析对端 SDP Offer
  • 识别 G722 / PCMU / PCMA / telephone-event
  • 根据本端支持列表选择最终 codec
  • 输出协商结果

NegotiatedAudioFormatResolver

负责:

  • 根据 CodecSpec 推导当前会话实际 PCM 参数

  • 统一封装:

    • 会话采样率
    • 声道数

这是本次“方案二”的核心工具类。


G722Codec

负责:

  • G722 RTP payload 与 PCM16 之间的转换
  • 向 RTP 层暴露统一的 AudioCodec 接口

RtpUdpHandler

负责:

  • RTP 解析
  • codec 选择
  • codec 解码 / 编码
  • 调用 MediaProcessor
  • RTP 重组和发送

它仍然是媒体入口和出口,但不再只局限于 G711。


RealtimeMediaProcessor

负责:

  • 接收当前会话格式 PCM
  • 转为模型输入格式 16k
  • 调用 RealtimeModelBridge
  • 接收模型 24k 输出
  • 转回当前会话格式 PCM

SipRealtimeSession

负责:

  • 管理某一路 SIP 通话与某一路 Realtime 会话的绑定关系
  • 缓冲模型下行音频
  • 将模型输出转换到当前会话采样率后入队

五 实战

  • SdpParser.java:支持并优先协商 G722
  • NegotiatedAudioFormatResolver.java:独立工具类,统一推导会话采样率
  • AudioResampler.java:重采样工具
  • G722Codec.java:G722 codec 接口实现骨架
  • RtpUdpHandler.java:支持 G722,并在输出前按当前会话 codec 兜底重采样
  • RealtimeMediaProcessor.java:按会话协商格式动态处理,不再写死 8k
  • SipRealtimeSession.java:按会话采样率缓存模型下行音频

其中 G722Codec 我把类结构、接口和调用点都补齐了;具体的 G722 编解码算法,需要替换成你项目实际采用的库实现。这一点我不想装成“已经完整实现”来误导你。其余类可以直接落到你当前工程里。


1. SdpParser.java

package com.litongjava.sip.sdp;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SdpParser {

  private final List<CodecSpec> localSupportedCodecs;

  public SdpParser() {
    this.localSupportedCodecs = defaultSupportedCodecs();
  }

  public SdpParser(List<CodecSpec> localSupportedCodecs) {
    this.localSupportedCodecs = localSupportedCodecs;
  }

  public SdpNegotiationResult negotiate(byte[] sdpBytes) {
    if (sdpBytes == null || sdpBytes.length == 0) {
      return SdpNegotiationResult.fail("missing sdp offer");
    }

    String sdp = new String(sdpBytes, StandardCharsets.US_ASCII);
    String[] lines = sdp.split("\r\n");

    String sessionConnectionIp = null;
    String mediaConnectionIp = null;
    int remoteAudioPort = 0;
    int ptime = 20;

    List<Integer> offeredPayloadTypes = new ArrayList<>();
    Map<Integer, CodecSpec> offeredCodecMap = new HashMap<>();

    boolean inAudioMedia = false;
    boolean telephoneEventSupported = false;
    int telephoneEventPt = -1;

    for (String line : lines) {
      if (line == null || line.isEmpty()) {
        continue;
      }

      if (line.startsWith("c=")) {
        String[] parts = line.split(" ");
        if (parts.length >= 3) {
          String ip = parts[2].trim();
          if (inAudioMedia) {
            mediaConnectionIp = ip;
          } else {
            sessionConnectionIp = ip;
          }
        }
        continue;
      }

      if (line.startsWith("m=")) {
        inAudioMedia = false;

        String[] parts = line.split(" ");
        if (parts.length >= 4 && parts[0].startsWith("m=audio")) {
          inAudioMedia = true;
          try {
            remoteAudioPort = Integer.parseInt(parts[1].trim());
          } catch (Exception e) {
            return SdpNegotiationResult.fail("invalid remote audio port");
          }

          for (int i = 3; i < parts.length; i++) {
            try {
              offeredPayloadTypes.add(Integer.parseInt(parts[i].trim()));
            } catch (Exception ignore) {
            }
          }
        }
        continue;
      }

      if (!inAudioMedia) {
        continue;
      }

      if (line.startsWith("a=rtpmap:")) {
        try {
          int colon = line.indexOf(':');
          int space = line.indexOf(' ');
          if (colon < 0 || space < 0 || space <= colon) {
            continue;
          }

          int pt = Integer.parseInt(line.substring(colon + 1, space).trim());
          String[] enc = line.substring(space + 1).trim().split("/");
          if (enc.length < 2) {
            continue;
          }

          String codecName = enc[0].trim();
          int clockRate = Integer.parseInt(enc[1].trim());

          CodecSpec spec = new CodecSpec(pt, codecName, clockRate);
          offeredCodecMap.put(pt, spec);

          if ("telephone-event".equalsIgnoreCase(codecName) && clockRate == 8000) {
            telephoneEventSupported = true;
            telephoneEventPt = pt;
          }
        } catch (Exception ignore) {
        }
        continue;
      }

      if (line.startsWith("a=ptime:")) {
        try {
          ptime = Integer.parseInt(line.substring("a=ptime:".length()).trim());
        } catch (Exception ignore) {
        }
      }
    }

    if (remoteAudioPort <= 0) {
      return SdpNegotiationResult.fail("missing audio media");
    }

    String remoteIp = mediaConnectionIp != null ? mediaConnectionIp : sessionConnectionIp;
    if (remoteIp == null || remoteIp.isEmpty()) {
      return SdpNegotiationResult.fail("missing connection address");
    }

    CodecSpec selected = chooseCodec(offeredPayloadTypes, offeredCodecMap);
    if (selected == null) {
      return SdpNegotiationResult.fail("no supported audio codec");
    }

    SdpNegotiationResult result = SdpNegotiationResult.ok();
    result.setRemoteRtpIp(remoteIp);
    result.setRemoteRtpPort(remoteAudioPort);
    result.setSelectedCodec(selected);
    result.setTelephoneEventSupported(telephoneEventSupported);
    result.setRemoteTelephoneEventPayloadType(telephoneEventPt);
    result.setPtime(ptime);
    return result;
  }

  private CodecSpec chooseCodec(List<Integer> offeredPayloadTypes, Map<Integer, CodecSpec> offeredCodecMap) {
    for (CodecSpec local : localSupportedCodecs) {
      for (Integer pt : offeredPayloadTypes) {
        CodecSpec offered = offeredCodecMap.get(pt);

        if (offered != null) {
          if (local.isSameCodec(offered.getCodecName(), offered.getClockRate())) {
            return new CodecSpec(pt, offered.getCodecName(), offered.getClockRate());
          }
        } else {
          if (pt == 0 && local.isStaticPcmu()) {
            return new CodecSpec(0, "PCMU", 8000);
          }
          if (pt == 8 && local.isStaticPcma()) {
            return new CodecSpec(8, "PCMA", 8000);
          }
          if (pt == 9 && "G722".equalsIgnoreCase(local.getCodecName()) && local.getClockRate() == 8000) {
            return new CodecSpec(9, "G722", 8000);
          }
        }
      }
    }
    return null;
  }

  public static List<CodecSpec> defaultSupportedCodecs() {
    List<CodecSpec> codecs = new ArrayList<>();
    codecs.add(new CodecSpec(9, "G722", 8000));
    codecs.add(new CodecSpec(0, "PCMU", 8000));
    codecs.add(new CodecSpec(8, "PCMA", 8000));
    return codecs;
  }
}

2. NegotiatedAudioFormatResolver.java

package com.litongjava.sip.rtp.codec;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.sdp.CodecSpec;

public final class NegotiatedAudioFormatResolver {

  private NegotiatedAudioFormatResolver() {
  }

  public static int resolveSessionPcmSampleRate(CallSession session) {
    if (session == null) {
      return 8000;
    }
    return resolveSessionPcmSampleRate(session.getSelectedCodec());
  }

  public static int resolveSessionPcmSampleRate(CodecSpec codecSpec) {
    if (codecSpec == null || codecSpec.getCodecName() == null) {
      return 8000;
    }

    String codecName = codecSpec.getCodecName();
    if ("G722".equalsIgnoreCase(codecName)) {
      return 16000;
    }
    if ("PCMU".equalsIgnoreCase(codecName)) {
      return 8000;
    }
    if ("PCMA".equalsIgnoreCase(codecName)) {
      return 8000;
    }

    return codecSpec.getClockRate() > 0 ? codecSpec.getClockRate() : 8000;
  }

  public static int resolveChannels(CallSession session) {
    return 1;
  }
}

3. AudioResampler.java

如果你项目里还没有通用重采样器,就加这个。

package com.litongjava.sip.rtp.codec;

import java.util.Arrays;
import java.util.Objects;

public final class AudioResampler {

  private AudioResampler() {
  }

  public static short[] resample(short[] input, int srcRate, int dstRate) {
    Objects.requireNonNull(input, "input");

    if (input.length == 0) {
      return new short[0];
    }
    if (srcRate <= 0 || dstRate <= 0) {
      throw new IllegalArgumentException("sample rate must be > 0");
    }
    if (srcRate == dstRate) {
      return Arrays.copyOf(input, input.length);
    }

    double ratio = (double) dstRate / (double) srcRate;
    int outputLength = Math.max(1, (int) Math.round(input.length * ratio));
    short[] output = new short[outputLength];

    for (int i = 0; i < outputLength; i++) {
      double srcIndex = i / ratio;
      int left = (int) Math.floor(srcIndex);
      int right = Math.min(left + 1, input.length - 1);
      double frac = srcIndex - left;

      double sample = input[left] * (1.0 - frac) + input[right] * frac;
      output[i] = clampToShort(sample);
    }
    return output;
  }

  private static short clampToShort(double value) {
    if (value > Short.MAX_VALUE) {
      return Short.MAX_VALUE;
    }
    if (value < Short.MIN_VALUE) {
      return Short.MIN_VALUE;
    }
    return (short) Math.round(value);
  }
}

4. G722Codec.java

这个类把接口和调用点补齐了,但编解码算法本体你需要替换成你实际采用的库实现。

package com.litongjava.sip.rtp.codec;

public class G722Codec implements AudioCodec {

  @Override
  public String codecName() {
    return "G722";
  }

  @Override
  public int payloadType() {
    return 9;
  }

  /**
   * RTP/SDP 里 G722 常见写法是 G722/8000,
   * 但这里返回的是媒体处理层使用的 PCM 采样率。
   */
  @Override
  public int sampleRate() {
    return 16000;
  }

  @Override
  public short[] decode(byte[] payload) {
    throw new UnsupportedOperationException(
        "G722 decode is not implemented yet. Please replace this class with your actual G722 decoder implementation.");
  }

  @Override
  public byte[] encode(short[] pcm16) {
    throw new UnsupportedOperationException(
        "G722 encode is not implemented yet. Please replace this class with your actual G722 encoder implementation.");
  }
}

5. RtpUdpHandler.java

package com.litongjava.sip.rtp.server;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioCodec;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.G722Codec;
import com.litongjava.sip.rtp.codec.NegotiatedAudioFormatResolver;
import com.litongjava.sip.rtp.codec.PcmaCodec;
import com.litongjava.sip.rtp.codec.PcmuCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.EchoMediaProcessor;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.sip.rtp.packet.RtpPacket;
import com.litongjava.sip.rtp.packet.RtpPacketParser;
import com.litongjava.sip.rtp.packet.RtpPacketWriter;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RtpUdpHandler implements UdpHandler {

  private final int localPort;
  private final CallSessionManager sessionManager;

  private final RtpPacketParser rtpPacketParser = new RtpPacketParser();
  private final RtpPacketWriter rtpPacketWriter = new RtpPacketWriter();
  private final MediaProcessor mediaProcessor;

  private final AudioCodec pcmuCodec = new PcmuCodec();
  private final AudioCodec pcmaCodec = new PcmaCodec();
  private final AudioCodec g722Codec = new G722Codec();

  public RtpUdpHandler(int localPort, CallSessionManager sessionManager) {
    this(localPort, sessionManager, new EchoMediaProcessor());
  }

  public RtpUdpHandler(int localPort, CallSessionManager sessionManager, MediaProcessor mediaProcessor) {
    this.localPort = localPort;
    this.sessionManager = sessionManager;
    this.mediaProcessor = mediaProcessor;
  }

  @Override
  public void handler(UdpPacket udpPacket, DatagramSocket socket) {
    try {
      CallSession session = sessionManager.getByLocalRtpPort(localPort);
      if (session == null || session.isTerminated()) {
        return;
      }

      Node remote = udpPacket.getRemote();
      byte[] data = udpPacket.getData();
      if (data == null || data.length < 12) {
        return;
      }

      RtpPacket in = rtpPacketParser.parse(data);

      if (session.getRemoteRtpIp() == null || session.getRemoteRtpIp().isEmpty()) {
        session.setRemoteRtpIp(remote.getIp());
      }
      if (session.getRemoteRtpPort() <= 0) {
        session.setRemoteRtpPort(remote.getPort());
      }

      if (session.isTelephoneEventSupported()
          && in.getPayloadType() == session.getRemoteTelephoneEventPayloadType()) {
        session.setUpdatedTime(System.currentTimeMillis());
        return;
      }

      AudioCodec codec = chooseCodec(session);
      if (codec == null) {
        log.warn("No codec selected for callId={}", session.getCallId());
        return;
      }

      short[] pcm = codec.decode(in.getPayload());
      AudioFrame inputFrame = new AudioFrame(
          pcm,
          codec.sampleRate(),
          NegotiatedAudioFormatResolver.resolveChannels(session),
          in.getTimestamp());

      AudioFrame outputFrame = mediaProcessor.process(inputFrame, session);
      if (outputFrame == null || outputFrame.getSamples() == null || outputFrame.getSamples().length == 0) {
        log.info("MediaProcessor returned no audio, callId={}", session.getCallId());
        return;
      }

      int targetSampleRate = codec.sampleRate();
      short[] outputSamples = outputFrame.getSamples();
      int outputSampleRate = outputFrame.getSampleRate() > 0 ? outputFrame.getSampleRate() : targetSampleRate;

      if (outputSampleRate != targetSampleRate) {
        outputSamples = AudioResampler.resample(outputSamples, outputSampleRate, targetSampleRate);
      }

      byte[] outPayload = codec.encode(outputSamples);

      RtpPacket out = new RtpPacket();
      out.setVersion(2);
      out.setPadding(false);
      out.setExtension(false);
      out.setCsrcCount(0);
      out.setMarker(false);
      out.setPayloadType(session.getSelectedCodec().getPayloadType());
      out.setSequenceNumber(session.nextSendSequence());
      out.setTimestamp(session.nextSendTimestamp(outputSamples.length));
      out.setSsrc(session.getLocalSsrc());
      out.setPayload(outPayload);

      byte[] outBytes = rtpPacketWriter.write(out);

      DatagramPacket resp = new DatagramPacket(
          outBytes,
          outBytes.length,
          new InetSocketAddress(session.getRemoteRtpIp(), session.getRemoteRtpPort()));
      socket.send(resp);

      session.setUpdatedTime(System.currentTimeMillis());
    } catch (Exception e) {
      log.error("rtp handler error, localPort={}", localPort, e);
    }
  }

  private AudioCodec chooseCodec(CallSession session) {
    if (session.getSelectedCodec() == null || session.getSelectedCodec().getCodecName() == null) {
      return null;
    }

    String codecName = session.getSelectedCodec().getCodecName();
    if ("G722".equalsIgnoreCase(codecName)) {
      return g722Codec;
    }
    if ("PCMU".equalsIgnoreCase(codecName)) {
      return pcmuCodec;
    }
    if ("PCMA".equalsIgnoreCase(codecName)) {
      return pcmaCodec;
    }
    return null;
  }
}

6. RealtimeMediaProcessor.java

package com.litongjava.voice.agent.sip;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.NegotiatedAudioFormatResolver;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeModelBridgeFactory;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RealtimeMediaProcessor implements MediaProcessor {

  private static final int MODEL_INPUT_SAMPLE_RATE = 16000;
  private static final int MODEL_OUTPUT_SAMPLE_RATE = 24000;

  private final String platform;
  private final RealtimeSetupCallback realtimeSetupCallback;
  private final SipSessionRegistry sessionRegistry;

  public RealtimeMediaProcessor(RealtimeSetupCallback realtimeSetupCallback) {
    this(EnvUtils.getStr("vioce.agent.platform"), realtimeSetupCallback, new SipSessionRegistry());
  }

  public RealtimeMediaProcessor(String platform, RealtimeSetupCallback realtimeSetupCallback,
      SipSessionRegistry sessionRegistry) {
    this.platform = platform;
    this.realtimeSetupCallback = realtimeSetupCallback;
    this.sessionRegistry = sessionRegistry;
  }

  @Override
  public AudioFrame process(AudioFrame input, CallSession session) {
    if (input == null || session == null) {
      return null;
    }

    String callId = getCallId(session);
    if (StrUtil.isBlank(callId)) {
      log.warn("callId is blank, skip processing");
      return null;
    }

    SipRealtimeSession sipSession = sessionRegistry.getOrCreate(callId, this::createSipSession);

    try {
      sipSession.ensureConnected(session);

      short[] inputSamples = input.getSamples();
      if (inputSamples == null || inputSamples.length == 0) {
        return null;
      }

      int sessionSampleRate = NegotiatedAudioFormatResolver.resolveSessionPcmSampleRate(session);
      int inputSampleRate = input.getSampleRate() > 0 ? input.getSampleRate() : sessionSampleRate;

      short[] modelInputSamples = inputSamples;
      if (inputSampleRate != MODEL_INPUT_SAMPLE_RATE) {
        modelInputSamples = AudioResampler.resample(inputSamples, inputSampleRate, MODEL_INPUT_SAMPLE_RATE);
      }

      byte[] pcm16kBytes = PcmCodec.shortsToLittleEndianBytes(modelInputSamples);
      sipSession.sendToModel(pcm16kBytes);

      short[] outputSamples = sipSession.takeOutputFrame(inputSamples.length);
      if (outputSamples == null) {
        return null;
      }

      return new AudioFrame(
          outputSamples,
          sessionSampleRate,
          NegotiatedAudioFormatResolver.resolveChannels(session),
          input.getRtpTimestamp());
    } catch (Exception e) {
      log.error("process failed, callId={}", callId, e);
      return null;
    }
  }

  public void close(CallSession session) {
    if (session == null) {
      return;
    }
    closeByCallId(getCallId(session));
  }

  public void closeByCallId(String callId) {
    if (StrUtil.isBlank(callId)) {
      return;
    }
    sessionRegistry.remove(callId);
  }

  public void closeAll() {
    sessionRegistry.clear();
  }

  private SipRealtimeSession createSipSession(String callId) {
    SipRealtimeBridgeCallback callback = new SipRealtimeBridgeCallback(callId);
    RealtimeModelBridge bridge = RealtimeModelBridgeFactory.createBridge(platform, callback);
    SipRealtimeSession sipSession = new SipRealtimeSession(callId, bridge, callback, realtimeSetupCallback);
    callback.bind(sipSession);
    log.info("created realtime sip session, callId={}", callId);
    return sipSession;
  }

  private String getCallId(CallSession session) {
    try {
      String callId = session.getCallId();
      return callId == null ? null : callId.trim();
    } catch (Exception e) {
      log.warn("failed to get callId from CallSession", e);
      return null;
    }
  }

  public static int getModelInputSampleRate() {
    return MODEL_INPUT_SAMPLE_RATE;
  }

  public static int getModelOutputSampleRate() {
    return MODEL_OUTPUT_SAMPLE_RATE;
  }
}

7. SipRealtimeSession.java

package com.litongjava.voice.agent.sip;

import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.NegotiatedAudioFormatResolver;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SipRealtimeSession {

  private static final int MODEL_OUTPUT_SAMPLE_RATE = 24000;

  private final String callId;
  private final RealtimeModelBridge bridge;
  private final SipRealtimeBridgeCallback callback;
  private final RealtimeSetupCallback realtimeSetupCallback;
  private final AtomicBoolean connected = new AtomicBoolean(false);
  private final ConcurrentLinkedQueue<Short> outputQueue = new ConcurrentLinkedQueue<>();

  private volatile CallSession callSession;

  public SipRealtimeSession(String callId, RealtimeModelBridge bridge, SipRealtimeBridgeCallback callback,
      RealtimeSetupCallback realtimeSetupCallback) {
    this.callId = callId;
    this.bridge = bridge;
    this.callback = callback;
    this.realtimeSetupCallback = realtimeSetupCallback;
  }

  public void ensureConnected(CallSession session) {
    this.callSession = session;
    if (connected.compareAndSet(false, true)) {
      RealtimeSetup setup = null;
      if (realtimeSetupCallback != null) {
        setup = realtimeSetupCallback.getRealtimeSetup(session);
      }
      callback.start(setup);
      bridge.connect(setup).exceptionally(ex -> {
        log.error("bridge connect failed, callId={}", callId, ex);
        connected.set(false);
        return null;
      });
    }
  }

  public void sendToModel(byte[] pcm16kBytes) {
    if (pcm16kBytes == null || pcm16kBytes.length == 0) {
      return;
    }

    bridge.sendPcm16k(pcm16kBytes).exceptionally(ex -> {
      log.warn("sendPcm16k failed, callId={}", callId, ex);
      return null;
    });
  }

  public void appendModelAudio(byte[] pcmBytes) {
    if (pcmBytes == null || pcmBytes.length == 0) {
      return;
    }

    short[] pcm24k = PcmCodec.littleEndianBytesToShorts(pcmBytes);
    if (pcm24k.length == 0) {
      return;
    }

    CallSession session = this.callSession;
    int sessionSampleRate = NegotiatedAudioFormatResolver.resolveSessionPcmSampleRate(session);

    short[] pcmSessionRate = pcm24k;
    if (MODEL_OUTPUT_SAMPLE_RATE != sessionSampleRate) {
      pcmSessionRate = AudioResampler.resample(pcm24k, MODEL_OUTPUT_SAMPLE_RATE, sessionSampleRate);
    }

    for (short sample : pcmSessionRate) {
      outputQueue.offer(sample);
    }
  }

  public short[] takeOutputFrame(int frameSamples) {
    if (frameSamples <= 0) {
      return null;
    }

    if (outputQueue.peek() == null) {
      return null;
    }

    short[] out = new short[frameSamples];
    int i = 0;
    for (; i < frameSamples; i++) {
      Short value = outputQueue.poll();
      if (value == null) {
        break;
      }
      out[i] = value;
    }

    if (i < frameSamples) {
      Arrays.fill(out, i, frameSamples, (short) 0);
    }

    return out;
  }

  public void close() {
    try {
      bridge.close().getNow(null);
    } catch (Exception e) {
      log.warn("bridge.close failed, callId={}", callId, e);
    } finally {
      outputQueue.clear();
      connected.set(false);
    }
  }

  public String getCallId() {
    return callId;
  }
}

8. RealtimeSetupFactory.java

如果你还保留这个工厂,不用改;贴一份完整可用版。

package com.litongjava.voice.agent.sip;

import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.voice.agent.bridge.RealtimeSetup;

public final class RealtimeSetupFactory {

  private RealtimeSetupFactory() {
  }

  public static RealtimeSetup buildFromEnv() {
    String systemPrompt = EnvUtils.getStr("VOICE_AGENT_SYSTEM_PROMPT");
    String userPrompt = EnvUtils.getStr("VOICE_AGENT_USER_PROMPT");
    String jobDescription = EnvUtils.getStr("VOICE_AGENT_JOB_DESCRIPTION");
    String resume = EnvUtils.getStr("VOICE_AGENT_RESUME");
    String questions = EnvUtils.getStr("VOICE_AGENT_QUESTIONS");
    String greeting = EnvUtils.getStr("VOICE_AGENT_GREETING");

    return new RealtimeSetup(
        systemPrompt,
        userPrompt,
        jobDescription,
        resume,
        questions,
        greeting);
  }
}

9. SipRealtimeSetupCallback.java

package com.litongjava.voice.agent.sip;

import com.litongjava.sip.model.CallSession;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;

public class SipRealtimeSetupCallback implements RealtimeSetupCallback {

  @Override
  public RealtimeSetup getRealtimeSetup(CallSession session) {
    return RealtimeSetupFactory.buildFromEnv();
  }
}

10. 你接下来还要做的唯一关键事

这套代码里,真正还没“落地完成”的只有一件事:

把 G722Codec 里的 decode/encode 换成你实际使用的 G722 库实现。

也就是说:

  • SDP 协商优先 G722:已经给你了
  • 会话 PCM 采样率动态推导:已经给你了
  • RTP 层按协商 codec 动态处理:已经给你了
  • RealtimeMediaProcessor 按会话协商格式动态处理:已经给你了

差的只是 G722 的实际算法实现。

你现在可以先把这套代码接进去,运行后会出现这种行为:

  • 对端支持 G722 时,SDP 会优先协商到 G722
  • RTP 层会选择 G722Codec
  • 这时如果你还没替换 G722Codec,会在编解码处抛 UnsupportedOperationException

六、最终效果

本次改造完成后,系统将从当前的“固定 G711 窄带电话链路”升级为“可动态协商宽带语音的 SIP + Realtime 媒体平台”。

它带来的直接收益包括:

  • 对端支持 G722 时,自动升级到宽带语音
  • 电话链路与 Realtime 模型之间的转换损耗明显降低
  • RealtimeMediaProcessor 不再绑定某个固定电话采样率
  • RTP 层、MediaProcessor、Realtime 三层职责更加清晰
  • 为后续继续扩展更多 codec、更多媒体处理器打下基础

七、总结

当前项目已经具备:

  • SIP 建链能力
  • SDP 协商能力
  • RTP 媒体处理能力
  • Realtime 模型接入能力

但在电话场景中,如果始终停留在 PCMU/PCMA 8k,就会给 Realtime 模型引入额外的音频质量损耗和重采样开销。

因此,本次升级的核心意义在于:

  • 让 SIP Server 支持并优先协商 G722
  • 让 RTP 层真正具备宽带语音编解码能力
  • 让 RealtimeMediaProcessor 按当前会话格式动态处理音频
  • 让整个系统从“固定 8k 电话 AI 链路”演进为“按协商格式动态工作的实时语音接入平台”

最终可以用一句话概括本次方案:

RTP 层负责按协商 codec 收发音频,RealtimeMediaProcessor 负责在会话采样率和模型采样率之间转换。

这就是整个改造方案最重要的设计原则。

Edit this page
Last Updated: 3/9/26, 3:43 PM
Contributors: litongjava
Prev
对接大语言模型
Next
G722编码和解码