DecodeRunnable

概述

t-io 对数据的解码过程是在 DecodeRunnable 中完成的,每个 TCP 连接对应一个 DecodeRunnable 实例。除了正常的解码外,半包和粘包的处理也都在 DecodeRunnable 中完成。

源代码

以下是 DecodeRunnable 类的源代码:

package com.litongjava.tio.core.task;

import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.ChannelContext.CloseCode;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.core.exception.AioDecodeException;
import com.litongjava.tio.core.intf.Packet;
import com.litongjava.tio.core.stat.ChannelStat;
import com.litongjava.tio.core.stat.IpStat;
import com.litongjava.tio.core.utils.ByteBufferUtils;
import com.litongjava.tio.utils.SystemTimer;
import com.litongjava.tio.utils.hutool.CollUtil;
import com.litongjava.tio.utils.queue.FullWaitQueue;
import com.litongjava.tio.utils.queue.TioFullWaitQueue;
import com.litongjava.tio.utils.thread.pool.AbstractQueueRunnable;

/**
 * 解码任务对象,一个连接对应一个本对象
 */
@SuppressWarnings("deprecation")
public class DecodeRunnable extends AbstractQueueRunnable<ByteBuffer> {
  private static final Logger log = LoggerFactory.getLogger(DecodeRunnable.class);
  private ChannelContext channelContext = null;
  private TioConfig tioConfig = null;

  // 上一次解码剩下的数据
  private ByteBuffer lastByteBuffer = null;

  // 新收到的数据
  private ByteBuffer newReceivedByteBuffer = null;

  // 上次解码进度百分比
  private int lastPercentage = 0;

  public DecodeRunnable(ChannelContext channelContext, Executor executor) {
    super(executor);
    this.channelContext = channelContext;
    this.tioConfig = channelContext.tioConfig;
    getMsgQueue();
  }

  @Override
  public void runTask() {
    while ((newReceivedByteBuffer = msgQueue.poll()) != null) {
      decode();
    }
  }

  public void decode() {
    ByteBuffer byteBuffer = newReceivedByteBuffer;
    if (lastByteBuffer != null) {
      byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
      lastByteBuffer = null;
    }

    while (true) {
      try {
        int initPosition = byteBuffer.position();
        int limit = byteBuffer.limit();
        int readableLength = limit - initPosition;
        Packet packet = null;

        if (channelContext.packetNeededLength != null) {
          if (log.isDebugEnabled()) {
            log.debug("{}, Length required for decoding:{}", channelContext, channelContext.packetNeededLength);
          }
          if (readableLength >= channelContext.packetNeededLength) {
            packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
          } else {
            int percentage = (int) (((double) readableLength / channelContext.packetNeededLength) * 100);
            if (percentage != lastPercentage) {
              lastPercentage = percentage;
              log.info("Receiving large packet: received {}% of {} bytes.", percentage, channelContext.packetNeededLength);
            }

            lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
            return;
          }
        } else {
          try {
            packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
          } catch (BufferUnderflowException e) {
            // 数据不够读
          }
        }

        if (packet == null) { // 数据不够,解不了码
          if (tioConfig.useQueueDecode || (byteBuffer != newReceivedByteBuffer)) {
            byteBuffer.position(initPosition);
            byteBuffer.limit(limit);
            lastByteBuffer = byteBuffer;
          } else {
            lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
          }
          ChannelStat channelStat = channelContext.stat;
          channelStat.decodeFailCount++;
          if (log.isInfoEnabled()) {
            log.info("{} Failed to decode this time, has failed to decode for {} consecutive times, the length of data involved in decoding is {} bytes.", channelContext, channelStat.decodeFailCount,
                readableLength);
          }

          // 检查慢包攻击
          if (channelStat.decodeFailCount > 10) {
            int per = readableLength / channelStat.decodeFailCount;
            if (per < Math.min(channelContext.getReadBufferSize() / 2, 256)) {
              String str = "连续解码" + channelStat.decodeFailCount + "次都不成功,并且平均每次接收到的数据为" + per + "字节,有慢攻击的嫌疑";
              throw new AioDecodeException(str);
            }
          }
          return;
        } else { // 解码成功
          channelContext.setPacketNeededLength(null);
          channelContext.stat.latestTimeOfReceivedPacket = SystemTimer.currTime;
          channelContext.stat.decodeFailCount = 0;

          int packetSize = byteBuffer.position() - initPosition;
          packet.setByteCount(packetSize);

          if (tioConfig.statOn) {
            tioConfig.groupStat.receivedPackets.incrementAndGet();
            channelContext.stat.receivedPackets.incrementAndGet();
          }

          if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) {
            try {
              for (Long v : tioConfig.ipStats.durationList) {
                IpStat ipStat = tioConfig.ipStats.get(v, channelContext);
                ipStat.getReceivedPackets().incrementAndGet();
                tioConfig.getIpStatListener().onAfterDecoded(channelContext, packet, packetSize, ipStat);
              }
            } catch (Exception e1) {
              log.error(packet.logstr(), e1);
            }
          }

          if (tioConfig.getAioListener() != null) {
            try {
              tioConfig.getAioListener().onAfterDecoded(channelContext, packet, packetSize);
            } catch (Throwable e) {
              log.error(e.toString(), e);
            }
          }

          if (log.isDebugEnabled()) {
            log.debug("{}, Unpacking to get a packet:{}", channelContext, packet.logstr());
          }

          handler(packet, packetSize);

          if (byteBuffer.hasRemaining()) { // 组包后,还剩有数据
            if (log.isDebugEnabled()) {
              log.debug("{},After grouping packets, there is still data left:{}", channelContext, byteBuffer.remaining());
            }
            continue;
          } else { // 组包后,数据刚好用完
            lastByteBuffer = null;
            if (log.isDebugEnabled()) {
              log.debug("{},After grouping the packets, the data just ran out", channelContext);
            }
            return;
          }
        }
      } catch (Throwable e) {
        if (channelContext.logWhenDecodeError) {
          log.error("Encountered an exception while decoding", e);
        }

        channelContext.setPacketNeededLength(null);

        if (e instanceof AioDecodeException) {
          List<Long> list = tioConfig.ipStats.durationList;
          if (list != null && list.size() > 0) {
            try {
              for (Long v : list) {
                IpStat ipStat = tioConfig.ipStats.get(v, channelContext);
                ipStat.getDecodeErrorCount().incrementAndGet();
                tioConfig.getIpStatListener().onDecodeError(channelContext, ipStat);
              }
            } catch (Exception e1) {
              log.error(e1.toString(), e1);
            }
          }
        }

        Tio.close(channelContext, e, "解码异常:" + e.getMessage(), CloseCode.DECODE_ERROR);
        return;
      }
    }
  }

  public void setNewReceivedByteBuffer(ByteBuffer newReceivedByteBuffer) {
    this.newReceivedByteBuffer = newReceivedByteBuffer;
  }
}

源代码解说

半包处理

TioHandler.decode() 方法中,业务层需要在数据不足以构成完整消息时返回 null。框架在接收到 null 后,认为这是一个半包,将收到的数据暂存到 DecodeRunnable.lastByteBuffer。当新的数据到达时,DecodeRunnable 会将 lastByteBuffer 和新数据合并,继续尝试解码,直到能构成完整的 Packet 对象。

粘包处理

TioHandler.decode() 方法中,业务层解码一个 Packet 对象返回后,框架会检查 ByteBuffer 中是否还有未处理的字节。如果有,则继续调用解码方法,直到返回 null 或没有剩余的字节为止。

小结

t-io 框架已经为半包和粘包提供了处理机制。业务层只需要按照业务协议进行解码即可,框架会自动处理剩余的字节数据,确保数据的完整性。