tio-运行原理详解

t-io 是一个高效、轻量级的异步 I/O 框架,广泛用于处理高并发网络通信场景,支持 TCP、UDP、WebSocket 和 Http 等协议。本文将详细解析 t-io 的运行机制,包括其核心组件、数据处理流程、线程池设计和连接管理机制。通过全面解析这些内容,可以帮助开发者更好地理解 t-io 的工作原理,进而在实际项目中灵活运用。

一、t-io 核心组件

t-io 框架中有几个重要的核心组件,这些组件各自负责不同的功能模块,并协同工作来保证系统的高效运行。本文详细介绍以下核心组件:

  1. TioServer:用于启动服务器,处理客户端连接请求和管理整个 I/O 通道。
  2. TioConfig 和 ServerTioConfig:全局配置类,管理 I/O 操作的处理器、监听器、线程池等核心配置。
  3. Threads:线程池管理类,提供专用线程池以应对不同任务类型(如 I/O 处理和业务逻辑处理)。
  4. AcceptCompletionHandler:负责接受新的客户端连接并为其分配资源。
  5. ReadCompletionHandler:负责从客户端异步读取数据,并将数据传递给后续的解码和处理环节。
  6. DecodeRunnable:负责将读取到的原始字节数据解码为具体的业务包(Packet),解码成功后将进入处理阶段。
  7. CloseRunnable:负责关闭连接并释放相关资源,保证连接正常关闭和数据安全传输。
  8. ChannelContext:代表一个具体的连接上下文,管理连接的状态和数据传输

二、t-io 数据流处理流程

t-io 的数据处理采用异步非阻塞 I/O 模型,整个数据流从客户端请求到服务器响应的处理大致包括以下几个阶段:

  1. 接收连接:通过 AcceptCompletionHandler 异步接收客户端的连接请求。每当有客户端发起连接时,t-io 通过 AsynchronousServerSocketChannel.accept() 异步处理新连接,并为其分配 ChannelContext 对象来维护连接状态。

  2. 读取数据:新连接建立后,ReadCompletionHandler 负责从客户端异步读取数据。该操作是非阻塞的,读取的数据会存储到 ByteBuffer 中,之后传递给解码器处理。

  3. 解码数据:读取的数据传递给 DecodeRunnable 进行解码处理。DecodeRunnable 会将 ByteBuffer 中的原始数据转换为业务数据包 Packet,并交由下游进行处理。

  4. 业务处理:解码后的数据包会通过 HandlerRunnable 进行业务逻辑处理,处理的结果最终通过 SendRunnable 发送响应数据回客户端。

  5. 响应数据发送:在处理完业务逻辑后,SendRunnable 负责将响应数据打包,并异步发送到客户端,完成整个数据流的闭环。

这种异步非阻塞模型使得 t-io 能够在高并发场景下有效地处理大量连接和请求,保证了系统的高效和稳定性。

三、t-io 线程池设计

t-io 的高效运行离不开合理的线程池设计,它通过不同的线程池来应对不同类型的任务,确保系统的负载均衡和性能优化。主要有以下两个核心线程池:

  1. tioExecutor:处理数据解码和业务逻辑的线程池。该线程池专门用于执行解码任务(DecodeRunnable)和业务处理任务(HandlerRunnable),其线程数量依据 CPU 核心数和系统配置动态调整。
  2. groupExecutor:负责处理网络 I/O 操作的线程池。该线程池主要用于接收新连接、管理心跳检测、处理网络 I/O 任务,确保在高并发场景下有效地管理大量客户端连接。

线程池的配置是根据可用 CPU 核心数进行优化的:

public static final int CORE_POOL_SIZE = AVAILABLE_PROCESSORS * 1;
public static final int MAX_POOL_SIZE_FOR_TIO = Math.max(CORE_POOL_SIZE * 3, 64);
public static final int MAX_POOL_SIZE_FOR_GROUP = Math.max(CORE_POOL_SIZE * 16, 256);

其中 CORE_POOL_SIZE 是系统 CPU 核心数的倍数,MAX_POOL_SIZE_FOR_TIOMAX_POOL_SIZE_FOR_GROUP 分别为解码和 I/O 操作设置了最大线程池容量,以确保高并发环境下的性能表现。

