支持 G722 宽带语音
一、背景
当前项目基于 tio-core 自研了一套轻量级 SIP Server,用于承接语音呼叫接入,并为后续语音机器人、ASR、TTS、Realtime LLM 对话等能力提供底层通话链路。
在前几个阶段中,系统已经逐步完成了以下能力建设:
第一阶段:最小链路打通
第一版主要完成了 SIP 和 RTP 的基础连通性验证,具备以下能力:
- 监听
SIP TCP 5060和SIP UDP 5060 - 收到
INVITE后解析 SIP/SDP - 动态分配 RTP 端口
- 返回
200 OK + SDP - 接收对端 RTP 音频
- 通过 RTP Echo 形成通话闭环
这一阶段的意义在于证明:
tio-core可以承载 SIP 信令和 RTP 数据- Java 可以完成 RTP 端口动态管理
- SIP / SDP / RTP 三层配合是正确的
第二阶段:SIP 会话与 SDP 协商增强
第二版在“能跑通”的基础上,开始补齐协议和会话层能力,完成了:
- SIP TCP 流式解码
- SIP 消息对象化解析与编码
CallSession生命周期管理- ACK 超时清理
- SDP Offer / Answer 协商
- 根据共同支持的 codec 返回合法 answer
这一阶段使系统从“可运行 demo”逐步演进为“具备基本 SIP Server 能力的语音接入骨架”。
第三阶段:媒体层从网络包回显升级到音频帧处理
第三版不再停留在“收到 UDP 包原样发回”,而是开始建立真正的媒体处理链:
- RTP header 解析
- G.711 编解码
AudioFrame统一音频帧抽象MediaProcessor媒体处理接口EchoMediaProcessor音频帧级回环- RTP 重组并回发
这一阶段最重要的意义,不是“再做一次 echo”,而是建立了后续可接入 AI 语音能力的媒体架构。
二、现阶段问题
当前系统虽然已经具备接入 Realtime 模型的能力,但在电话语音场景中仍存在一个明显问题:
当前协商结果多为 G711 窄带语音
从实际日志可以看到,对端 SDP Offer 中通常会同时提供:
G722PCMUPCMAtelephone-event
例如:
m=audio 15282 RTP/AVP 9 0 8 101
a=rtpmap:9 G722/8000
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=rtpmap:101 telephone-event/8000
a=ptime:20
但是当前本端 SDP 协商能力只支持:
PCMUPCMA
因此即使对端已经支持 G722,最终仍会协商成:
PCMU/8000- 或
PCMA/8000
这意味着 RTP 层输入输出仍然是 8k 窄带语音。
对接 Realtime 模型时存在额外重采样损耗
Realtime 模型通常要求:
- 输入:
16k PCM - 输出:
24k PCM
而当前电话链路实际协商的是 8k,因此媒体链路会变成:
当前窄带链路
- SIP 输入:8k
- 上采样到 16k,送入 Realtime 模型
- 模型输出 24k
- 再下采样到 8k
- 编码回 RTP
即:
8k -> 16k -> Realtime Model -> 24k -> 8k
这条链路虽然可用,但存在两个问题:
- 电话侧本身已经是窄带音频,信息量有限
- 需要在模型边界做额外的升采样和降采样
如果能够在 SIP 协商阶段优先使用 G722,则媒体链路可升级为宽带语音:
16k -> Realtime Model -> 24k -> 16k
这样可以显著减少损耗,并提升整体语音质量。
三、目标
本次改造的目标不是“永远强制使用 G722”,而是建立一套更合理、更可扩展的媒体处理机制,使系统能够根据当前会话协商结果动态工作。
整体目标包括四个方面。
1. SDP 协商层支持并优先选择 G722
当对端 SDP Offer 中包含 G722,且本端也支持时,优先协商 G722。 如果对端不支持 G722,则自动回退到 PCMU 或 PCMA。
2. RTP 编解码层支持 G722
在现有 PCMU / PCMA 之外,补齐 G722Codec,使 RTP 层能够:
- 按协商结果正确选择 codec
- 正确完成 RTP payload 与 PCM16 之间的转换
3. 媒体处理层不再写死 8k
当前 RealtimeMediaProcessor 内部仍然隐含了“电话侧一定是 8k”的假设。 本次改造后,媒体处理层应当按当前会话协商格式动态工作:
- 如果当前协商是
PCMU/PCMA,则按 8k 处理 - 如果当前协商是
G722,则按 16k 处理
4. Realtime 内部保持固定模型格式
桥接 Realtime 模型这一层不需要感知 SIP 侧 codec 变化,它只需要保持模型要求的固定格式:
- 模型输入:16k PCM
- 模型输出:24k PCM
真正负责做“会话采样率 ↔ 模型采样率”转换的,是 RealtimeMediaProcessor。
四、设计原则与责任边界
为了避免职责混乱,这次改造必须先把责任边界定义清楚。
1. 媒体链路定义
当前第三版媒体链路已经比较明确:
- RTP 收到包
- 解析 RTP header
- 按协商 codec 解码成 PCM
- 构造成
AudioFrame - 交给
MediaProcessor MediaProcessor返回新的AudioFrame- 按当前 codec 编码
- 重组 RTP 并发回对端
2. 核心责任划分
RTP 层职责
RTP 层负责:
- 按协商 codec 收 RTP
- 解析 RTP header
- 按 codec 解码成 PCM
- 调用
MediaProcessor - 按 codec 编码输出
- 重新封装 RTP
也就是说:
codec encode / decode 是 RTP 层职责,不是 MediaProcessor 职责。
MediaProcessor 职责
MediaProcessor 只负责处理“音频内容本身”,不负责:
- RTP 封包
- SIP 协商
- codec 选择
- G711 / G722 编码解码
对于 RealtimeMediaProcessor 而言,它的职责是:
- 接收当前会话格式的 PCM 音频
- 转成模型输入格式
- 发送给 Realtime 模型
- 接收模型输出音频
- 转回当前会话格式
- 返回
AudioFrame
因此可以总结为一句话:
RTP 层负责“按协商 codec 收发音频”,RealtimeMediaProcessor 负责“在会话采样率和模型采样率之间转换”。
3.目标状态
改造完成后,系统应具备以下行为:
当对端支持 G722 时
- SDP 优先协商 G722
- RTP 解码得到 16k PCM
RealtimeMediaProcessor直接使用 16k 作为模型输入- 模型返回 24k PCM
- 下采样到 16k
- G722 编码回 RTP
链路如下:
16k -> Realtime Model -> 24k -> 16k
当对端只支持 PCMU / PCMA 时
- SDP 回退到 G711
- RTP 解码得到 8k PCM
RealtimeMediaProcessor上采样到 16k- 模型返回 24k PCM
- 下采样到 8k
- G711 编码回 RTP
链路如下:
8k -> 16k -> Realtime Model -> 24k -> 8k
4、改造方案
本次改造采用“方案二”: 增加独立工具类,根据 CodecSpec 推导当前会话 PCM 参数。
这样做的好处是:
- 不把 codec 判断逻辑散落在各个类里
- 不把采样率推导逻辑写进
CallSession - 让 RTP 层、MediaProcessor、Realtime 都复用同一套会话格式推导逻辑
1. SDP 协商层支持并优先选择 G722
修改 SdpParser.defaultSupportedCodecs(),将本端支持 codec 列表改为:
G722PCMUPCMA
因为 chooseCodec() 本来就是按本端支持顺序优先匹配,所以只要把 G722 放在第一位,就可以在对端 offer G722 时优先协商到它。
这一层的职责只是:
- 识别对端 offer
- 选择双方都支持的 codec
- 把协商结果写入
CallSession
2. 增加会话音频格式解析工具类
新增独立工具类:
NegotiatedAudioFormatResolver
用于根据 CallSession 或 CodecSpec 推导当前会话实际 PCM 参数。
例如:
PCMU -> 8000PCMA -> 8000G722 -> 16000
这个工具类会被以下模块复用:
RtpUdpHandlerRealtimeMediaProcessorSipRealtimeSession- 后续其他
MediaProcessor
这样就避免了“到处 if codecName == ...”的散乱写法。
3. 增加 G722Codec
新增:
com.litongjava.sip.rtp.codec.G722Codec
它实现 AudioCodec 接口,职责是:
- RTP payload -> PCM16
- PCM16 -> RTP payload
需要注意的是:
- SDP / RTP 中
G722/8000的8000是 RTP clock rate 表示 - 对媒体处理层而言,G722 应按 16k PCM 工作
因此:
CodecSpec.clockRate = 8000G722Codec.sampleRate() = 16000
这一点必须区分清楚,不能混淆。
4. RTP 层支持 G722 并按会话格式工作
升级 RtpUdpHandler:
chooseCodec(session)增加 G722 支持输入
AudioFrame.sampleRate不再假设 8000,而是来自 codec在
MediaProcessor输出后,增加一个兜底步骤:- 如果返回的采样率与当前 codec 需要的采样率不一致
- 则先重采样到 codec 所需采样率
- 再执行编码
这样 RTP 层就具备了真正意义上的“按会话 codec 工作”的能力。
5. RealtimeMediaProcessor 按会话协商格式动态处理
当前 RealtimeMediaProcessor 里写死了:
- SIP 输入 8000
- 模型输入 16000
- 模型输出 24000
- SIP 输出 8000
改造后,真正保留写死的只有模型侧:
- 模型输入 16000
- 模型输出 24000
而 SIP 侧的输入输出采样率应动态来自:
session.getSelectedCodec()- 通过
NegotiatedAudioFormatResolver推导得到
这样处理流程变成:
上行
sessionSampleRate -> 16000 -> send to model
下行
24000 -> sessionSampleRate -> return AudioFrame
这样当协商结果是 G722 时,就不会再多做一次无意义的 8k 降采样。
6. SipRealtimeSession 按会话采样率缓存模型下行音频
当前模型下行音频固定按 24k 返回。 在 SipRealtimeSession 中,需要根据当前通话会话格式,将模型输出统一转换到当前会话采样率后再入队。
这样 RealtimeMediaProcessor 从缓冲里拿到的就始终是“当前会话格式的 PCM”,不需要额外假设是 8k。
7 关键类说明
本次改造涉及以下关键类。
SdpParser
负责:
- 解析对端 SDP Offer
- 识别 G722 / PCMU / PCMA / telephone-event
- 根据本端支持列表选择最终 codec
- 输出协商结果
NegotiatedAudioFormatResolver
负责:
根据
CodecSpec推导当前会话实际 PCM 参数统一封装:
- 会话采样率
- 声道数
这是本次“方案二”的核心工具类。
G722Codec
负责:
- G722 RTP payload 与 PCM16 之间的转换
- 向 RTP 层暴露统一的
AudioCodec接口
RtpUdpHandler
负责:
- RTP 解析
- codec 选择
- codec 解码 / 编码
- 调用
MediaProcessor - RTP 重组和发送
它仍然是媒体入口和出口,但不再只局限于 G711。
RealtimeMediaProcessor
负责:
- 接收当前会话格式 PCM
- 转为模型输入格式 16k
- 调用
RealtimeModelBridge - 接收模型 24k 输出
- 转回当前会话格式 PCM
SipRealtimeSession
负责:
- 管理某一路 SIP 通话与某一路 Realtime 会话的绑定关系
- 缓冲模型下行音频
- 将模型输出转换到当前会话采样率后入队
五 实战
SdpParser.java:支持并优先协商 G722NegotiatedAudioFormatResolver.java:独立工具类,统一推导会话采样率AudioResampler.java:重采样工具G722Codec.java:G722 codec 接口实现骨架RtpUdpHandler.java:支持 G722,并在输出前按当前会话 codec 兜底重采样RealtimeMediaProcessor.java:按会话协商格式动态处理,不再写死 8kSipRealtimeSession.java:按会话采样率缓存模型下行音频
其中 G722Codec 我把类结构、接口和调用点都补齐了;具体的 G722 编解码算法,需要替换成你项目实际采用的库实现。这一点我不想装成“已经完整实现”来误导你。其余类可以直接落到你当前工程里。
1. SdpParser.java
package com.litongjava.sip.sdp;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SdpParser {
private final List<CodecSpec> localSupportedCodecs;
public SdpParser() {
this.localSupportedCodecs = defaultSupportedCodecs();
}
public SdpParser(List<CodecSpec> localSupportedCodecs) {
this.localSupportedCodecs = localSupportedCodecs;
}
public SdpNegotiationResult negotiate(byte[] sdpBytes) {
if (sdpBytes == null || sdpBytes.length == 0) {
return SdpNegotiationResult.fail("missing sdp offer");
}
String sdp = new String(sdpBytes, StandardCharsets.US_ASCII);
String[] lines = sdp.split("\r\n");
String sessionConnectionIp = null;
String mediaConnectionIp = null;
int remoteAudioPort = 0;
int ptime = 20;
List<Integer> offeredPayloadTypes = new ArrayList<>();
Map<Integer, CodecSpec> offeredCodecMap = new HashMap<>();
boolean inAudioMedia = false;
boolean telephoneEventSupported = false;
int telephoneEventPt = -1;
for (String line : lines) {
if (line == null || line.isEmpty()) {
continue;
}
if (line.startsWith("c=")) {
String[] parts = line.split(" ");
if (parts.length >= 3) {
String ip = parts[2].trim();
if (inAudioMedia) {
mediaConnectionIp = ip;
} else {
sessionConnectionIp = ip;
}
}
continue;
}
if (line.startsWith("m=")) {
inAudioMedia = false;
String[] parts = line.split(" ");
if (parts.length >= 4 && parts[0].startsWith("m=audio")) {
inAudioMedia = true;
try {
remoteAudioPort = Integer.parseInt(parts[1].trim());
} catch (Exception e) {
return SdpNegotiationResult.fail("invalid remote audio port");
}
for (int i = 3; i < parts.length; i++) {
try {
offeredPayloadTypes.add(Integer.parseInt(parts[i].trim()));
} catch (Exception ignore) {
}
}
}
continue;
}
if (!inAudioMedia) {
continue;
}
if (line.startsWith("a=rtpmap:")) {
try {
int colon = line.indexOf(':');
int space = line.indexOf(' ');
if (colon < 0 || space < 0 || space <= colon) {
continue;
}
int pt = Integer.parseInt(line.substring(colon + 1, space).trim());
String[] enc = line.substring(space + 1).trim().split("/");
if (enc.length < 2) {
continue;
}
String codecName = enc[0].trim();
int clockRate = Integer.parseInt(enc[1].trim());
CodecSpec spec = new CodecSpec(pt, codecName, clockRate);
offeredCodecMap.put(pt, spec);
if ("telephone-event".equalsIgnoreCase(codecName) && clockRate == 8000) {
telephoneEventSupported = true;
telephoneEventPt = pt;
}
} catch (Exception ignore) {
}
continue;
}
if (line.startsWith("a=ptime:")) {
try {
ptime = Integer.parseInt(line.substring("a=ptime:".length()).trim());
} catch (Exception ignore) {
}
}
}
if (remoteAudioPort <= 0) {
return SdpNegotiationResult.fail("missing audio media");
}
String remoteIp = mediaConnectionIp != null ? mediaConnectionIp : sessionConnectionIp;
if (remoteIp == null || remoteIp.isEmpty()) {
return SdpNegotiationResult.fail("missing connection address");
}
CodecSpec selected = chooseCodec(offeredPayloadTypes, offeredCodecMap);
if (selected == null) {
return SdpNegotiationResult.fail("no supported audio codec");
}
SdpNegotiationResult result = SdpNegotiationResult.ok();
result.setRemoteRtpIp(remoteIp);
result.setRemoteRtpPort(remoteAudioPort);
result.setSelectedCodec(selected);
result.setTelephoneEventSupported(telephoneEventSupported);
result.setRemoteTelephoneEventPayloadType(telephoneEventPt);
result.setPtime(ptime);
return result;
}
private CodecSpec chooseCodec(List<Integer> offeredPayloadTypes, Map<Integer, CodecSpec> offeredCodecMap) {
for (CodecSpec local : localSupportedCodecs) {
for (Integer pt : offeredPayloadTypes) {
CodecSpec offered = offeredCodecMap.get(pt);
if (offered != null) {
if (local.isSameCodec(offered.getCodecName(), offered.getClockRate())) {
return new CodecSpec(pt, offered.getCodecName(), offered.getClockRate());
}
} else {
if (pt == 0 && local.isStaticPcmu()) {
return new CodecSpec(0, "PCMU", 8000);
}
if (pt == 8 && local.isStaticPcma()) {
return new CodecSpec(8, "PCMA", 8000);
}
if (pt == 9 && "G722".equalsIgnoreCase(local.getCodecName()) && local.getClockRate() == 8000) {
return new CodecSpec(9, "G722", 8000);
}
}
}
}
return null;
}
public static List<CodecSpec> defaultSupportedCodecs() {
List<CodecSpec> codecs = new ArrayList<>();
codecs.add(new CodecSpec(9, "G722", 8000));
codecs.add(new CodecSpec(0, "PCMU", 8000));
codecs.add(new CodecSpec(8, "PCMA", 8000));
return codecs;
}
}
2. NegotiatedAudioFormatResolver.java
package com.litongjava.sip.rtp.codec;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.sdp.CodecSpec;
public final class NegotiatedAudioFormatResolver {
private NegotiatedAudioFormatResolver() {
}
public static int resolveSessionPcmSampleRate(CallSession session) {
if (session == null) {
return 8000;
}
return resolveSessionPcmSampleRate(session.getSelectedCodec());
}
public static int resolveSessionPcmSampleRate(CodecSpec codecSpec) {
if (codecSpec == null || codecSpec.getCodecName() == null) {
return 8000;
}
String codecName = codecSpec.getCodecName();
if ("G722".equalsIgnoreCase(codecName)) {
return 16000;
}
if ("PCMU".equalsIgnoreCase(codecName)) {
return 8000;
}
if ("PCMA".equalsIgnoreCase(codecName)) {
return 8000;
}
return codecSpec.getClockRate() > 0 ? codecSpec.getClockRate() : 8000;
}
public static int resolveChannels(CallSession session) {
return 1;
}
}
3. AudioResampler.java
如果你项目里还没有通用重采样器,就加这个。
package com.litongjava.sip.rtp.codec;
import java.util.Arrays;
import java.util.Objects;
public final class AudioResampler {
private AudioResampler() {
}
public static short[] resample(short[] input, int srcRate, int dstRate) {
Objects.requireNonNull(input, "input");
if (input.length == 0) {
return new short[0];
}
if (srcRate <= 0 || dstRate <= 0) {
throw new IllegalArgumentException("sample rate must be > 0");
}
if (srcRate == dstRate) {
return Arrays.copyOf(input, input.length);
}
double ratio = (double) dstRate / (double) srcRate;
int outputLength = Math.max(1, (int) Math.round(input.length * ratio));
short[] output = new short[outputLength];
for (int i = 0; i < outputLength; i++) {
double srcIndex = i / ratio;
int left = (int) Math.floor(srcIndex);
int right = Math.min(left + 1, input.length - 1);
double frac = srcIndex - left;
double sample = input[left] * (1.0 - frac) + input[right] * frac;
output[i] = clampToShort(sample);
}
return output;
}
private static short clampToShort(double value) {
if (value > Short.MAX_VALUE) {
return Short.MAX_VALUE;
}
if (value < Short.MIN_VALUE) {
return Short.MIN_VALUE;
}
return (short) Math.round(value);
}
}
4. G722Codec.java
这个类把接口和调用点补齐了,但编解码算法本体你需要替换成你实际采用的库实现。
package com.litongjava.sip.rtp.codec;
public class G722Codec implements AudioCodec {
@Override
public String codecName() {
return "G722";
}
@Override
public int payloadType() {
return 9;
}
/**
* RTP/SDP 里 G722 常见写法是 G722/8000,
* 但这里返回的是媒体处理层使用的 PCM 采样率。
*/
@Override
public int sampleRate() {
return 16000;
}
@Override
public short[] decode(byte[] payload) {
throw new UnsupportedOperationException(
"G722 decode is not implemented yet. Please replace this class with your actual G722 decoder implementation.");
}
@Override
public byte[] encode(short[] pcm16) {
throw new UnsupportedOperationException(
"G722 encode is not implemented yet. Please replace this class with your actual G722 encoder implementation.");
}
}
5. RtpUdpHandler.java
package com.litongjava.sip.rtp.server;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioCodec;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.G722Codec;
import com.litongjava.sip.rtp.codec.NegotiatedAudioFormatResolver;
import com.litongjava.sip.rtp.codec.PcmaCodec;
import com.litongjava.sip.rtp.codec.PcmuCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.EchoMediaProcessor;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.sip.rtp.packet.RtpPacket;
import com.litongjava.sip.rtp.packet.RtpPacketParser;
import com.litongjava.sip.rtp.packet.RtpPacketWriter;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RtpUdpHandler implements UdpHandler {
private final int localPort;
private final CallSessionManager sessionManager;
private final RtpPacketParser rtpPacketParser = new RtpPacketParser();
private final RtpPacketWriter rtpPacketWriter = new RtpPacketWriter();
private final MediaProcessor mediaProcessor;
private final AudioCodec pcmuCodec = new PcmuCodec();
private final AudioCodec pcmaCodec = new PcmaCodec();
private final AudioCodec g722Codec = new G722Codec();
public RtpUdpHandler(int localPort, CallSessionManager sessionManager) {
this(localPort, sessionManager, new EchoMediaProcessor());
}
public RtpUdpHandler(int localPort, CallSessionManager sessionManager, MediaProcessor mediaProcessor) {
this.localPort = localPort;
this.sessionManager = sessionManager;
this.mediaProcessor = mediaProcessor;
}
@Override
public void handler(UdpPacket udpPacket, DatagramSocket socket) {
try {
CallSession session = sessionManager.getByLocalRtpPort(localPort);
if (session == null || session.isTerminated()) {
return;
}
Node remote = udpPacket.getRemote();
byte[] data = udpPacket.getData();
if (data == null || data.length < 12) {
return;
}
RtpPacket in = rtpPacketParser.parse(data);
if (session.getRemoteRtpIp() == null || session.getRemoteRtpIp().isEmpty()) {
session.setRemoteRtpIp(remote.getIp());
}
if (session.getRemoteRtpPort() <= 0) {
session.setRemoteRtpPort(remote.getPort());
}
if (session.isTelephoneEventSupported()
&& in.getPayloadType() == session.getRemoteTelephoneEventPayloadType()) {
session.setUpdatedTime(System.currentTimeMillis());
return;
}
AudioCodec codec = chooseCodec(session);
if (codec == null) {
log.warn("No codec selected for callId={}", session.getCallId());
return;
}
short[] pcm = codec.decode(in.getPayload());
AudioFrame inputFrame = new AudioFrame(
pcm,
codec.sampleRate(),
NegotiatedAudioFormatResolver.resolveChannels(session),
in.getTimestamp());
AudioFrame outputFrame = mediaProcessor.process(inputFrame, session);
if (outputFrame == null || outputFrame.getSamples() == null || outputFrame.getSamples().length == 0) {
log.info("MediaProcessor returned no audio, callId={}", session.getCallId());
return;
}
int targetSampleRate = codec.sampleRate();
short[] outputSamples = outputFrame.getSamples();
int outputSampleRate = outputFrame.getSampleRate() > 0 ? outputFrame.getSampleRate() : targetSampleRate;
if (outputSampleRate != targetSampleRate) {
outputSamples = AudioResampler.resample(outputSamples, outputSampleRate, targetSampleRate);
}
byte[] outPayload = codec.encode(outputSamples);
RtpPacket out = new RtpPacket();
out.setVersion(2);
out.setPadding(false);
out.setExtension(false);
out.setCsrcCount(0);
out.setMarker(false);
out.setPayloadType(session.getSelectedCodec().getPayloadType());
out.setSequenceNumber(session.nextSendSequence());
out.setTimestamp(session.nextSendTimestamp(outputSamples.length));
out.setSsrc(session.getLocalSsrc());
out.setPayload(outPayload);
byte[] outBytes = rtpPacketWriter.write(out);
DatagramPacket resp = new DatagramPacket(
outBytes,
outBytes.length,
new InetSocketAddress(session.getRemoteRtpIp(), session.getRemoteRtpPort()));
socket.send(resp);
session.setUpdatedTime(System.currentTimeMillis());
} catch (Exception e) {
log.error("rtp handler error, localPort={}", localPort, e);
}
}
private AudioCodec chooseCodec(CallSession session) {
if (session.getSelectedCodec() == null || session.getSelectedCodec().getCodecName() == null) {
return null;
}
String codecName = session.getSelectedCodec().getCodecName();
if ("G722".equalsIgnoreCase(codecName)) {
return g722Codec;
}
if ("PCMU".equalsIgnoreCase(codecName)) {
return pcmuCodec;
}
if ("PCMA".equalsIgnoreCase(codecName)) {
return pcmaCodec;
}
return null;
}
}
6. RealtimeMediaProcessor.java
package com.litongjava.voice.agent.sip;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.NegotiatedAudioFormatResolver;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeModelBridgeFactory;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RealtimeMediaProcessor implements MediaProcessor {
private static final int MODEL_INPUT_SAMPLE_RATE = 16000;
private static final int MODEL_OUTPUT_SAMPLE_RATE = 24000;
private final String platform;
private final RealtimeSetupCallback realtimeSetupCallback;
private final SipSessionRegistry sessionRegistry;
public RealtimeMediaProcessor(RealtimeSetupCallback realtimeSetupCallback) {
this(EnvUtils.getStr("vioce.agent.platform"), realtimeSetupCallback, new SipSessionRegistry());
}
public RealtimeMediaProcessor(String platform, RealtimeSetupCallback realtimeSetupCallback,
SipSessionRegistry sessionRegistry) {
this.platform = platform;
this.realtimeSetupCallback = realtimeSetupCallback;
this.sessionRegistry = sessionRegistry;
}
@Override
public AudioFrame process(AudioFrame input, CallSession session) {
if (input == null || session == null) {
return null;
}
String callId = getCallId(session);
if (StrUtil.isBlank(callId)) {
log.warn("callId is blank, skip processing");
return null;
}
SipRealtimeSession sipSession = sessionRegistry.getOrCreate(callId, this::createSipSession);
try {
sipSession.ensureConnected(session);
short[] inputSamples = input.getSamples();
if (inputSamples == null || inputSamples.length == 0) {
return null;
}
int sessionSampleRate = NegotiatedAudioFormatResolver.resolveSessionPcmSampleRate(session);
int inputSampleRate = input.getSampleRate() > 0 ? input.getSampleRate() : sessionSampleRate;
short[] modelInputSamples = inputSamples;
if (inputSampleRate != MODEL_INPUT_SAMPLE_RATE) {
modelInputSamples = AudioResampler.resample(inputSamples, inputSampleRate, MODEL_INPUT_SAMPLE_RATE);
}
byte[] pcm16kBytes = PcmCodec.shortsToLittleEndianBytes(modelInputSamples);
sipSession.sendToModel(pcm16kBytes);
short[] outputSamples = sipSession.takeOutputFrame(inputSamples.length);
if (outputSamples == null) {
return null;
}
return new AudioFrame(
outputSamples,
sessionSampleRate,
NegotiatedAudioFormatResolver.resolveChannels(session),
input.getRtpTimestamp());
} catch (Exception e) {
log.error("process failed, callId={}", callId, e);
return null;
}
}
public void close(CallSession session) {
if (session == null) {
return;
}
closeByCallId(getCallId(session));
}
public void closeByCallId(String callId) {
if (StrUtil.isBlank(callId)) {
return;
}
sessionRegistry.remove(callId);
}
public void closeAll() {
sessionRegistry.clear();
}
private SipRealtimeSession createSipSession(String callId) {
SipRealtimeBridgeCallback callback = new SipRealtimeBridgeCallback(callId);
RealtimeModelBridge bridge = RealtimeModelBridgeFactory.createBridge(platform, callback);
SipRealtimeSession sipSession = new SipRealtimeSession(callId, bridge, callback, realtimeSetupCallback);
callback.bind(sipSession);
log.info("created realtime sip session, callId={}", callId);
return sipSession;
}
private String getCallId(CallSession session) {
try {
String callId = session.getCallId();
return callId == null ? null : callId.trim();
} catch (Exception e) {
log.warn("failed to get callId from CallSession", e);
return null;
}
}
public static int getModelInputSampleRate() {
return MODEL_INPUT_SAMPLE_RATE;
}
public static int getModelOutputSampleRate() {
return MODEL_OUTPUT_SAMPLE_RATE;
}
}
7. SipRealtimeSession.java
package com.litongjava.voice.agent.sip;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.NegotiatedAudioFormatResolver;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SipRealtimeSession {
private static final int MODEL_OUTPUT_SAMPLE_RATE = 24000;
private final String callId;
private final RealtimeModelBridge bridge;
private final SipRealtimeBridgeCallback callback;
private final RealtimeSetupCallback realtimeSetupCallback;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Short> outputQueue = new ConcurrentLinkedQueue<>();
private volatile CallSession callSession;
public SipRealtimeSession(String callId, RealtimeModelBridge bridge, SipRealtimeBridgeCallback callback,
RealtimeSetupCallback realtimeSetupCallback) {
this.callId = callId;
this.bridge = bridge;
this.callback = callback;
this.realtimeSetupCallback = realtimeSetupCallback;
}
public void ensureConnected(CallSession session) {
this.callSession = session;
if (connected.compareAndSet(false, true)) {
RealtimeSetup setup = null;
if (realtimeSetupCallback != null) {
setup = realtimeSetupCallback.getRealtimeSetup(session);
}
callback.start(setup);
bridge.connect(setup).exceptionally(ex -> {
log.error("bridge connect failed, callId={}", callId, ex);
connected.set(false);
return null;
});
}
}
public void sendToModel(byte[] pcm16kBytes) {
if (pcm16kBytes == null || pcm16kBytes.length == 0) {
return;
}
bridge.sendPcm16k(pcm16kBytes).exceptionally(ex -> {
log.warn("sendPcm16k failed, callId={}", callId, ex);
return null;
});
}
public void appendModelAudio(byte[] pcmBytes) {
if (pcmBytes == null || pcmBytes.length == 0) {
return;
}
short[] pcm24k = PcmCodec.littleEndianBytesToShorts(pcmBytes);
if (pcm24k.length == 0) {
return;
}
CallSession session = this.callSession;
int sessionSampleRate = NegotiatedAudioFormatResolver.resolveSessionPcmSampleRate(session);
short[] pcmSessionRate = pcm24k;
if (MODEL_OUTPUT_SAMPLE_RATE != sessionSampleRate) {
pcmSessionRate = AudioResampler.resample(pcm24k, MODEL_OUTPUT_SAMPLE_RATE, sessionSampleRate);
}
for (short sample : pcmSessionRate) {
outputQueue.offer(sample);
}
}
public short[] takeOutputFrame(int frameSamples) {
if (frameSamples <= 0) {
return null;
}
if (outputQueue.peek() == null) {
return null;
}
short[] out = new short[frameSamples];
int i = 0;
for (; i < frameSamples; i++) {
Short value = outputQueue.poll();
if (value == null) {
break;
}
out[i] = value;
}
if (i < frameSamples) {
Arrays.fill(out, i, frameSamples, (short) 0);
}
return out;
}
public void close() {
try {
bridge.close().getNow(null);
} catch (Exception e) {
log.warn("bridge.close failed, callId={}", callId, e);
} finally {
outputQueue.clear();
connected.set(false);
}
}
public String getCallId() {
return callId;
}
}
8. RealtimeSetupFactory.java
如果你还保留这个工厂,不用改;贴一份完整可用版。
package com.litongjava.voice.agent.sip;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
public final class RealtimeSetupFactory {
private RealtimeSetupFactory() {
}
public static RealtimeSetup buildFromEnv() {
String systemPrompt = EnvUtils.getStr("VOICE_AGENT_SYSTEM_PROMPT");
String userPrompt = EnvUtils.getStr("VOICE_AGENT_USER_PROMPT");
String jobDescription = EnvUtils.getStr("VOICE_AGENT_JOB_DESCRIPTION");
String resume = EnvUtils.getStr("VOICE_AGENT_RESUME");
String questions = EnvUtils.getStr("VOICE_AGENT_QUESTIONS");
String greeting = EnvUtils.getStr("VOICE_AGENT_GREETING");
return new RealtimeSetup(
systemPrompt,
userPrompt,
jobDescription,
resume,
questions,
greeting);
}
}
9. SipRealtimeSetupCallback.java
package com.litongjava.voice.agent.sip;
import com.litongjava.sip.model.CallSession;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;
public class SipRealtimeSetupCallback implements RealtimeSetupCallback {
@Override
public RealtimeSetup getRealtimeSetup(CallSession session) {
return RealtimeSetupFactory.buildFromEnv();
}
}
10. 你接下来还要做的唯一关键事
这套代码里,真正还没“落地完成”的只有一件事:
把 G722Codec 里的 decode/encode 换成你实际使用的 G722 库实现。
也就是说:
- SDP 协商优先 G722:已经给你了
- 会话 PCM 采样率动态推导:已经给你了
- RTP 层按协商 codec 动态处理:已经给你了
- RealtimeMediaProcessor 按会话协商格式动态处理:已经给你了
差的只是 G722 的实际算法实现。
你现在可以先把这套代码接进去,运行后会出现这种行为:
- 对端支持 G722 时,SDP 会优先协商到 G722
- RTP 层会选择
G722Codec - 这时如果你还没替换
G722Codec,会在编解码处抛UnsupportedOperationException
六、最终效果
本次改造完成后,系统将从当前的“固定 G711 窄带电话链路”升级为“可动态协商宽带语音的 SIP + Realtime 媒体平台”。
它带来的直接收益包括:
- 对端支持 G722 时,自动升级到宽带语音
- 电话链路与 Realtime 模型之间的转换损耗明显降低
RealtimeMediaProcessor不再绑定某个固定电话采样率- RTP 层、MediaProcessor、Realtime 三层职责更加清晰
- 为后续继续扩展更多 codec、更多媒体处理器打下基础
七、总结
当前项目已经具备:
- SIP 建链能力
- SDP 协商能力
- RTP 媒体处理能力
- Realtime 模型接入能力
但在电话场景中,如果始终停留在 PCMU/PCMA 8k,就会给 Realtime 模型引入额外的音频质量损耗和重采样开销。
因此,本次升级的核心意义在于:
- 让 SIP Server 支持并优先协商 G722
- 让 RTP 层真正具备宽带语音编解码能力
- 让
RealtimeMediaProcessor按当前会话格式动态处理音频 - 让整个系统从“固定 8k 电话 AI 链路”演进为“按协商格式动态工作的实时语音接入平台”
最终可以用一句话概括本次方案:
RTP 层负责按协商 codec 收发音频,RealtimeMediaProcessor 负责在会话采样率和模型采样率之间转换。
这就是整个改造方案最重要的设计原则。
