后端用的是c#,所以長鏈接這塊用的是signalR。公司的前端是用flutter的,也有線程的signalR的插件。可惜會出現一些問題,決定自己封裝一個。這里就簡單介紹一下android原生封裝signalR吧
這邊實現了,心跳機制,斷線重連,消息去重發送,連接狀態等。
先封裝了hubConnection,然后在這層實現了心跳。這一塊必須得扯上后端,后端實現了一個方法,收到什么參數,馬上就把這個參數傳回來。然后就用這個方法實現心跳,發送一個消息給服務器,服務器收到這個消息。記錄下發出時間和接收時間,不小於自己設定的時間間隔,則認定網絡狀態有效。當心跳無效的時候就把連接狀態置為false,表示連接斷開。其實他提供了一個回調oncloed。當連接關閉的時候會調用這個回調。但是不能太依賴這個,所以自己寫了心跳來確保連接。下為心跳的邏輯。
while(isRunning){ long ping = System.currentTimeMillis()/1000; //發送心跳包 try{ hubConnection.send("Echo",String.valueOf(ping)); }catch (Exception e){ connectStatus = false; } //心跳延時 try { Thread.sleep(heartDelay); } catch (InterruptedException e) { e.printStackTrace(); } //最后一次接收消息時間小於發送心跳時間, //起碼在心跳時間內,沒有收到包。 if(lastRecvTime < ping){ long delay = System.currentTimeMillis()/1000 - ping; //時間差大於重連時間的時候,判定為超時,連接狀態置為false if(delay > KeepAliveTimeOutSecond){ connectStatus = false; }else { connectStatus = true; } }else { connectStatus = true; } }
這個isRunning則表明需不需要進行心跳檢測,當連接斷開的時候當然是不必要的啦。(ps,來自后端大佬的一個建議,死循環線程里要加一個try,避免他因為錯誤而中斷循環)。
然后開放了三個方法,開始連接,斷開連接,發送消息。
/** * 開放的三個方法 * */ public void send(String method,Object... message){try{ hubConnection.send(method,message); }catch (Exception e){ connectStatus = false; } } public void stopConnect(){ isRunning = false; connectStatus = false; hubConnection.stop(); } public void startConnect(){ Log.i(TAG,"start connect this message from SignalRSession"); hubConnection = HubConnectionBuilder.create(url) .build(); setOn(); hubConnection.start().blockingAwait(); heartCheck(); isRunning = true; }
發送消息就不多說了,就是包一下。這里加try是為了保證特殊原因連接丟失的情況下,調用send方法不會出錯。
斷開連接的時候把心跳循環停掉,連接狀態也是理所當然的變成false,然后是hubConnection的stop。
建立連接的話,就是把url傳入,這里的url是在這個類初始化的時候拿到的。setOn是我自己寫的建立監聽的函數,發送過來消息都會在setOn中收到,然后通過handler發出去。然后開始的時候要建立心跳連接。當然這塊可以放到初始化里。可以優化下。
public SignalRChannel(String url1, android.os.Handler handler) { this.url = url1; this.receiveHandler = handler; }
這是這個類的構造器,url用來建立連接就不多說。這個handler是為了發送消息以及更上層接收消息。
到此為止,第一層封裝完了。
接下來是第二層,實現了斷線重連,消息去重,記錄數據庫等操作。數據庫選用的框架用的是room。
這一塊操作比較多,可能會講的有點亂。到時候可以看看我的demo消化下。
public ReliableClient(String url1, Context context) { this.url = url1; this.context = context; //創建數據庫,如果存在不會重復創建 db = Room.databaseBuilder(context, AppDatabase.class, "database-name").build(); recordDb = Room.databaseBuilder(context, recordDatabase.class,"database-name1").build(); loadData(); logFile = new LogFile(context); Thread t = new Thread(runnableSend); t.start(); }
這個是構造器,第一個數據庫用來存收到的數據,第二個數據庫用來處理進度(處理到第幾個數據了) 。loadData是獲取進度,即剛剛的數據庫。logFile是我自己寫的類,用於寫日志。然后這個線程啟動的是短線重連。這里一個ReliableClient可以用單例來實現。
private void loadData() { Runnable runnable = new Runnable() { @Override public void run() { if(recordDb.recordDao().databaseCount()<1){ //數據庫沒有數據,設置為默認值 curRecvSeq = -1; authMessage = null; Log.i(TAG,"load <1 "); }else if(recordDb.recordDao().databaseCount() == 1){ //數據庫一條數據,取這條數據 recordData messageData = recordDb.recordDao().getRecord(); curRecvSeq = messageData.curRecvSeq; authMessage = new AuthRequest(messageData.ClientType,messageData.Token,messageData.UserId,messageData.Version); if(authMessage.ClientType == -1){ authMessage = null; } Log.i(TAG,"load = 1 "+curRecvSeq); }else { Log.i(TAG,"qweq: "+recordDb.recordDao().databaseCount()); //數據庫很多數據,取最后一條的數據 recordData messageData = recordDb.recordDao().getRecord(); curRecvSeq = messageData.curRecvSeq; authMessage = new AuthRequest(messageData.ClientType,messageData.Token,messageData.UserId,messageData.Version); recordDb.recordDao().deleteAll(); recordData record1 = new recordData(); record1.Token = authMessage.Token; record1.curRecvSeq = messageData.curRecvSeq; record1.Version = authMessage.version; record1.ClientType = authMessage.ClientType; record1.UserId = authMessage.UserId; recordDb.recordDao().insertAll(record1); if(authMessage.ClientType == -1){ authMessage = null; } Log.i(TAG,"load > 1 "+curRecvSeq); } } }; new Thread(runnable).start(); if(curRecvSeq != -1){ //如果有操作記錄,那么查詢數據庫,取出未處理的數據,發給flutter。 List<MessageData> messageDataList = db.userDao().getAll(); for(MessageData messageData : messageDataList){ //未操作數據壓入哈希表 hTable.put(messageData.seq,messageData); curRecvSeq ++; } } Log.i(TAG,"load msg :"+curRecvSeq); }
這個數據庫理論上只能存在一條數據,因為是記錄嘛,然后這里的邏輯是,當數據庫沒有數據時,給他一個默認值,標記為初次啟動。當一條數據的時候讀取這條數據。當出現不可抗力時,出現了多條數據,取出最后一條數據,然后刪庫不跑路。把這最后一條記錄插進數據庫。這個記錄是為了獲取登錄信息的,賬號,token等。這樣他從后台啟動起來的時候,還是處於連接狀態。
下面是斷線重連機制以及消息發送隊列機制
private Runnable runnableSend = new Runnable() { @Override public void run() { while(isRunning){ try {//刷新連接狀態 if(signalRChannel == null || !signalRChannel.isConnected()){ try{ reConnect(); }catch (Exception e){ e.printStackTrace(); continue; } } while(!sendMessageQueue.isEmpty()){ //發送消息 SendMessage sendMessage = sendMessageQueue.poll(); signalRChannel.send(sendMessage.method, sendMessage.message); } if(!logFile.fileStatus){ logFile.openLog(); } Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } } };
這里還是跑一個死循環線程,反復確認連接狀態,如果斷開連接的話,執行重連。消息發送也比較簡單,放在一個隊列里,順便發送出去。
下面是重連,比較簡單就這樣看吧。
private void reConnect() { if(signalRChannel != null){ signalRChannel.stopConnect(); } signalRChannel = new SignalRChannel(url,receiveHandler); signalRChannel.startConnect(); if(authMessage!=null){ signalRChannel.send("Auth",authMessage); } }
下面是三個開放給外部的方法
//發送消息 public void send(String method,Object... messages){ /** * queue * */ if(method.equals("Echo")){ long time = System.currentTimeMillis()/1000; signalRChannel.send(method,String.valueOf(time/1000)); }else { SendMessage sendMessage = new SendMessage(method,messages); sendMessageQueue.offer(sendMessage); } } //登錄 public void LogIn(AuthRequest authRequest){ this.authMessage = authRequest; //todo: write file // signalRChannel.send("Auth",authMessage); } //登出 public void LogOut(){ authMessage = null; if(signalRChannel != null){ signalRChannel.stopConnect(); } if(logFile.fileStatus){ logFile.closeLog(); } }
發送消息的時候給他壓進消息隊列里,等一段時間發送。當然我這里設置時間是4秒,有點不合理,這個需要自己改一下。
然后這里的登錄登出只是狀態登出了,長鏈接是一直存在的。
登出是先把authMessage清空,然后斷開重連一下,就斷開了。
登錄只是記錄下他的登錄信息,因為我們登錄走的是另外的方法。
這里大致是這樣了,其他很多代碼都是跟我們自己的業務相關,我會覺得不具有參考性,就不列出來了。
最后貼一下demo地址: https://github.com/libo1223/signalR
