[多線程] Web 項目中,少有涉及到的一次多線程編程的經驗


  如今框架橫行,Spring 已經是非常成熟的容器體系,我們在日常開發 JavaWeb 的工作中,大多已經不需要考慮多線程的問題,這些問題都已經在Spring容器中實現,框架的意義就是讓程序員們可以專注於邏輯的實現。然而這種編程工作是非常無趣無味的,如果長期從事這個工作,技術不一定見長,業務知識一定很熟悉!= =但說實在的,我並不喜歡這類工作,因為這種工作大多情況下知識對代碼的簡單復制,或是簡單的一些編寫,並沒有什么真正的創造性,不會給人成就感。

  需求背景


  我們的項目,是 Mysql+ElasticSearch 做的一個數據庫和搜索引擎,項目經理提出需要做一個用於重建 ES 搜索數據的接口,這個任務很光榮的交給了我。

  在功能的編寫過程當中,我突然思考這樣一個問題,因為我們 Web 項目本身是多線程的,那如果在同一時間段,有多個請求同時發起,那同時發起 ES 的重建,對於 ES 來說,可能會產生一些莫名其妙的問題。

  所以我感到非常高興,因為這個問題,似乎不是聽起來的那么簡單。於是乎我想到了,要加入同步鎖了。

最開始的思考:


  最開始我只是很簡單的想,直接在對應的 Service 層寫一個方法,然后直接加一個

synchronized(this)

在整個方法體上。    

1 @Override
2     public synchronized int rebuiltBountyData() throws Exception {
3         ...
4     }

 

  可是問題來了:


  但是這個方法很快就聯想到了另一個問題:

  我們是希望不要多線程同時重建數據,但是如果排隊重建呢?好像也不是我們想要的結果。我希望的是當一個線程在執行重建任務的時候,另一個線程要被拒絕開始任務,而不是等待上一個任務做好后再開始。因為我們 tomcat 是采用線程池的概念,如果所有線程都執行這個方法,最后每個線程都會處於等待狀態,結果其他請求就會因為沒有空閑的線程可用,而無法正常執行。

  so,我們修改了一下思路:

  在 Service 的這個實現類中,添加一個私有類成員對象  flag = false,當線程進入時,判斷 flag 是否為 true,是,則直接拋出異常,結束線程。否,則修改 flag 的值為 true,然后開始執行線程任務,並且,我們對這個 flag 加上一個同步鎖,例如:我們在代碼中使用時,加入這樣一段

synchronized(flag)

  由於 Spring 默認是單例模式,所以這個flag 在多個線程中是共享的,這樣就不需要將這個flag 設置為 static 了,因為它在這個局部當中實現了類似 static 的作用。但是這個時候,flag 不能是基礎類型,必須是 Boolean 包裝類型。那就會產生另一個隱患:包裝類的對象僅僅是一個引用,引用是可以被更換了,比如使用了這個 flag 的 set 方法來修改值,但是同步鎖取得是引用的鎖,而不是引用對應那個實例的鎖,鎖了引用卻沒鎖實例,但我們實際上卻要根據實例的狀態來判斷,這就會造成一個隱患,可能會使得同步鎖失效。

  那使用 this 來獲得整個 Service 類的同步鎖,貌似可以解決問題(如下面這段代碼具體實現),但是如果萬一以后這個 Service 還有其他需要用到同步鎖的需求怎么辦呢?這樣就會讓兩個不想干的業務邏輯因為同步鎖的問題產生互相的影響。添加同步鎖,要盡可能的縮小同步鎖的獲取范圍,和鎖內代碼的代碼量,這樣才能減少沖突和線程獲取鎖時等待的時間,提高軟件的安全性和執行效率。

  而且,我們的需求在這個時候,又有了變化,項目經理說,有兩張表都需要做這種功能。就是說兩個業務內容,都需要進行ES 的數據重建。所以如果每次增加一個,我就要單獨寫一個類似下面這段代碼,不僅代碼的可復用性降低了,而且以后換別人來維護的時候,說不定會寫錯這些內容。

 1 @Override
 2     public int rebuiltBountyData() throws Exception {
 3         //鎖住資源防止多線程重復發起任務
 4         synchronized (this) {
 5             if (hasThread) {
 6                 throw new RebuiltBountyEsException("搜索引擎重建任務已經在執行,請勿重復發起!");
 7             } else {
 8                 hasThread = true;
 9             }
10         }
11         //獲取總數
12         int count = bountyMapper.countNum();
13 
14         int pageTotal, pageSize = 1000;
15 
16         if (count % pageSize != 0) {
17             //若不能整除,則頁數加1
18             pageTotal = count / pageSize + 1;
19         } else {
20             pageTotal = count / pageSize;
21         }
22 
23         try {
24             for (int pageNum = 1; pageNum <= pageTotal; pageNum++) {
25                 //分頁查詢數據庫的數據
26                 PageHelper.startPage(pageNum, pageSize);
27                 List<Bounty> bountyList = bountyMapper.selectForRebuiltES();
28                 //添加到 ES 引擎
29                 bountyDao.add4List(bountyList);
30             }
31         } catch (Exception e) {
32             throw e;
33         } finally {
34             hasThread = false;
35         }
36         return count;
37     }

  那我們該怎么辦好呢?


  最好的辦法,就是把這個需要“加鎖”的邏輯,單獨賦予一個對象,讓這個鎖的范圍能夠縮小到只針對這個邏輯,這個功能,而不要跟其他的功能混在一起。 然后我們需要對這個功能,進行進一步的抽象。

  我們來好好觀察上面這段代碼,上面這段代碼,算是已經實現了整個功能,從頭到尾分解一下這段代碼的功能,可以看得出如下:

  1.   單線程檢查
  2.   分頁處理
  3.   獲取數據
  4.   寫入 ES

  So,我們可以看到,其實不同業務場景下,線程檢查是一模一樣的代碼,而分頁處理中,獲取數據總條數會根據不同業務場景而不同,其他代碼也都是相同的,至於寫入 ES 的部分,如果數據結構跟從數據庫中獲取的實體對象沒有區別的話,這個也是可以看做是相同的而不需要特別的處理,但是我們公司的項目中,因為種種原因,ES 中的數據結構和實體對象是不同的(盡管數據字段都是相同,我表示我不知道怎么跟你們說這個歷史遺留的奇葩問題...)。在這里,我們要應用一個設計模式,是模板模式,將固定的流程代碼封裝起來。再將可變的部分,留給子類實現。

  1 /**
  2  * 類說明:從 JDBC 中獲取重建 ES 的數據
  3  */
  4 @Service
  5 public abstract class JdbcRebuiltEsService<E> extends BaseService{
  6 
  7 
  8     protected Logger log = LoggerFactory.getLogger(getClass());
  9 
 10     private boolean threadLock = false;//線程鎖
 11 
 12     @Value("1")
 13     private int startPage;//開始頁碼
 14 
 15     @Value("1000")
 16     private int pageSize;//頁面容量
 17 
 18 
 19     protected abstract int countTotalData() throws Exception;
 20 
 21     protected abstract Collection<E> loadDataSource(int pageSize, int pageNum) throws Exception;
 22 
 23     protected abstract void writeToElasticSearch(Collection<E> collection) throws Exception;
 24 
 25     /**
 26      * 檢查線程鎖
 27      *
 28      * @throws Exception
 29      */
 30     private void checkLock() throws Exception {
 31 
 32         //這段代碼需要保證線程安全
 33         synchronized (this) {
 34             if (threadLock) {
 35                 //如果已經有線程占用,后續線程進入則拋出異常,因為本接口只允許單線程執行
 36                 throw new RebuiltEsTaskExistException("已經有重建任務正在執行,請等待結束后再發起新任務!");
 37             } else {
 38                 //如果沒有線程占用,則新線程進入后將改成線程占用狀態
 39                 threadLock = true;
 40                 log.info("用戶[{}]發起 ES 重建任務!其他重建任務請求將被拒絕!", getUserJid());
 41             }
 42         }
 43     }
 44 
 45     /**
 46      * 數據重建
 47      *
 48      * @return
 49      * @throws Exception
 50      */
 51     public int rebuild() throws Exception {
 52 
 53         checkLock();
 54         log.info("#=== ES 重建任務開始執行");
 55 
 56         int totalNum = countTotalData();
 57         log.info("本次重建預計總記錄數{}", totalNum);
 58 
 59         int pageTotal;
 60         int pageNum = this.startPage;
 61         int pageSize = this.pageSize;
 62 
 63         //根據條目總數計算總頁數
 64         if (totalNum % pageSize != 0) {
 65             //若不能整除,則頁數加1
 66             pageTotal = totalNum / pageSize + 1;
 67         } else {
 68             pageTotal = totalNum / pageSize;
 69         }
 70 
 71         long startTime = System.currentTimeMillis();//任務開始計時
 72 
 73         try {
 74             while (pageNum <= pageTotal) {
 75                 //分頁查詢數據庫的數據並同時發送到 ES
 76                 writeToElasticSearch(loadDataSource(pageSize, pageNum));
 77                 pageNum++;
 78             }
 79         } catch (Exception e) {
 80             Double progress = (Double) (pageNum * 1.0) / (Double) (pageSize * 1.0);
 81             DecimalFormat decimalFormat = new DecimalFormat("##.00%");
 82             log.info("重建異常中斷,當前已重建進度為:{}", decimalFormat.format(progress));
 83             throw e;
 84         } finally {
 85             threadLock = false;//不論是否成功,當線程退出時,都需要將線程狀態改為非占用
 86             long endTime = System.currentTimeMillis();//任務結束計時
 87             log.info("#=== ES 重建任務執行結束,耗時:{}毫秒", endTime - startTime);
 88         }
 89 
 90         return totalNum;
 91     }
 92 
 93     public int getStartPage() {
 94         return startPage;
 95     }
 96 
 97     public void setStartPage(int startPage) {
 98         this.startPage = startPage;
 99     }
100 
101     public int getPageSize() {
102         return pageSize;
103     }
104 
105     public void setPageSize(int pageSize) {
106         this.pageSize = pageSize;
107     }
108 }

  OK,這樣就解決了。復寫三個容易跟隨應用場景不同,而改變的方法,分別是,獲取數據源,獲取數據總條目,寫入 ES。然后暴露 rebuild 方法給外部調用,在 rebuild 方法內部,實現整個運作流程,這樣也可以避免以后有人需要做新的實現的時候,修改到這部分有涉及到同步鎖的代碼,以避免安全隱患。

  實際使用的時候可以這樣用,創建一個子類繼承這個 JdbcRebuiltEsService 

 1 /**
 2  * 類說明:商品信息 ES 重建所需要實現的具體方法
 3  */
 4 @Service
 5 public class GoodsRebuiltEsServiceImpl extends JdbcRebuiltEsService<Goods> {
 6 
 7     @Autowired
 8     private GoodsMapper goodsMapper;
 9 
10     @Autowired
11     private DrawingDAO drawingDAO;
12 
13     @Override
14     public int countTotalData() throws Exception {
15         return goodsMapper.countNum();
16     }
17 
18     @Override
19     public Collection<Goods> loadDataSource(int pageSize, int pageNum) throws Exception {
20         PageHelper.startPage(pageNum, pageSize);
21         return goodsMapper.selectForRebuiltES();
22     }
23 
24     @Override
25     public void writeToElasticSearch(Collection<Goods> collection) throws Exception {
26         drawingDAO.addBatch(collection);
27     }
28 }

  這樣以后每次使用,都只需要實現一個新的子類,然后這樣調用:

 1   @Autowired
 2     @Qualifier("goodsRebuiltEsServiceImpl")
 3     private JdbcRebuiltEsService<Goods> jdbcRebuiltEsService;
 4 
 5  /**
 6      * 數據重建
 7      *
 8      * @return
 9      * @throws Exception
10      */
11     @Override
12     public int rebuiltEsGoodsData() throws Exception {
13         return jdbcRebuiltEsService.rebuiltd();
14     }

  這樣 rebuilt 方法就很安全的被調用,將程序中不希望被修改的部分,用父類寫好,只留下希望被復寫的部分,這樣就可以很好的保護比較關鍵的部位,當然了,public 方法也是可以重寫的,不過這就超出了我們“以防萬一,不小心寫錯”的初衷了,如果需要重寫,那就重寫唄。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM