項目中redis改brpop阻塞模式為訂閱模式的實現(二)


更改項目需求以及項目之前阻塞模式問題的敘述已經在上一篇說過了,詳情可參考:https://www.cnblogs.com/darope/p/10276213.html文章的介紹。

關於Agent數據采集相關內容介紹可以參考華中科技大學的這篇碩士論文,說的比較詳細:http://www.docin.com/p-131767044.html 。

一,關於brpop為什么要更改,這里簡單分析一下原版本的阻塞代碼。

 1 @Override
 2     public void readyForControl(Service.ControlRequest request, StreamObserver<Service.ControlResponse> responseObserver) {
 3         byte[] uuidByte = request.getH().getId().toByteArray();
 4         JUUID juuid = new JUUID(uuidByte);
 5         String uuid = juuid.toString();
 6         logger.info("readyForControl uuid: " + uuid);
 7         // agent上線
 8         Long onlineTime = System.currentTimeMillis();
 9         redisService.set(ONLINE_PREFIX + uuid, String.valueOf(System.currentTimeMillis()));
10         onlineAgent(uuid);
11 
12         while (true) {
13             try {
14                 //暫時沒有更好的辦法處理,降低兩個while同時守護任務redis的可能性
15                 if (needBreak(uuid, onlineTime)) {
16                     break;
17                 }
18                 List<Task> tasks = taskRedisMap.brpop(uuid);
19                 if (Objects.isNull(tasks)) {
20                     continue;
21                 }
22                 for(Task task : tasks) {
23                     //agent 重啟后丟失一個任務;老的rpc通道收到任務放回機制
24                     if (needBreak(uuid, onlineTime)) {
25                         taskRedisMap.pushTask(task);
26                         continue;
27                     }
28                     logger.info("task get uuid: " + uuid + " nodeId: " + task.getNodeId());
29 
30                     Service.ControlResponse.ControlCmd controlCmd = Service.ControlResponse.ControlCmd.forNumber(task.getTaskType());
31                     Service.ControlResponse response = null;
32                     assert controlCmd != null;
33                     // 根據任務類型分配任務
34                     response = getControlResponseOption(task, controlCmd, null);
35                     logger.info("cmd: " + controlCmd + " nodeId " + task.getNodeId());
36                     if (Objects.isNull(response)) {
37                         logger.info("empty response. nodeId " + task.getNodeId());
38                         return;
39                     }
40 
41                     // 通知業務調用方
42                     readyForControlEvent(task);
43                     logger.info("readyForControlEvent");
44                     task.setTaskStatus(TaskStatusEnum.BUSY);
45                     task.setStartExecTimeout(System.currentTimeMillis());
46                     task.setReceiveEvent(true);
47                     taskRedisMap.update(task);
48                     logger.info("onNext ...");
49                     responseObserver.onNext(response);
50                     logger.info("onNext OK...");
51                 }
52             } catch (Throwable e) {
53                 logger.error("readyForControl異常, uuid={}", e, uuid);
54             }
55         }
56     }

客戶端在服務端注冊好自己傳送過來的數據后,調用readyForControl,請求服務端下發命令,有幾個agent客戶端主機,就會調用幾次。相同agent再次上線這里就會出現一個很大的問題,原來的agent沒有下線,相同的agent再次上線,這里會再次調用readyForControl。意味着相同的agent調用了兩次,而且新上線的agent后調用readyForControl。如果采用brpop的方式,意味着一開始上線的agent調用readyForControl已經拿走了消息列隊的task任務,后來的只能拿不到,空指針異常。這里采用了一個不是辦法的辦法,就是寫一個死循環,監聽agent上線動作,比對一下,如果這個agent是后來上線的,就會break掉,杜絕了異常的發生。但是這個操作會顯得很臃腫,而且效率不太好。

二,更改為訂閱模式或許會解決以上問題,原因如下:

  a.  readyForControl中,只有一個訂閱方法,簡潔很多

  b.  不需要判斷是不是相同agent上線的問題,雖然新上線的agent跟之前的agent是同一個agent,但是跟redis的發布訂閱模式不沖突,老的agent也會訂閱到消息,新的agent也會訂閱到消息。避免了一個大的用於判斷agent新舊問題的死循環。

  c.  效率更高,redis底層是c語言實現的,借助redis的機制來解決問題,往往比自己實現邏輯來解決問題,從本質上看來要可取。

三,更改的過程中遇到的坑:

很遺憾,很多坑是我想當然的以為造成的,並沒有嚴謹的考慮軟件工程的思想以及大型程序運行的理論情形。對此只會讓我以為我還有很多的東西要學,現在的出錯,只是為了記憶更深刻吧。下面由淺入深做簡單總結:

  a. 從簡單訂閱模式,到多線程訂閱模式。

  訂閱模式本身是redis自帶的方法,但是訂閱模式是恆阻塞的,一旦進入訂閱的方法,就會一直監聽發布方是否發布了消息,導致監聽阻塞,無法使調用方程序順序執行。雖然訂閱方法父類有onMessage方法可以終止訂閱,但是不滿足需要監聽agent上線的邏輯策略。對此需要增加多線程實現,把訂閱方法寫到線程空間中去。

 1     @Override
 2     public void readyForControl(Service.ControlRequest request, StreamObserver<Service.ControlResponse> responseObserver) {
 3         byte[] uuidByte = request.getH().getId().toByteArray();
 4         JUUID juuid = new JUUID(uuidByte);
 5         String uuid = juuid.toString();
 6         Long agentId = taskRedisMap.getIdByUuid(uuid);
 7         // 調用訂閱者線程
 8         SubThread subThread;
 9         subThread = new SubThread(redisService.getJedisPool(), agentId, responseObserver, taskRedisMap, applicationContext);
10         subThread.start();
11         logger.info("readyForControl uuid: " + uuid);
12 
13         // agent上線
14         redisService.set(ONLINE_PREFIX + uuid, String.valueOf(System.currentTimeMillis()));
15         onlineAgent(uuid);
16     }

更改之后的代碼采用多線程開啟訂閱方法,刪除死循環維護agent上線的問題。當多agent上線時,會為每一個agent客戶端開啟一個屬於自己的訂閱方法,由於brpop方式采用的是uuid轉化為agentId對比任務agentId的方式,以此來保證任務下發的准確性,我就把頻道更改為uuid,保證了任務下發的准確性。

  b.從專用頻道訂閱模式,到通用頻道訂閱模式。

  企業級項目必須考慮到資源的損耗和浪費情況,如果每一個上線agent客戶端均使用專用頻道,會增加redis的負荷,嚴重會讓redis睡覺。如此看來為每一個agent開一個以agent的id相關的字符串為該agent的通道的話,是絕對不可取的。在師兄的引導下,為此我折騰了一個下午,目的就是不采用專用通道,采用通用通道,即所有任務shell的發布和訂閱都在一個頻道,是誰的誰自己來領取。但是怎么領取,最后我通過把uuid傳到訂閱線程中,從onMessage中轉化為任務序列對比發布中的任務序列號,取到我需要的task然后return到調用方。看起來還不錯,我比較滿意。

  c.程序運行並不滿足預期

  如我所想,數據我是拿到了,接着我在readyForControl調用這個線程后,取到agentId對應的所有任務列表,這樣我就可以使用這個任務列表onNext到客戶端啦,像下面這樣:

 1  // 通知業務調用方
 2                 readyForControlEvent(task);
 3                 logger.info("readyForControlEvent");
 4                 task.setTaskStatus(TaskStatusEnum.BUSY);
 5                 task.setStartExecTimeout(System.currentTimeMillis());
 6                 task.setReceiveEvent(true);
 7                 //不通
 8                 taskRedisMap.update(task);
 9                 logger.info("onNext ...");
10                 responseObserver.onNext(response);
11                 logger.info("onNext OK...");

但是下面的方法是取不到我的task任務列表的所有數據的,原因是,當我進入到我的線程后,我執行訂閱方法,對比我傳入的uuid拿到屬於該agent的一個task。然后調用這個線程的方法就會順序執行了。線程仍然存在,只是再也沒人調用了,readyForControl代碼程序一旦順序執行,就回不到調用線程的那個代碼位置了。尷尬的是,理論上,我的task列表里面只會有一條task。

  d.沒法在readyForControl中拿到所有task的列表,我必須在線程里面單個處理,仔細想想,效率好像還提升了

逆行思維真的是很好的方式,他會使你在向左走不通的情況下會考慮向右走一走,最終走出這個死胡同。程序封裝的目的在於統一處理,正常的方式是我所有task存入到我的list列表中,return到調用方,在readyForControl中統一onNext到agent客戶端。線程方式這種走不通,只能把接下來所有操作task的代碼傳到線程中去,在線程中一個一個onNext到客戶端。首先要做的是把需要用到的類實例傳到線程中去,該傳進去的傳進去,該注入的注入到線程空間中去。然后每次收到訂閱消息message,我都把這個message轉化為對應agent的task最后onNext下發到客戶端。看起來還不錯,但是即將迎來一個大坑。

  e.程序沒報錯,為什么線程空間中的實例,會頻繁的報空指針?

代碼看着已經沒什么問題,邏輯上也是可行的,但測試的時候,老是空指針。查閱資料,發現Spring為了安全,禁止向線程空間中注入bean。網上的解決辦法很多,我需要注入的就是兩個操作task任務流的bean,所以就采用了最簡單的傳遞參數的方式,外層先注入我需要的bean,然后當成調用線程的方法的參數。線程方使用私有變量初始化類,不采用注入的方式,然后通過構造方法拿到傳進來的類實例。

  f.或許你認為最不應該有問題的地方出現了問題

最終代碼已經差不多可以使用了,但是偶爾會拋異常,檢查了一晚上發現是jdk中操作list的問題。至今不是很明白,也希望有讀到的大神給與評論原因。一開始的邏輯,在對比是不是我這個上線agent的task的時候,我采用一個if判斷。在任務列表tasks不可能為空的情況下,if( 上線agentId.compareToIgnoreCase(發布方發布的Task中的agentId) != 0 ) 從tasks列表中移除這個不匹配的task,采用tasks.remove(task)的方式,else下發這個任務到客戶端    ------------》  更改為if( 上線agentId.compareToIgnoreCase(發布方發布的Task中的agentId) =0 )下發任務到客戶端,else不做處理。就解決了異常問題,看似兩個邏輯是一樣的,或許是remove操作列表有什么需要注意的吧。

最終所有操作都在線程空間中處理,訂閱線程繼承的的onMessage方法中,分布對訂閱到的task單獨處理,肢解了圓來readyForControl的代碼:

 1 @Override
 2     public void onMessage(String channel, String message) {       //收到消息會調用
 3         logger.info("收到了發布者的消息,頻道為: {}, 消息為: {}", channel, message);
 4 
 5         tasks.add(message);
 6 
 7         key = TASK_PENDING_PREFIX + agentId;
 8 
 9         List<Map> taskList = tasks.stream().map(k -> Json2.fromJson(k, Map.class)).collect(Collectors.toList());
10         if (taskList.size() == 0) {
11             return;
12         }
13         // 篩選出ShellTask
14         List<ShellTask> shellTaskList = taskList.stream().filter(t -> Objects.equals(t.get("execType"), ExecScriptType.SHELL.getCode())).map(t -> Json2.fromJson(Json2.toJson(t), ShellTask.class)).collect(Collectors.toList());
15         if (shellTaskList.size() == 0) {
16             return;
17         }
18         List<Task> task_ = shellTaskList.stream().filter(t -> Objects.equals(t.getTaskStatus(), TaskStatusEnum.NOT_OPERATED)).collect(Collectors.toList());
19         logger.info("task list : {}", task_);
20 
21         //返回攜帶特定uuid訂閱者agent的task
22 
23         for (Task task : task_) {
24             String keyPub = TASK_PENDING_PREFIX + task.getAgentId();
25             logger.info("keyPub {}", keyPub);
26             if (key.compareToIgnoreCase(keyPub) == 0){
27                 logger.info("task get uuid: " + key + " nodeId: " + task.getNodeId());
28 
29                 Service.ControlResponse.ControlCmd controlCmd = Service.ControlResponse.ControlCmd.forNumber(task.getTaskType());
30                 Service.ControlResponse response = null;
31                 assert controlCmd != null;
32                 // 根據任務類型分配任務
33                 response = getControlResponseOption(task, controlCmd, null);
34                 logger.info("cmd: " + controlCmd + " nodeId " + task.getNodeId());
35                 if (Objects.isNull(response)) {
36                     logger.info("empty response. nodeId " + task.getNodeId());
37                     return;
38                 }
39 
40                 // 通知業務調用方
41                 readyForControlEvent(task);
42                 logger.info("readyForControlEvent");
43                 task.setTaskStatus(TaskStatusEnum.BUSY);
44                 task.setStartExecTimeout(System.currentTimeMillis());
45                 task.setReceiveEvent(true);
46                 //不通
47                 taskRedisMap.update(task);
48                 logger.info("onNext ...");
49                 responseObserver.onNext(response);
50                 logger.info("onNext OK...");
51             }
52         }
53     }
View Code

接下來的優化策略是,判斷agent上線時間,如果是相同agent再次上線,可以考慮讓以前的agent下線,而非繼續訂閱,雖然繼續訂閱不會影響程序正常使用,也不需要像brpop的方式來維護消息列隊中的task,但是當agent某個客戶端反復上線下線,也會造成不必要的訂閱資源浪費,所以程序還是需要判斷哪些agent需要下線處理。

因為是實習第一階段,自己還算個小白,很多思考不到的地方,踩了不少坑,特此記錄。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM