对接大语言模型
背景
原来的 SIP Server 已经把通话底座全部打通了,SIP 建链、SDP 协商、RTP 收发、G711 编解码这些底层能力都已经稳定存在。系统里真正留给业务扩展的点只有一个,就是 MediaProcessor。默认实现只是把收到的音频原样返回,所以表现出来就是回声。现在要做的事情,是把这个“回声处理器”替换成“实时模型处理器”,让通话中的语音不再直接回显,而是先送进 Realtime 模型,再把模型生成的语音回送给通话对端。
目标
目标也很清楚:在不改动 SIP、SDP、RTP、Codec 这些底层通信逻辑的前提下,把现有通话链路接到 Qwen Realtime 模型上。换句话说,电话侧继续只关心 8k PCM 音频帧,模型侧继续只关心它自己的实时音频接口,而中间这层新实现负责把两边接起来,包括采样率转换、异步发送、音频缓冲和回填。这样做的价值是结构非常干净,SIP 服务器继续做它擅长的实时音频传输,模型桥接层继续做它擅长的多轮实时生成,两边通过 RealtimeMediaProcessor 耦合,职责边界很清晰。
在原有链路中插入了一个“实时语音适配层”。电话进来的音频还是按原来的方式被 RTP 解码成 8k PCM 的 AudioFrame,然后交给 RealtimeMediaProcessor。它先把 8k 音频转换成 16k,因为接入的 Realtime 模型希望收到更高采样率的输入;接着通过 RealtimeModelBridge 把音频发给模型;模型返回的下行音频则先进入缓冲区,再被重采样回 8k,最后重新包装成 AudioFrame 返回给 SIP/RTP 链路。对现有系统来说,它看到的仍然只是“输入一个 AudioFrame,输出一个 AudioFrame”,所以原有框架几乎不用动。
改造思路
RealtimeMediaProcessor 是这次改造的核心类。它实现了 MediaProcessor 接口,所以可以直接替换掉原来的 EchoMediaProcessor。它的职责不是直接“理解语音”,而是做一层适配和调度。上游给它的是 SIP 通话里的 8k 音频帧,下游它连接的是 Realtime 模型桥接对象。它在一次 process() 调用里做三件事: 第一,确保当前通话已经建立对应的实时模型会话; 第二,把当前收到的 8k 音频转成 16k,并异步送给模型; 第三,从模型下行音频缓冲区取出一帧已经准备好的 8k 音频返回给对端。如果当前模型还没返回数据,它就返回 null,表示这一时刻不发音频。 这个类本质上是“RTP 帧处理接口”和“实时模型接口”之间的总控器。
SessionState 是每一路通话对应的内部状态对象。因为 SIP 通话不是单例,可能同时存在多路呼叫,所以不能把所有状态放在 RealtimeMediaProcessor 的全局变量里混着用。这个类的意义就是把“某个 callId 的模型连接、下行音频队列、连接状态”等资源组织在一起。可以把它理解成“一路电话和一路实时模型会话的一一绑定关系”。它维护了当前通话的 RealtimeModelBridge,并且负责保存模型返回的音频数据。下行音频先进入它的缓冲队列,等到下一次 process() 被调用时,再从队列里取出对应长度的一帧回给 RTP。这样就把“模型返回的异步性”和“RTP 每 20ms 拉一帧的同步节奏”平滑地衔接起来了。
SipRealtimeBridgeCallback 的作用是把 RealtimeModelBridge 的回调结果喂回 SessionState。因为桥接层本身不是直接返回一个 AudioFrame,它是通过回调的方式通知“模型有新音频了”或“有文本事件了”。在 WebSocket 场景下,这种回调会把内容发给浏览器;而在 SIP 场景下,不需要把这些内容发到网页,而是要把模型返回的二进制音频塞进当前通话的下行缓冲里。这个类正是干这个事情的。它接收桥接层返回的文本和音频,其中音频部分会被送进 SessionState 的输出队列,供 RealtimeMediaProcessor 后续取走。也就是说,这个回调类把“模型事件流”转换成了“电话通话可消费的音频流”。
RealtimeModelBridge 不是这次新发明的类,而是现有系统里已经定义好的统一桥接接口。它的意义非常大,因为它把上层业务和底层具体模型平台解耦了。RealtimeMediaProcessor 不需要知道自己对接的是 Gemini 还是 Qwen,也不需要知道底层 SDK 的具体方法叫什么,它只知道:建立连接、发送 16k PCM、结束音频输入、发送文本、关闭连接,这些动作可以通过统一接口完成。这样一来,SIP 场景和 WebSocket 场景都可以复用同一套桥接抽象,后续要切换模型平台时,也不用改媒体处理逻辑。
RealtimeModelBridgeFactory 的作用是根据配置创建实际的桥接实现。它是一个工厂角色。因为统一接口只是抽象,真正运行时还是要落到某个具体实现上,比如 QwenOmniRealtimeBridge。工厂的价值在于把“选择哪个平台”的逻辑集中起来。RealtimeMediaProcessor 调用工厂拿到桥接对象后,就不再关心底层是百炼还是其他平台。这样后续如果需要扩展更多模型平台,也只是工厂里增加分支,而不影响媒体处理主逻辑。
RealtimeSetup 的作用是承载会话启动时的初始配置。它不是处理音频流本身的,而是告诉模型“是谁、的角色是什么、开场白是什么、有哪些业务背景”。例如系统提示词、岗位描述、简历内容、问题列表、欢迎语,这类信息都应该在建会话时一起传给模型。对于 SIP 电话场景来说,这意味着每一路电话一接通,就可以给模型建立一个带业务上下文的实时会话,而不是一个什么都不知道的裸模型连接。
QwenOmniRealtimeBridge 是真正对接 DashScope Realtime SDK 的实现类。它的职责是把统一桥接接口的调用翻译成阿里云 Qwen Realtime 所要求的协议。比如上行音频要做 Base64,输入格式是 PCM16,输出音频事件里返回的是 Base64 编码的 PCM,服务端还会抛出各种生命周期事件和转写事件。这个类负责管理 WebSocket 连接、更新 session、append 音频、处理模型回调事件,并通过统一的 RealtimeBridgeCallback 向上层交付结果。对 SIP 场景来说,它就是“模型连接器”;对 WebSocket 场景来说,它也是“模型连接器”。这说明的桥接层设计已经很通用了。
AudioResampler 的作用是做采样率转换。这个类虽然小,但它其实是整个链路能跑通的关键之一。因为电话侧是 8k,模型输入通常要 16k,模型输出又往往是 24k,如果没有重采样,就没法在两边之间直接对接。它的职责就是把一段 PCM 样本从一个采样率映射到另一个采样率。现在的实现属于轻量级线性插值方案,优点是依赖少、实现简单、足够支撑语音通话场景。后面如果对音质要求更高,也可以替换成更专业的重采样器,但在架构上它仍然扮演同样的角色。
PcmCodec 的作用是做 PCM short 数组和 little-endian 字节数组之间的转换。这个类的存在是因为 SIP 侧更习惯处理 short[] 形式的采样,而桥接层和 SDK 往往更习惯传输 byte[]。它本质上是数据表示层的转换工具,让音频在不同模块之间可以顺畅流动。它不是业务核心,但没有它,音频数据就很难在 RTP 层和模型层之间交换。
从整个调用时序来看,也可以这样理解:当一通电话进来后,系统第一次调用 RealtimeMediaProcessor.process() 时,会基于当前 callId 创建一个 SessionState,再通过工厂创建实际的 RealtimeModelBridge,并用 RealtimeSetup 去连接模型。之后每次来一帧 RTP 音频,处理器都会把它上采样后送给模型。模型返回的音频不会立刻直接“同步返回”,而是先进回调,再进入当前会话的输出队列。等下一次 RTP 处理循环到来时,处理器从队列中取出足够的一帧数据,重采样回 8k 后交给 RTP 发回去。这样就形成了一个持续运转的实时双向语音回路。
所以这次改造的本质可以概括成一句话:没有改电话系统本身,而是在 MediaProcessor 这一层,把“本地回声”升级成了“外部实时模型驱动的语音处理”。背景是已有 SIP 链路已经成熟,目标是最小侵入地接入 Realtime 模型,而各个类则分别承担了通话会话管理、模型桥接、回调接收、音频转换和缓冲调度等不同职责,最终共同把电话网络和实时大模型连接到了一起。
实战代码
实现思路是:
- SIP/RTP 侧每次进来的是 8k / PCM16 / 20ms
process()中把 8k 上采样到 16k- 通过现有的
RealtimeModelBridge异步送给 Qwen Realtime - Qwen 下行音频通过
RealtimeBridgeCallback.sendBinary(...)回来 - 下行默认按 24k PCM16 处理,再重采样回 8k
process()每次从缓冲队列里取一帧 8k 音频返回给 RTP- 如果当前还没有模型音频,返回
null
代码里我把依赖都按现有工程结构组织好了,并且没有改 RealtimeModelBridge 的接口。
AudioResampler
package com.litongjava.sip.rtp.codec;
import java.util.Arrays;
import java.util.Objects;
/**
* 简单线性插值重采样
* 适合当前语音链路对接,依赖少、易落地
*/
public final class AudioResampler {
private AudioResampler() {
}
public static short[] resample(short[] input, int srcRate, int dstRate) {
Objects.requireNonNull(input, "input");
if (input.length == 0) {
return new short[0];
}
if (srcRate <= 0 || dstRate <= 0) {
throw new IllegalArgumentException("sample rate must be > 0");
}
if (srcRate == dstRate) {
return Arrays.copyOf(input, input.length);
}
double ratio = (double) dstRate / (double) srcRate;
int outputLength = Math.max(1, (int) Math.round(input.length * ratio));
short[] output = new short[outputLength];
for (int i = 0; i < outputLength; i++) {
double srcIndex = i / ratio;
int left = (int) Math.floor(srcIndex);
int right = Math.min(left + 1, input.length - 1);
double frac = srcIndex - left;
double sample = input[left] * (1.0 - frac) + input[right] * frac;
output[i] = clampToShort(sample);
}
return output;
}
private static short clampToShort(double v) {
if (v > Short.MAX_VALUE) {
return Short.MAX_VALUE;
}
if (v < Short.MIN_VALUE) {
return Short.MIN_VALUE;
}
return (short) Math.round(v);
}
}
PcmCodec
package com.litongjava.sip.rtp.codec;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
* PCM 编解码工具
*/
public final class PcmCodec {
private PcmCodec() {
}
public static byte[] shortsToLittleEndianBytes(short[] samples) {
if (samples == null || samples.length == 0) {
return new byte[0];
}
ByteBuffer buffer = ByteBuffer.allocate(samples.length * 2).order(ByteOrder.LITTLE_ENDIAN);
for (short s : samples) {
buffer.putShort(s);
}
return buffer.array();
}
public static short[] littleEndianBytesToShorts(byte[] bytes) {
if (bytes == null || bytes.length < 2) {
return new short[0];
}
int len = bytes.length / 2;
short[] out = new short[len];
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < len; i++) {
out[i] = buffer.getShort();
}
return out;
}
}
RealtimeSetupFactory
package com.litongjava.voice.agent.sip;
import com.litongjava.template.PromptEngine;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
public final class RealtimeSetupFactory {
private RealtimeSetupFactory() {
}
public static RealtimeSetup buildFromEnv() {
String systemPrompt = PromptEngine.renderToString("VOICE_AGENT_SYSTEM_PROMPT");
String userPrompt = PromptEngine.renderToString("VOICE_AGENT_USER_PROMPT");
String jobDescription = PromptEngine.renderToString("VOICE_AGENT_JOB_DESCRIPTION");
String resume = PromptEngine.renderToString("VOICE_AGENT_RESUME");
String questions = PromptEngine.renderToString("VOICE_AGENT_QUESTIONS");
String greeting = PromptEngine.renderToString("VOICE_AGENT_GREETING");
return new RealtimeSetup(systemPrompt, userPrompt, jobDescription, resume, questions, greeting);
}
}
RealtimeSetupCallback
package com.litongjava.voice.agent.callback;
import com.litongjava.sip.model.CallSession;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
public interface RealtimeSetupCallback {
RealtimeSetup getRealtimeSetup(CallSession session);
}
SipRealtimeSetupCallback
package com.litongjava.voice.agent.sip;
import com.litongjava.sip.model.CallSession;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;
public class SipRealtimeSetupCallback implements RealtimeSetupCallback {
@Override
public RealtimeSetup getRealtimeSetup(CallSession session) {
return RealtimeSetupFactory.buildFromEnv();
}
}
SipRealtimeSession
package com.litongjava.voice.agent.sip;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SipRealtimeSession {
private static final int MODEL_OUTPUT_SAMPLE_RATE = 24000;
private static final int SIP_SAMPLE_RATE = 8000;
private final String callId;
private final RealtimeModelBridge bridge;
private final SipRealtimeBridgeCallback callback;
private final RealtimeSetupCallback realtimeSetupCallback;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Short> outputQueue = new ConcurrentLinkedQueue<>();
public SipRealtimeSession(String callId, RealtimeModelBridge bridge, SipRealtimeBridgeCallback callback,
RealtimeSetupCallback realtimeSetupCallback) {
this.callId = callId;
this.bridge = bridge;
this.callback = callback;
this.realtimeSetupCallback = realtimeSetupCallback;
}
public void ensureConnected(CallSession session) {
if (connected.compareAndSet(false, true)) {
RealtimeSetup setup = null;
if (realtimeSetupCallback != null) {
setup = realtimeSetupCallback.getRealtimeSetup(null);
}
callback.start(setup);
bridge.connect(setup).exceptionally(ex -> {
log.error("bridge connect failed, callId={}", callId, ex);
connected.set(false);
return null;
});
}
}
public void sendToModel(byte[] pcm16kBytes) {
if (pcm16kBytes == null || pcm16kBytes.length == 0) {
return;
}
bridge.sendPcm16k(pcm16kBytes).exceptionally(ex -> {
log.warn("sendPcm16k failed, callId={}", callId, ex);
return null;
});
}
public void appendModelAudio(byte[] pcmBytes) {
if (pcmBytes == null || pcmBytes.length == 0) {
return;
}
short[] pcm24k = PcmCodec.littleEndianBytesToShorts(pcmBytes);
if (pcm24k.length == 0) {
return;
}
short[] pcm8k = AudioResampler.resample(pcm24k, MODEL_OUTPUT_SAMPLE_RATE, SIP_SAMPLE_RATE);
for (short sample : pcm8k) {
outputQueue.offer(sample);
}
}
public short[] takeOutputFrame(int frameSamples) {
if (frameSamples <= 0) {
return null;
}
if (outputQueue.peek() == null) {
return null;
}
short[] out = new short[frameSamples];
int i = 0;
for (; i < frameSamples; i++) {
Short value = outputQueue.poll();
if (value == null) {
break;
}
out[i] = value;
}
if (i < frameSamples) {
Arrays.fill(out, i, frameSamples, (short) 0);
}
return out;
}
public void close() {
try {
bridge.close().getNow(null);
} catch (Exception e) {
log.warn("bridge.close failed, callId={}", callId, e);
} finally {
outputQueue.clear();
connected.set(false);
}
}
public String getCallId() {
return callId;
}
}
SipSessionRegistry
package com.litongjava.voice.agent.sip;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
public class SipSessionRegistry {
private final Map<String, SipRealtimeSession> sessions = new ConcurrentHashMap<>();
public SipRealtimeSession getOrCreate(String callId, Function<String, SipRealtimeSession> creator) {
return sessions.computeIfAbsent(callId, creator);
}
public SipRealtimeSession get(String callId) {
return sessions.get(callId);
}
public void remove(String callId) {
SipRealtimeSession session = sessions.remove(callId);
if (session != null) {
session.close();
}
}
public void clear() {
for (SipRealtimeSession session : sessions.values()) {
if (session != null) {
session.close();
}
}
sessions.clear();
}
}
SipRealtimeBridgeCallback
package com.litongjava.voice.agent.sip;
import com.litongjava.tio.utils.hutool.StrUtil;
import com.litongjava.voice.agent.bridge.RealtimeBridgeCallback;
import com.litongjava.voice.agent.bridge.RealtimeSetup;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SipRealtimeBridgeCallback implements RealtimeBridgeCallback {
private final String callId;
private volatile SipRealtimeSession sipSession;
public SipRealtimeBridgeCallback(String callId) {
this.callId = callId;
}
public void bind(SipRealtimeSession sipSession) {
this.sipSession = sipSession;
}
@Override
public void start(RealtimeSetup setup) {
log.info("realtime callback start, callId={}", callId);
}
@Override
public void sendText(String text) {
if (StrUtil.isNotBlank(text)) {
log.debug("realtime text event, callId={}, text={}", callId, text);
}
}
@Override
public void sendBinary(byte[] bytes) {
SipRealtimeSession session = this.sipSession;
if (session == null || bytes == null || bytes.length == 0) {
return;
}
session.appendModelAudio(bytes);
}
@Override
public void close(String reason) {
log.info("realtime callback close, callId={}, reason={}", callId, reason);
}
@Override
public void session(String sessionId) {
// TODO Auto-generated method stub
}
@Override
public void turnComplete(String role, String text) {
// TODO Auto-generated method stub
}
}
RealtimeMediaProcessor
package com.litongjava.voice.agent.sip;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioResampler;
import com.litongjava.sip.rtp.codec.PcmCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
import com.litongjava.voice.agent.bridge.RealtimeModelBridge;
import com.litongjava.voice.agent.bridge.RealtimeModelBridgeFactory;
import com.litongjava.voice.agent.callback.RealtimeSetupCallback;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RealtimeMediaProcessor implements MediaProcessor {
private final String platform;
private final RealtimeSetupCallback realtimeSetupCallback;
private final SipSessionRegistry sessionRegistry;
public RealtimeMediaProcessor(RealtimeSetupCallback realtimeSetupCallback) {
this(EnvUtils.getStr("vioce.agent.platform"), realtimeSetupCallback, new SipSessionRegistry());
}
public RealtimeMediaProcessor(String platform, RealtimeSetupCallback realtimeSetupCallback,
SipSessionRegistry sessionRegistry) {
this.platform = platform;
this.realtimeSetupCallback = realtimeSetupCallback;
this.sessionRegistry = sessionRegistry;
}
@Override
public AudioFrame process(AudioFrame input, CallSession session) {
if (input == null || session == null) {
return null;
}
String callId = getCallId(session);
if (StrUtil.isBlank(callId)) {
log.warn("callId is blank, skip processing");
return null;
}
SipRealtimeSession sipSession = sessionRegistry.getOrCreate(callId, this::createSipSession);
try {
sipSession.ensureConnected(session);
short[] in8k = input.getSamples();
if (in8k == null || in8k.length == 0) {
return null;
}
short[] pcm16k = AudioResampler.resample(in8k, 8000, 16000);
byte[] pcm16kBytes = PcmCodec.shortsToLittleEndianBytes(pcm16k);
sipSession.sendToModel(pcm16kBytes);
short[] out8k = sipSession.takeOutputFrame(in8k.length);
if (out8k == null) {
return null;
}
return new AudioFrame(out8k, 8000, 1, input.getRtpTimestamp());
} catch (Exception e) {
log.error("process failed, callId={}", callId, e);
return null;
}
}
public void close(CallSession session) {
if (session == null) {
return;
}
closeByCallId(getCallId(session));
}
public void closeByCallId(String callId) {
if (StrUtil.isBlank(callId)) {
return;
}
sessionRegistry.remove(callId);
}
public void closeAll() {
sessionRegistry.clear();
}
private SipRealtimeSession createSipSession(String callId) {
SipRealtimeBridgeCallback callback = new SipRealtimeBridgeCallback(callId);
RealtimeModelBridge bridge = RealtimeModelBridgeFactory.createBridge(platform, callback);
SipRealtimeSession sipSession = new SipRealtimeSession(callId, bridge, callback, realtimeSetupCallback);
callback.bind(sipSession);
log.info("created realtime sip session, callId={}", callId);
return sipSession;
}
private String getCallId(CallSession session) {
try {
String callId = session.getCallId();
return callId == null ? null : callId.trim();
} catch (Exception e) {
log.warn("failed to get callId from CallSession", e);
return null;
}
}
}
SipServerConfig
现在在 SipServerConfig 里,把这段:
MediaProcessor echoMediaProcessor = new EchoMediaProcessor();
替换成:
MediaProcessor mediaProcessor = new RealtimeMediaProcessor(sipRealtimeSetupCallback);
再把后面的 echoMediaProcessor 改成 mediaProcessor 即可:
SipTcpServerHandler tcpHandler = new SipTcpServerHandler(
localIp, sessionManager, rtpServerManager, mediaProcessor);
SipUdpServerHandler udpHandler = new SipUdpServerHandler(
localIp, sessionManager, rtpServerManager, mediaProcessor);
