協議編解碼器是在使用Mina 的時候你最需要關注的對象,因為在網絡傳輸的數據都是二進制數據(byte),而你在程序中面向的是JAVA 對象,這就需要你實現在發送數據時將JAVA 對象編碼二進制數據,而接收數據時將二進制數據解碼為JAVA 對象(這個可不是JAVA 對象的序列化、反序列化那么簡單的事情)
Mina 中的協議編解碼器通過過濾器ProtocolCodecFilter 構造,這個過濾器的構造方法需要一個ProtocolCodecFactory,
ProtocolCodecFactory 中有如下兩個方法:
public interface ProtocolCodecFactory { ProtocolEncoder getEncoder(IoSession session) throws Exception; ProtocolDecoder getDecoder(IoSession session) throws Exception; }
ProtocolEncoder是自定義編碼器要實現的接口,ProtocolDecoder是自定義解碼器要實現的接口。
下面是示例代碼:(模擬手機信息的編解碼,消息格式;有報頭,發送人,接收人,內容長度,內容信息)
MsgObject.java: 消息實體類
1 public class MsgObject { 2 //發送者 3 private String sender; 4 //接收者 5 private String receiver; 6 //信息內容 7 private String content; 8 9 public String getSender() { 10 return sender; 11 } 12 13 public void setSender(String sender) { 14 this.sender = sender; 15 } 16 17 public String getReceiver() { 18 return receiver; 19 } 20 21 public void setReceiver(String receiver) { 22 this.receiver = receiver; 23 } 24 25 public String getContent() { 26 return content; 27 } 28 29 public void setContent(String content) { 30 this.content = content; 31 } 32 33 }
MessageEncoder.java: 消息編碼器
1 //消息編碼器 2 public class MessageEncoder extends ProtocolEncoderAdapter { 3 private Charset charset; 4 5 public MessageEncoder(Charset charset) 6 { 7 this.charset = charset; 8 } 9 10 @Override 11 public void encode(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2) 12 throws Exception { 13 MsgObject msg = (MsgObject) arg1; 14 //生成字符編碼器 15 CharsetEncoder charsetEncoder = charset.newEncoder(); 16 //得到要發送對象屬性內容,准備進行編碼 17 String status = "M sip:wap.fetion.com.cn SIP-C/2.0"; 18 String sender = msg.getSender(); 19 String receiver = msg.getReceiver(); 20 String content = msg.getContent(); 21 //開辟一個緩存空間,設置為自動調整大小 22 IoBuffer ioBuffer = IoBuffer.allocate(100); 23 ioBuffer.setAutoExpand(true); 24 //將要發送的信息放入緩存空間 25 //消息頭 26 ioBuffer.putString(status + "\n", charsetEncoder); 27 //消息發送者 28 ioBuffer.putString("S: " + sender + "\n", charsetEncoder); 29 //消息接收者 30 ioBuffer.putString("R: " + receiver + "\n", charsetEncoder); 31 //消息內容長度 32 ioBuffer.putString("L: " + content.getBytes(charset).length + "\n", charsetEncoder); 33 //消息內容 34 ioBuffer.putString(content + "\n", charsetEncoder); 35 //編碼后的信息已放入ioBuffer中,進行寫回 36 ioBuffer.flip(); 37 arg2.write(ioBuffer); 38 } 39 40 }
MessageDecoder.java: 消息解碼器
1 //消息解碼器 2 public class MessageDecoder extends CumulativeProtocolDecoder { 3 private Charset charset; 4 5 public MessageDecoder(Charset charset) { 6 this.charset = charset; 7 } 8 9 @Override 10 protected boolean doDecode(IoSession arg0, IoBuffer arg1, 11 ProtocolDecoderOutput arg2) throws Exception { 12 CharsetDecoder charDecoder = charset.newDecoder(); 13 IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); 14 // 接收解碼后的信息 15 String status = ""; 16 String sender = ""; 17 String receiver = ""; 18 String contentLen = ""; 19 String content = ""; 20 21 int textLineNumber = 1; 22 int columnNumber = 0; 23 // 如果緩存區還有消息 24 while (arg1.hasRemaining()) { 25 byte bt = arg1.get(); 26 buffer.put(bt); 27 //換行 28 if (bt == 10 && textLineNumber < 5) { 29 columnNumber++; 30 if (textLineNumber == 1) { 31 buffer.flip(); 32 status = buffer.getString(columnNumber, charDecoder); 33 status = status.substring(0, status.length() - 1); 34 columnNumber = 0; 35 buffer.clear(); 36 } 37 if (textLineNumber == 2) { 38 buffer.flip(); 39 sender = buffer.getString(columnNumber, charDecoder); 40 sender = sender.substring(0, sender.length() - 1); 41 columnNumber = 0; 42 buffer.clear(); 43 } 44 if (textLineNumber == 3) { 45 buffer.flip(); 46 receiver = buffer.getString(columnNumber, charDecoder); 47 receiver = receiver.substring(0, receiver.length() - 1); 48 columnNumber = 0; 49 buffer.clear(); 50 } 51 if (textLineNumber == 4) { 52 buffer.flip(); 53 contentLen = buffer.getString(columnNumber, charDecoder); 54 contentLen = contentLen.substring(0, 55 contentLen.length() - 1); 56 columnNumber = 0; 57 buffer.clear(); 58 } 59 textLineNumber++; 60 } else if (textLineNumber == 5) { 61 columnNumber++; 62 if (columnNumber == Long.parseLong(contentLen.split(": ")[1])) { 63 buffer.flip(); 64 content = buffer.getString(columnNumber, charDecoder); 65 textLineNumber++; 66 break; 67 } 68 } else { 69 columnNumber++; 70 } 71 72 } 73 MsgObject smsObject = new MsgObject(); 74 smsObject.setSender(sender.split(": ")[1]); 75 smsObject.setReceiver(receiver.split(": ")[1]); 76 smsObject.setContent(content); 77 arg2.write(smsObject); 78 return false; 79 } 80 }
關於IoBuffer的讀操作,需要了解一下原理,可參考文章:Mina框架研究(2)
MessageProtocolCodecFactory.java: 生成消息編解碼器工廠
1 //編解碼器生成工產 2 public class MessageProtocolCodecFactory implements ProtocolCodecFactory { 3 private ProtocolEncoder encoder; 4 private ProtocolDecoder decoder; 5 6 public MessageProtocolCodecFactory() 7 { 8 this(Charset.forName("UTF-8")); 9 } 10 11 public MessageProtocolCodecFactory(Charset charset) 12 { 13 encoder = new MessageEncoder(charset); 14 decoder = new MessageDecoder(charset); 15 } 16 17 @Override 18 public ProtocolDecoder getDecoder(IoSession arg0) throws Exception { 19 return decoder; 20 } 21 22 @Override 23 public ProtocolEncoder getEncoder(IoSession arg0) throws Exception { 24 return encoder; 25 } 26 27 }
接着就是調用這些編解碼器來進行對象的傳輸了,服務器端和客戶端的主程序編寫可參考Mina框架HelloWorld入門
溫馨提示:
上面的消息解碼器( MessageDecoder.java)中的解碼考慮的情況是消息一次性從服務器發送過來,但有時消息可能不是一次性從服務器發送過來,而是分成了幾次分批過來,這時就會重復調用解碼器的deCode()方法,這時狀態變量textLineNumber和columnNumber就會被重置,所以要把狀態變量保存起來。可能你會想到將狀態變量保存在解碼器的成員變量中,但是Mina不保證每次調用deCode()方法的都是同一個線程,所以狀態變量不是線程安全的。所以要將狀態變量保存到IoSession中,因為IoSession用了一個同步的HashMap保存對象。
在IoSession中保存狀態變量:
// 保存數據狀態對象的key值 private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
通過IoSession.setAttribute和IoSession.getAttribute的保存和得到保存數據的對象
如下:
1 private MsgContext getContext(IoSession session) { 2 MsgContext context = (MsgContext) session.getAttribute(CONTEXT); 3 if (null == context) { 4 context = new MsgContext(); 5 session.setAttribute(CONTEXT, context); 6 } 7 return context; 8 }