tio-core
使用 tio-core 添加独立 tcp 端口的 处理 tcp 请求
使用 tio-boot 内置 tio-core,可以可以 tcp-core 启动 tcp 服务,处理 tcp 数据, tio-boot 提供了两种处理 tcp 数据数据的方式
- 1.t-io core 需要使用单独端口
- 2.使用 tio-boot tcphander,无须独立的端口,复用 tio-boot server 的端口,同时支持 http 协议和 websocket 协议
启动 tcp-server
DemoPacket
import com.litongjava.tio.core.intf.Packet;
/**
* socket消息包
*/
@SuppressWarnings("serial")
public class DemoPacket extends Packet {
private byte[] body;
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
DemoTioServerListener
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.intf.Packet;
import com.litongjava.tio.server.intf.ServerAioListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DemoTioServerListener implements ServerAioListener {
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
}
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
}
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
}
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
}
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
}
/**
* 连接关闭前触发本方法
*
* @param channelContext the channelcontext
* @param throwable the throwable 有可能为空
* @param remark the remark 有可能为空
* @param isRemove
* @throws Exception
*/
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
log.info("关闭后清除认证信息");
Tio.unbindToken(channelContext);
}
/**
* @param channelContext
* @param interval 已经多久没有收发消息了,单位:毫秒
* @param heartbeatTimeoutCount 心跳超时次数,第一次超时此值是1,以此类推。此值被保存在:channelContext.stat.heartbeatTimeoutCount
* @return 返回true,那么服务器则不关闭此连接;返回false,服务器将按心跳超时关闭该连接
*/
public boolean onHeartbeatTimeout(ChannelContext channelContext, Long interval, int heartbeatTimeoutCount) {
log.info("心跳超时");
Tio.unbindToken(channelContext);
return false;
}
}
DemoTioServerHandler.java
import java.nio.ByteBuffer;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.TioDecodeException;
import com.litongjava.tio.core.intf.Packet;
import com.litongjava.tio.server.intf.ServerAioHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DemoTioServerHandler implements ServerAioHandler {
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
throws TioDecodeException {
log.info("buffer:{}", buffer);
// 获取由ByteBuffer支持的字节数组
byte[] bytes = new byte[readableLength];
buffer.get(bytes);
// 封装为DemoPacket
DemoPacket imPackage = new DemoPacket();
imPackage.setBody(bytes);
return imPackage;
}
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
DemoPacket helloPacket = (DemoPacket) packet;
byte[] body = helloPacket.getBody();
// ByteBuffer的总长度是消息体长度
int bodyLength = body.length;
log.info("encode:{}", bodyLength);
// 创建一个新的ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(bodyLength);
// 设置字节序
buffer.order(tioConfig.getByteOrder());
// 消息消息体
buffer.put(body);
return buffer;
}
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
DemoPacket packingPacket = (DemoPacket) packet;
byte[] body = packingPacket.getBody();
if (body == null) {
return;
}
String string = new String(body);
log.info("received:{}", string);
// 响应数据
String sendMessage = "收到了你的消息,你的消息是:" + string;
log.info("sendMessage:{}", sendMessage);
byte[] bytes = sendMessage.getBytes();
// 响应包
DemoPacket responsePacket = new DemoPacket();
responsePacket.setBody(bytes);
// 响应消息
log.info("开始响应");
Tio.send(channelContext, responsePacket);
log.info("响应完成");
}
}
TioServerConfig
package demo.tcp.config;
import java.io.IOException;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;
import com.litongjava.tio.server.intf.ServerAioHandler;
import com.litongjava.tio.server.intf.ServerAioListener;
import demo.tcp.server.DemoTioServerHandler;
import demo.tcp.server.DemoTioServerListener;
@AConfiguration
public class TioServerConfig {
@Initialization
public void demoTioServer() {
// handler, 包括编码、解码、消息处理
ServerAioHandler serverHandler = new DemoTioServerHandler();
// 事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
ServerAioListener serverListener = new DemoTioServerListener();
// 配置对象
ServerTioConfig tioServerConfig = new ServerTioConfig("tcp-server", serverHandler, serverListener);
// 设置心跳,-1 取消心跳
tioServerConfig.setHeartbeatTimeout(-1);
// TioServer对象
TioServer tioServer = new TioServer(tioServerConfig);
// 启动服务
try {
tioServer.start(null, 9998);
} catch (IOException e) {
e.printStackTrace();
}
}
}
启动类
package demo.tcp;
import com.litongjava.hotswap.wrapper.tio.boot.TioApplicationWrapper;
import com.litongjava.jfinal.aop.annotation.AComponentScan;
@AComponentScan
public class Main {
public static void main(String[] args) {
long start = System.currentTimeMillis();
TioApplicationWrapper.run(Main.class, args);
long end = System.currentTimeMillis();
System.out.println((end - start) + "(ms)");
}
}
上面的代码是一个使用 Java TIO 网络框架实现的简单服务器应用的示例。让我们逐部分进行解释:
DemoPacket
类(数据包定义)
- 目的:为服务器定义一个自定义的数据包结构。在网络通信中,数据包是数据的格式化单位。
- 主要元素:
- 继承自 Tio 框架的
Packet
类。 - 包含一个
byte[] body
用于存储数据载荷。
- 继承自 Tio 框架的
DemoTioServerListener
类(服务器事件监听器)
- 目的:实现
ServerAioListener
接口,定义各种服务器事件的行为。 - 关键功能:
onAfterConnected
、onAfterDecoded
等:在特定事件(如连接或解码数据包)后触发的方法。onBeforeClose
:在关闭连接之前执行的操作,例如解绑令牌。onHeartbeatTimeout
:管理超时的连接。
DemoTioServerHandler
类(服务器处理器)
- 目的:实现
ServerAioHandler
接口,处理数据包的编码、解码和处理。 - 关键功能:
decode
:将传入的原始数据转换为DemoPacket
对象。encode
:将DemoPacket
对象转换为传输的原始数据。handler
:处理接收到的数据包并发送响应。
TioServerConfig
类(tio-boot 配置)
- 目的:使用 tio-boot 框架的注解来配置并启动 Tio 服务器。
- 主要元素:
- 用
@AConfiguration
注解标记,表示这是一个 tio-boot 配置类。 - 包含一个用
@Initialization
注解的方法demoTioServer
,tio-boot 框架启动时会执行改方法,该方法启动 tioServer - 配置心跳超时、服务器处理器和监听器。
- 在指定端口(
6789
)上启动服务器。
- 用
整体流程
- 数据包定义:自定义数据包(
DemoPacket
)来携带数据。 - 事件处理:
DemoTioServerListener
监听服务器事件,如连接、断开连接和心跳。 - 数据处理:
DemoTioServerHandler
处理数据包的编码和解码,并处理传入的消息。 - 服务器设置和启动:
TioServerConfig
配置并启动 Tio 服务器,使用定义的处理器和监听器。
这段代码演示了 TIO 服务器的基本但完整的设置,包括数据包处理、事件监听、消息处理,以及与 tio-boot 框架的集成,便于管理和配置。
启动 tcp client
Tio-Boot 同样内置了 tcp-client,你可是使用 tcpClient 向服务端发送消息
DemoClientAioHandler
package demo.tcp.client;
import java.nio.ByteBuffer;
import com.litongjava.tio.client.intf.ClientAioHandler;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.TioDecodeException;
import com.litongjava.tio.core.intf.Packet;
import demo.tcp.server.DemoPacket;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DemoClientAioHandler implements ClientAioHandler {
/**
* 解码:把接收到的ByteBuffer解码成应用可以识别的业务消息包
*/
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext var5)
throws TioDecodeException {
log.info("buffer:{}", buffer);
// 转换前准备ByteBuffer
int length = buffer.remaining();
// 获取由ByteBuffer支持的字节数组
byte[] bytes = new byte[length];
buffer.get(bytes);
// 封装为DemoPacket
DemoPacket imPackage = new DemoPacket();
imPackage.setBody(bytes);
return imPackage;
}
/**
* 编码:把业务消息包编码为可以发送的ByteBuffer
*/
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext chanelContext) {
DemoPacket helloPacket = (DemoPacket) packet;
byte[] body = helloPacket.getBody();
// ByteBuffer的总长度是消息体长度
int bodyLength = body.length;
log.info("encode:{}", bodyLength);
// 创建一个新的ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(bodyLength);
// 设置字节序
buffer.order(tioConfig.getByteOrder());
// 消息消息体
buffer.put(body);
return buffer;
}
/**
* 处理消息
*/
@Override
public void handler(Packet packet, ChannelContext var2) throws Exception {
log.info("handler");
DemoPacket helloPacket = (DemoPacket) packet;
byte[] body = helloPacket.getBody();
if (body != null) {
String str = new String(body);
System.out.println("received::" + str);
}
}
/**
* 此方法如果返回null,框架层面则不会发出心跳,如果返回非null,框架层面会定时发送本方法返回的消息包
*/
@Override
public Packet heartbeatPacket(ChannelContext var1) {
return null;
}
}
DemoTioClient
package demo.tcp.client;
import java.io.IOException;
import com.litongjava.tio.client.ClientChannelContext;
import com.litongjava.tio.client.ClientTioConfig;
import com.litongjava.tio.client.ReconnConf;
import com.litongjava.tio.client.TioClient;
import com.litongjava.tio.client.intf.ClientAioHandler;
import com.litongjava.tio.client.intf.ClientAioListener;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.Tio;
import demo.tcp.server.DemoPacket;
public class DemoTioClient {
/**
* 启动程序
*/
public static void main(String[] args) throws Exception {
// 服务器节点
Node serverNode = new Node("127.0.0.1", 9998);
// handler,包含编解码,消息处理
ClientAioHandler clientAioHandler = new DemoClientAioHandler();
// 初始化
ClientChannelContext clientChannelContext = init(serverNode, clientAioHandler);
// 发送消息
send(clientChannelContext, "Hello,World");
}
public static ClientChannelContext init(Node serverNode, ClientAioHandler clientAioHandler)
throws IOException, Exception {
// 事件监听器,可以为null,但是建议自己实现该接口,可以参考showcase
ClientAioListener clientAioListener = null;
// 断链后自动连接,不自动连接设置为null
ReconnConf reconnConf = new ReconnConf(50000L);
// 共用上下文对象
ClientTioConfig clientTioConfig = new ClientTioConfig(clientAioHandler, clientAioListener, reconnConf);
// 发送消息客户端
TioClient tioClient;
// 客户端通道上下文,连接服务器后获得
ClientChannelContext clientChannelContext;
// 设置心跳时间
clientTioConfig.setHeartbeatTimeout(0);
// 初始化client
tioClient = new TioClient(clientTioConfig);
// 连接服务器
clientChannelContext = tioClient.connect(serverNode);
return clientChannelContext;
}
private static void send(ClientChannelContext clientChannelContext, String message) {
DemoPacket helloPacket = new DemoPacket();
helloPacket.setBody(message.getBytes());
Tio.send(clientChannelContext, helloPacket);
}
}
日志
测试成功后显示的日志如下
客户端日志
18:50:14.715 [tio-timer-reconnect-1] ERROR com.litongjava.tio.client.TioClient - closeds:0, connections:0
18:50:14.715 [tio-timer-heartbeat1] WARN com.litongjava.tio.client.TioClient - The user has cancelled the heartbeat sending function at the frame level, and asks the user to complete the heartbeat mechanism by himsel
18:50:14.992 [tio-group-2] INFO com.litongjava.tio.client.ConnectionCompletionHandler - connected to 127.0.0.1:9998
18:50:14.995 [tio-worker-2] INFO demo.tcp.client.DemoClientAioHandler - encode:11
18:50:14.997 [tio-group-3] DEBUG com.litongjava.tio.core.ChannelContext - server:127.0.0.1:9998, client:0:0:0:0:0:0:0:0:1311 Sent
18:50:14.998 [tio-group-4] INFO demo.tcp.client.DemoClientAioHandler - buffer:java.nio.HeapByteBuffer[pos=0 lim=51 cap=20480]
18:50:14.999 [tio-group-4] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:127.0.0.1:9998, client:0:0:0:0:0:0:0:0:1311, Unpacking to get a packet:
18:50:15.000 [tio-group-4] INFO demo.tcp.client.DemoClientAioHandler - handler
received::收到了你的消息,你的消息是:Hello,World
18:50:15.000 [tio-group-4] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:127.0.0.1:9998, client:0:0:0:0:0:0:0:0:1311,After grouping the packets, the data just ran out
服务端日志
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - buffer:java.nio.HeapByteBuffer[pos=0 lim=11 cap=20480]
18:50:14.997 [tio-group-10] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:0.0.0.0:9998, client:127.0.0.1:1311, Unpacking to get a packet:
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - received:Hello,World
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - sendMessage:收到了你的消息,你的消息是:Hello,World
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - 开始响应
18:50:14.997 [tio-group-10] INFO demo.tcp.server.DemoTioServerHandler - 响应完成
18:50:14.997 [tio-group-10] DEBUG com.litongjava.tio.core.task.DecodeRunnable - server:0.0.0.0:9998, client:127.0.0.1:1311,After grouping the packets, the data just ran out
18:50:14.998 [tio-worker-5] INFO demo.tcp.server.DemoTioServerHandler - encode:51
18:50:14.998 [tio-group-11] DEBUG com.litongjava.tio.core.ChannelContext - server:0.0.0.0:9998, client:127.0.0.1:1311 Sent
18:50:27.310 [tio-worker-6] INFO demo.tcp.server.DemoTioServerListener - 关闭后清除认证信息
18:50:27.310 [tio-worker-6] DEBUG com.litongjava.tio.core.maintain.Tokens - tcp-server, server:0.0.0.0:9998, client:127.0.0.1:1311, 并没有绑定Token
18:50:27.310 [tio-worker-6] DEBUG com.litongjava.tio.core.maintain.Users - tcp-server, server:0.0.0.0:9998, client:127.0.0.1:1311, 并没有绑定用户
18:50:27.310 [tio-worker-6] DEBUG com.litongjava.tio.core.maintain.Tokens - tcp-server, server:0.0.0.0:9998, client:127.0.0.1:1311, 并没有绑定Token
测试代码地址
https://github.com/litongjava/java-ee-tio-boot-study/tree/main/tio-boot-latest-study/tio-boot-tcp-server-demo01