title: Mina服務端客戶端通信
date: 2018-09-30 09:00:30
tags:
- [mina]
- [tcp]
categories:
- [編程]
permalink: zxh
前兩章節已經完整的介紹了理論部分,今天我們就利用這些理論來實現tcp協議的c/s 通信。首先我們簡單回顧下之前的介紹,
在mina中我們的客戶端和服務端簡直就是一模一樣,只是我們用不同適配器。但是他的數據處理流程是一樣的。今天我們就重點看看如何建立服務端、客戶端
並且處理兩者之間的消息通信處理
服務端
服務端和客戶端不同的就是我們創建的監聽對象不同而已,客戶端發送消息到服務端,服務端需要經歷過濾器的處理才能到達消息中心,但是在過濾器中我們就需要將消息進行解碼,然后才會到消息接收的地方處理我們的業務。正常情況下我們處理完消息需要對客戶端進行回應。回應的時候也會經歷過濾器中的編碼邏輯,進行數據編碼然后發送。信息發送到客戶端我們可以看成服務端的方向。也是需要進行編解碼的。下面看看服務端的創建代碼
//創建監聽對象
IoAcceptor acceptor = new NioSocketAcceptor();
TextLineCodecFactory textLineCodecFactory =
new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
LineDelimiter.WINDOWS.getValue());
//添加過濾器
acceptor.getFilterChain().addLast("logger",new LoggingFilter());
acceptor.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
textLineCodecFactory
));
//設置時間處理的handler
acceptor.setHandler(new ServerMessageHandler());
//設置讀取數據緩存區的大小
acceptor.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//設置多久沒有消息就進入空閑狀態
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
//綁定端口
try {
acceptor.bind(new InetSocketAddress(Constaint.REMOTE_PORT));
} catch (IOException e) {
logger.error(String.format("bind %s error",Constaint.REMOTE_PORT));
e.printStackTrace();
}
logger.info(String.format("bind %s success",Constaint.REMOTE_PORT));
客戶端
//創建監聽對象
IoConnector connector = new NioSocketConnector();
TextLineCodecFactory textLineCodecFactory =
new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
LineDelimiter.WINDOWS.getValue());
//添加過濾器
//日志過濾器 。 sltf日志設置
connector.getFilterChain().addLast("logger",new LoggingFilter());
//在這個過濾器中提供了編解碼,這里的編碼是以信息中已\r\n結尾算是一條信息
connector.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
new SocketFactory()
));
//設置時間處理的handler , 提供session生命周期的監聽函數,消息接受,發送的函數
connector.setHandler(new ClientMessageHandler());
//設置讀取數據緩存區的大小
connector.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//設置多久沒有消息就進入空閑狀態
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
ConnectFuture future = connector.connect(new InetSocketAddress(Constaint.REMOTE_IP,Constaint.REMOTE_PORT));
//是異步處理,這里不會造成阻塞
future.addListener(new IoFutureListener<IoFuture>() {
@Override
public void operationComplete(IoFuture ioFuture) {
logger.info("連接准備完成");
IoSession session = ioFuture.getSession();
}
});
通信
- 其實上面服務端,客戶端兩邊創建好就應經在通信了,在上面創建的時候我們發現。創建的時候需要指定消息處理器(IoHandlerAdapter) , 這個在IoService中會排在IoFilter之后執行。在過濾器執行之后我們就會調用我們的消息處理器。
private static Logger logger = LogManager.getLogger(ServerMessageHandler.class);
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
logger.info("sessionCreated");
}
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
try {
IoBuffer buffer = IoBuffer.allocate(30);
buffer.clear();
buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
buffer.flip();
session.write(buffer);
} catch (Exception e) {
logger.error(e.toString());
}
logger.info("sessionOpened");
}
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
logger.info("sessionClosed");
}
public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception {
super.sessionIdle(session,idleStatus);
try {
IoBuffer buffer = IoBuffer.allocate(30);
buffer.clear();
buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
buffer.flip();
session.write(buffer);
} catch (Exception e) {
logger.error(e.toString());
}
// logger.info("sessionIdle");
}
public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception {
logger.info("exceptionCaught");
throwable.printStackTrace();
}
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
String info = message.toString();
Date date = new Date(System.currentTimeMillis());
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
String time = sdf.format(date);
session.write(time);
System.out.println("接收到的消息:"+info);
}
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
logger.info("messageSent");
}
- 這里消息處理器,提供了幾個時刻可以控制,比如session創建、銷毀的時候執行的地方。消息接收的地方,消息發送成功的地方。這些控制力度可以根據我們的需要進行適度的復寫。
自定義工廠編解碼
- 工廠是提供編解碼的方法。這個工廠是加載在ProtocolCodecFilter這個過濾器中的。我們也可以自定義過濾器,在自定義的過濾器中我們也可以加載我們自定義的工廠,實現編解碼。我們在編解碼的地方,就可以加入我們的業務代碼。比如解碼通過約定的協議方式讀取到內容后通過ProtocolDecoderOutput 將消息寫出去就可以在我們的IoHandlerAdapter的messageReceived方法中獲取到消息。然后業務書寫。這樣做到代碼的解耦。
public class SocketFactory implements ProtocolCodecFactory {
private MessageDecoder decoder;
private MessageEncoder encoder;
public SocketFactory() {
decoder = new MessageDecoder();
encoder = new MessageEncoder();
}
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return this.decoder;
}
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return this.encoder;
}
}
解碼器
-
上面的工廠就是提供編解碼的。和我們生活中一樣工廠提供功能,但是實際並不是工廠做的,工廠可能只代理功能,僅僅是個加工廠而已。mina通信也是如此。真正編解碼的並不是工廠執行的,本節將揭露解碼者CumulativeProtocolDecoder
-
解碼器寫好之后只需要在上面自定義工廠中創建就好了。至於自定義編碼器只需要繼承CumulativeProtocolDecoder這個類就好了。而且復寫doDecode方法就好了。這個方法的返回值是boolean類型。返回值不同代表意義不一。這里需要重點理清楚
+ true: 返回true表示你已經從CumulativeProtocolDecoder的消息中消費了信息,在編碼器中返回true之前也應該調用ProtocolDecoderOutput 的wirte將消息發布到IoHandAdaptor中進行業務處理。但是這里會出現其他情況,應為我們服務端客戶端是長連接所以在我們消息中消息是不斷發過來的,我們緩存中的消息可能是完整一條消息,也可能不夠一整條消息,也可能是一整條多了一點,
1、如果不是一條完整(半包)的那么我們返回falsed等待客戶端繼續發送
2、如果正好是一整條,那么我們接受到之后返回true的時候我們緩存中就沒有數據了,在CumulativeProtocolDecoder會停止對解碼中doDecode的調用了,這種情況不會出現意外
3、數據比一條完整信息(粘包)多,那么我們處理到一條信息后也需要返回true,但是CumulativeProtocolDecoder會將剩余的緩存繼續拼裝,剩余消息就相當於內部進行了第二次解碼。如果不過那么相當於上面第一種情況
記住三種情況 半包 、 正常 、 粘包
+ false: 返回false就是緩存中的數據不夠我們一整條消息,需要繼續等待客戶端的消息。CumulativeProtocolDecoder中的緩存機制會不斷的將客戶端發過來的數據拼接到緩存中
public class MessageDecoder extends CumulativeProtocolDecoder {
/**
* 此方法return true : 表示父類中CumulativeProtocolDecoder會不斷的調用此方法進行消息的消費
* return false: 表示消息已經消費完全了,緩存中就算有數據也不會再消費了。等待再次客戶端
* 發送消息時會觸發消息發送接口,此時會將新舊消息拼接再一起進行處理
* @param ioSession
* @param ioBuffer
* @param protocolDecoderOutput
* @return
* @throws Exception
*/
@Override
protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
IoBuffer buffer = IoBuffer.allocate(10);
while (ioBuffer.hasRemaining()) {
if (ioBuffer.remaining()<3) {
//繼續接受
return false;
}
//獲取三個字節
int oldLimit = ioBuffer.limit();
ioBuffer.limit(ioBuffer.position()+3);
String text = ioBuffer.getString(Charset.forName("UTF-8").newDecoder());
protocolDecoderOutput.write(text);
ioBuffer.limit(oldLimit);
if (ioBuffer.hasRemaining()) {
return true;
}
}
return false;
}
}
編碼器
- 編碼器相對解碼器簡單很多,編碼器就是加入我們的協議,正常情況就是我們業務代碼中消息是一個Java實體,我們需要做的是將Java實體按照協議轉換成IoBuffer進行發送。但是我們mina中發送消息是通過IoSession中write方法發送的。我們查看源碼發現在IoSession.write(Object o),發送的如果是IoBuffer那么就不經過我們的編碼器,否則會經過我們編碼器進行編碼最終將轉換后的IoBuffer發送出去。
public class MessageEncoder extends ProtocolEncoderAdapter {
@Override
public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
//TODO 根據協議編碼
//組裝好之后 ioSession.write(IoBuffer)寫出
System.out.println(o);
}
}