step1:協議格式
step2:根據協議定義出對應的模型

1 package com.superb.mina.entity; 2 3 import java.io.ByteArrayOutputStream; 4 import java.io.DataOutputStream; 5 import java.io.IOException; 6 import java.io.Serializable; 7 import java.nio.charset.Charset; 8 9 /** 10 * 協議包 11 * @author sundg 12 * 13 * 2018年3月30日上午10:55:33 14 */ 15 public class BaseMsg implements Serializable { 16 /** 序列號*/ 17 private static final long serialVersionUID = -4614096987747485330L; 18 public static int HEAD_SIZE =18; 19 /**報文總長度*/ 20 private Integer length; 21 /**序列號*/ 22 private Integer sequence; 23 /**命令號*/ 24 private Integer command; 25 /**保留字*/ 26 private Integer reserved; 27 /**數據段*/ 28 private byte[] data; 29 30 private Integer separator=0xFFFF; 31 32 33 public static long getSerialversionuid() { 34 return serialVersionUID; 35 } 36 37 public void setLength(Integer length) { 38 this.length = length; 39 } 40 41 public Integer getLength() { 42 return length; 43 } 44 45 public Integer getSequence() { 46 return sequence; 47 } 48 49 public void setSequence(Integer sequence) { 50 this.sequence = sequence; 51 } 52 53 public Integer getCommand() { 54 return command; 55 } 56 57 public void setCommand(Integer command) { 58 this.command = command; 59 } 60 61 public Integer getReserved() { 62 return reserved; 63 } 64 65 public void setReserved(Integer reserved) { 66 this.reserved = reserved; 67 } 68 69 public byte[] getData() { 70 return data; 71 } 72 73 public void setData(byte[] data) { 74 this.data = data; 75 if (data != null) { 76 this.length = HEAD_SIZE + data.length; 77 } else { 78 this.length = HEAD_SIZE; 79 } 80 } 81 82 public Integer getSeparator() { 83 return separator; 84 } 85 86 public void setSeparator(Integer separator) { 87 this.separator = separator; 88 } 89 90 /** 91 * 將要發送的信息保存為二進制流進行傳輸 92 * @return 93 */ 94 public byte[] toBytes() { 95 byte[] ret = null; 96 try { 97 ByteArrayOutputStream bos = new ByteArrayOutputStream(this.length); 98 DataOutputStream dos = new DataOutputStream(bos); 99 100 dos.writeInt(this.length); 101 dos.writeInt(this.sequence); 102 dos.writeInt(this.command); 103 if(this.reserved==null){ 104 dos.writeInt(0); 105 }else{ 106 dos.writeInt(this.reserved); 107 } 108 if (this.data != null) { 109 dos.write(this.data); 110 } 111 dos.writeShort(0xFFFF); 112 ret = bos.toByteArray(); 113 } catch (IOException e) { 114 e.printStackTrace(); 115 } 116 117 return ret; 118 } 119 120 }
step3:自定義編碼解碼器

1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.session.IoSession; 4 import org.apache.mina.filter.codec.ProtocolCodecFactory; 5 import org.apache.mina.filter.codec.ProtocolDecoder; 6 import org.apache.mina.filter.codec.ProtocolEncoder; 7 8 public class SuperbProtocolCodecFactory implements ProtocolCodecFactory { 9 private ProtocolDecoder decoder; 10 private ProtocolEncoder encoder; 11 12 public SuperbProtocolCodecFactory() { 13 this.encoder = new SuperbProtocolEncoder(); 14 this.decoder = new SuperbProtocolDecoder(); 15 } 16 17 @Override 18 public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception { 19 return this.decoder; 20 } 21 22 @Override 23 public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception { 24 return this.encoder; 25 } 26 }

1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.buffer.IoBuffer; 4 import org.apache.mina.core.session.IoSession; 5 import org.apache.mina.filter.codec.ProtocolEncoder; 6 import org.apache.mina.filter.codec.ProtocolEncoderOutput; 7 8 import com.superb.mina.entity.BaseMsg; 9 10 public class SuperbProtocolEncoder implements ProtocolEncoder { 11 12 @Override 13 public void dispose(IoSession session) throws Exception { 14 // nothing to dispose 15 } 16 17 @Override 18 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { 19 BaseMsg msg = (BaseMsg) message; 20 byte[] bys = msg.toBytes(); 21 22 IoBuffer buffer = IoBuffer.allocate(bys.length, false); 23 buffer.put(bys); 24 buffer.flip(); 25 out.write(buffer); 26 } 27 }

1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.buffer.BufferDataException; 4 import org.apache.mina.core.buffer.IoBuffer; 5 import org.apache.mina.core.session.IoSession; 6 import org.apache.mina.filter.codec.CumulativeProtocolDecoder; 7 import org.apache.mina.filter.codec.ProtocolDecoderOutput; 8 9 import com.superb.mina.entity.BaseMsg; 10 11 12 public class SuperbProtocolDecoder extends CumulativeProtocolDecoder { 13 private boolean prefixedDataAvailable(IoBuffer in) { 14 if (in.remaining() < 4) { 15 return false; 16 } 17 18 int dataLength = in.getInt(in.position()); 19 20 if (dataLength < BaseMsg.HEAD_SIZE) { 21 throw new BufferDataException("dataLength: " + dataLength); 22 } 23 24 return in.remaining() >= dataLength; 25 } 26 27 @Override 28 protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { 29 if (!prefixedDataAvailable(in)) { 30 return false; 31 } 32 33 BaseMsg msg = new BaseMsg(); 34 int length = in.getInt(); 35 msg.setLength(length); 36 msg.setSequence(in.getInt()); 37 msg.setCommand(in.getInt()); 38 msg.setReserved(in.getInt()); 39 40 int body_len = length - BaseMsg.HEAD_SIZE; 41 42 if (body_len < 0 ) { 43 throw new Exception("body length error.(" + body_len + ")"); 44 } 45 46 if (body_len > 0) { 47 byte[] body = new byte[body_len]; 48 in.get(body); 49 msg.setData(body); 50 } 51 in.getShort(); 52 msg.setSeparator(0xFFFF); 53 out.write(msg); 54 return true; 55 } 56 }
step4:編寫對應的客戶端與服務器端

1 package com.superb.mina.clientServer; 2 3 import java.net.InetSocketAddress; 4 import java.util.Date; 5 import java.util.HashMap; 6 import java.util.Map; 7 8 import org.apache.mina.core.future.ConnectFuture; 9 import org.apache.mina.core.service.IoConnector; 10 import org.apache.mina.core.session.IoSession; 11 import org.apache.mina.filter.codec.ProtocolCodecFilter; 12 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 13 import org.apache.mina.transport.socket.nio.NioSocketConnector; 14 import org.slf4j.Logger; 15 import org.slf4j.LoggerFactory; 16 17 import com.superb.mina.clientServer.handler.ClientServerHandler; 18 import com.superb.mina.coder.MinaProtobufDecoder; 19 import com.superb.mina.coder.MinaProtobufEncoder; 20 import com.superb.mina.coder.SuperbProtocolCodecFactory; 21 import com.superb.mina.coder.SuperbProtocolDecoder; 22 import com.superb.mina.coder.SuperbProtocolEncoder; 23 import com.superb.mina.entity.BaseMsg; 24 import com.superb.mina.entity.Init; 25 26 27 public class ClientServer { 28 private static Logger logger=LoggerFactory.getLogger(ClientServer.class); 29 private static String HOST="192.168.1.115"; 30 private static int PORT=8003; 31 public static void main(String[] args) { 32 IoConnector connector=null; 33 connector=new NioSocketConnector(); 34 connector.setConnectTimeoutMillis(30000); 35 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 36 connector.setHandler(new ClientServerHandler()); 37 IoSession session=null; 38 try { 39 ConnectFuture future= connector.connect(new InetSocketAddress(HOST,PORT)); 40 future.awaitUninterruptibly(); 41 session=future.getSession(); 42 BaseMsg msg=new BaseMsg(); 43 Thread t=new Thread(); 44 int i=0; 45 while(true){ 46 i++; 47 Init init=new Init(); 48 init.setAc1(25); 49 init.setAc1FaltAlertEnable(1); 50 init.setcTime(new Date()); 51 byte[] bytes=init.getData(init); 52 msg.setSequence(i); 53 msg.setCommand(100); 54 msg.setData(bytes); 55 session.write(msg); 56 t.sleep(8000); 57 } 58 } catch (Exception e) { 59 logger.info("客戶端連接異常....",e); 60 } 61 } 62 63 /** 64 * 嘗試與服務器建立連接,如果連接成功返回IoSession 65 * @param ip 服務器IP地址 66 * @param port 服務器端口號 67 * @return 返回連接成功之后的IoSession 68 */ 69 public static IoSession connectToServer(String ip,int port){ 70 IoConnector connector=null; 71 connector=new NioSocketConnector(); 72 connector.setConnectTimeoutMillis(30000); 73 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 74 connector.setHandler(new ClientServerHandler()); 75 IoSession session=null; 76 try { 77 ConnectFuture future= connector.connect(new InetSocketAddress(ip,port)); 78 future.awaitUninterruptibly(); 79 session=future.getSession(); 80 logger.info("客戶端連接成功"); 81 } catch (Exception e) { 82 logger.info("客戶端連接異常....",e); 83 } 84 return session; 85 } 86 87 /** 88 * 發送心跳包 89 * @param session 與服務器連接成功之后的IoSession 90 * @param baseMsg 將要發送的心跳包 91 */ 92 public static void sendHeartBeat(IoSession session, BaseMsg baseMsg) { 93 logger.info("命令號為:"+baseMsg.getCommand()); 94 session.write(baseMsg); 95 } 96 }

1 package com.superb.mina.clientServer.handler; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ObjectInputStream; 5 import java.net.InetSocketAddress; 6 7 import org.apache.log4j.Logger; 8 import org.apache.mina.core.future.ConnectFuture; 9 import org.apache.mina.core.service.IoConnector; 10 import org.apache.mina.core.service.IoHandlerAdapter; 11 import org.apache.mina.core.session.IdleStatus; 12 import org.apache.mina.core.session.IoSession; 13 import org.apache.mina.filter.codec.ProtocolCodecFilter; 14 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 15 import org.apache.mina.transport.socket.nio.NioSocketConnector; 16 17 import com.superb.mina.entity.BaseMsg; 18 import com.superb.mina.helper.Helper; 19 20 21 public class ClientServerHandler extends IoHandlerAdapter{ 22 private static Logger logger=Logger.getLogger(ClientServerHandler.class); 23 @Override 24 public void sessionCreated(IoSession session) throws Exception { 25 // TODO Auto-generated method stub 26 super.sessionCreated(session); 27 logger.info("創建session"); 28 } 29 @Override 30 public void sessionIdle(IoSession session, IdleStatus status) 31 throws Exception { 32 // TODO Auto-generated method stub 33 super.sessionIdle(session, status); 34 logger.info("session進入空閑狀態"); 35 } 36 @Override 37 public void messageReceived(IoSession session, Object message) 38 throws Exception { 39 BaseMsg msg=(BaseMsg) message; 40 logger.info("客戶端接收到服務器的消息為:"+msg); 41 logger.info("報文的長度為:"+msg.getLength()); 42 logger.info("報文序列號為:"+msg.getSequence()); 43 logger.info("報文命令號為:"+msg.getCommand()); 44 logger.info("報文保留字為:"+msg.getReserved()); 45 ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData()); 46 ObjectInputStream oi=new ObjectInputStream(bi); 47 Object obj=oi.readObject(); 48 49 if(0x80000000==msg.getCommand()){ 50 //心跳應答包 51 }else if(0x80000001==msg.getCommand()){ 52 //初始化應答包 53 54 }else if(0x80000002==msg.getCommand()){ 55 //設備動作上報應答包 56 }else if(0x80000003==msg.getCommand()){ 57 //報警上報應答包 58 }else if(0x00008000==msg.getCommand()){ 59 //狀態查詢請求 60 }else if(0x00008001==msg.getCommand()){ 61 //設備控制請求 62 }else if(0x00008002==msg.getCommand()){ 63 //常用配置參數 64 }else if(0x00008003==msg.getCommand()){ 65 //用戶參數設置 66 }else if(0x00008004==msg.getCommand()){ 67 //報警使能設置 68 } 69 70 } 71 72 73 @Override 74 public void messageSent(IoSession session, Object message) throws Exception { 75 // TODO Auto-generated method stub 76 int i=0; 77 super.messageSent(session, message); 78 //logger.info(message.toString()); 79 logger.info("消息發送成功"); 80 } 81 82 @Override 83 public void exceptionCaught(IoSession session, Throwable cause) 84 throws Exception { 85 logger.error("客戶端發生異常",cause); 86 } 87 @Override 88 public void sessionOpened(IoSession session) throws Exception { 89 super.sessionOpened(session); 90 logger.info("連接成功"); 91 } 92 @Override 93 public void sessionClosed(IoSession session) throws Exception { 94 logger.info("連接斷開"); 95 IoConnector connector=null; 96 connector=new NioSocketConnector(); 97 connector.setConnectTimeoutMillis(30000); 98 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); 99 connector.setHandler(new ClientServerHandler()); 100 //i=0; 101 while(true) { 102 try { 103 Thread.sleep(3000); 104 // 這里是異步操作 連接后立即返回 105 ConnectFuture future = connector.connect(new InetSocketAddress( 106 Helper.getMap().get("server").toString(), (int) Helper.getMap().get("server_port"))); 107 future.awaitUninterruptibly();// 等待連接創建完成 108 session = future.getSession(); 109 if(session.isConnected()) { 110 logger.info("重新連接成功"); 111 break; 112 }else{ 113 // Toast.makeText(getBaseContext(), "請檢查網絡", Toast.LENGTH_SHORT).show(); 114 logger.error("請檢查網絡是否正常"); 115 } 116 } catch (Exception e) { 117 logger.info("正在嘗試重新連接...."); 118 } 119 } 120 } 121 }

1 package com.superb.mina.server; 2 3 import java.net.InetSocketAddress; 4 5 import org.apache.mina.core.service.IoAcceptor; 6 import org.apache.mina.core.session.IdleStatus; 7 import org.apache.mina.filter.codec.ProtocolCodecFilter; 8 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 9 import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 10 import org.slf4j.Logger; 11 import org.slf4j.LoggerFactory; 12 13 import com.superb.mina.coder.MinaProtobufDecoder; 14 import com.superb.mina.coder.MinaProtobufEncoder; 15 import com.superb.mina.coder.SuperbProtocolCodecFactory; 16 import com.superb.mina.coder.SuperbProtocolDecoder; 17 import com.superb.mina.coder.SuperbProtocolEncoder; 18 import com.superb.mina.server.handler.ServerHandler; 19 20 21 public class Server { 22 private static Logger logger=LoggerFactory.getLogger(Server.class); 23 24 private static int PORT=8003; 25 26 public static void main(String[] args) { 27 IoAcceptor acceptor=null; 28 try { 29 acceptor=new NioSocketAcceptor(); 30 acceptor.getFilterChain().addLast("codec", 31 new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 32 acceptor.getSessionConfig().setReadBufferSize(2048); 33 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 34 acceptor.setHandler(new ServerHandler()); 35 acceptor.bind(new InetSocketAddress(PORT)); 36 logger.info("服務器端啟動成功... 端口號為:"+PORT); 37 } catch (Exception e) { 38 logger.error("服務器啟動異常...", e); 39 } 40 } 41 }

1 package com.superb.mina.server.handler; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ObjectInputStream; 5 import java.util.Date; 6 7 import org.apache.log4j.Logger; 8 import org.apache.mina.core.service.IoHandlerAdapter; 9 import org.apache.mina.core.session.IoSession; 10 11 import com.superb.mina.entity.BaseMsg; 12 import com.superb.mina.entity.DeviceAction; 13 import com.superb.mina.entity.DeviceAlert; 14 import com.superb.mina.entity.HeartBeat; 15 import com.superb.mina.entity.Init; 16 17 18 public class ServerHandler extends IoHandlerAdapter{ 19 private static Logger logger=Logger.getLogger(ServerHandler.class); 20 21 @Override 22 public void messageReceived(IoSession session, Object message) 23 throws Exception { 24 int i=0; 25 BaseMsg msg=(BaseMsg) message; 26 logger.info("報文的長度為:"+msg.getLength()); 27 logger.info("報文序列號為:"+msg.getSequence()); 28 logger.info("報文命令號為:"+msg.getCommand()); 29 logger.info("報文保留字為:"+msg.getReserved()); 30 //logger.info(msg.getData()); 31 ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData()); 32 ObjectInputStream oi=new ObjectInputStream(bi); 33 Object obj=oi.readObject(); 34 if(0==msg.getCommand()){ 35 HeartBeat heartBeat=new HeartBeat(); 36 logger.info("室內溫度為:"+heartBeat.getInTemp()); 37 logger.info("風機電流為:"+heartBeat.getFanElec()); 38 logger.info("室外溫度為:"+heartBeat.getOutTemp()); 39 }else if(1==msg.getCommand()){ 40 Init init=(Init)obj; 41 logger.info("初始化時間為:"+init.getcTime()); 42 logger.info("初始化mac為:"+init.getMac()); 43 logger.info("初始化室內溫度為:"+init.getInTemp()); 44 logger.info("初始化室外溫度為:"+init.getOutTemp()); 45 logger.info("初始化風機電流為:"+init.getFanElec()); 46 logger.info("初始化主風機1為:"+init.getInFan1()); 47 } 48 49 /*ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData()); 50 ObjectInputStream oi=new ObjectInputStream(bi); 51 Object obj=oi.readObject(); 52 53 logger.info(obj); 54 bi.close(); 55 oi.close(); 56 if(obj instanceof Init){ 57 Init init=(Init)obj; 58 logger.info("初始化時間為:"+init.getcTime()); 59 logger.info("初始化mac為:"+init.getMac()); 60 logger.info("初始化室內溫度為:"+init.getInTemp()); 61 logger.info("初始化室外溫度為:"+init.getOutTemp()); 62 logger.info("初始化風機電流為:"+init.getFanElec()); 63 logger.info("初始化主風機1為:"+init.getInFan1()); 64 }else if(obj instanceof DeviceAction){ 65 66 }else if(obj instanceof DeviceAlert){ 67 68 }else if(obj instanceof HeartBeat){ 69 HeartBeat beat =(HeartBeat)obj; 70 logger.info("當前時間為:"+beat.getcTime()); 71 logger.info("風機電流為:"+beat.getFanElec()); 72 logger.info("室內溫度為:"+beat.getInTemp()); 73 logger.info("室外溫度為:"+beat.getOutTemp()); 74 } 75 76 77 78 msg.getData(); 79 logger.info("報文數據段為:"+msg.getData());*/ 80 session.write(msg); 81 } 82 @Override 83 public void exceptionCaught(IoSession session, Throwable cause) 84 throws Exception { 85 logger.info("服務器發生異常",cause); 86 } 87 }