Java Netty 服務端向客戶端發送16進制數據


放假前夕,接手一個不太熟悉的任務,不過好在用的東西,比較熟,就是netty通訊。具體遇到什么問題嘞,我們來看一下。

netty服務端可以接收消息,但是不能正確的發送消息給客戶端,最開始看到的時候,沒有注意到,會是編碼問題,具體我們來看一下吧。

在寫的過程中,看到這篇文章,我才意識到,我可能被同事已有的代碼誤導了:

 這里比較郁悶的是,人家沒有加編碼,而我這邊是,加了編碼,初看 沒意識到。然后在解碼器里邊去處理了接收的消息,還想發送消息出去。

這里就不給大家看沒改之前的,直接上正確代碼。

啟動類:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServer {

	private static class SingletionWSServer {
		static final NettyServer instance = new NettyServer();
	}

	public static NettyServer getInstance() {
		return SingletionWSServer.instance;
	}

	private EventLoopGroup mainGroup;
	private EventLoopGroup subGroup;
	private ServerBootstrap server;
	private ChannelFuture future;

	public NettyServer() {
		// 主線程組
		mainGroup = new NioEventLoopGroup();

		// 子線程組
		subGroup = new NioEventLoopGroup();
		// netty服務器的創建,ServerBootstrap是一個啟動類
		server = new ServerBootstrap();
		server.group(mainGroup, subGroup)// 設置主從線程組
				.option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO))
				.channel(NioServerSocketChannel.class)// 設置nio雙向通道
				.childHandler(new NettyServerInitializer());// 子處理器,用於處理subGroup
	}

	/**
	 * 啟動
	 */
	public void bind(int port) {
		try {
			this.future = server.bind(port).sync();
			System.err.println("netty websocket server 啟動完畢...");
			log.info("netty websocket server 啟動完畢...");
			this.future.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
			System.err.println("netty websocket server 啟動異常..." + e.getMessage());
			log.debug("netty websocket server 啟動異常..." + e.getMessage());
		}
	}
}

NettyServerInitializer

import com.slife.netty.coder.NettyMessageDecoder;
import com.slife.netty.coder.NettyMessageEncoder;
import com.slife.netty.handler.HeartBeatHandler;
import com.slife.netty.handler.NettyServerHandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		log.info("初始化 SocketChannel");

		ChannelPipeline pipeline = ch.pipeline();

		// 自定義解碼器
		pipeline.addLast(new NettyMessageDecoder());

		// 自定義編碼器
		pipeline.addLast(new NettyMessageEncoder());

		// 自定義的空閑檢測
		pipeline.addLast(new HeartBeatHandler());
		// ========================增加心跳支持 end ========================

		/**
		 *
		 * @param maxFrameLength
		 *            幀的最大長度
		 * @param lengthFieldOffset
		 *            length字段偏移的地址
		 * @param lengthFieldLength
		 *            length字段所占的字節長
		 * @param lengthAdjustment
		 *            修改幀數據長度字段中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,若為負數,則說明要推后多少個字段
		 * @param initialBytesToStrip
		 *            解析時候跳過多少個長度
		 * @param failFast
		 *            為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false,讀取完整個幀再報異
		 */
		pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));

		// 自定義hanler 處理解碼消息並回復信息
		pipeline.addLast(new NettyServerHandler());
	}

}

這里需要解析一下的是這個類,LengthFieldBasedFrameDecoder,上述代碼的注解是翻譯過來的,定義的參數值,大家要依據自己的實際情況去設置。

監控:HeartBeatHandler

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

		// 判斷evt是否是IdleStateEvent(用於觸發用戶事件,包含 讀空閑/寫空閑/讀寫空閑)
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt; // 強制類型轉換

			if (event.state() == IdleState.READER_IDLE) {
				System.out.println("進入讀空閑...");
			} else if (event.state() == IdleState.WRITER_IDLE) {
				System.out.println("進入寫空閑...");
			} else if (event.state() == IdleState.ALL_IDLE) {
				System.out.println("channel關閉前channelGroup數量為:"+ NettyServerHandler.channelGroup.size());
				System.out.println("進入讀寫空閑...");
				Channel channel = ctx.channel();
				//關閉無用的channel,以防資源浪費
				channel.close();
				System.out.println("channel關閉后channelGroup數量為:"+ NettyServerHandler.channelGroup.size());
			}
		}
	}
}

解碼器:NettyMessageDecoder

import java.util.List;

import com.netty.constant.Delimiter;
import com.netty.pojo.GpsMessage;
import com.netty.pojo.LoginMsg;
import com.utils.CrcUtils;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class NettyMessageDecoder extends ByteToMessageDecoder {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		System.out.println("開始解碼:");
		int length = in.readableBytes();
		if (length < Delimiter.MINIMUM_LENGTH)
			return;
		in.markReaderIndex(); // 我們標記一下當前的readIndex的位置

		// 解碼后消息對象
		GpsMessage gpsMessage = new GpsMessage();
		byte packetLen = in.readByte();
		int nPacketLen = packetLen & 0xff;
		gpsMessage.setPacketLen(nPacketLen);
		/**
		 * 協議
		 */
		byte agreement = in.readByte();
		gpsMessage.setAgreement(agreement);
		ByteBuf frame = null;
		if (agreement == Delimiter.LOGIN_PACKET) { // 登錄包
			LoginMsg loginMsg = new LoginMsg();
			frame = CrcUtils.decodeCodeIDFrame(ctx, in);
			String sCode = CrcUtils.bytesToHexString(frame);
			System.out.println("編號:" + sCode);
			loginMsg.setCardId(sCode);
			gpsMessage.setContent(loginMsg);
		} else if (agreement == Delimiter.STATUS_PACKET) {// 心跳包
			System.out.println(" 心跳包:");
			frame = CrcUtils.decodeCodeIDFrame(ctx, in);
			String sContent = CrcUtils.bytesToHexString(frame);
			System.out.println("心跳包內容:" + sContent);
			gpsMessage.setContent(sContent);
		}
		out.add(gpsMessage);
		System.out.println("解碼結束!");
	}

}

編碼器:NettyMessageEncoder

import com.netty.pojo.GpsMessage;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class NettyMessageEncoder extends MessageToByteEncoder<GpsMessage> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, GpsMessage gpsMessage, ByteBuf byteBuf) throws Exception {
        // 2、寫入數據包長度
        byteBuf.writeInt(gpsMessage.getPacketLen());

        // 3、寫入請求類型
        byteBuf.writeByte(gpsMessage.getAgreement());

        // 4、寫入預留字段
        //byteBuf.writeByte(nettyMessage.getHeader().getReserved());

        // 5、寫入數據
        byteBuf.writeBytes(gpsMessage.getContent().toString().getBytes());
    }
}

處理消息的handler:

import org.springframework.util.StringUtils;

import com.netty.channel.CardChannelRel;
import com.netty.constant.Delimiter;
import com.netty.pojo.GpsMessage;
import com.netty.pojo.LoginMsg;
import com.utils.ConvertCode;
import com.utils.CrcUtils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	// 用於記錄和管理所有客戶端的channel
	public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

		System.out.println("處理消息的handler:" + msg);
		Channel currentChannel = ctx.channel();
		// 1. 獲取客戶端發送的消息
		GpsMessage gpsMessage = (GpsMessage) msg;
		if (gpsMessage != null) {
			
			// 協議
			byte agreement = gpsMessage.getAgreement();

			String cardId = "";
			if (agreement == Delimiter.LOGIN_PACKET) { // 登錄包

				LoginMsg loginMsg = (LoginMsg) gpsMessage.getContent();
				cardId = loginMsg.getCardId();
				CardChannelRel.put(cardId, currentChannel);
				String sReply = "回復";
				System.out.println(" 回復包:" + sReply);
				CardChannelRel.output();
				// 發送消息
				writeToClient(sReply, currentChannel, "登錄回復");
			} else if (agreement == Delimiter.STATUS_PACKET) {// 心跳包
				System.out.print("心跳包:");
				String receiveStr = (String) gpsMessage.getContent();
				System.out.println("心跳包內容:" + receiveStr);
				writeToClient(receiveStr, currentChannel, "心跳包回復");
			} else {
				// 發送消息
				// 從全局用戶channel關系中獲取接受方的channel
				Channel receiverChannel = CardChannelRel.get(cardId);
				if (receiverChannel != null) {
					// 當receiverChannel不為空的時候,從 ChannelGroup 去查找對應的channel是否存在
					Channel findChannel = channelGroup.find(receiverChannel.id());
					if (findChannel != null) {
						// 用戶在線
						writeToClient("其他消息", currentChannel, "其他消息回復");
					}
				}
			}
		
		}
	}

	/**
	 * 公用回寫數據到客戶端的方法
	 * 
	 * @param 需要回寫的字符串
	 * @param receiverChannel
	 * @param mark
	 *            用於打印/log的輸出 <br>
	 *            //channel.writeAndFlush(msg);//不行 <br>
	 *            //channel.writeAndFlush(receiveStr.getBytes());//不行 <br>
	 *            在netty里,進出的都是ByteBuf,樓主應確定服務端是否有對應的編碼器,將字符串轉化為ByteBuf
	 */
	public void writeToClient(final String receiveStr, Channel receiverChannel, final String mark) {
		try {
			ByteBuf byteValue = Unpooled.buffer();// netty需要用ByteBuf傳輸
			byteValue.writeBytes(ConvertCode.hexString2Bytes(receiveStr));// 對接需要16進制
			receiverChannel.writeAndFlush(byteValue).addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					StringBuilder sb = new StringBuilder("");
					if (!StringUtils.isEmpty(mark)) {
						sb.append("【").append(mark).append("】");
					}
					if (future.isSuccess()) {
						System.out.println(sb.toString() + "回寫成功" + byteValue);
						log.info(sb.toString() + "回寫成功" + byteValue);
					} else {
						System.out.println(sb.toString() + "回寫失敗" + byteValue);
						log.error(sb.toString() + "回寫失敗" + byteValue);
					}
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("調用通用writeToClient()異常" + e.getMessage());
			log.error("調用通用writeToClient()異常:", e);
		}
	}

	/**
	 * 當客戶連接服務端之后(打開鏈接) 獲取客戶端的channel,並且放到ChannelGroup中去進行管理
	 */
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		channelGroup.add(ctx.channel());
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		super.handlerRemoved(ctx);
		String channelId = ctx.channel().id().asLongText();
		System.out.println("客戶端被移除,channelId為:" + channelId);
		// 當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端channel
		channelGroup.remove(ctx.channel());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		// 發生異常之后關鍵channel。隨后從ChannelGroup 中移除
		ctx.channel().close();
		channelGroup.remove(ctx.channel());
	}

}

上述類中:Delimiter為自定義的消息類型,大家可根據自己十六進制去定義響應不用的消息類型

CardChannelRel:

import java.util.HashMap;

import io.netty.channel.Channel;

/**
 * 用戶id和channel的關聯關系處理
 */
public class CardChannelRel {

	private static HashMap<String, Channel> manager = new HashMap<>();

	public static void put(String senderId, Channel channel) {
		manager.put(senderId, channel);
	}

	public static Channel get(String senderId) {
		return manager.get(senderId);
	}
	
	
	public static void output() {
		for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
			System.out.println("CredId:" + entry.getKey() + 
					",ChannelId:" + entry.getValue().id().asLongText());
		}
	}
} 

效果:

工具類

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;

public class CrcUtils {
	public static String CRC_16(byte[] bytes) {
		int i, j, lsb;
		int h = 0xffff;
		for (i = 0; i < bytes.length; i++) {
			h ^= bytes[i];
			for (j = 0; j < 8; j++) {
				lsb = h & 0x0001; // 取 CRC 的移出位
				h >>= 1;
				if (lsb == 1) {
					h ^= 0x8408;
				}
			}
		}
		h ^= 0xffff;
		return Integer.toHexString(h).toUpperCase();
	}

	public static byte[] hexStringToByte(String hex) {
		int len = (hex.length() / 2);
		byte[] result = new byte[len];
		char[] achar = hex.toCharArray();
		for (int i = 0; i < len; i++) {
			int pos = i * 2;
			result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
		}
		return result;
	}

	private static byte toByte(char c) {
		byte b = (byte) "0123456789ABCDEF".indexOf(c);
		return b;
	}

	public static String bytesToHexString(ByteBuf buffer) {
		final int length = buffer.readableBytes();
		StringBuffer sb = new StringBuffer(length);
		String sTmp;

		for (int i = 0; i < length; i++) {
			byte b = buffer.readByte();
			sTmp = Integer.toHexString(0xFF & b);
			if (sTmp.length() < 2)
				sb.append(0);
			sb.append(sTmp.toUpperCase());
		}
		return sb.toString();
	}

}

參考文章:

1.https://blog.csdn.net/qq_42599616/article/details/105459117?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~all~sobaiduend~default-3-105459117.nonecase&utm_term=netty%E8%BF%9E%E6%8E%A5%E6%88%90%E5%8A%9F%E4%B8%8D%E8%83%BD%E5%8F%91%E9%80%81%E6%95%B0%E6%8D%AE&spm=1000.2123.3001.4430

2.https://github.com/bjmashibing/tank/commit/1121deccf76786b634389629454a0ec0af80765f

3.https://blog.csdn.net/linsongbin1/article/details/77915686?utm_source=blogxgwz2

4.https://blog.csdn.net/yqwang75457/article/details/73913572


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM