For update帶來的思考


For update or not

起源

​ 之所以想寫這個專題,是因為最近在做一個搶占任務的實現。假設數據庫很多個任務,在搶占發生之前任務的狀態都是FREE。現在假設同時有一堆搶占線程開始工作,搶占線程會查找數據庫中狀態為FREE的任務,並且將其狀態置為BUSY,然后開始執行對應任務。執行完成之后,再將任務狀態置為FINISH。任何任務都是不能被重復執行的,即必須保證所有任務都只能被一個線程執行。

​ 筆者和人民群眾一樣,第一個想到的就是利用數據庫的for update實現悲觀鎖。這樣肯定能夠保證數據的強一致性,但是這樣會大大影響效率,加重數據庫的負擔。想到之前看過的一篇文章https://www.cnblogs.com/bigben0123/p/8986507.html,文章里面有提到數據庫引擎本身對更新的記錄會行級上鎖。這個行級鎖的粒度非常細,上鎖的時間窗口也最少,只有在更新記錄的那一刻,才會對記錄上鎖。同時筆者也想到在前一家公司工作的時候,當時有幸進入到了核心支付組,負責過一段時間的賬務系統。當時使用的是mysql的InnoDB引擎。記得當時的代碼在往賬戶里面加錢的時候是沒有加任何鎖的,只有在從賬戶扣錢的時候才用for update。所以這個問題應該有更加完美的答案......


探索之路

for update的實現這里就不再做過多嘗試了。這里筆者直接探索在沒有for update的時候高並發情況下是否會有問題。具體嘗試的過程如下:

造測試數據

​ 首先建立一個任務表,為了簡單模擬,我們這里就只添加必要的字段。建表語句如下:

create table task(
       ID NUMBER(10) NOT NULL, 
    TASK_RUN_STATUS NUMBER(4) NOT NULL
);
comment on table task is '互斥任務表';
comment on column task.ID is '主鍵ID.';
comment on column task.TASK_RUN_STATUS is '任務運行狀態(1.初始待運行 2.運行中 3.運行完成).';
alter table task add constraint TASK_PK primary key (ID) using index;

​ 為了方便測試,這里我們加入三條任務記錄,插入任務記錄的語句如下:

insert into task(id, task_run_status) values(0, 1);
insert into task(id, task_run_status) values(1, 1);
insert into task(id, task_run_status) values(2, 1);

模擬並發搶占

public class MultiThreadUpdate {
	public static void main(String[] args) throws Exception {
		Class.forName("oracle.jdbc.OracleDriver");
		ExecutorService executorService = Executors.newFixedThreadPool(30);
		List<Future<Void>> futures = new ArrayList<Future<Void>>();
		
		// 每個ID開20個線程去並發更新數據
 		for (int i=0; i<20; i++) {
 			for (int j=0; j<3; j++) {
 				final int id = j;
 				futures.add(executorService.submit(new Callable<Void>() {
 					public Void call() throws Exception {
 						Connection con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "czbank", "123456");
 						// con.setAutoCommit(false);		// 不自動提交事務
 						PreparedStatement pstm = con.prepareStatement("update task set TASK_RUN_STATUS = ? where id = ? and TASK_RUN_STATUS = ?");
 						pstm.setInt(1, 2);
 						pstm.setInt(2, id);
 						pstm.setInt(3, 1);
 						int upRec = pstm.executeUpdate();
 						// 打印更新的記錄條數
 						System.out.println("Thread:" + Thread.currentThread().getName() + " updated(id=" + id + "):" + upRec + " records...");
 						// Thread.sleep(1000);		// 在事務提交之前,其線程都會阻塞直到對特定記錄的更新提交
 						// con.commit();
 						con.close();
 						pstm.close();
 						return null;
 					}
 				}));
 			}
		}
 		executorService.shutdown();
	}
}

​ 最終程序的輸出結果如下:

Thread:pool-1-thread-9 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-22 updated(id=0):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-14 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=1):0 records...
Thread:pool-1-thread-26 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=2):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-23 updated(id=1):0 records...
Thread:pool-1-thread-21 updated(id=2):1 records...
Thread:pool-1-thread-1 updated(id=0):1 records...
Thread:pool-1-thread-6 updated(id=2):0 records...
Thread:pool-1-thread-8 updated(id=1):1 records...
Thread:pool-1-thread-10 updated(id=0):0 records...
Thread:pool-1-thread-13 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=0):0 records...
Thread:pool-1-thread-19 updated(id=0):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=0):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=2):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=2):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-9 updated(id=0):0 records...
Thread:pool-1-thread-22 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=0):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=2):0 records...
Thread:pool-1-thread-26 updated(id=0):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-21 updated(id=1):0 records...
Thread:pool-1-thread-1 updated(id=2):0 records...
Thread:pool-1-thread-14 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=2):0 records...
Thread:pool-1-thread-13 updated(id=1):0 records...
Thread:pool-1-thread-19 updated(id=2):0 records...
Thread:pool-1-thread-6 updated(id=0):0 records...
Thread:pool-1-thread-8 updated(id=1):0 records...
Thread:pool-1-thread-10 updated(id=2):0 records...
Thread:pool-1-thread-23 updated(id=0):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=2):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=1):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=0):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...

​ 可以看到,即使在沒有顯示使用事務的情況下,多線程並發執行也能夠保證某一條數據的更新只被執行一次。


