DecodeRunnable
概述
t-io
对数据的解码过程是在 DecodeRunnable
中完成的,每个 TCP 连接对应一个 DecodeRunnable
实例。除了正常的解码外,半包和粘包的处理也都在 DecodeRunnable
中完成。
源代码
以下是 DecodeRunnable
类的源代码:
package com.litongjava.tio.core.task;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.ChannelContext.CloseCode;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.AioDecodeException;
import com.litongjava.tio.core.intf.Packet;
import com.litongjava.tio.core.stat.ChannelStat;
import com.litongjava.tio.core.stat.IpStat;
import com.litongjava.tio.core.utils.ByteBufferUtils;
import com.litongjava.tio.utils.SystemTimer;
import com.litongjava.tio.utils.hutool.CollUtil;
import com.litongjava.tio.utils.queue.FullWaitQueue;
import com.litongjava.tio.utils.queue.TioFullWaitQueue;
import com.litongjava.tio.utils.thread.pool.AbstractQueueRunnable;
/**
* 解码任务对象,一个连接对应一个本对象
*/
@SuppressWarnings("deprecation")
public class DecodeRunnable extends AbstractQueueRunnable<ByteBuffer> {
private static final Logger log = LoggerFactory.getLogger(DecodeRunnable.class);
private ChannelContext channelContext = null;
private TioConfig tioConfig = null;
// 上一次解码剩下的数据
private ByteBuffer lastByteBuffer = null;
// 新收到的数据
private ByteBuffer newReceivedByteBuffer = null;
// 上次解码进度百分比
private int lastPercentage = 0;
public DecodeRunnable(ChannelContext channelContext, Executor executor) {
super(executor);
this.channelContext = channelContext;
this.tioConfig = channelContext.tioConfig;
getMsgQueue();
}
@Override
public void runTask() {
while ((newReceivedByteBuffer = msgQueue.poll()) != null) {
decode();
}
}
public void decode() {
ByteBuffer byteBuffer = newReceivedByteBuffer;
if (lastByteBuffer != null) {
byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
lastByteBuffer = null;
}
while (true) {
try {
int initPosition = byteBuffer.position();
int limit = byteBuffer.limit();
int readableLength = limit - initPosition;
Packet packet = null;
if (channelContext.packetNeededLength != null) {
if (log.isDebugEnabled()) {
log.debug("{}, Length required for decoding:{}", channelContext, channelContext.packetNeededLength);
}
if (readableLength >= channelContext.packetNeededLength) {
packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
} else {
int percentage = (int) (((double) readableLength / channelContext.packetNeededLength) * 100);
if (percentage != lastPercentage) {
lastPercentage = percentage;
log.info("Receiving large packet: received {}% of {} bytes.", percentage, channelContext.packetNeededLength);
}
lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
return;
}
} else {
try {
packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
} catch (BufferUnderflowException e) {
// 数据不够读
}
}
if (packet == null) { // 数据不够,解不了码
if (tioConfig.useQueueDecode || (byteBuffer != newReceivedByteBuffer)) {
byteBuffer.position(initPosition);
byteBuffer.limit(limit);
lastByteBuffer = byteBuffer;
} else {
lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
}
ChannelStat channelStat = channelContext.stat;
channelStat.decodeFailCount++;
if (log.isInfoEnabled()) {
log.info("{} Failed to decode this time, has failed to decode for {} consecutive times, the length of data involved in decoding is {} bytes.", channelContext, channelStat.decodeFailCount,
readableLength);
}
// 检查慢包攻击
if (channelStat.decodeFailCount > 10) {
int per = readableLength / channelStat.decodeFailCount;
if (per < Math.min(channelContext.getReadBufferSize() / 2, 256)) {
String str = "连续解码" + channelStat.decodeFailCount + "次都不成功,并且平均每次接收到的数据为" + per + "字节,有慢攻击的嫌疑";
throw new AioDecodeException(str);
}
}
return;
} else { // 解码成功
channelContext.setPacketNeededLength(null);
channelContext.stat.latestTimeOfReceivedPacket = SystemTimer.currTime;
channelContext.stat.decodeFailCount = 0;
int packetSize = byteBuffer.position() - initPosition;
packet.setByteCount(packetSize);
if (tioConfig.statOn) {
tioConfig.groupStat.receivedPackets.incrementAndGet();
channelContext.stat.receivedPackets.incrementAndGet();
}
if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) {
try {
for (Long v : tioConfig.ipStats.durationList) {
IpStat ipStat = tioConfig.ipStats.get(v, channelContext);
ipStat.getReceivedPackets().incrementAndGet();
tioConfig.getIpStatListener().onAfterDecoded(channelContext, packet, packetSize, ipStat);
}
} catch (Exception e1) {
log.error(packet.logstr(), e1);
}
}
if (tioConfig.getAioListener() != null) {
try {
tioConfig.getAioListener().onAfterDecoded(channelContext, packet, packetSize);
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
if (log.isDebugEnabled()) {
log.debug("{}, Unpacking to get a packet:{}", channelContext, packet.logstr());
}
handler(packet, packetSize);
if (byteBuffer.hasRemaining()) { // 组包后,还剩有数据
if (log.isDebugEnabled()) {
log.debug("{},After grouping packets, there is still data left:{}", channelContext, byteBuffer.remaining());
}
continue;
} else { // 组包后,数据刚好用完
lastByteBuffer = null;
if (log.isDebugEnabled()) {
log.debug("{},After grouping the packets, the data just ran out", channelContext);
}
return;
}
}
} catch (Throwable e) {
if (channelContext.logWhenDecodeError) {
log.error("Encountered an exception while decoding", e);
}
channelContext.setPacketNeededLength(null);
if (e instanceof AioDecodeException) {
List<Long> list = tioConfig.ipStats.durationList;
if (list != null && list.size() > 0) {
try {
for (Long v : list) {
IpStat ipStat = tioConfig.ipStats.get(v, channelContext);
ipStat.getDecodeErrorCount().incrementAndGet();
tioConfig.getIpStatListener().onDecodeError(channelContext, ipStat);
}
} catch (Exception e1) {
log.error(e1.toString(), e1);
}
}
}
Tio.close(channelContext, e, "解码异常:" + e.getMessage(), CloseCode.DECODE_ERROR);
return;
}
}
}
public void setNewReceivedByteBuffer(ByteBuffer newReceivedByteBuffer) {
this.newReceivedByteBuffer = newReceivedByteBuffer;
}
}
源代码解说
半包处理
在 TioHandler.decode()
方法中,业务层需要在数据不足以构成完整消息时返回 null
。框架在接收到 null
后,认为这是一个半包,将收到的数据暂存到 DecodeRunnable.lastByteBuffer
。当新的数据到达时,DecodeRunnable
会将 lastByteBuffer
和新数据合并,继续尝试解码,直到能构成完整的 Packet
对象。
粘包处理
在 TioHandler.decode()
方法中,业务层解码一个 Packet
对象返回后,框架会检查 ByteBuffer
中是否还有未处理的字节。如果有,则继续调用解码方法,直到返回 null
或没有剩余的字节为止。
小结
t-io
框架已经为半包和粘包提供了处理机制。业务层只需要按照业务协议进行解码即可,框架会自动处理剩余的字节数据,确保数据的完整性。