放假前夕,接手一個不太熟悉的任務,不過好在用的東西,比較熟,就是netty通訊。具體遇到什么問題嘞,我們來看一下。
netty服務端可以接收消息,但是不能正確的發送消息給客戶端,最開始看到的時候,沒有注意到,會是編碼問題,具體我們來看一下吧。
在寫的過程中,看到這篇文章,我才意識到,我可能被同事已有的代碼誤導了:

這里比較郁悶的是,人家沒有加編碼,而我這邊是,加了編碼,初看 沒意識到。然后在解碼器里邊去處理了接收的消息,還想發送消息出去。
這里就不給大家看沒改之前的,直接上正確代碼。
啟動類:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer {
private static class SingletionWSServer {
static final NettyServer instance = new NettyServer();
}
public static NettyServer getInstance() {
return SingletionWSServer.instance;
}
private EventLoopGroup mainGroup;
private EventLoopGroup subGroup;
private ServerBootstrap server;
private ChannelFuture future;
public NettyServer() {
// 主線程組
mainGroup = new NioEventLoopGroup();
// 子線程組
subGroup = new NioEventLoopGroup();
// netty服務器的創建,ServerBootstrap是一個啟動類
server = new ServerBootstrap();
server.group(mainGroup, subGroup)// 設置主從線程組
.option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO))
.channel(NioServerSocketChannel.class)// 設置nio雙向通道
.childHandler(new NettyServerInitializer());// 子處理器,用於處理subGroup
}
/**
* 啟動
*/
public void bind(int port) {
try {
this.future = server.bind(port).sync();
System.err.println("netty websocket server 啟動完畢...");
log.info("netty websocket server 啟動完畢...");
this.future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.err.println("netty websocket server 啟動異常..." + e.getMessage());
log.debug("netty websocket server 啟動異常..." + e.getMessage());
}
}
}
NettyServerInitializer
import com.slife.netty.coder.NettyMessageDecoder;
import com.slife.netty.coder.NettyMessageEncoder;
import com.slife.netty.handler.HeartBeatHandler;
import com.slife.netty.handler.NettyServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.info("初始化 SocketChannel");
ChannelPipeline pipeline = ch.pipeline();
// 自定義解碼器
pipeline.addLast(new NettyMessageDecoder());
// 自定義編碼器
pipeline.addLast(new NettyMessageEncoder());
// 自定義的空閑檢測
pipeline.addLast(new HeartBeatHandler());
// ========================增加心跳支持 end ========================
/**
*
* @param maxFrameLength
* 幀的最大長度
* @param lengthFieldOffset
* length字段偏移的地址
* @param lengthFieldLength
* length字段所占的字節長
* @param lengthAdjustment
* 修改幀數據長度字段中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,若為負數,則說明要推后多少個字段
* @param initialBytesToStrip
* 解析時候跳過多少個長度
* @param failFast
* 為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false,讀取完整個幀再報異
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));
// 自定義hanler 處理解碼消息並回復信息
pipeline.addLast(new NettyServerHandler());
}
}
這里需要解析一下的是這個類,LengthFieldBasedFrameDecoder,上述代碼的注解是翻譯過來的,定義的參數值,大家要依據自己的實際情況去設置。
監控:HeartBeatHandler
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判斷evt是否是IdleStateEvent(用於觸發用戶事件,包含 讀空閑/寫空閑/讀寫空閑)
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt; // 強制類型轉換
if (event.state() == IdleState.READER_IDLE) {
System.out.println("進入讀空閑...");
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("進入寫空閑...");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel關閉前channelGroup數量為:"+ NettyServerHandler.channelGroup.size());
System.out.println("進入讀寫空閑...");
Channel channel = ctx.channel();
//關閉無用的channel,以防資源浪費
channel.close();
System.out.println("channel關閉后channelGroup數量為:"+ NettyServerHandler.channelGroup.size());
}
}
}
}
解碼器:NettyMessageDecoder
import java.util.List;
import com.netty.constant.Delimiter;
import com.netty.pojo.GpsMessage;
import com.netty.pojo.LoginMsg;
import com.utils.CrcUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class NettyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("開始解碼:");
int length = in.readableBytes();
if (length < Delimiter.MINIMUM_LENGTH)
return;
in.markReaderIndex(); // 我們標記一下當前的readIndex的位置
// 解碼后消息對象
GpsMessage gpsMessage = new GpsMessage();
byte packetLen = in.readByte();
int nPacketLen = packetLen & 0xff;
gpsMessage.setPacketLen(nPacketLen);
/**
* 協議
*/
byte agreement = in.readByte();
gpsMessage.setAgreement(agreement);
ByteBuf frame = null;
if (agreement == Delimiter.LOGIN_PACKET) { // 登錄包
LoginMsg loginMsg = new LoginMsg();
frame = CrcUtils.decodeCodeIDFrame(ctx, in);
String sCode = CrcUtils.bytesToHexString(frame);
System.out.println("編號:" + sCode);
loginMsg.setCardId(sCode);
gpsMessage.setContent(loginMsg);
} else if (agreement == Delimiter.STATUS_PACKET) {// 心跳包
System.out.println(" 心跳包:");
frame = CrcUtils.decodeCodeIDFrame(ctx, in);
String sContent = CrcUtils.bytesToHexString(frame);
System.out.println("心跳包內容:" + sContent);
gpsMessage.setContent(sContent);
}
out.add(gpsMessage);
System.out.println("解碼結束!");
}
}
編碼器:NettyMessageEncoder
import com.netty.pojo.GpsMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class NettyMessageEncoder extends MessageToByteEncoder<GpsMessage> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, GpsMessage gpsMessage, ByteBuf byteBuf) throws Exception {
// 2、寫入數據包長度
byteBuf.writeInt(gpsMessage.getPacketLen());
// 3、寫入請求類型
byteBuf.writeByte(gpsMessage.getAgreement());
// 4、寫入預留字段
//byteBuf.writeByte(nettyMessage.getHeader().getReserved());
// 5、寫入數據
byteBuf.writeBytes(gpsMessage.getContent().toString().getBytes());
}
}
處理消息的handler:
import org.springframework.util.StringUtils;
import com.netty.channel.CardChannelRel;
import com.netty.constant.Delimiter;
import com.netty.pojo.GpsMessage;
import com.netty.pojo.LoginMsg;
import com.utils.ConvertCode;
import com.utils.CrcUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 用於記錄和管理所有客戶端的channel
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("處理消息的handler:" + msg);
Channel currentChannel = ctx.channel();
// 1. 獲取客戶端發送的消息
GpsMessage gpsMessage = (GpsMessage) msg;
if (gpsMessage != null) {
// 協議
byte agreement = gpsMessage.getAgreement();
String cardId = "";
if (agreement == Delimiter.LOGIN_PACKET) { // 登錄包
LoginMsg loginMsg = (LoginMsg) gpsMessage.getContent();
cardId = loginMsg.getCardId();
CardChannelRel.put(cardId, currentChannel);
String sReply = "回復";
System.out.println(" 回復包:" + sReply);
CardChannelRel.output();
// 發送消息
writeToClient(sReply, currentChannel, "登錄回復");
} else if (agreement == Delimiter.STATUS_PACKET) {// 心跳包
System.out.print("心跳包:");
String receiveStr = (String) gpsMessage.getContent();
System.out.println("心跳包內容:" + receiveStr);
writeToClient(receiveStr, currentChannel, "心跳包回復");
} else {
// 發送消息
// 從全局用戶channel關系中獲取接受方的channel
Channel receiverChannel = CardChannelRel.get(cardId);
if (receiverChannel != null) {
// 當receiverChannel不為空的時候,從 ChannelGroup 去查找對應的channel是否存在
Channel findChannel = channelGroup.find(receiverChannel.id());
if (findChannel != null) {
// 用戶在線
writeToClient("其他消息", currentChannel, "其他消息回復");
}
}
}
}
}
/**
* 公用回寫數據到客戶端的方法
*
* @param 需要回寫的字符串
* @param receiverChannel
* @param mark
* 用於打印/log的輸出 <br>
* //channel.writeAndFlush(msg);//不行 <br>
* //channel.writeAndFlush(receiveStr.getBytes());//不行 <br>
* 在netty里,進出的都是ByteBuf,樓主應確定服務端是否有對應的編碼器,將字符串轉化為ByteBuf
*/
public void writeToClient(final String receiveStr, Channel receiverChannel, final String mark) {
try {
ByteBuf byteValue = Unpooled.buffer();// netty需要用ByteBuf傳輸
byteValue.writeBytes(ConvertCode.hexString2Bytes(receiveStr));// 對接需要16進制
receiverChannel.writeAndFlush(byteValue).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
StringBuilder sb = new StringBuilder("");
if (!StringUtils.isEmpty(mark)) {
sb.append("【").append(mark).append("】");
}
if (future.isSuccess()) {
System.out.println(sb.toString() + "回寫成功" + byteValue);
log.info(sb.toString() + "回寫成功" + byteValue);
} else {
System.out.println(sb.toString() + "回寫失敗" + byteValue);
log.error(sb.toString() + "回寫失敗" + byteValue);
}
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println("調用通用writeToClient()異常" + e.getMessage());
log.error("調用通用writeToClient()異常:", e);
}
}
/**
* 當客戶連接服務端之后(打開鏈接) 獲取客戶端的channel,並且放到ChannelGroup中去進行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channelGroup.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
String channelId = ctx.channel().id().asLongText();
System.out.println("客戶端被移除,channelId為:" + channelId);
// 當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端channel
channelGroup.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 發生異常之后關鍵channel。隨后從ChannelGroup 中移除
ctx.channel().close();
channelGroup.remove(ctx.channel());
}
}
上述類中:Delimiter為自定義的消息類型,大家可根據自己十六進制去定義響應不用的消息類型
CardChannelRel:
import java.util.HashMap;
import io.netty.channel.Channel;
/**
* 用戶id和channel的關聯關系處理
*/
public class CardChannelRel {
private static HashMap<String, Channel> manager = new HashMap<>();
public static void put(String senderId, Channel channel) {
manager.put(senderId, channel);
}
public static Channel get(String senderId) {
return manager.get(senderId);
}
public static void output() {
for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
System.out.println("CredId:" + entry.getKey() +
",ChannelId:" + entry.getValue().id().asLongText());
}
}
}
效果:

