基于 MediaProcessor 对接 Realtime 模型说明
1. 系统整体流程
SIP Server 已经实现了完整的语音通话链路:
- SIP 建立通话
- SDP 协商音频 codec(PCMU / PCMA)
- RTP 接收语音数据
- 解码为 PCM 音频
- 交给
MediaProcessor处理 - 处理后的音频重新编码为 RTP
- 回发给通话对端
整体流程如下:
SIP 呼叫
│
▼
RTP 音频包
│
▼
G711 解码
│
▼
AudioFrame (PCM16)
│
▼
MediaProcessor
│
▼
AudioFrame
│
▼
G711 编码
│
▼
RTP 回发
也就是说:
所有语音处理逻辑都在 MediaProcessor 中完成。
2. MediaProcessor 的作用
MediaProcessor 是系统中唯一需要开发者实现的媒体处理接口。
接口定义如下:
public interface MediaProcessor {
AudioFrame process(AudioFrame input, CallSession session);
}
参数说明:
input
当前收到的一帧音频数据:
- 已经从 RTP 解码
- PCM16 格式
- 采样率:8000 Hz
- 单声道
- 每帧约 20ms(160 samples)
session
当前通话会话信息,可以获取:
- callId
- codec
- ptime
- RTP 信息
通常 realtime 模型接入时不会频繁使用。
返回值
返回一个新的 AudioFrame:
- 表示要回发给对端的音频
如果返回 null:
- 当前不发送音频
3. 默认实现(Echo)
当前系统默认实现是:
public class EchoMediaProcessor implements MediaProcessor {
@Override
public AudioFrame process(AudioFrame input, CallSession session) {
return input;
}
}
作用:
收到音频 → 原样返回。
因此会听到回声。
4. 接入 Realtime 模型的基本原理
Realtime 模型接入其实非常简单:
输入音频 → 送给模型
模型生成音频 → 返回
流程如下:
AudioFrame
│
▼
转成模型音频格式
│
▼
发送给 realtime 模型
│
▼
模型返回音频
│
▼
转成 AudioFrame
│
▼
返回
通常需要做一个步骤:
采样率转换
RTP 音频:
8000 Hz
大多数 realtime 模型要求:
16000 Hz
所以流程变为:
8k PCM
│
▼
重采样
│
▼
16k PCM
│
▼
Realtime 模型
│
▼
16k PCM
│
▼
重采样
│
▼
8k PCM
5. 一个简单的实现示例
Realtime 版本的 MediaProcessor 示例:
public class RealtimeModelMediaProcessor implements MediaProcessor {
private final RealtimeModelClient modelClient;
public RealtimeModelMediaProcessor(RealtimeModelClient modelClient) {
this.modelClient = modelClient;
}
@Override
public AudioFrame process(AudioFrame input, CallSession session) {
short[] pcm8k = input.getSamples();
// 1 转换为 16k
short[] pcm16k = AudioResampler.resample8kTo16k(pcm8k);
// 2 发送给 realtime 模型
short[] modelOutput = modelClient.processAudio(pcm16k);
if (modelOutput == null) {
return null;
}
// 3 转回 8k
short[] pcmOut = AudioResampler.resample16kTo8k(modelOutput);
return new AudioFrame(pcmOut, 8000, 1, input.getRtpTimestamp());
}
}
6. 如何接入到系统
当前系统启动代码:
MediaProcessor echoMediaProcessor = new EchoMediaProcessor();
只需要替换为:
MediaProcessor mediaProcessor =
new RealtimeModelMediaProcessor(new RealtimeModelClient());
即可完成模型接入。
无需修改:
- SIP
- SDP
- RTP
- Codec
7. 开发时需要注意的几个问题
不要阻塞 RTP 线程
process() 应该尽量快速返回。
如果模型调用较慢,建议:
- 使用异步调用
- 或使用缓冲队列
采样率问题
RTP:
8k
模型:
16k
需要做重采样。
一帧音频长度
RTP 默认:
20ms
也就是:
160 samples @ 8k
音频格式
当前统一格式:
PCM16
little endian
mono
AudioFrame -> 模型 -> AudioFrame
8. 使用MediaProcessor
将EchoMediaProcessor 换成对应的方法
package com.jobright.study.voice.agent.config;
import java.io.IOException;
import java.net.SocketException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.rtp.media.EchoMediaProcessor;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.sip.server.handler.SipTcpServerHandler;
import com.litongjava.sip.server.handler.SipUdpServerHandler;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.sip.server.session.CallSessionReaper;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SipServerConfig {
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
public void config() {
String localIp = "192.168.3.219";
CallSessionManager sessionManager = new CallSessionManager();
RtpServerManager rtpServerManager = new RtpServerManager(localIp, sessionManager);
MediaProcessor echoMediaProcessor = new EchoMediaProcessor();
SipTcpServerHandler tcpHandler = new SipTcpServerHandler(localIp, sessionManager, rtpServerManager,
echoMediaProcessor);
ServerTioConfig serverTioConfig = new ServerTioConfig("sip-server");
serverTioConfig.setWorkderExecutor(scheduledExecutor);
serverTioConfig.setServerAioHandler(tcpHandler);
serverTioConfig.setHeartbeatTimeout(-1L);
TioServer tioServer = new TioServer(serverTioConfig);
int port = 5060;
try {
tioServer.start(null, port);
log.info("独立 TCP 服务器已成功启动,监听端口: {}", port);
} catch (IOException e) {
log.error("启动 TCP 服务器失败", e);
}
SipUdpServerHandler udpHandler = new SipUdpServerHandler(localIp, sessionManager, rtpServerManager,
echoMediaProcessor);
UdpServerConf udpServerConf = new UdpServerConf(5060, udpHandler, 5000);
try {
UdpServer udpServer = new UdpServer(udpServerConf);
udpServer.start();
log.info("UDP 服务器已成功启动,监听端口: {}", port);
} catch (SocketException e) {
log.error("启动 UDP 服务器失败", e);
}
scheduledExecutor.scheduleAtFixedRate(new CallSessionReaper(sessionManager, rtpServerManager), 5, 5,
TimeUnit.SECONDS);
}
}
9. 总结
开发 realtime 模型接入时,只需要做一件事:
实现一个新的 MediaProcessor。
系统已经帮完成:
- SIP 信令
- SDP 协商
- RTP 收发
- G711 编解码
