Netty WebSocket Server 二进制数据传输
简介
在之前的章节中,我们已经完成了以下两部分操作:
- 编写了一个 WebSocket 服务器并实现了聊天服务功能。
- 使用
protoc
工具生成了 Java 的协议缓冲区(Protobuf)文件。
然而,目前客户端和服务器之间传输的数据是以 JSON 格式进行的。本章将介绍如何在客户端和服务器之间通过二进制数据进行通信。
添加依赖
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.26.1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
修改 ChatHandler 代码
以下是 ChatHandler
的代码修改,以支持二进制数据的处理:
package com.litongjava.im.netty.handler;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import com.alibaba.fastjson2.JSONObject;
import com.google.protobuf.InvalidProtocolBufferException;
import com.litongjava.im.netty.protobuf.ChatPacketOuter;
import com.litongjava.im.netty.protobuf.ChatPacketOuter.ChatPacket;
import com.litongjava.im.netty.utils.MessageUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
/**
* Netty 消息处理器
*/
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
if (msg == null) {
log.info("接收到的数据为空");
return;
}
if (msg instanceof FullHttpRequest) {
log.info("收到 HTTP 请求");
} else if (msg instanceof TextWebSocketFrame) { // 处理文本数据
processText(channelHandlerContext, msg);
} else if (msg instanceof BinaryWebSocketFrame) { // 处理二进制数据
processBinary(channelHandlerContext, msg);
}
}
private void processBinary(ChannelHandlerContext channelHandlerContext, Object msg) {
BinaryWebSocketFrame binary = (BinaryWebSocketFrame) msg;
ByteBuf buf = binary.content();
byte[] array = new byte[buf.readableBytes()];
buf.readBytes(array);
ChatPacket packet;
try {
packet = ChatPacketOuter.ChatPacket.parseFrom(array);
} catch (InvalidProtocolBufferException e) {
log.error("解析二进制数据失败", e);
return;
}
int code = packet.getCode();
String content = packet.getMsg();
JSONObject msgJson = JSONObject.parseObject(content);
if (code == 101) {
// 假设用户ID在消息中以 "userId" 传递
Integer userId = msgJson.getInteger("userId");
MessageUtils.nettyUserMap.put(userId, channelHandlerContext.channel());
log.info("Netty 客户端 " + userId + " 加入成功!");
// 向客户端发送欢迎消息或确认信息
String welcomeMessage = "用户 " + userId + " 加入成功!";
ByteBuf welcomeBuf = Unpooled.copiedBuffer(welcomeMessage.getBytes(StandardCharsets.UTF_8));
channelHandlerContext.channel().writeAndFlush(new BinaryWebSocketFrame(welcomeBuf));
} else {
Integer toUserId = msgJson.getInteger("toUserId"); // 假设接收方用户ID在消息中传递
Channel toUserChannel = MessageUtils.nettyUserMap.get(toUserId);
if (toUserChannel != null) {
ByteBuf contentBuf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8));
toUserChannel.writeAndFlush(new BinaryWebSocketFrame(contentBuf));
} else {
log.info("用户 " + toUserId + " 未连接。");
}
}
}
private void processText(ChannelHandlerContext channelHandlerContext, Object msg) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
String content = textWebSocketFrame.text();
JSONObject msgJson = JSONObject.parseObject(content);
Integer code = msgJson.getInteger("code");
if (code == 101) {
// 假设用户ID在消息中以 "userId" 传递
Integer userId = msgJson.getInteger("userId");
MessageUtils.nettyUserMap.put(userId, channelHandlerContext.channel());
log.info("Netty 客户端 " + userId + " 加入成功!");
// 向客户端发送欢迎消息或确认信息
String welcomeMessage = "用户 " + userId + " 加入成功!";
channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(welcomeMessage));
} else {
Integer toUserId = msgJson.getInteger("toUserId"); // 假设接收方用户ID在消息中传递
Channel toUserChannel = MessageUtils.nettyUserMap.get(toUserId);
if (toUserChannel != null) {
toUserChannel.writeAndFlush(new TextWebSocketFrame(content));
} else {
log.info("用户 " + toUserId + " 未连接。");
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("新的连接: {}", ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 获取当前断开连接的 Channel
Channel channel = ctx.channel();
// 迭代 nettyUserMap,找到对应的用户ID并移除
Integer userIdToRemove = null;
for (Map.Entry<Integer, Channel> entry : MessageUtils.nettyUserMap.entrySet()) {
if (entry.getValue().equals(channel)) {
userIdToRemove = entry.getKey();
break;
}
}
// 移除找到的用户ID
if (userIdToRemove != null) {
MessageUtils.nettyUserMap.remove(userIdToRemove);
log.info("用户 " + userIdToRemove + " 已断开连接并移除。");
} else {
log.info("未找到对应的用户ID。");
}
}
}
测试
使用开发工具将以下 JSON 格式数据转换为二进制格式,并发送到服务器进行测试:
{
"code": 101,
"userId": 1
}
{
"code": 101,
"userId": 2
}
{
"code": 200,
"fromUserId": 1,
"toUserId": 2,
"message": "Hello, this is a message from Client 1 to Client 2."
}
{
"code": 200,
"fromUserId": 2,
"toUserId": 1,
"message": "Hello, this is a message from Client 2 to Client 1."
}
通过以上步骤,你可以将客户端和服务器之间的消息从 JSON 转换为二进制格式,进一步提高传输效率并减少数据量。这种方法特别适用于高性能应用场景,如实时通信或大规模数据交换。