四、TioServer 启动流程

tioServer 是 t-io 框架中服务器启动的核心类,它负责服务器的初始化、配置和监听客户端连接请求。以下是 TioServer 的启动流程:

  1. 配置初始化:服务器启动时,首先调用 TioConfigServerTioConfig 的初始化方法,设置缓存工厂、监听器、线程池等全局配置。这些配置确保服务器能够正确处理后续的 I/O 操作和业务逻辑。

  2. 创建异步通道组:服务器通过 AsynchronousChannelGroupAsynchronousServerSocketChannel 创建异步 I/O 通道。AsynchronousServerSocketChannel 是非阻塞的,并通过设置接收缓冲区大小和复用地址选项,确保多线程环境下的高效运行。

  3. 启动监听器:服务器通过 serverSocketChannel.bind() 绑定到指定的 IP 地址和端口,并开始监听客户端的连接请求。

  4. 接收客户端连接:当有客户端连接到服务器时,AcceptCompletionHandler 被触发,负责处理新的连接请求,并为每个连接分配 ChannelContext 对象,以跟踪连接状态。

public void start(String serverIp, int serverPort) throws IOException {
    serverTioConfig.getCacheFactory().register(TioCoreConfigKeys.REQEUST_PROCESSING, null, null, null);
    this.serverNode = new Node(serverIp, serverPort);
    channelGroup = AsynchronousChannelGroup.withThreadPool(serverTioConfig.groupExecutor);
    serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
    serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
    serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024);
    InetSocketAddress listenAddress = new InetSocketAddress(serverIp, serverPort);
    serverSocketChannel.bind(listenAddress, 0);
    AcceptCompletionHandler acceptCompletionHandler = serverTioConfig.getAcceptCompletionHandler();
    serverSocketChannel.accept(this, acceptCompletionHandler);
    serverTioConfig.startTime = System.currentTimeMillis();
}

五、AcceptCompletionHandler:接收连接请求

AcceptCompletionHandler 是 t-io 中负责接收新客户端连接的组件。它在客户端发起连接时被异步调用,负责为新连接分配资源,并初始化对应的 ChannelContextAcceptCompletionHandler 的主要任务包括:

  1. 接收新连接:当有客户端发起连接时,AsynchronousServerSocketChannel.accept() 异步调用 AcceptCompletionHandlercompleted() 方法,接收该连接请求。

  2. 资源分配:为新连接创建 ChannelContext,并为其分配读写缓冲区等资源,以便后续的数据传输和处理。

  3. 安全检查AcceptCompletionHandler 会进行一些安全性检查,例如判断客户端 IP 是否在黑名单中,或者连接数量是否超出系统上限等。

public void completed(AsynchronousSocketChannel asynchronousSocketChannel, TioServer tioServer) {
    if (tioServer.isWaitingStop()) {
        log.info("The server will be shut down and no new requests will be accepted:{}", tioServer.getServerNode());
    } else {
        AsynchronousServerSocketChannel serverSocketChannel = tioServer.getServerSocketChannel();
        serverSocketChannel.accept(tioServer, this);
    }

    ServerTioConfig serverTioConfig = tioServer.getServerTioConfig();
    try {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
        String clientIp = inetSocketAddress.getHostString();

        // 检查客户端 IP 是否在黑名单中
        if (IpBlacklist.isInBlacklist(serverTioConfig, clientIp)) {
            log.info("{} on the blacklist, {}", clientIp, serverTioConfig.getName());
            asynchronousSocketChannel.close();
            return;
        }

        // 创建新的 ChannelContext 来处理该连接
        ServerChannelContext channelContext = new ServerChannelContext(serverTioConfig, asynchronousSocketChannel);
        channelContext.setClosed(false);
        channelContext.stat.setTimeFirstConnected(SystemTimer.currTime);
        channelContext.setServerNode(tioServer.getServerNode());

        // 将新连接绑定到 I/O 线程池,并开始读取数据
        ReadCompletionHandler readCompletionHandler = channelContext.getReadCompletionHandler();
        ByteBuffer readByteBuffer = readCompletionHandler.getReadByteBuffer();
        asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, readCompletionHandler);

    } catch (Throwable e) {


 log.error("Error handling new connection", e);
    }
}

六、ReadCompletionHandler:异步读取数据

ReadCompletionHandler 负责从客户端异步读取数据。t-io 使用异步 I/O 模型,当客户端发送数据时,服务器端的 ReadCompletionHandler 会被触发,执行读取操作。ReadCompletionHandler 的主要任务包括:

  1. 异步读取数据:从客户端读取数据,并将其存储在 ByteBuffer 中。读取操作是异步的,不会阻塞主线程。

  2. 检查读取结果:读取数据后,检查读取的字节数。如果读取的数据量大于 0,则将数据传递给解码器进行解码。如果读取失败或读取字节数为 0,则需要关闭连接。

  3. 解码数据:将读取到的数据传递给 DecodeRunnable 进行解码,转换为业务数据包。

@Override
public void completed(Integer result, ByteBuffer byteBuffer) {
  if (result > 0) {
    TioConfig tioConfig = channelContext.tioConfig;

    // 统计接收到的字节数
    if (tioConfig.statOn) {
        tioConfig.groupStat.receivedBytes.addAndGet(result);
        tioConfig.groupStat.receivedTcps.incrementAndGet();
        channelContext.stat.receivedBytes.addAndGet(result);
        channelContext.stat.receivedTcps.incrementAndGet();
    }

    // 更新最近一次收到字节的时间
    channelContext.stat.latestTimeOfReceivedByte = SystemTimer.currTime;

    // 调用 IP 统计模块
    if (CollUtil.isNotEmpty(tioConfig.ipStats.durationList)) {
        try {
            for (Long v : tioConfig.ipStats.durationList) {
                IpStat ipStat = tioConfig.ipStats.get(v, channelContext);
                ipStat.getReceivedBytes().addAndGet(result);
                ipStat.getReceivedTcps().incrementAndGet();
                tioConfig.getIpStatListener().onAfterReceivedBytes(channelContext, result, ipStat);
            }
        } catch (Exception e1) {
            log.error(channelContext.toString(), e1);
        }
    }

    // 通知监听器数据接收完成
    if (tioConfig.getAioListener() != null) {
        try {
            tioConfig.getAioListener().onAfterReceivedBytes(channelContext, result);
        } catch (Exception e) {
            log.error("", e);
        }
    }

    // 将数据翻转为读取模式,并开始解码
    readByteBuffer.flip();
    if (channelContext.sslFacadeContext == null) {
        if (tioConfig.useQueueDecode) {
            channelContext.decodeRunnable.addMsg(ByteBufferUtils.copy(readByteBuffer));
            channelContext.decodeRunnable.execute();
        } else {
            channelContext.decodeRunnable.setNewReceivedByteBuffer(readByteBuffer);
            channelContext.decodeRunnable.decode();
        }
    } else {
        try {
            ByteBuffer copiedByteBuffer = ByteBufferUtils.copy(readByteBuffer);
            channelContext.sslFacadeContext.getSslFacade().decrypt(copiedByteBuffer);
        } catch (Exception e) {
            log.error(channelContext + ", " + e.toString() + readByteBuffer, e);
            Tio.close(channelContext, e, e.toString(), CloseCode.SSL_DECRYPT_ERROR);
        }
    }

    // 再次读取新的数据
    if (TioUtils.checkBeforeIO(channelContext)) {
        read();
    }

} else if (result == 0) {
    log.error("{}, 读到的数据长度为0", channelContext);
    Tio.close(channelContext, null, "读到的数据长度为0", CloseCode.READ_COUNT_IS_ZERO);
} else if (result < 0) {
    if (result == -1) {
        Tio.close(channelContext, null, "对方关闭了连接", CloseCode.CLOSED_BY_PEER);
    } else {
        Tio.close(channelContext, null, "读数据时返回" + result, CloseCode.READ_COUNT_IS_NEGATIVE);
    }
}

ReadCompletionHandlercompleted() 方法是 t-io 框架中处理 I/O 操作的重要环节。通过 AsynchronousSocketChannel,系统能够在非阻塞的情况下读取客户端的数据,并通过 ByteBuffer 传递给后续的解码任务。数据读取完成后,系统会自动进行解码。

七、DecodeRunnable:解码数据包

解码数据包是 t-io 数据处理中的关键环节之一。DecodeRunnable 负责将从客户端读取到的原始字节流解码成 Packet,以供业务逻辑处理。它处理的流程如下:

  1. 解码器设置DecodeRunnable 初始化时,会根据 TioConfig 的配置设置解码器(即 AioHandler)用于处理具体的解码任务。

  2. 数据合并与处理:如果当前读取到的数据不足以完成一次完整的解码,DecodeRunnable 会缓存数据,并在下次有新数据到来时合并处理,保证不丢包。

  3. 解码逻辑:解码器会根据数据的长度和格式,将 ByteBuffer 中的数据转换为 Packet 对象。Packet 是业务数据的载体,包含了请求的所有重要信息。

  4. 处理多次解码:在一次读取操作中,如果 ByteBuffer 中包含了多个 PacketDecodeRunnable 会在解码完一个 Packet 后继续解码下一个,直到数据全部解码完成。

  5. 异常处理:如果解码过程中发生异常(例如数据包不完整或格式错误),系统会捕获异常并记录日志,同时关闭当前连接。

public void decode() {
    ByteBuffer byteBuffer = newReceivedByteBuffer;
    if (lastByteBuffer != null) {
        byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
        lastByteBuffer = null;
    }

    // 循环解码所有数据包
    label_2: while (true) {
        try {
            int initPosition = byteBuffer.position();
            int limit = byteBuffer.limit();
            int readableLength = limit - initPosition;
            Packet packet = null;

            // 根据需要解码的字节数进行判断
            if (channelContext.packetNeededLength != null) {
                if (readableLength >= channelContext.packetNeededLength) {
                    packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
                } else {
                    lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
                    return;
                }
            } else {
                try {
                    packet = tioConfig.getAioHandler().decode(byteBuffer, limit, initPosition, readableLength, channelContext);
                } catch (BufferUnderflowException e) {
                    // 数据不足,无法解码
                }
            }

            // 数据包解码完成
            if (packet != null) {
                channelContext.setPacketNeededLength(null);
                channelContext.stat.latestTimeOfReceivedPacket = SystemTimer.currTime;
                channelContext.stat.decodeFailCount = 0;

                // 统计解码后的数据包信息
                int packetSize = byteBuffer.position() - initPosition;
                packet.setByteCount(packetSize);

                if (tioConfig.statOn) {
                    tioConfig.groupStat.receivedPackets.incrementAndGet();
                    channelContext.stat.receivedPackets.incrementAndGet();
                }

                // 处理解码成功后的逻辑
                handler(packet, packetSize);

                // 如果还有剩余数据,继续解码
                if (byteBuffer.hasRemaining()) {
                    continue label_2;
                } else {
                    lastByteBuffer = null;
                    return;
                }
            } else {
                lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, limit);
                return;
            }
        } catch (Throwable e) {
            log.error("解码出现异常", e);
            Tio.close(channelContext, e, "解码异常:" + e.getMessage(), CloseCode.DECODE_ERROR);
            return;
        }
    }
}

DecodeRunnable 中,系统会根据当前读取到的 ByteBuffer 数据进行解码。如果解码失败,系统会尝试再次读取数据以完成解码任务。成功解码的数据包 Packet 会交由 HandlerRunnable 进行进一步处理。

八、CloseRunnable:关闭连接

CloseRunnable 负责在连接关闭时释放所有资源,并确保数据传输完毕。t-io 的连接关闭操作是异步的,通过 CloseRunnable 实现。CloseRunnable 的主要功能如下:

  1. 连接关闭时机:当客户端主动关闭连接或服务器检测到异常(如超时或解码失败)时,系统会触发 CloseRunnable 来处理关闭操作。

  2. 资源释放:关闭连接时,系统会释放与该连接相关的资源,如 ByteBuffer、任务队列等,防止内存泄漏。

  3. 日志记录:每次关闭连接时,系统都会记录相关的关闭原因、连接状态等信息,以便后续分析和调试。

  4. 重连机制:对于客户端连接,如果连接因为网络问题中断,CloseRunnable 可以将该连接加入重连队列,等待下次重连。

CloseRunnablerunTask 方法是在 t-io 框架中处理关闭连接的时机被调用的。具体来说,当一个连接需要关闭时,无论是因为客户端主动断开连接,还是服务器端检测到错误或超时,t-io 框架都会将该连接的 ChannelContext 放入 CloseRunnable 的任务队列中,并触发 CloseRunnable 的执行。其调用过程如下:

