netty實現socket服務器 處理websocket請求
最近有兩個都用到netty做服務端的項目,第一個是c直接發起socket建立連接的請求,第二個是react框架的app,用websocket協議發起連接請求,netty處理稍有不同,記錄一下。
netty高性能:https://www.infoq.cn/article/netty-high-performance
netty調優:https://blog.csdn.net/C_J33/article/details/80737053
#### 先來看第一個項目:
Springboot版本是1.5.10,點進去發現默認依賴沒有netty,加入netty依賴。
maven依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency>
spring啟動完成后,新建線程指定端口啟動socket
private int port;
public SocketService(int port) {
this.port = port;
}
public SocketService() {
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public void startSocket() throws Exception{
// 接受socket鏈接循環器
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 處理業務邏輯循環器
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bs = new ServerBootstrap();
bs.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// 回車換行作為消息分隔符,消息最大長度設置1024
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new MyServerHandler());
}
})
// 請求處理線程滿時,臨時存放完成握手隊列的大小,默認50
.option(ChannelOption.SO_BACKLOG, 1024);
// 是否啟用心跳保活機制,若鏈接建立並2小時左右無數據傳輸,此機制才會被激活(tcp機制)。
//.childOption(ChannelOption.SO_KEEPALIVE, true);
// 同步等待socket鏈接結果,用戶線程waite,直到連接完成被notify,繼續執行用戶代碼
ChannelFuture future = bs.bind(port).sync();
future.channel().closeFuture().sync();
}finally {
// 優雅的釋放資源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
> netty使用很簡單,只要加入相應處理handle即可。
>
> 傳輸層為了效率,tcp協議發送數據包時有可能合並發送,對接收方來說會產生粘包問題,需要在應用層解決拆包,收發數據時協商設計分割點,一般而言有四種分割收到包的方法:
>
> 1. 發送方在發送每段數據后拼接回車換行符,接收方讀到“\r\n”則認為是一個獨立的數據包。netty默認解析實現是LineBasedFrameDecoder,加入解碼handle即可。
> 2. 其他自定義分割符號,如“#”。netty實現handle是DelimiterBasedFrameDecoder.
> 3. 無論數據大小,每次發送固定長度,如1024字節,不夠的0補位,超出的截斷。缺點是比較生硬,數據小的時候浪費帶寬資源。netty實現的handle是FixedLengthFrameHandle.
> 4. 數據分為消息頭,消息體,消息頭定義消息體長度,接收端解析出長度后只讀取指定的長度。需要自己實現decoder。
_上述DecoderHandle全部繼承ByteToMessageDecoder,是netty封裝的解析二進制數據的處理類,只要將相應handle添加到pipeline中即可,解析完成后傳輸給自定義的邏輯處理類MyServerHandler。此項目中與c端約定傳輸json字符串格式數據,每段數據手動增加換行分割符。_
#### 第二個項目(netty與websocket)
springboot版本2.0.6.RELEASE,點進去發現默認依賴<netty.version>4.1.29.Final</netty.version>。
maven依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
SocketService與上一個項目相同,只是把匿名內部類單獨創建為ChildChannelInit類,具體實現為:
public class ChildChannelInit extends ChannelInitializer<SocketChannel> {
private Logger logger = LoggerFactory.getLogger(ChildChannelInit.class);
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// logger.debug("有客戶端鏈接新建一條chennel ......");
SSLEngine sslEngine = SslUtil.generateSSLContext().createSSLEngine();
sslEngine.setUseClientMode(false); //服務器端模式
sslEngine.setNeedClientAuth(false); //不需要驗證客戶端
ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
ch.pipeline().addLast("http-codec", new HttpServerCodec());
// ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 把多個httpmessagge組裝成一個的默認實現
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("ping", new IdleStateHandler(20,0,0, TimeUnit.SECONDS));
ch.pipeline().addLast("handler", new MyNettyHandler());
}
}
上面是幾條是為了給socket加入ssl功能,SslUtil類的主要方法:
private static volatile SSLContext ssl_Context = null;
public static SSLContext generateSSLContext() {
if (null == ssl_Context){
synchronized (SslUtil.class){
if (null == ssl_Context){
try {
KeyStore ks = KeyStore.getInstance("JKS");
InputStream ksInputStream = new FileInputStream(APP_CONFIG.getKeyStorePath());
ks.load(ksInputStream, APP_CONFIG.getKeyStorePass().toCharArray());
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, APP_CONFIG.getKeyStoreKeyPass().toCharArray());
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), null, null);
ssl_Context = sslContext;
}catch (Exception e){
logger.info("load ssl context failed, error:{}",e.getLocalizedMessage());
}
}
}
}
return ssl_Context;
}
下面的Handle方法則是因為websocket協議是通過http協議握手,然后切換(升級)到socket協議,主要是用來處理http協議的編解碼添加的netty自定義實現的handle。
這里有個問題,也是本篇要記錄的初衷,在接收消息的handle中,后期測試發現,客戶端發來10000條數據,內容是json,每次解析出json中的cmd指令回復相應數據,總會少回6-7條,想到了是粘包導致的問題,但無論是加分割符的編解碼還是自定義二進制decoder,pipeline中都不會加載,也就沒有任何作用。
__后來查看netty源碼,發現在http發送握手后,netty會自動添加及調整websocket的編解碼。__
// handshake方法內的部分原碼,
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
> websocketDecoder繼承WebSocketFrameDecoder,會處理編解碼,並把二進制數據轉換成binaryFrame或Textframe,其中frame有個isFinalFragment方法可以判斷是否是一條數據的最后一段,如果不是,會通過ContinuationWebSocketFrame消息類型發送剩下的數據,自己在代碼邏輯中可以拼接出完整的數據,避免了拆包不清的問題。
>
> 這里處理的是text消息類型,binary同理,用byte數組存就可以了。
自定義處理類:
@ChannelHandler.Sharable
public class MyNettyHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(MyNettyHandler.class);
private WebSocketServerHandshaker handshaker;
private String appendStr = "";
private String currentUserId = "";
String wsFactroyUri = "";
/**客戶端鏈接建立,即為活躍*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("new connect active !! channelId:{}",ctx.channel().id().asShortText());
}
/**客戶端斷開鏈接,通道不活躍*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (currentUserId != null && NettyManager.channelGroupMap.containsKey(currentUserId)){
NettyManager.channelGroupMap.get(currentUserId).remove(ctx.channel());
logger.debug("client disconnect!! channelId:{} map user size:{} current user connCount:{}",ctx.channel().id().asShortText(),NettyManager.channelGroupMap.size(), NettyManager.channelGroupMap.get(currentUserId).size());
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
logger.error("!!!!EXCEPTION:{}",cause.toString());
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1.第一次握手請求消息由HTTP協議承載,所以它是一個HTTP消息,執行handleHttpRequest方法來處理WebSocket握手請求。
//2.客戶端通過socket提交請求消息給服務端,WebSocketServerHandler接收到的是已經解碼后的WebSocketFrame消息。
if (msg instanceof FullHttpRequest){
handleHttpRequest(ctx,(FullHttpRequest) msg);
}else if (msg instanceof WebSocketFrame){
handleSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)){
logger.info("Can't get client msg or ping in idle time,channel will be closed, channelId:{} ", ctx.channel().id().asLongText());
ctx.channel().close();
}else {
super.userEventTriggered(ctx, evt);
}
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws UnsupportedEncodingException{
// 利用http協議完成握手后升級到webSocket
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory handShakerFac = new WebSocketServerHandshakerFactory( wsFactroyUri, null, false);
handshaker = handShakerFac.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// 通過它構造握手響應消息返回給客戶端
// 同時將WebSocket相關的編碼和解碼類動態添加到ChannelPipeline中,用於WebSocket消息的編解碼,
// 添加WebSocketEncoder和WebSocketDecoder之后,服務端就可以自動對WebSocket消息進行編解碼了
handshaker.handshake(ctx.channel(), req);
}
}
private void handleSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
// 判斷是否為關閉鏈接
if (frame instanceof CloseWebSocketFrame){
logger.info("get close socket command");
handshaker.close(ctx.channel(),(CloseWebSocketFrame)frame.retain());
return;
}
// 判斷是否ping消息
if (frame instanceof PingWebSocketFrame) {
logger.info("get ping socket command");
ctx.channel().write( new PongWebSocketFrame(frame.content().retain()));
return;
}
// 文本內容
if (frame instanceof TextWebSocketFrame){
String body = ((TextWebSocketFrame) frame).text();
if (!frame.isFinalFragment()){
appendStr += body;
}else {
handleMsg(ctx, body);
}
}else if (frame instanceof ContinuationWebSocketFrame){
String halfBody = ((ContinuationWebSocketFrame) frame).text();
appendStr += halfBody;
if (frame.isFinalFragment()){
handleMsg(ctx, appendStr);
appendStr = "";
}
}
}
private void handleMsg(ChannelHandlerContext ctx, String body){
JSONObject jsonObject ;
try {
jsonObject = new JSONObject(body);
}catch (Exception e){
logger.error("get json error :{}",body);
return;
}
String cmd = (String) jsonObject.get("command");
if (cmd.equals("auth")){
handleAuthLogic(ctx, jsonObject);
}else if (cmd.equals("client_ping")){
handleClientPingLogic(ctx, jsonObject);
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回應答給客戶端
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,關閉連接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private static boolean isKeepAlive(FullHttpRequest req) {
return false;
}
/** 客戶端登錄 */
private void handleAuthLogic(ChannelHandlerContext ctx, JSONObject jsonObject){
logger.debug("json:{}",jsonObject.toString());
String userId = (String)jsonObject.get("from");
Long clientTime = (Long)jsonObject.get("timestamp");
String uniqueId = (String) jsonObject.get("uniqueId");
Long currentTime = System.currentTimeMillis();
Long diff = currentTime - clientTime;
AuthRes authRes = new AuthRes();
authRes.setCommand("auth");
authRes.setFrom("sys");
authRes.setTo(userId);
authRes.setDiff_time(diff);
Service2Controller<UserProfile> s2c = NettyManager.USER_SERVICE.getUserById(userId);
UserProfile userProfile = s2c.getData();
// UserProfile userProfile = new UserProfile();
boolean shouldClose = false;
if (userProfile == null){
authRes.setResult("failed");
authRes.setResson("user_not_exist");
shouldClose = true;
}else {
authRes.setResult("ok");
authRes.setResson("success");
currentUserId = userId;
// 保存當前user的所有鏈接chennel
// NettyManager.addConnectMap(userId, ctx.channel());
if (!NettyManager.channelGroupMap.containsKey(userId)){
NettyManager.channelGroupMap.put(userId, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
}
NettyManager.channelGroupMap.get(userId).add(ctx.channel());
logger.info("connect map user size:{} connect count:{}",NettyManager.channelGroupMap.size(),NettyManager.channelGroupMap.get(userId).size());
}
authRes.setTimestamp(currentTime);
String resString = authRes.toString() + NettyManager.SEP;
ctx.channel().writeAndFlush(new TextWebSocketFrame(resString));
if (shouldClose){
ctx.close();
}
}
/** 客戶端ping */
private void handleClientPingLogic(ChannelHandlerContext ctx, JSONObject jsonObject){
Long clientTime = (Long)jsonObject.get("timestamp");
Long currentTime = System.currentTimeMillis();
long diffTime = Math.abs(currentTime - clientTime);
if (diffTime < 30 * 1000){
JsonObject object = new JsonObject();
object.addProperty("command","client_ping_receive");
String resString = object.toString() + NettyManager.SEP;
ctx.channel().writeAndFlush(new TextWebSocketFrame(resString));
logger.info("receive client ping command ,res:{}", resString);
}
}
}
代碼中還有維持存儲客戶端連接的邏輯,一並記錄,保存連接的容器結構是:
Map<String, ChannelGroup> channelGroupMap = new ConcurrentHashMap<>;
鍵為用戶ID,值為當前用戶的連接集合。在給某個用戶發送數據,在相應地方調用channelGroupMap.get("userId").writeAndFlush()方法即可。
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
使用netty時,如果鏈接異常關閉會打印對應的log信息,下面是別人的博客地址,記錄一下。
終止一個連接的正常方式是發送FIN。 在發送緩沖區中 所有排隊數據都已發送之后才發送FIN,正常情況下沒有任何數據丟失。 但我們有時也有可能發送一個RST報文段而不是F IN來中途關閉一個連接。這稱為異常關閉 。 進程關閉socket的默認方式是正常關閉,如果需要異常關閉,利用 SO_LINGER選項來控制。 異常關閉一個連接對應用程序來說有兩個優點: (1)丟棄任何待發的已經無意義的 數據,並立即發送RST報文段; (2)RST的接收方利用關閉方式來 區分另一端執行的是異常關閉還是正常關閉。 值得注意的是RST報文段不會導致另一端產生任何響應,另一端根本不進行確認。收到RST的一方將終止該連接。程序行為如下: 阻塞模型下,內核無法主動通知應用層出錯,只有應用層主動調用read()或者write()這樣的IO系統調用時,內核才會利用出錯來通知應用層對端RST。 非阻塞模型下,select或者epoll會返回sockfd可讀,應用層對其進行讀取時,read()會報錯RST。 游戲測試過程中發現某些socket錯誤經常出現,以下是測試游戲服務器時通常考慮的case. 服務器端: 1. Case:客戶端程序正常運行的情況下,拔掉網線,殺掉客戶端程序 目的:模擬客戶端死機、系統突然重啟、網線松動或網絡不通等情況 結論:這種情況下服務器程序沒有檢測到任何異常,並最后等待“超時”才斷開TCP連接 2. Case:客戶端程序發送很多數據包后正常關閉Socket並exit進程(或不退出進程) 目的:模擬客戶端發送完消息后正常退出的情況 結論:這種情況下服務器程序能夠成功接收完所有消息,並最后收到“對端關閉”(Recv返回零)消息 3. Case:客戶端程序發送很多數據包后不關閉Socket直接exit進程 目的:模擬客戶端程序退出而忘記關閉Socket的情況(比如通過Windows窗口的關閉圖標退出進程,而沒有捕獲相應關閉事件做正常退出處理等) 結論:這種情況下服務器程序能夠收到部分TCP消息,然后收到“104: Connection reset by peer”(Linux下)或“10054: An existing connection was forcibly closed by the remote host”(Windows下)錯誤 4. Case:客戶端程序發送很多數據包的過程中直接Kill進程 目的:模擬客戶端程序崩潰或非正常方式結束進程(比如Linux下”kill -9″或Windows的任務管理器殺死進程)的情況 結論:這種情況下服務器程序很快收到“104: Connection reset by peer”(Linux下)或“10054: An existing connection was forcibly closed by the remote host”(Windows下)錯誤 5. Case:客戶端程序發送很多數據包后正常關閉Socket並exit進程(或不退出進程) 目的:模擬客戶端正常關閉Socket后,服務器端在檢查到TCP對端關閉前向客戶端發送消息的情況 結論:這種情況下服務器程序接收和發送部分TCP消息后,在Send消息時產生“32: Broken pipe”(Linux下)或“10053: An established connection was aborted by the software in your host machine”(Windows下)錯誤 總結: 當TCP連接的進程在忘記關閉Socket而退出、程序崩潰、或非正常方式結束進程的情況下(Windows客戶端),會導致TCP連接的對端進程產生“104: Connection reset by peer”(Linux下)或“10054: An existing connection was forcibly closed by the remote host”(Windows下)錯誤 當TCP連接的進程機器發生死機、系統突然重啟、網線松動或網絡不通等情況下,連接的對端進程可能檢測不到任何異常,並最后等待“超時”才斷開TCP連接 當TCP連接的進程正常關閉Socket時,對端進程在檢查到TCP關閉事件之前仍然向TCP發送消息,則在Send消息時會產生“32: Broken pipe”(Linux下)或“10053: An established connection was aborted by the software in your host machine”(Windows下)錯誤 客戶端 1. 服務器端已經close了Socket,客戶端再發送數據 目的:測試在TCP對端進程已經關閉Socket時,本端進程還未檢測到連接關閉的情況下繼續向對端發送消息 結論:第一包可以發送成功,但第二包發送失敗,錯誤碼為“10053: An established connection was aborted by the software in your host machine”(Windows下)或“32: Broken pipe,同時收到SIGPIPE信號”(Linux下)錯誤 2. 服務器端發送數據到TCP后close了Socket,客戶端再發送一包數據,然后接收消息 目的:測試在TCP對端進程發送數據后關閉Socket,本端進程還未檢測到連接關閉的情況下發送一包消息,並接着接收消息 結論:客戶端能夠成功發送第一包數據(這會導致服務器端發送一個RST包 <已抓包驗證>),客戶端再去Recv時,對於Windows和Linux程序有如下不同的表現: Windows客戶端程序:Recv失敗,錯誤碼為“10053: An established connection was aborted by the software in your host machine” Linux客戶端程序:能正常接收完所有消息包,最后收到正常的對端關閉消息(這一點與Window下不一樣) 3. 服務器端在TCP的接收緩沖區中還有未接收數據的情況下close了Socket,客戶端再收包 目的:測試在TCP的接收緩沖區中還有未接收數據的情況下關閉Socket時,對端進程是否正常 結論:這種情況服務器端就會向對端發送RST包,而不是正常的FIN包(已經抓包證明),這就會導致客戶端提前(RST包比正常數據包先被收到)收到“10054: An existing connection was forcibly closed by the remote host”(Windows下)或“104: Connection reset by peer”(Linux下)錯誤 總結: 當TCP連接的對端進程已經關閉了Socket的情況下,本端進程再發送數據時,第一包可以發送成功(但會導致對端發送一個RST包過來): 之后如果再繼續發送數據會失敗,錯誤碼為“10053: An established connection was aborted by the software in your host machine”(Windows下)或“32: Broken pipe,同時收到SIGPIPE信號”(Linux下)錯誤; 之后如果接收數據,則Windows下會報10053的錯誤,而Linux下則收到正常關閉消息 TCP連接的本端接收緩沖區中還有未接收數據的情況下close了Socket,則本端TCP會向對端發送RST包,而不是正常的FIN包,這就會導致對端進程提前(RST包比正常數據包先被收到)收到“10054: An existing connection was forcibly closed by the remote host”(Windows下)或“104: Connection reset by peer”(Linux下)錯誤 --------------------- 作者:九嶷山 來源:CSDN 原文:https://blog.csdn.net/larry_zeng1/article/details/78982370 版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
