Netty TCP Server
Introcution
Netty 简介
Netty 是一个基于 Java 的网络应用框架,用于快速开发高性能、高可靠性的网络服务器和客户端。Netty 简化了网络编程,提供了异步事件驱动的网络应用程序框架,使开发人员可以专注于业务逻辑的实现,而不用处理复杂的底层网络通信细节。
Netty 的基本概念
Channel:
- 表示一个开放的连接,可以执行读/写操作。
- 是与远程节点进行通信的通道。
EventLoop:
- 处理 I/O 操作的循环。
- 每个 Channel 在其生命周期内只注册到一个 EventLoop。
ChannelFuture:
- 用于异步通知 I/O 操作的结果。
- 例如,写操作的完成通知。
ChannelHandler:
- 处理 I/O 事件或拦截 I/O 操作的逻辑组件。
- 包括入站处理器(ChannelInboundHandler)和出站处理器(ChannelOutboundHandler)。
Pipeline:
- 持有一个 ChannelHandler 链,处理 Channel 的所有 I/O 事件和请求。
Bootstrap:
- 用于客户端或服务器启动的辅助类。
ServerBootstrap
用于服务器端,Bootstrap
用于客户端。
Netty 是一个高性能的网络库,用于创建一个服务器,该服务器在指定端口上监听传入的网络连接.
Netty 组件如何协同工作处理一个请求
当一个请求到达 Netty 服务器时,以下 Netty 组件将协同工作来处理请求:
Channel: 每个客户端连接都会分配一个 Channel。Channel 负责处理与客户端的所有 I/O 操作,例如读取请求数据和写回响应。
EventLoop: Channel 被注册到一个 EventLoop 中。EventLoop 是一个处理 I/O 操作的事件循环,它不断从 Channel 中读取数据并将事件传递给相应的处理器(ChannelHandler)。一个 EventLoop 可以管理多个 Channel,但每个 Channel 只会被分配到一个 EventLoop。
ChannelPipeline: 每个 Channel 都有一个与之关联的 Pipeline。Pipeline 是一个 ChannelHandler 的链表,负责处理 Channel 的所有 I/O 事件和操作。在 Pipeline 中,数据会依次通过每个 ChannelHandler 进行处理。
ChannelHandler: ChannelHandler 是处理 I/O 事件的具体逻辑。Netty 提供了入站处理器和出站处理器,分别处理从客户端到服务器的入站事件(如读取数据)和从服务器到客户端的出站事件(如写入数据)。例如,
MyInbloudHandler
就是一个入站处理器,用于处理从客户端发送来的消息。ChannelFuture: 由于 Netty 是异步的,I/O 操作(如连接、读写等)不会立即返回结果,而是返回一个 ChannelFuture 对象。ChannelFuture 可以通过回调机制来通知操作的结果,比如操作成功或失败。
Bootstrap:
ServerBootstrap
用于配置和启动服务器。它负责初始化 EventLoopGroup(EventLoop 的集合),配置 Channel 的选项,并绑定端口以开始监听客户端连接。
当客户端发送一个请求时,服务器的 Channel 会接收到该请求数据,EventLoop 会从 Channel 中读取数据并将其传递到 ChannelPipeline 中。数据在 Pipeline 中依次经过每个 ChannelHandler 进行处理。在处理完请求后,服务器可能会将响应数据写回到客户端,所有的 I/O 操作都通过 ChannelFuture 进行异步通知。最终,当请求处理完成或发生异常时,EventLoop 会关闭 Channel,释放资源。
TioBoot 整合 netty 创建 tcp-server
添加依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
线程池配置类
TioBoot 已经内置了 TioThreadUtils
如果你需要自定义线程池,请参考下面的配置,ThreadUtils 是需要你手动编写的工具类,内部实现可以参考 TioThreadUtils 因为需要在一个子线程中启动 netty server,所以需要添加一个线程配置类
import java.util.concurrent.ExecutorService;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.tio.boot.server.TioBootServer;
@AConfiguration
public class ExecutorServiceConfig {
@Initialization
public void config() {
// 创建包含10个线程的线程池
ExecutorService executor = ThreadUtils.newFixedThreadPool(10);
// 项目关闭时,关闭线程池
HookCan.me().addDestroyMethod(() -> {
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
}
});
}
}
Netty 相关代码
package com.litongjava.tio.web.hello.nettyserver;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new MyInbloudHandler());
}
}
package com.litongjava.tio.boot.hello.nettyserver;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyInbloudHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 出现异常就关闭
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf in = (ByteBuf) msg;
String string = in.toString(CharsetUtil.UTF_8);
log.info("received:{}", string);
// 这里调用service服务,数据库
} finally {
ReferenceCountUtil.release(msg);
}
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerBootstrap {
private int port;
private MyChannelInitializer MyChannelInitializer;
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup worker = new NioEventLoopGroup();
public NettyServerBootstrap(int port, MyChannelInitializer MyChannelInitializer) {
this.port = port;
this.MyChannelInitializer = MyChannelInitializer;
}
public void start() {
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 连接数
bootstrap.option(ChannelOption.TCP_NODELAY, true); // 不延迟,消息立即发送
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 长连接
bootstrap.childHandler(MyChannelInitializer);
ChannelFuture f = bootstrap.bind(port).sync();
if (f.isSuccess()) {
log.info("netty start successful:{}", this.port);
}
f.channel().closeFuture().sync();
} catch (Exception e) {
log.info("netty start fail:" + e.getMessage());
e.printStackTrace();
} finally {
close();
}
}
public void close() {
log.info("close netty");
if (boss != null) {
boss.shutdownGracefully();
}
if (worker != null) {
worker.shutdownGracefully();
}
}
}
因为需要等待线程池启动,所以设置了@Initialization(priority = 101)
package com.litongjava.tio.web.hello.config;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.thread.ThreadUtils;
import com.litongjava.tio.web.hello.nettyserver.MyChannelInitializer;
import com.litongjava.tio.web.hello.nettyserver.NettyServerBootstrap;
@AConfiguration
public class NettyServerConfig {
@Initialization(priority = 101)
public void nettyServerBootstrap() {
int nioPort = EnvUtils.getInt("nio.server.port", 17902);
MyChannelInitializer MyChannelInitializer = new MyChannelInitializer();
NettyServerBootstrap nettyServerBootstrap = new NettyServerBootstrap(nioPort, MyChannelInitializer);
// start netty
ThreadUtils.getFixedThreadPool().execute(() -> {
nettyServerBootstrap.start();
});
HookCan.me().addDestroyMethod(nettyServerBootstrap::close);
}
}
tio-boot 启动类
package com.litongjava.tio.web.hello;
import com.litongjava.jfinal.aop.annotation.AComponentScan;
import com.litongjava.tio.boot.TioApplication;
@AComponentScan
public class HelloApp {
public static void main(String[] args) {
long start = System.currentTimeMillis();
TioApplication.run(HelloApp.class, args);
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
}
}
测试客户端
package com.litongjava.tio.boot.hello.nettyserver;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
public class SocketClientSender {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 17902);
OutputStream outputStream = socket.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write("$tmb00035ET3318/08/22 11:5804029.94,027.25,20.00,20.00$");
System.out.println("send message");
printWriter.flush();
socket.shutdownOutput();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
代码解释
这些代码展示了一个使用 Netty 库的基本服务器-客户端应用程序。
MyInbloudHandler
处理入站消息和异常情况。MyChannelInitializer
用于初始化通道。NettyServerBootstrap
负责配置和启动服务器。NettyServerConfig
在应用程序启动时配置服务器。SocketClientSender
演示了如何创建一个客户端来发送消息给服务器。
这些代码中的每个类和方法的用途和意义:
1. MyInbloudHandler
这个类扩展了 Netty 的 ChannelInboundHandlerAdapter
,用于处理入站的网络事件。
channelRead(ChannelHandlerContext ctx, Object msg)
: 当从客户端接收到数据时调用。它将接收到的ByteBuf
转换为字符串,并记录接收到的消息。处理完成后,释放消息资源以避免内存泄漏。exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
: 当处理过程中发生异常时调用。这个方法记录异常并关闭当前的连接。
2. MyChannelInitializer
这个类是 ChannelInitializer<SocketChannel>
的扩展,用于初始化新的连接通道。
initChannel(SocketChannel socketChannel)
: 在这个方法中,你可以添加各种处理器到这个通道的流水线(ChannelPipeline
),这里添加了MyInbloudHandler
实例。
3. NettyServerBootstrap
这个类负责配置和启动 Netty 服务器。
- 构造函数: 接收端口号和
MyChannelInitializer
实例。 start()
: 这个方法配置和启动 Netty 服务器。它设置了服务器的各种选项(如连接队列长度、TCP_NODELAY 等),并将服务器绑定到指定的端口。close()
: 用于优雅地关闭服务器,释放资源。
4. NettyServerConfig
用途
NettyServerConfig
是一个配置类,主要用于初始化和配置 Netty 服务器。这个类使用了 JFinalAop Framework 的 @AConfiguration
注解,这表明它是一个用于定义配置信息的类,JFinalAop 容器会特别处理这个类,以便在应用程序启动时应用这些配置。
方法解释
nettyServerBootstrap()
: 此方法创建并配置NettyServerBootstrap
实例。它从环境配置中读取端口号,并在新线程中启动服务器。这是一个被@Initialization
注解标记的方法,用于在服务启动时 创建NettyServerBootstrap
类的实例。初始化过程:方法首先通过
EvnUtils
获取环境配置信息,从中读取服务器端口号。这种方式表明它使用了 JFinal 的 AOP 特性来注入依赖,这里是获取应用程序的环境设置。创建
NettyServerBootstrap
实例:使用从环境配置中获取的端口号和一个新创建的MyChannelInitializer
实例,初始化NettyServerBootstrap
类的对象。启动服务器:通过
ThreadUtils
方法,在新的线程中调用nettyServerBootstrap.start()
启动 Netty 服务器。这样做可以避免阻塞正在执行的主线程,确保服务器的启动过程是异步的。返回值:方法返回创建的
NettyServerBootstrap
实例。由于这个方法被标记为@ABean
,因此这个实例会被 JFinalAop 容器管理,可以在其他部分的应用程序中被注入和使用。
@ABean(destroyMethod = "close")
: 这个注解的destroyMethod
属性指定了当 JFinalAop 容器关闭或者这个 Bean 被移除时应该调用的方法。在这个例子中,当应用程序关闭时,NettyServerBootstrap
实例的close()
方法会被自动调用,这个方法会负责清理资源,比如关闭服务器和释放线程资源。
NettyServerConfig
类是 Netty 服务器在基于 JFinalAop 框架的应用程序中的配置类。它负责创建和配置 NettyServerBootstrap
实例,确保 Netty 服务器作为一个 JFinalAop Bean 被正确初始化、运行和关闭。这种配置方式使得 Netty 服务器的设置与 JFinalAop 应用程序的生命周期紧密集成,便于管理和维护。
5. SocketClientSender
这个类包含一个简单的客户端,用于向服务器发送消息。
main(String[] args)
: 这个方法创建一个套接字连接到服务器,发送一条消息,然后关闭连接。