使用tio-boot搭建openai 代理服务
本文档介绍如何基于 tio-boot 框架快速搭建一个 OpenAI 代理服务。该代理服务将接收来自客户端的 ChatGPT 请求(包括流式和非流式模式),并通过 OkHttp 将请求转发至 OpenAI API,同时负责将 OpenAI 返回的结果以 HTTP 响应或 SSE(Server-Sent Events)流的形式返回给客户端。
1. 概述
使用场景:有时前端或其他后端服务由于网络策略(如不能直接访问 OpenAI API 域名)、权限或安全原因,需要中转 OpenAI 请求到代理服务,由该服务统一转发和聚合响应。本示例即演示如何用
tio-boot
快速搭建这种中转代理。核心思路:
- 前端请求发送至我们本地运行的 tio-boot HTTP 服务(地址如
http://127.0.0.1:80/openai/v1/chat/completions
) - 我们根据请求体中的
stream
字段决定采用异步流式(SSE)还是同步普通 HTTP 调用 - 使用 OkHttp 发送请求到 OpenAI API(同步或异步),将 OpenAI 返回的结果(JSON 或 SSE 流式增量数据)转发回前端
- 前端请求发送至我们本地运行的 tio-boot HTTP 服务(地址如
2. 环境准备
JDK:建议使用 Java 8 或更高版本
IDE:IntelliJ IDEA、Eclipse 等(已安装 Lombok 插件)
Maven/Gradle:任意一种构建工具,下面示例基于 Maven
依赖库:
pom.xml
中至少包含以下依赖:<!-- Tio Boot admin 框架 --> <dependency> <groupId>com.litongjava</groupId> <artifactId>tio-boot-admin</artifactId> <version>1.0.3</version> </dependency> <!-- fastjson2 --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.30</version> </dependency> <!-- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> <scope>provided</scope> </dependency>
环境变量:
OPENAI_API_URL
:若需自定义代理的 OpenAI API 地址(默认值https://api.openai.com/v1
)OPENAI_API_KEY
:你的 OpenAI API Key
3. 项目结构
建议的 Maven 项目目录结构如下:
llm-proxy-app/
├─ src/
│ ├─ main/
│ │ ├─ java/
│ │ │ ├─ com/litongjava/llm/proxy/
│ │ │ │ ├─ LLMProxyApp.java
│ │ │ ├─ com/litongjava/llm/proxy/config/
│ │ │ │ └─ LLMProxyConfig.java
│ │ │ ├─ com/litongjava/llm/proxy/handler/
│ │ │ │ └─ OpenAIV1ChatHandler.java
│ │ │ ├─ com/litongjava/llm/proxy/callback/
│ │ │ │ └─ OpenAIProxyCallback.java
│ │ └─ resources/
│ │ └─ app.properties
└─ pom.xml
LLMProxyApp.java
:程序入口,启动 tio-boot ServerLLMProxyConfig.java
:实现BootConfiguration
,注册路由路径与处理方法OpenAIV1ChatHandler.java
:核心业务处理类,根据stream
字段决定同步/异步调用OpenAIProxyCallback.java
:异步回调,用于将 OpenAI SSE 流数据逐行推送给前端OpenAiClient.java
:封装 OkHttp 同步/异步调用 OpenAI 的方法
4. 关键代码说明
下面对项目中的核心 Java 类进行详细说明。
4.1 OpenAIV1ChatHandler.java(请求入口)
package com.litongjava.llm.proxy.handler;
import java.io.IOException;
import java.util.Map;
import com.alibaba.fastjson2.JSONObject;
import com.litongjava.llm.proxy.callback.OpenAIProxyCallback;
import com.litongjava.model.body.RespBodyVo;
import com.litongjava.openai.client.OpenAiClient;
import com.litongjava.tio.boot.http.TioRequestContext;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.utils.HttpIpUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
import com.litongjava.tio.utils.json.FastJson2Utils;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
/**
* OpenAI V1 Chat Handler:接收前端请求后,分两种模式执行:
* - stream = true:异步 SSE 模式,使用 OpenAIProxyCallback 逐行推送
* - stream = false:同步普通 JSON 返回模式
*/
@Slf4j
public class OpenAIV1ChatHandler {
public HttpResponse completions(HttpRequest httpRequest) {
long start = System.currentTimeMillis();
HttpResponse httpResponse = TioRequestContext.getResponse();
String requestURI = httpRequest.getRequestURI();
String bodyString = httpRequest.getBodyString();
// 1. 校验 body 是否为空
if (StrUtil.isBlank(bodyString)) {
return httpResponse.setJson(RespBodyVo.fail("empty body"));
}
// 2. 记录客户端 IP、请求路径
String realIp = HttpIpUtils.getRealIp(httpRequest);
log.info("from:{}, requestURI: {}", realIp, requestURI);
// 3. 获取并清理请求头(去掉 host、accept-encoding,避免转发问题)
Map<String, String> headers = httpRequest.getHeaders();
headers.remove("host");
headers.remove("accept-encoding");
// 4. 判断是否是流式请求
Boolean stream = true;
JSONObject openAiRequestVo = FastJson2Utils.parseObject(bodyString);
if (openAiRequestVo != null) {
stream = openAiRequestVo.getBoolean("stream");
}
if (Boolean.TRUE.equals(stream)) {
// 4.1 流式(SSE)模式: 禁止默认处理器自动发送 body,自己用 SseEmitter 推送
httpResponse.setSend(false);
ChannelContext channelContext = httpRequest.getChannelContext();
// 创建回调对象,将会逐行推送 OpenAI SSE 数据
OpenAIProxyCallback callback =
new OpenAIProxyCallback(channelContext, httpResponse, start);
// 调用 OpenAiClient 异步请求,并传入 callback
OpenAiClient.chatCompletions(headers, bodyString, callback);
} else {
// 4.2 同步普通模式: 直接转发请求并同步等待响应
try (Response response =
OpenAiClient.chatCompletions(headers, bodyString)) {
// 将 OpenAI 返回的 JSON 字符串写回前端
String respBody = response.body().string();
httpResponse.setString(respBody, "utf-8", "application/json");
log.info("chat request: {}, response: {}", bodyString, respBody);
} catch (IOException e) {
log.error("OpenAI 同步请求失败", e);
return httpResponse.setJson(RespBodyVo.fail(e.getMessage()));
}
long end = System.currentTimeMillis();
log.info("finish llm in {} ms", (end - start));
}
return httpResponse;
}
}
要点说明:
httpRequest.getBodyString()
读取前端传过来的 JSON。headers.remove("host")/remove("accept-encoding")
:避免把本地的 Host、浏览器端的 accept-encoding 转发到 OpenAI,否则可能出现压缩/域名不匹配等问题。根据
stream
字段分两种分支:- 流式模式:
httpResponse.setSend(false)
禁止 Tio 默认把 body 写回,改为通过OpenAIProxyCallback
手动推送 SSE。 - 非流式模式:直接同步调用
OpenAiClient.chatCompletions(...)
,拿到Response
后把response.body().string()
写回即可。
- 流式模式:
4.2 OpenAIProxyCallback.java(SSE 回调处理)
package com.litongjava.llm.proxy.callback;
import java.io.IOException;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.litongjava.model.body.RespBodyVo;
import com.litongjava.tio.boot.utils.OkHttpResponseUtils;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HeaderName;
import com.litongjava.tio.http.common.HeaderValue;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.http.common.encoder.ChunkEncoder;
import com.litongjava.tio.http.common.sse.ChunkedPacket;
import com.litongjava.tio.http.server.util.SseEmitter;
import com.litongjava.tio.utils.SystemTimer;
import com.litongjava.tio.utils.json.FastJson2Utils;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
import okhttp3.ResponseBody;
/**
* OpenAI SSE 回调:将 OpenAI 返回的每一行增量数据(delta)按 SSE 格式推送给前端。
*/
@Slf4j
public class OpenAIProxyCallback implements Callback {
private ChannelContext channelContext;
private HttpResponse httpResponse;
private long start;
public OpenAIProxyCallback(
ChannelContext channelContext,
HttpResponse httpResponse,
long start) {
this.channelContext = channelContext;
this.httpResponse = httpResponse;
this.start = start;
}
@Override
public void onFailure(Call call, IOException e) {
// 请求 OpenAI 失败,直接返回失败 JSON
e.printStackTrace();
httpResponse.setSend(true);
httpResponse.setJson(RespBodyVo.fail(e.getMessage()));
Tio.send(channelContext, httpResponse);
}
@Override
public void onResponse(Call call, Response response) throws IOException {
// 1. 如果 OpenAI 返回非 2xx,直接透传状态码与 body
if (!response.isSuccessful()) {
httpResponse.setSend(true);
OkHttpResponseUtils.toTioHttpResponse(response, httpResponse);
httpResponse.setHasGzipped(true);
httpResponse.removeHeaders("Content-Length");
Tio.send(channelContext, httpResponse);
return;
}
// 2. 设置 SSE 响应头
httpResponse.addServerSentEventsHeader();
httpResponse.addHeader(HeaderName.Keep_Alive, HeaderValue.from("timeout=60"));
httpResponse.addHeader(HeaderName.Transfer_Encoding, HeaderValue.from("chunked"));
// 第一次要先把 HTTP 响应头发送给客户端(告知客户端这是 SSE 长链接)
if (!httpResponse.isSend()) {
Tio.send(channelContext, httpResponse);
}
// 3. 通过 try-with-resources 自动关闭 ResponseBody
try (ResponseBody responseBody = response.body()) {
if (responseBody == null) {
String errorMsg = "response body is null";
log.error(errorMsg);
byte[] errBytes = errorMsg.getBytes();
ChunkedPacket ssePacket =
new ChunkedPacket(ChunkEncoder.encodeChunk(errBytes));
Tio.send(channelContext, ssePacket);
SseEmitter.closeChunkConnection(channelContext);
return;
}
// 4. 逐行读取 OpenAI SSE 返回的内容
String line;
while ((line = responseBody.source().readUtf8Line()) != null) {
// OpenAI SSE 每一行以 "data: {...}\n" 形式返回,因此要拆分出真正的 JSON
// 然后提取 delta.content 并推送
if (line.length() < 1) {
// 空行,忽略
continue;
}
// 将整行字节原封不动地以 SSE chunk 发送给前端(前端自行解析 data 简单拆分)
byte[] chunkBytes = (line + "\n\n").getBytes();
if (line.length() > 6 && line.contains(":")) {
int idx = line.indexOf(':');
String jsonPart = line.substring(idx + 1).trim();
if (jsonPart.endsWith("}")) {
JSONObject parsed = FastJson2Utils.parseObject(jsonPart);
JSONArray choices = parsed.getJSONArray("choices");
if (!choices.isEmpty()) {
// 如果 delta.content 存在,才 push
String content =
choices.getJSONObject(0)
.getJSONObject("delta")
.getString("content");
if (content != null) {
SseEmitter.pushChunk(channelContext, chunkBytes);
}
// 这里可以解析更多字段,如 function_call、tool_calls 等
extraChoices(choices);
}
} else {
// 某些非 JSON 的行,也 push(如 [DONE] 等)
SseEmitter.pushChunk(channelContext, chunkBytes);
}
} else {
// 对于长度较小的控制行(如 data: [DONE]),也 push
SseEmitter.pushChunk(channelContext, chunkBytes);
}
}
// 5. 读取结束后关闭 SSE 连接(发送 0-length chunk)
SseEmitter.closeChunkConnection(channelContext);
log.info("elapsed: {} ms", (SystemTimer.currTime - start));
}
}
/**
* 可选:解析 choices 里的 function_call、tool_calls 等更多字段,用于后续业务处理
*/
private void extraChoices(JSONArray choices) {
for (int i = 0; i < choices.size(); i++) {
JSONObject delta = choices.getJSONObject(i).getJSONObject("delta");
// 解析 delta.content、delta.function_call 和 delta.tool_calls
// …(此处可根据业务需要自行实现)…
}
}
}
要点说明:
onResponse
里用try (ResponseBody responseBody = response.body())
- 通过 try-with-resources 确保最终关闭
responseBody
,释放底层 Socket 或归还连接池。 - 在循环退出时会自动调用
responseBody.close()
,无需手动再关闭。
- 通过 try-with-resources 确保最终关闭
SSE 头与 Chunked
httpResponse.addServerSentEventsHeader()
:相当于设置Content-Type: text/event-stream
、Cache-Control: no-cache
等头。httpResponse.addHeader(HeaderName.Transfer_Encoding, HeaderValue.from("chunked"))
:启用 HTTP Chunked 模式,将每次推送都作为一个 chunk 发送。SseEmitter.pushChunk(channelContext, chunkBytes)
:把一段字节(data: ...\n\n
)包装成ChunkedPacket
并推送。
推送增量数据
- 每次
readUtf8Line()
读到一行:如果该行以data:
开头且 JSON 完整,则解析出choices[0].delta.content
,只有当content != null
时才pushChunk
。否则遇到诸如data: [DONE]
、data: { … }
等行,也原封不动地推送,最终让前端知道流已结束。
- 每次
SSE 结束
- 循环退出后,调用
SseEmitter.closeChunkConnection(channelContext)
:发送一个空的 chunk(长度为 0),告诉客户端 Stream 结束。前端 EventSource 收到结束后会自动关闭,后端的 TCP 也随之断开。
- 循环退出后,调用
错误处理
- 如果
response.body() == null
,立即推送错误并调用SseEmitter.closeChunkConnection
关闭连接 - 如果 OpenAI 返回非 2xx,使用
OkHttpResponseUtils.toTioHttpResponse()
将状态码、响应头、消息体透传给客户端
- 如果
4.3 LLMProxyConfig.java(路由注册)
package com.litongjava.llm.proxy.config;
import com.litongjava.context.BootConfiguration;
import com.litongjava.llm.proxy.handler.OpenAIV1ChatHandler;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.http.server.router.HttpRequestRouter;
/**
* LLMProxyConfig 用于注册 HTTP 路由,将 /openai/v1/chat/completions 映射到 OpenAIV1ChatHandler.completions 方法
*/
public class LLMProxyConfig implements BootConfiguration {
@Override
public void config() {
TioBootServer server = TioBootServer.me();
HttpRequestRouter requestRouter = server.getRequestRouter();
// 注册路径:当收到 POST /openai/v1/chat/completions 时,调用 openAIV1ChatHandler.completions
OpenAIV1ChatHandler openAIV1ChatHandler = new OpenAIV1ChatHandler();
requestRouter.add(
"/openai/v1/chat/completions",
openAIV1ChatHandler::completions
);
}
}
要点说明:
- 实现
BootConfiguration
接口后,tio-boot 启动时会自动调用config()
。 - 通过
server.getRequestRouter().add(path, handlerMethod)
把 URI 路径与业务方法绑定。tio-boot 支持 Lambda 引用handler::method
。
4.4 LLMProxyApp.java(启动类)
package com.litongjava.llm.proxy;
import com.litongjava.llm.proxy.config.LLMProxyConfig;
import com.litongjava.tio.boot.TioApplication;
/**
* 程序入口:启动 tio-boot 应用
*/
public class LLMProxyApp {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 以当前类为 Spring-like 上下文启动,并加载 LLMProxyConfig 注册的路由
TioApplication.run(LLMProxyApp.class, new LLMProxyConfig(), args);
long end = System.currentTimeMillis();
System.out.println("启动耗时:" + (end - start) + " ms");
}
}
要点说明:
TioApplication.run(...)
会根据LLMProxyApp.class
与LLMProxyConfig
启动一个完整的 HTTP Server,默认端口 80。- 启动过程中会扫描到
LLMProxyConfig
,并把路由注册到TioBootServer
,使得后续客户端请求/openai/v1/chat/completions
时由OpenAIV1ChatHandler.completions
处理。
5. 启动与测试
准备 OpenAI API Key 请在
app.properties
(或系统环境变量)里配置:OPENAI_API_KEY=sk-********************** OPENAI_API_URL=https://api.openai.com/v1
或者在运行时以 JVM 参数方式传入:
java -DOPENAI_API_KEY=sk-*** -DOPENAI_API_URL=https://api.openai.com/v1 \ -jar llm-proxy-app.jar
编译打包
mvn clean package -DskipTests
启动服务
java -jar target/llm-proxy-app-1.0.0.jar
控制台输出示例:
1234ms [INFO] Server started. Listening on port 80
测试非流式请求
curl -X POST http://127.0.0.1/openai/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-4o", "stream": false, "messages": [ {"role":"system","content":"You are a helpful assistant."}, {"role":"user","content":"Hello, world!"} ] }'
代理服务会同步请求 OpenAI 并返回完整 JSON。例如:
{ "id": "chatcmpl-xxxxx", "object": "chat.completion", "choices": [ { "index": 0, "message": { "role": "assistant", "content": "Hello! How can I assist you today?" }, "finish_reason": "stop" } ], "usage": { … } }
测试流式 SSE 请求
curl -N -X POST http://127.0.0.1/openai/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-4o", "stream": true, "messages": [ {"role":"system","content":"You are a helpful assistant."}, {"role":"user","content":"Tell me a joke, please."} ] }'
-N
选项用于告诉 curl 保持连接,持续打印服务器推送的增量数据。你会看到形如:
data: {"id":"chatcmpl-xxxxx","choices":[{"delta":{"content":"Sure! Here’s a joke:"}}]} data: {"id":"chatcmpl-xxxxx","choices":[{"delta":{"content":" Why don't scientists trust atoms?"}}]} data: {"id":"chatcmpl-xxxxx","choices":[{"delta":{"content":" Because they make up everything!"}}]} data: [DONE]
6. 注意事项
ResponseBody
只读一次- 在
OpenAIV1ChatCallback
中,使用responseBody.source().readUtf8Line()
逐行读取时,不要再另外调用response.body()
或response.body().bytes()
,否则会导致流被重复消费而报错。 - 在非流式分支中,使用
response.body().string()
自动完成读取并关闭流。
- 在
禁止转发客户端的
accept-encoding
头- 我们在 Handler 里手动移除了
headers.remove("accept-encoding")
,避免 OpenAI 返回 Brotli 压缩流。若要开启 Brotli 支持,需要在 OkHttpClientPool 中添加BrotliInterceptor.INSTANCE
并在回调中做手动解码。
- 我们在 Handler 里手动移除了
并发与连接复用
- OkHttpClient 建议为全局单例复用,否则每个请求都会新建连接池和线程池,影响性能。
- 本示例假设
OpenAiClient
内部使用了共享的OkHttpClientPool.get300HttpClient()
,否则会出现资源无法复用。
SSE 超时与断开
- 我们在回调中设置了
Keep-Alive: timeout=60
,告知客户端 60 秒无数据后自动断开。SseEmitter.closeChunkConnection(channelContext)
会在结束时发送一个空 chunk 并关闭连接。
- 我们在回调中设置了
异常路径处理
- 如果在读取 SSE 过程中抛出 IOException,应在 catch 里输出日志并调用
SseEmitter.closeChunkConnection(...)
。
- 如果在读取 SSE 过程中抛出 IOException,应在 catch 里输出日志并调用
日志与调试
- 在开发环境,可在
OpenAIV1ChatHandler
的同步分支中打印response.headers()
以便调试是否返回了content-encoding: br
,若需 Brotli 解码,可在OkHttpClientPool
中添加addNetworkInterceptor(BrotliInterceptor.INSTANCE)
。
- 在开发环境,可在
7. 总结
本文档完整演示了如何基于 tio-boot
框架构建一个 OpenAI 代理服务,支持流式(SSE)和非流式两种模式。
主要组件:
OpenAIV1ChatHandler
:负责接收前端请求并根据stream
字段选择同步或异步调用。OpenAIProxyCallback
:用于异步 SSE 模式下,按行读取 OpenAI 的增量data
,并以 SSE chunk 形式推送给前端。LLMProxyConfig
:将/openai/v1/chat/completions
路径注册到 HandlerLLMProxyApp
:应用入口,启动 tio-boot HTTP Server
关键技术点:
- SSE (Server-Sent Events):HTTP/1.1 下的长连接推流方式,客户端使用
EventSource
监听服务器端增量更新。 - OkHttp 异步回调:通过
enqueue(callback)
实现非阻塞请求,回调函数在收到响应时往 TioChannelContext
写数据。 - try-with-resources 管理:通过
try (ResponseBody responseBody = response.body())
自动关闭底层资源。 - Header 清理与转发:移除
host
、accept-encoding
,避免域名/压缩协商冲突。
- SSE (Server-Sent Events):HTTP/1.1 下的长连接推流方式,客户端使用
通过本示例,你可以快速在本地或云服务器上运行一个面向前端的 ChatGPT 代理,统一管理 API Key、日志、流量限流等,并可集成到更大规模的应用架构中。
若需扩展,可考虑:
- 增加缓存层,减少重复 OpenAI 调用
- 针对不同模型、不同路由做动态转发
- 在 SSE 中解析更多字段(如 function_call、tool_calls),实现打通 Function Calling 逻辑