工具類
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
public class CrcUtils {
public static String CRC_16(byte[] bytes) {
int i, j, lsb;
int h = 0xffff;
for (i = 0; i < bytes.length; i++) {
h ^= bytes[i];
for (j = 0; j < 8; j++) {
lsb = h & 0x0001; // 取 CRC 的移出位
h >>= 1;
if (lsb == 1) {
h ^= 0x8408;
}
}
}
h ^= 0xffff;
return Integer.toHexString(h).toUpperCase();
}
public static byte[] hexStringToByte(String hex) {
int len = (hex.length() / 2);
byte[] result = new byte[len];
char[] achar = hex.toCharArray();
for (int i = 0; i < len; i++) {
int pos = i * 2;
result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
}
return result;
}
private static byte toByte(char c) {
byte b = (byte) "0123456789ABCDEF".indexOf(c);
return b;
}
public static String bytesToHexString(ByteBuf buffer) {
final int length = buffer.readableBytes();
StringBuffer sb = new StringBuffer(length);
String sTmp;
for (int i = 0; i < length; i++) {
byte b = buffer.readByte();
sTmp = Integer.toHexString(0xFF & b);
if (sTmp.length() < 2)
sb.append(0);
sb.append(sTmp.toUpperCase());
}
return sb.toString();
}
}
參考文章:
2.https://github.com/bjmashibing/tank/commit/1121deccf76786b634389629454a0ec0af80765f
3.https://blog.csdn.net/linsongbin1/article/details/77915686?utm_source=blogxgwz2
4.https://blog.csdn.net/yqwang75457/article/details/73913572