1. 连接断开触发 CloseRunnable

当连接需要关闭时,t-io 框架会调用 Tio.close(channelContext, ...) 方法。这个方法会根据不同的关闭原因,触发 CloseRunnable 进行关闭操作。

public static void close(ChannelContext channelContext, Throwable throwable, String remark, CloseCode closeCode) {
    // 标记连接已关闭
    channelContext.setClosed(true);

    // 将当前连接的 CloseMeta 信息保存
    channelContext.closeMeta.setNeedRemove(false);
    channelContext.closeMeta.setRemark(remark);
    channelContext.closeMeta.setThrowable(throwable);

    // 将该连接加入到 CloseRunnable 的任务队列
    channelContext.tioConfig.closeRunnable.addMsg(channelContext);

    // 执行 CloseRunnable 的任务
    channelContext.tioConfig.closeRunnable.execute();
}

2. 什么时候调用 CloseRunnable.runTask

CloseRunnable.runTask 方法的调用时机是:

  • 当连接因为读写异常、解码失败、心跳超时等原因需要关闭时,t-io 会调用 Tio.close 方法。
  • Tio.close 方法会将 ChannelContext 对象添加到 CloseRunnable 的任务队列中(通过 addMsg(channelContext))。
  • CloseRunnable 中的 execute() 方法会触发任务队列的消费,也就是调用 runTask 方法,实际关闭连接。

3. CloseRunnable 的执行过程

execute() 方法被调用时,CloseRunnable 会从任务队列中取出需要关闭的 ChannelContext,并调用 runTask 方法对其进行处理。该处理包括:

  • 调用监听器的 onBeforeClose 方法通知连接即将关闭。
  • 清理与该连接相关的资源。
  • 关闭底层的 AsynchronousSocketChannel
  • 从 t-io 的各种管理列表中移除该连接(如 connectionsclientNodes 等)。

4. 关键逻辑示例

当连接需要关闭时,Tio.close() 会触发如下代码片段:

@Override
public void runTask() {
    ChannelContext channelContext = null;
    while ((channelContext = msgQueue.poll()) != null) {
        try {
            // 执行关闭前的逻辑
            boolean isNeedRemove = channelContext.closeMeta.isNeedRemove;
            String remark = channelContext.closeMeta.remark;
            Throwable throwable = channelContext.closeMeta.throwable;

            // 关闭前通知监听器
            if (channelContext.tioConfig.getAioListener() != null) {
                channelContext.tioConfig.getAioListener().onBeforeClose(channelContext, throwable, remark, isNeedRemove);
            }

            // 关闭连接并释放资源
            channelContext.setClosed(true);
            MaintainUtils.remove(channelContext);

        } catch (Throwable e) {
            log.error("关闭连接时出错", e);
        }
    }
}

CloseRunnable.runTask() 方法是在 t-io 框架中执行关闭连接的关键步骤之一。其调用时机由 Tio.close() 方法触发,通常是在连接断开、读写异常、超时、解码错误等情况下。CloseRunnable 通过异步的方式关闭连接并释放资源,确保连接管理的高效性和安全性。

九. Tio.close 方法是在什么时候调用的?

t-io 框架中的 Tio.close 方法是用于关闭连接的,通常在以下几种情况下被调用:

9.1 连接读写异常时

如果在读写数据时出现异常,ReadCompletionHandlerWriteCompletionHandler 会捕获异常,并调用 Tio.close 方法来关闭该连接。例如,在 ReadCompletionHandlerfailed 方法中,当数据读取失败时,框架会调用 Tio.close 来关闭连接:

@Override
public void failed(Throwable exc, ByteBuffer byteBuffer) {
    Tio.close(channelContext, exc, "读数据时发生异常: " + exc.getClass().getName(), CloseCode.READ_ERROR);
}

9.2 解码异常时

如果在处理收到的数据时,DecodeRunnable 中的解码过程出现问题,通常是由于数据格式不符合预期,Tio.close 会被调用来终止该连接并清理资源。例如,在 DecodeRunnable 中,捕获到解码异常时会调用 Tio.close

catch (Throwable e) {
    if (channelContext.logWhenDecodeError) {
        log.error("解码时发生异常", e);
    }
    Tio.close(channelContext, e, "解码异常:" + e.getMessage(), CloseCode.DECODE_ERROR);
}

9.3 心跳超时时

t-io 提供了心跳检测机制,用于检测客户端与服务器之间的活跃性。如果长时间未收到数据或心跳信号,框架会认为连接超时,并调用 Tio.close 来关闭连接。例如,在 ServerTioConfig 的心跳检测线程中,如果连接长时间没有任何数据传输,会调用 Tio.close

if (interval > heartbeatTimeout) {
    Tio.close(channelContext, "心跳超时", CloseCode.HEARTBEAT_TIMEOUT);
}

9.4 主动关闭连接

在业务逻辑中,服务器或客户端可能需要主动关闭连接。例如,当服务器要强制断开某个连接时,业务代码可以直接调用 Tio.close(channelContext) 方法来关闭该连接。

9.5 客户端关闭连接

当客户端主动断开连接时,服务器端可能会捕获到该断开的信号(如读取到 EOF),并调用 Tio.close 来关闭服务器端的连接。

9.6. 如果是 HTTP 请求,连接是通过心跳检测关闭的吗?

9.6.1 短连接 vs 长连接

在 HTTP 请求中,有两种主要的连接模式:

  • 短连接:传统的 HTTP/1.0 中,客户端和服务器每次请求结束后都会关闭连接。此时不需要心跳检测,连接会在请求结束后立即关闭。
  • 长连接:HTTP/1.1 引入了持久连接(keep-alive),允许多个请求在同一个连接中发送。如果该连接长时间没有活动,可能会使用心跳检测来保持连接的活跃性。

9.6.12 HTTP 的连接管理

对于长连接的 HTTP 请求,t-io 的心跳检测机制仍然可以发挥作用。具体来说,如果客户端在一个持久连接中长时间没有发送新的请求,服务器可以通过心跳检测来判断该连接是否超时,进而关闭连接。

  • 如果 HTTP 请求是通过短连接实现的,那么连接在请求完成后就会自动关闭,心跳检测不会起作用。
  • 对于使用 keep-alive 的 HTTP 长连接,如果该连接长时间未发送任何数据或请求,那么心跳机制可能会检测到该连接的空闲,并触发 Tio.close 方法关闭连接。

9.6.3 HTTP/2 的心跳机制

对于 HTTP/2 或 WebSocket 等协议,长连接更为普遍。在这种情况下,t-io 可以通过心跳机制监测连接的活跃度。如果连接处于空闲状态超过设定的时间,心跳检测将会关闭连接。

结论

  • Tio.close 方法是在网络异常(读写异常、解码异常)、心跳超时、主动断开连接等情况下被调用的。
  • 对于 HTTP 请求:
    • 短连接 在请求结束后自动关闭,不需要心跳检测。
    • 长连接(如 HTTP/1.1 keep-alive 或 HTTP/2),心跳检测可以用来关闭长时间未活动的连接。

总结

t-io 是一个高效的异步 I/O 框架,支持高并发和低延迟的网络应用开发。通过 AcceptCompletionHandlerReadCompletionHandlerDecodeRunnableCloseRunnable 等组件,t-io 实现了对网络连接的管理、数据的异步读取与解码、以及连接的安全关闭。这些组件相互配合,确保了数据在服务器和客户端之间的高效流转。