最終任務設計

​ 通過上面的測試例子,已經驗證了我的猜想。接下來就是如何設計搶占任務的執行步驟了。廢話不多說,直接上基本代碼:

public void runMutexTasks(MutexTaskDto runCond) throws Exception {
	// STEP1: 先去查找待執行的互斥任務
	runCond.setTaskRunStatus(Enums.MutexTaskRunStatus.WAIT_RUN.getKey());	// 待運行
	runCond.setPhysicsFlag(Enums.TaskStatus.NORMAL.getKey());				// 正常狀態(未廢棄)
	PageInfo<MutexTaskDto> runnableTasks = MutexTaskService.pagingQueryGroupByTaskId(0, 0, runCond);
	if (CollectionUtils.isEmpty(runnableTasks.getRows())) {
		LOGGER.debug("根據條件未找到待執行的互斥任務,跳過執行......");
		return;
	}
	
	// STEP2: 分別嘗試執行
	List<MutexTaskDto> runTasks = null;
	Collections.shuffle(runnableTasks.getRows());	// 打亂順序
	for (MutexTaskDto oneTask : runnableTasks.getRows()) {
		runTasks = mutexTaskService.selectRunnableTaskByTaskId(oneTask.getTaskId());
		if (CollectionUtils.isEmpty(runTasks)) {
			LOGGER.info("互斥任務ID【{}】已不是待運行狀態,跳過任務執行......", oneTask.getTaskId());
			continue;
		}
		
		// STEP3: 運行任務
		MutexTaskDto updateCond = new MutexTaskDto();
		updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_SUCCESS.getKey());
		updateCond.setTaskPreStatus(Enums.MutexTaskRunStatus.RUNNING.getKey());
		updateCond.setTaskId(oneTask.getTaskId());
		try {
			runTasks(runTasks);
		} catch(Exception  e) {
			updateCond.setRunRemark(getErrorMsg(e));
			updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_FAILED.getKey());
			mutexTaskService.updateByTaskId(updateCond);
			// 這里只打印失敗結果,具體失敗信息需要上層調用方法日志打印出來
			LOGGER.error("互斥任務ID【{}】執行失敗!", oneTask.getTaskId());
			throw e;
		}
		mutexTaskService.updateByTaskId(updateCond);
		LOGGER.info("互斥任務ID【{}】執行成功......", oneTask.getTaskId());
		Thread.sleep(1000); // 搶到了一個節點執行權限,此處暫停1s,給其他機器機會
	}
}

// 其中mutexTaskService的selectRunnableTaskByTaskId方法如下:
// 不使用事務,利用數據庫引擎自身的行級鎖

public List<MutexTaskDto> selectRunnableTaskByTaskId(String taskId) {
	// STEP1: 先用查詢數據(一個taskID可能對應多條記錄,對應不同的參數)
	List<MutexTaskModle> mutexTaskModles = this.mutexTaskDao
			.selectByTaskId(taskId);
	if (CollectionUtils.isEmpty(mutexTaskModles)) {
		return Collections.emptyList();
	}
	
	// STEP2: 更新數據(使用數據庫引擎自身所帶的行級鎖)
	MutexTaskModle updateInfo = new MutexTaskModle();
	updateInfo.setTaskRunStatus(2);
	updateInfo.setTaskPreStatus(1);
	updateInfo.setTaskId(taskId);
	int updateCount = cleaningMutexTaskDao.updateByTaskId(updateInfo);
	if (updateCount <= 0) {
		LOGGER.info("找到待執行的互斥任務,但是更新任務為執行中失敗......");
		return Collections.emptyList();
	}
	
	// STEP3: 前面兩項都校驗過,則確認當前任務列表是可以執行的
	List<MutexTaskDto> mutexTasks = BeanConvertUtils.convertList(mutexTaskModles,
			MutexTaskDto.class);
	return mutexTasks;
}

​ 關鍵點就在於第58行的cleaningMutexTaskDao.updateByTaskId(updateInfo);。該語句對應的SQL大致為:

update TASK set task_status = ? where task_id = ? and task_tatus = ?

​ 其中task_id為表的主鍵,且啟用了唯一索引。


總結

​ 這個問題剛開始筆者想到的解決方案就是使用for update。但內心總覺得這不是最佳方案,想起以前做過的項目還有看過的文章,卻也總是不太確定。最終還是自己動手寫了個測試用例"釋懷"了內心的疑惑。最終也順利地想出了這個"完美"的實現。不得不承認:實踐是檢驗真理的唯一標准!工作到現在,越來越覺得大家覺得最好的實現不一定就是最好的,大家認為的最高效的方法不一定就是最高效的。很多事情沒有絕對,就像寫代碼一樣,沒有絕對的好代碼。

​ 當然這不是鼓勵大家隨便寫代碼,筆者想說的是:做軟件就像做學問。不能純粹地拿別人的結論奉為聖經。遇到問題要多思考,才會有自己的沉淀。思考之后要多行動,才不會僅僅停留在思想的巨人,行動的矮子。當然,行動之后也要多多整理出來,就像筆者這樣,奉獻社會,方便你我他......(一臉無語)😂

​---

參考鏈接

https://www.cnblogs.com/bigben0123/p/8986507.html

https://www.cnblogs.com/clphp/p/6398667.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM