Java多線程事務管理


今天要討論的是“Java實現多線程單條數據事務管理”,在此之前,順便回顧一下實現多線程的幾種方式


實現多線程的三種方式



一、繼承Thread類

第一種方法是繼承Thread類,重寫run()方法

public class TestThread extends Thread {
  public void run() {
   System.out.println("繼承Thread類,重寫run方法");
  }
}

使用時,new一個實例,執行start()方法

TestThread testThread1 = new TestThread(); // 新建狀態
TestThread testThread2 = new TestThread(); // 新建狀態
testThread1.start(); // 就緒狀態
testThread2.start(); // 就緒狀態

何時執行取決於cpu調度


二、實現Runnable接口

因為Java“單繼承、多實現”的特性,當我們已經繼承了一個類的時候,則無法再繼承Thread類,此時可以通過實現Runnable接口的方式,實現run()方法

public class TestThread extends FatherClass implements Runnable {
  public void run() {
   System.out.println("實現Runnable接口的方式,實現run方法");
  }
}

Thread類也是實現Runnable接口

使用時,需要首先實例化一個Thread,並傳入自己的TestThread實例

TestThread testThread = new TestThread();
Thread thread = new Thread(testThread);
thread.start();

三、實現Callable和Future接口

該方法區別於前兩種的特點是:能夠獲得線程處理的結果。因此該方式適用於需要對線程的結果進行處理的場景

class TestCallable implements Callable<Integer> {

    @Override
    public Integer call() {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + " " + i);
            sum += i;
        }
        return sum;
    }

}

使用時,先創建TestCallable對象,然后使用FutureTask來包裝MyCallable對象,再將FutureTask對象作為Thread對象的target創建新的線程,最后thread執行start()方法,線程進入就緒狀態

Callable<Integer> testCallable = new TestCallable();                    // 創建TestCallable對象
FutureTask<Integer> futureTask = new FutureTask<Integer>(testCallable); // 使用FutureTask來包裝MyCallable對象
Thread thread = new Thread(futureTask);                                 // FutureTask對象作為Thread對象的target創建新的線程
thread.start();

多線程單條數據事務管理


關於事務,可以看看另外一篇文章談談Java事務

我們有時會遇到這樣的場景:要對大批量的數據進行更新或插入操作,需要開啟多線程來提高效率,又希望每個線程在的處理一批數據時,能夠對其中每條數據進行處理的時,做到出錯時實現單條數據回滾,而不是所有數回滾(所有數據回滾后續討論)。先看代碼:

根據以上多線程知識,我們先定義一個業務線程類如下:

public class TestTranstionalThread extends Thread {

    private List<BalBankDictEntity> balBankDictEntities;

    public TestTranstionalThread( List<BalBankDictEntity> balBankDictEntities){
        this.balBankDictEntities = balBankDictEntities;

    }

    @Override
    public void run() {

        log.info("線程{}開始",Thread.currentThread().getName());

        for (BalBankDictEntity balBankDictEntity : balBankDictEntities) {

            try{
                collBillDao.insOneBank(balBankDictEntity);
            }catch (BusiException e){
                log.error("{}回滾",balBankDictEntity.getBankId());
            }

        }

        log.info("線程{}結束",Thread.currentThread().getName());
    }
}

insOneBank()方法如下,注意的@Transactional注解的事務隔離等級為:REQUIRES_NEW,創建一個新的事務。

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void insOneBank(BalBankDictEntity balBankDictEntity){

    balBankDictMapper.insert(balBankDictEntity);

    /* 模擬發生異常,拋出異常,實現將已插入數據回滾 */
    if (Integer.parseInt(balBankDictEntity.getBankId().substring(2)) % 100 == 0){
        throw new BusiException("test");
    }

}

開啟多線程進行業務處理,注意加上@Transactional注解

@Transactional
public void testTransactional(){

    /* 模擬測試數據 */
    List<BalBankDictEntity> balBankDictEntities = new ArrayList<>();
    for (int i = 0 ; i < 100000 ; i ++){
        BalBankDictEntity balBankDictEntity = new BalBankDictEntity();
        balBankDictEntity.setBankCode("BK" + i);
        balBankDictEntity.setBankId("ID" + i + "");
        balBankDictEntity.setBankName("N" + i + "N");
        balBankDictEntities.add(balBankDictEntity);
    }

    int totalNum = balBankDictEntities.size();
    log.info("totalNum" + totalNum);

    /* 分10個線程處理 */
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
    int dealNum = totalNum % 10 == 0 ? totalNum / 10 : totalNum / 10 + 1; // 計算每個線程處理的數量

    for (int i = 1; i <= 10 ; i++ ){
        List<BalBankDictEntity> balBankDictEntityList = splitDataList(balBankDictEntities,dealNum,10,i);  // 切割數據集實現數據隔離

        TestTranstionalThread testTranstional = new TestTranstionalThread(balBankDictEntityList);
        fixedThreadPool.execute(testTranstional);

    }

}

最終實現多個線程並發插入數據,有異常的數據的單獨回滾,不影響整體


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM