Java Netty 服務端向客戶端發送消息


說到netty通訊,回憶了下,還是18年的時候,學了了netty,至今就學習的時候寫過一個項目。最近閑生,被要求做一個netty通訊的項目,順手,總結一下,之前寫的項目。

當時是寫了一款訪微信聊天的軟件,所以用到了netty通訊,廢話不過說,我們來直接上代碼吧。

import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j	
@Component
public class WSServer {

	private static class SingletionWSServer {
		static final WSServer instance = new WSServer();
	}

	public static WSServer getInstance() {
		return SingletionWSServer.instance;
	}

	private EventLoopGroup mainGroup;
	private EventLoopGroup subGroup;
	private ServerBootstrap server;
	private ChannelFuture future;

	public WSServer() {
		// 主線程組
		mainGroup = new NioEventLoopGroup();

		// 子線程組
		subGroup = new NioEventLoopGroup();
		// netty服務器的創建,ServerBootstrap是一個啟動類
		server = new ServerBootstrap();
		server.group(mainGroup, subGroup)// 設置主從線程組
				.channel(NioServerSocketChannel.class)// 設置nio雙向通道
				.childHandler(new WSServerInitializer());// 子處理器,用於處理subGroup
	}

	/**
	 * 啟動
	 */
	public void start() {
		this.future = server.bind(9700);
		System.err.println("netty websocket server 啟動完畢...");
		log.info("netty websocket server 啟動完畢...");
	}

}

這個類用於創建netty的服務端鏈接,下面我們來配置啟動的一些配置。

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WSServerInitializer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		System.out.println("初始化 SocketChannel");
		log.info("初始化 SocketChannel");
		ChannelPipeline pipeline = ch.pipeline();

		//websocket 基於http協議,所以要http編解碼器
		pipeline.addLast(new HttpServerCodec());
		
		//對寫大數據流的支持
		pipeline.addLast(new ChunkedWriteHandler());
		
		//對httpMessage進行聚合,聚合成FullHttpRequest或FullHttpResponse
		//幾乎在netty中的編程,都會使用到此hanler
		pipeline.addLast(new HttpObjectAggregator(1024*64));
		
		//========================以上是用於支持http協議========================
		
		
		//========================增加心跳支持 start    ========================
		
		//針對客戶端,如果在1分鍾時沒有想服務端發送寫心跳(ALL),則主動斷開
		//如果是讀空閑或者寫空閑,不處理
		pipeline.addLast(new IdleStateHandler(8, 10, 12));
		
		//自定義的空閑檢測
		pipeline.addLast(new HeartBeatHandler());
		//========================增加心跳支持 end      ========================
		
		
		/**
		 * websocket服務器處理的協議,用於指定給客戶端連接訪問的路由: /ws
		 * 本Handler會幫助你處理一些繁重的復雜的事
		 * 會幫你處理握手動作: handshaking(close, ping, pong) ping + pong = 心跳
		 * 對於websocket來講,都是以frames進行傳輸的,不同的數據類型對應的frames也不同
		 */
		pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
		
		//自定義hanler
		pipeline.addLast(new ChatHandler());
	}

}

自定義空閑檢測

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * 用於檢測channel的心跳的handler 繼承 ChannelInboundHandlerAdapter 從而不需要實現 channelRead0方法
 * 
 * @author wb0024
 *
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

		// 判斷evt是否是IdleStateEvent(用於觸發用戶事件,包含 讀空閑/寫空閑/讀寫空閑)
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt; // 強制類型轉換

			if (event.state() == IdleState.READER_IDLE) {
				System.out.println("進入讀空閑...");
			} else if (event.state() == IdleState.WRITER_IDLE) {
				System.out.println("進入寫空閑...");
			} else if (event.state() == IdleState.ALL_IDLE) {
				System.out.println("channel關閉前users數量為:"+ChatHandler.users.size());
				System.out.println("進入讀寫空閑...");
				Channel channel = ctx.channel();
				//關閉無用的channel,以防資源浪費
				channel.close();
				System.out.println("channel關閉后users數量為:"+ChatHandler.users.size());
			}
		}
	}

}

自定義hanler

import com.imooc.enums.MsgActionEnum;
import com.imooc.utils.JsonUtils;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 處理消息的handler
 * 
 * @author wb0024 TextWebSocketFrame:在netty中,是用於為websocket專門處理文本的對象,frame是消息的載體
 */
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

	// 用於記錄和管理所有客戶端的channel
	public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

		// 獲取客戶端傳輸過來的消息
		String content = msg.text();

		System.out.println("處理消息的handler:" + content);
		Channel currentChannel = ctx.channel();

		// 1. 獲取客戶端發送的消息
		DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);

		Integer action = dataContent.getAction();
		System.out.println("action:" + action);
		// 2. 判斷消息類型,根據不同的類型來處理不同的業務
		if (action == MsgActionEnum.CONNECT.type) {
			// 2.1 當websocket 第一次open的時候,初始化channel,把用戶的channel和userId關聯起來
			String senderId = dataContent.getChatMsg().getSenderId();
			UserChannelRel.put(senderId, currentChannel);

			// 測試
			for (Channel channel : users) {
				System.out.println(channel.id().asLongText());
			}
			UserChannelRel.output();

		} else if (action == MsgActionEnum.CHAT.type) {
			// 2.2 聊天類型的消息,把聊天記錄保存到數據庫,同時標記消息的簽收狀態[未簽收]
			ChatMsg chatMsg = dataContent.getChatMsg();
			String receiverId = chatMsg.getReceiverId();

			DataContent dataContentMsg = new DataContent();
			dataContentMsg.setChatMsg(chatMsg);

			// 發送消息
			// 從全局用戶channel關系中獲取接受方的channel
			Channel receiverChannel = UserChannelRel.get(receiverId);
			if (receiverChannel != null) {
				// 當receiverChannel不為空的時候,從 ChannelGroup 去查找對應的channel是否存在
				Channel findChannel = users.find(receiverChannel.id());
				if (findChannel != null) {
					// 用戶在線
					receiverChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.objectToJson(dataContentMsg)));
				}
			}
		} else if (action == MsgActionEnum.KEEPALIVE.type) {
			// 2.4 心跳類型的消息
			System.out.println("收到來自channel為[" + currentChannel + "]的心跳包");
		}

	}

	/**
	 * 當客戶連接服務端之后(打開鏈接) 獲取客戶端的channel,並且放到ChannelGroup中去進行管理
	 */
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		users.add(ctx.channel());
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

		String channelId = ctx.channel().id().asLongText();
		System.out.println("客戶端被移除,channelId為:" + channelId);

		// 當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端channel
		users.remove(ctx.channel());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		// 發生異常之后關鍵channel。隨后從ChannelGroup 中移除
		ctx.channel().close();
		users.remove(ctx.channel());
	}

}

  

定義channel管理

import java.util.HashMap;

import io.netty.channel.Channel;

/**
 * 用戶id和channel的關聯關系處理
 * 
 * @author wb0024
 *
 */
public class UserChannelRel {

	private static HashMap<String, Channel> manager = new HashMap<>();

	public static void put(String senderId, Channel channel) {
		manager.put(senderId, channel);
	}

	public static Channel get(String senderId) {
		return manager.get(senderId);
	}
	
	
	public static void output() {
		for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
			System.out.println("UserId:" + entry.getKey() + 
					",ChannelId:" + entry.getValue().id().asLongText());
		}
	}
}

其他類

import java.io.Serializable;

import lombok.Data;

@Data
public class ChatMsg implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private String senderId; // 發送者的用戶id
	private String receiverId; // 接受者的用戶id
	private String msg; // 聊天內容
	private String msgId; // 用於消息的簽收

}

  

import java.io.Serializable;

import lombok.Data;

@Data
public class DataContent implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private Integer action; // 動作類型
	private ChatMsg chatMsg; // 用戶的聊天內容對象
	private String extand; // 擴展字段

}

 好了,到了這里一個簡單的netty通訊就做好了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM