Netty框架的使用
1 TCP開發范例
發送地址---192.168.31.241
發送端口號---9223
發送數據
{
"userid":"mm910@mbk.com",
"devicetype":3,
"accounttype":0,
"username":"",
"password":"e10adc3949ba59abbe56e057f20f883e",
"meiid":1000217,
"deviceid":"864376025909275"
}
接受數據
{
"message":"登錄成功",
"sessionkey":"EF81E1BD132D40DE8F1707A521D8B5A6",
"mainsn":"C001B00010000002",
"code":0
}
2 上代碼
1 業務層代碼
public class MainActivity extends Activity { private Base1106Entity entity1106;// 登錄雲棒協議 public static final int RESPONSE_SUCCESS = 0x401; public static final int RESPONSE_FAIL = 0x402; public static final int RESPONSE_TIMEOUT = 0x403; public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; //心跳超時 public static final int NOT_LOGIN= 0x411; //用戶未登錄 public Handler mHandler = new Handler() { @Override public void handleMessage(Message msg) { super.handleMessage(msg); switch (msg.what) { case RESPONSE_SUCCESS: IEntity entity = (IEntity) msg.obj; if (entity != null) { responseSuccess((IEntity) msg.obj); } else { responseFail(-1, "返回數據為空!"); } break; case RESPONSE_FAIL:// 請求失敗 if (msg != null && msg.obj != null) responseFail(-10001, (String) msg.obj); break; case RESPONSE_TIMEOUT:// 請求超時 if (msg != null && msg.obj != null) responseFail(-10000, (String) msg.obj); break; case NOT_LOGIN:// 用戶未登錄 if (msg != null && msg.obj != null) responseFail(-10002, (String) msg.obj); break; } } }; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Button login = (Button)findViewById(R.id.login); login.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { reqEntity1106(); } }); } public void reqEntity1106() { entity1106 = new Base1106Entity(); entity1106.setMeiid(1000217); entity1106.setUserid("mm910@mbk.com"); entity1106.setUsername(""); entity1106.setPassword("e10adc3949ba59abbe56e057f20f883e"); entity1106.setAccounttype( 0 ); entity1106.setDevicetype(3); entity1106.setDeviceid("864376025909275"); entity1106.setHandler(mHandler); ClientConnectFactory.getInstance().sendEntity(entity1106); } public void responseSuccess(IEntity entity) { Toast.makeText(MainActivity.this, ((Base1106Entity)entity).toString(), Toast.LENGTH_LONG).show(); } public void responseFail(int code, String msg) { Toast.makeText(MainActivity.this, msg, Toast.LENGTH_SHORT).show(); } }
public class MeiApp extends Application{ public static Context mContext; @Override public void onCreate() { super.onCreate(); mContext = this; ClientConnectFactory.getInstance().init(mContext); } }
2 業務通訊層代碼
public interface IClientConnect { public void isConnect(String netType); public void sendAgain(); public void sendMsgFail(String netType, byte[] msg); public void connectFail(String netType); // 根據實體發送數據 public void sendEntity(IEntity entity); public void sendByte(byte[] b); // 關閉 public void isClose(); // 清除當前數據 public void isClearMsg(); public void callBack(PackageHeader header, byte[] data, String desc, int type); public void callBack(IEntity entity, String desc); }
public abstract class BaseClientMgr extends Subject implements IClientConnect { protected boolean isRunning; // 當前是否正在連接 protected boolean isSending; // 是否正在發送 線程是否被占用 private int mPort; // 連接服務器的端口號 private int mCommunication; // 通訊類型 private int heartTimeOutCount = 0; // 記錄心跳超時次數 protected int function = 1200; // 關閉連接功能號 public static final int RESPONSE_SUCCESS = 0x401; public static final int RESPONSE_FAIL = 0x402; public static final int RESPONSE_TIMEOUT = 0x403; public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; // 心跳超時 public static final int NOT_LOGIN = 0x411; // 用戶未登錄 private String mConnectKey = "BasicServicesMgr"; private String mHost; // 連接服務器的IP地址 protected ArrayList<IEntity> mEntityMsg = null; // 待發送消息集合 protected Context mContext; // Context對象 protected CommunicationThreadManager mManager; // 該通訊層管理器 protected ParseByteThread mParseByteThread = null; // 數據解析線程 protected ExecutorService executor; // 線程連接池 protected BaseClientMgr(String host, int port, String key) { init(host, port, key); } // 初始化 private void init(String host, int port, String key) { this.mContext = MeiApp.mContext; isRunning = false; isSending = false; mHost = host; mPort = port; mConnectKey = key; mEntityMsg = new ArrayList<IEntity>(); executor = Executors.newFixedThreadPool(10); mParseByteThread = new ParseByteThread(this); executor.execute(mParseByteThread); } protected Handler basicHandler = new Handler() { @Override public void handleMessage(Message msg) { super.handleMessage(msg); switch (msg.what) { case ClientConstants.REQUEST: // 發送請求 連接占用 if (mEntityMsg != null && mEntityMsg.size() > 0) { isSending = true; // 清除handler的消息 basicHandler.removeMessages(ClientConstants.REQUEST); basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT); basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE); // 請求類型 當為網絡請求時判斷網絡狀態 建立連接 // 檢查連接是否可用 if (isRunning) { // 直接發送消息 basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE); basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE); } else { // 建立連接 basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT); Message msgCreate = Message.obtain(); msgCreate.what = ClientConstants.REQUEST_CREATE_CONNECT; msgCreate.arg1 = 0; basicHandler.sendMessage(msgCreate); } } break; case ClientConstants.REQUEST_CREATE_CONNECT: // 建立連接 Log.i("mbk", "建立連接!"); isConnect("netty"); break; case ClientConstants.REQUEST_SEND_MESSAGE: // 發送消息 Log.i("mbk", "發送消息!"); if (isRunning) { if (mEntityMsg.size() > 0) { Log.i("mbk", "發送數據!"); sendData(mEntityMsg.get(0)); basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT); // 設置請求超時 basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_TIMEOUT, 3000); } else { Log.i("mbk", "數據發送完成!"); isSending = false; } } else { // 重新建立連接 basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT); basicHandler.sendEmptyMessage(ClientConstants.REQUEST_CREATE_CONNECT); } break; case ClientConstants.REQUEST_SEND_HEARTBEAT: Log.i("mbk", "發送心跳!"); mManager.sendHeart(function); heartTimeOutCount++; Log.i("lzy02", "heartTimeOutCount---------------" + heartTimeOutCount); if (heartTimeOutCount >= 3) {// 大於等於3則認為與雲棒無連接 callBack(null, null, "心跳超時!", REQUEST_HEARTBEAT_TIMEOUT); } // // 發送心跳 basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT); basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT, 3000); break; case ClientConstants.REQUEST_TIMEOUT:// 請求超時 Log.i("mbk", "請求超時!"); isRunning = false; callBack(null, null, "請求超時!", RESPONSE_TIMEOUT); break; } } }; public void sendHeartbeat(int function) { this.function = function; } public void sendData(IEntity entity) { sendByte(ClientSocketUtils.sendDatas(mEntityMsg.get(0))); } // 建立連接 @Override public void isConnect(String netType) { UdpEntity udpEntity = null; int type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY; if (netType.equals("netty")) { // 建立一個netty連接 type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY; mManager = new CommunicationThreadManager(mContext, null, mConnectKey, "192.168.31.241", mPort, type, mCommunicationCallBack); Log.i("mbk", "發送地址---" + "192.168.31.241"); Log.i("mbk", "發送端口號---" + mPort); /* * if (udpEntity != null) { Log.i("lzy02", * "udpEntity---209----------udpEntity=="+udpEntity.getYunbangIp()); * mManager = new CommunicationThreadManager(mContext, null, mConnectKey, * "192.168.31.241", mPort, type, mCommunicationCallBack); * //Toast.makeText(mContext, "已通過Netty發送 ", Toast.LENGTH_SHORT).show(); * Log.i("mbk","netty發送雲棒IP號---" + udpEntity.getYunbangIp()); } else { * Log.i("lzy02", "udpEntity---211----------udpEntity == null"); * callBack(null, null, "無法連接netty!", RESPONSE_FAIL); } */ // 使用netty是時候 清理p2p P2pClearUp(); } else { } Log.i("mbk", "初始化 連接服務器!" + netType); } @Override public void sendByte(byte[] b) { try { if (mManager != null) { mManager.sendDataToServer(new SendData(b)); } else { isClose(); } } catch (InterruptedException e) { isClose(); } } // 服務端回調 private CommunicationCallBack mCommunicationCallBack = new CommunicationCallBack() { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { Log.i("mbk", "--------------------------請求異常--------------------------" + mCommunication); isRunning = false; callBack(null, null, "請求異常!", RESPONSE_FAIL); } @Override public void connected(ChannelHandlerContext ctx) { Log.i("mbk", "--------------------------連接成功--------------------------" + mCommunication); // mChx = ctx; isRunning = true; sendAgain(); } @Override public void connectFailure(Exception e) { Log.i("mbk", "--------------------------連接服務器失敗--------------------------" + mCommunication); isRunning = false; callBack(null, null, "連接服務器失敗!", RESPONSE_FAIL); } @Override public void channelRead(ChannelHandlerContext ctx, byte[] msg) { Log.i("mbk", "--------------------------服務端返回--------------------------" + mCommunication); if (mParseByteThread != null) { mParseByteThread.sendParseByte(msg); } } @Override public void communicationOutTime() { Log.i("mbk", "--------------------------連接超時--------------------------" + mCommunication); isRunning = false; callBack(null, null, "連接超時!", RESPONSE_TIMEOUT); } @Override public void questTimeOut() { Log.i("mbk", "--------------------------請求超時--------------------------" + mCommunication); isRunning = false; callBack(null, null, "請求超時!", RESPONSE_TIMEOUT); } }; @Override public void sendAgain() { // 連接成功 發起請求 Log.i("mbk", "連接成功,數據重新發送!"); // basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE); basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_MESSAGE, 500); } // 接收需要發送的實體 @Override public void sendEntity(IEntity entity) { if (mEntityMsg != null && entity != null) { mEntityMsg.add(entity); if (!isSending) { // 啟動一個發送 Log.i("mbk", "發起請求!REQUEST_NET"); basicHandler.sendEmptyMessage(ClientConstants.REQUEST); } } // if (mEntityMsg != null && mEntityMsg.size() == 2) { // mEntityMsg.remove(1); // } } @Override public void callBack(PackageHeader header, byte[] data, String desc, int type) { basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT); switch (type) { case RESPONSE_SUCCESS: heartTimeOutCount = 0; basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT, 20000); switch (header.getFunction()) { case 9998: Log.i("mbk", "服務端關閉!"); isClose(); break; case 9999: Log.i("mbk", "成功返回一個心跳!"); break; case 999: Log.i("mbk", "未知錯誤!"); callBack(null, null, "未知錯誤", RESPONSE_FAIL); break; default: responseSuccess(header, data, desc, type); break; } break; case REQUEST_HEARTBEAT_TIMEOUT:// 心跳超時3次認為與雲棒無連接 /* * Intent m2Intent = new Intent(MeiConfigs.NETWORK_PROMPT); * m2Intent.putExtra("islogin", "3003"); * MeiApp.mContext.sendBroadcast(m2Intent); */ break; case RESPONSE_FAIL: responseFail(header, data, desc, type); break; case RESPONSE_TIMEOUT: responseFail(header, data, desc, type); break; } } // 請求成功 public void responseSuccess(PackageHeader header, byte[] data, String desc, int type) { try { if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) { IEntity entity = mEntityMsg.get(0); if (data != null && data.length > 0) { entity.onDecode(new String(data, "utf-8")); // Log.i("mbk","雲棒返回---" + "---" + new String(data, "utf-8")); // 請求成功 Log.i("lzy02", "1--------------" + entity.getCode()); Log.i("mbk", "返回一條數據!"); Message msg = Message.obtain(); msg.obj = entity; msg.arg1 = header.getFunction(); msg.what = type; entity.getHandler().sendMessage(msg); } } } catch (Exception e) { e.printStackTrace(); isClose(); } if (mEntityMsg != null && mEntityMsg.size() > 0) { mEntityMsg.remove(0); } basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT); isSending = false; if (mEntityMsg.size() > 0) { basicHandler.sendEmptyMessage(ClientConstants.REQUEST); } } // 請求失敗 public void responseFail(PackageHeader header, byte[] data, String desc, int type) { Log.i("mbk", "請求失敗! " + desc); Message msg = Message.obtain(); msg.obj = desc; msg.arg1 = 0; msg.what = type; if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) { mEntityMsg.get(0).getHandler().sendMessage(msg); } isClose(); } // 請求本地緩存返回 @Override public void callBack(IEntity entity, String desc) { Log.i("mbk", "回一返個緩存數據! "); if ("cache".equals(desc)) { if (entity != null && entity.getHandler() != null) { Message msg = Message.obtain(); msg.obj = entity; msg.what = RESPONSE_SUCCESS; entity.getHandler().sendMessage(msg); } } } public void P2pClearUp() { if (mManager != null) { mManager.p2pCleanup(); } } @Override public void isClose() { Log.i("mbk", "關閉連接!" + isRunning); if (mManager != null) { if (isRunning) { try { mManager.sendDataToServer(new SendData(ClientSocketUtils.sendExit(function))); } catch (InterruptedException e) { } } else { mManager.closeTheadManager(); mManager = null; } } if (mParseByteThread != null) mParseByteThread.closeThread(); if (mEntityMsg != null) { mEntityMsg.clear(); } P2pClearUp(); basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT); basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT); isRunning = false; isSending = false; } @Override public void sendMsgFail(String netType, byte[] msg) { } @Override public void connectFail(String netType) { } @Override public void isClearMsg() { if (mEntityMsg != null) { mEntityMsg.clear(); } } }
public class BasicServicesMgr extends BaseClientMgr { public static BasicServicesMgr instance = null; public static BasicServicesMgr getInstance() { if (instance == null) { instance = new BasicServicesMgr(); } return instance; } private BasicServicesMgr() { super( "192.168.43.1", 9223, ClientConnectorManager.BASIC_SERVICES_MGR_KEY); } //接收需要發送的實體 @Override public void sendEntity(IEntity entity) { if (entity != null) { // 請求列表每次最多保存兩個請求 if (mEntityMsg != null && mEntityMsg.size() == 2) { mEntityMsg.remove(1); } mEntityMsg.add(entity); if (!isSending) { // 啟動一個發送 isSending = true; basicHandler.sendEmptyMessage(ClientConstants.REQUEST); } } } }
public interface Observer { //更新接口 public void update(IEntity state); }
class ParseByteThread implements Runnable { private byte[] bufHeader = null; private byte[] readData = null; private PackageHeader header = null; private int headerLenth = PackageHeader.headerLenth; private int readDataLenth = 0; private int sLength = 0;// 添加到數組的長度 private Handler fileParseHandler = null; private IClientConnect connect; public static final int RESPONSE_SUCCESS = 0x401; public static final int RESPONSE_FAIL = 0x402; public static final int RESPONSE_TIMEOUT = 0x403; /** 心跳超時 */ public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; /** 用戶未登錄 */ public static final int NOT_LOGIN= 0x411; public Handler getFileParseHandler() { return this.fileParseHandler; } public void sendParseByte(byte[] msg) { if (fileParseHandler != null) { Message msgData = Message.obtain(); msgData.obj = msg; fileParseHandler.sendMessage(msgData); } } public ParseByteThread(IClientConnect connect) { readDataLenth = 0; sLength = 0; headerLenth = PackageHeader.headerLenth; bufHeader = new byte[PackageHeader.headerLenth]; readData = null; header = new PackageHeader(); this.connect = connect; } public void setFileParseHandler(Handler fileParseHandler) { this.fileParseHandler = fileParseHandler; } public void closeThread(){ readDataLenth = 0; sLength = 0; headerLenth = PackageHeader.headerLenth; bufHeader = new byte[PackageHeader.headerLenth]; readData = null; header = new PackageHeader(); } @Override public void run() { Looper.prepare(); fileParseHandler = new Handler() { public void handleMessage(Message data) { synchronized (data) { byte[] msg = (byte[]) data.obj; if (msg == null) { return; } int msgLength = msg.length; int useLength = 0;// 已經使用的長度 while (msgLength - useLength > 0) { // 讀取包頭 if (readDataLenth == 0) { if (msgLength - useLength >= headerLenth - sLength) { // 讀取了一個完整的包頭 System.arraycopy(msg, useLength, bufHeader, sLength, headerLenth - sLength); useLength += (headerLenth - sLength); sLength = 0; header.setPackageHeader(bufHeader); if (header.getFunction() > 10000 || header.getFunction() < 999) { // 包頭不符合,跳出循環 放棄整包 connect.callBack(null, null, "包頭不符合", RESPONSE_FAIL); break; } if (header.getFunction() != 9999 && header.getFunction() != 9998) { readDataLenth = (int) header.getInclusionLenth(); readData = null; readData = new byte[readDataLenth]; } else if (header.getFunction() == 9999) { // 發送心跳包 connect.callBack(header, readData, "", RESPONSE_SUCCESS); } else if (header.getFunction() == 9998) { msgLength = 0; useLength = 0; connect.callBack(header, readData, "", RESPONSE_SUCCESS); } } else { System.arraycopy(msg, useLength, bufHeader, sLength, msgLength - useLength); sLength += (msgLength - useLength); break; } } // 讀取包體 else { if (msgLength - useLength >= readDataLenth - sLength) { // 讀取了一個完整的包體 System.arraycopy(msg, useLength, readData, sLength, readDataLenth - sLength); useLength += (readDataLenth - sLength); sLength = 0; readDataLenth = 0; bufHeader = null; bufHeader = new byte[PackageHeader.headerLenth]; // 解析成功 返回數據 try { connect.callBack(header, readData, "", RESPONSE_SUCCESS); } catch (Exception e) { e.printStackTrace(); } } else { System.arraycopy(msg, useLength, readData, sLength, msgLength - useLength); sLength += (msgLength - useLength); break; } } } } } }; Looper.loop(); } }
public abstract class Subject { //用來保存注冊的觀察者對象 private List<Observer> list = new ArrayList<Observer>(); private Handler subHandler = new Handler(MeiApp.mContext.getMainLooper()) { public void handleMessage(Message msg) { if (list != null && list.size() > 0) { for (int i = 0; i < list.size(); i++) { list.get(i).update((IEntity) msg.obj); } } } }; //注冊觀察者對象 public void attach(Observer observer) { if (list != null) { list.add(observer); } } //刪除觀察者對象 public void detach(Observer observer) { if (list != null && list.size() > 0 && observer != null) { list.remove(observer); } } //刪除觀察者對象 public void clear() { if (list != null && list.size() > 0) { list.clear(); } } //通知所有注冊的觀察者對象 public void nodifyObservers(final IEntity newState) { new Thread(new Runnable() { @Override public void run() { Message msg = Message.obtain(); msg.obj = newState; subHandler.sendMessage(msg); } }).start(); } }
代碼見https://github.com/huanyi0723/NettyTest