For update or not
起源
之所以想寫這個專題,是因為最近在做一個搶占任務的實現。假設數據庫很多個任務,在搶占發生之前任務的狀態都是FREE。現在假設同時有一堆搶占線程開始工作,搶占線程會查找數據庫中狀態為FREE的任務,並且將其狀態置為BUSY,然后開始執行對應任務。執行完成之后,再將任務狀態置為FINISH。任何任務都是不能被重復執行的,即必須保證所有任務都只能被一個線程執行。
筆者和人民群眾一樣,第一個想到的就是利用數據庫的for update
實現悲觀鎖。這樣肯定能夠保證數據的強一致性,但是這樣會大大影響效率,加重數據庫的負擔。想到之前看過的一篇文章https://www.cnblogs.com/bigben0123/p/8986507.html,文章里面有提到數據庫引擎本身對更新的記錄會行級上鎖。這個行級鎖的粒度非常細,上鎖的時間窗口也最少,只有在更新記錄的那一刻,才會對記錄上鎖。同時筆者也想到在前一家公司工作的時候,當時有幸進入到了核心支付組,負責過一段時間的賬務系統。當時使用的是mysql的InnoDB引擎。記得當時的代碼在往賬戶里面加錢的時候是沒有加任何鎖的,只有在從賬戶扣錢的時候才用for update
。所以這個問題應該有更加完美的答案......
探索之路
for update
的實現這里就不再做過多嘗試了。這里筆者直接探索在沒有for update
的時候高並發情況下是否會有問題。具體嘗試的過程如下:
造測試數據
首先建立一個任務表,為了簡單模擬,我們這里就只添加必要的字段。建表語句如下:
create table task(
ID NUMBER(10) NOT NULL,
TASK_RUN_STATUS NUMBER(4) NOT NULL
);
comment on table task is '互斥任務表';
comment on column task.ID is '主鍵ID.';
comment on column task.TASK_RUN_STATUS is '任務運行狀態(1.初始待運行 2.運行中 3.運行完成).';
alter table task add constraint TASK_PK primary key (ID) using index;
為了方便測試,這里我們加入三條任務記錄,插入任務記錄的語句如下:
insert into task(id, task_run_status) values(0, 1);
insert into task(id, task_run_status) values(1, 1);
insert into task(id, task_run_status) values(2, 1);
模擬並發搶占
public class MultiThreadUpdate {
public static void main(String[] args) throws Exception {
Class.forName("oracle.jdbc.OracleDriver");
ExecutorService executorService = Executors.newFixedThreadPool(30);
List<Future<Void>> futures = new ArrayList<Future<Void>>();
// 每個ID開20個線程去並發更新數據
for (int i=0; i<20; i++) {
for (int j=0; j<3; j++) {
final int id = j;
futures.add(executorService.submit(new Callable<Void>() {
public Void call() throws Exception {
Connection con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "czbank", "123456");
// con.setAutoCommit(false); // 不自動提交事務
PreparedStatement pstm = con.prepareStatement("update task set TASK_RUN_STATUS = ? where id = ? and TASK_RUN_STATUS = ?");
pstm.setInt(1, 2);
pstm.setInt(2, id);
pstm.setInt(3, 1);
int upRec = pstm.executeUpdate();
// 打印更新的記錄條數
System.out.println("Thread:" + Thread.currentThread().getName() + " updated(id=" + id + "):" + upRec + " records...");
// Thread.sleep(1000); // 在事務提交之前,其線程都會阻塞直到對特定記錄的更新提交
// con.commit();
con.close();
pstm.close();
return null;
}
}));
}
}
executorService.shutdown();
}
}
最終程序的輸出結果如下:
Thread:pool-1-thread-9 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-22 updated(id=0):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-14 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=1):0 records...
Thread:pool-1-thread-26 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=2):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-23 updated(id=1):0 records...
Thread:pool-1-thread-21 updated(id=2):1 records...
Thread:pool-1-thread-1 updated(id=0):1 records...
Thread:pool-1-thread-6 updated(id=2):0 records...
Thread:pool-1-thread-8 updated(id=1):1 records...
Thread:pool-1-thread-10 updated(id=0):0 records...
Thread:pool-1-thread-13 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=0):0 records...
Thread:pool-1-thread-19 updated(id=0):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=0):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=2):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=2):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-9 updated(id=0):0 records...
Thread:pool-1-thread-22 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=0):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=2):0 records...
Thread:pool-1-thread-26 updated(id=0):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-21 updated(id=1):0 records...
Thread:pool-1-thread-1 updated(id=2):0 records...
Thread:pool-1-thread-14 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=2):0 records...
Thread:pool-1-thread-13 updated(id=1):0 records...
Thread:pool-1-thread-19 updated(id=2):0 records...
Thread:pool-1-thread-6 updated(id=0):0 records...
Thread:pool-1-thread-8 updated(id=1):0 records...
Thread:pool-1-thread-10 updated(id=2):0 records...
Thread:pool-1-thread-23 updated(id=0):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=2):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=1):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=0):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...
可以看到,即使在沒有顯示使用事務的情況下,多線程並發執行也能夠保證某一條數據的更新只被執行一次。
最終任務設計
通過上面的測試例子,已經驗證了我的猜想。接下來就是如何設計搶占任務的執行步驟了。廢話不多說,直接上基本代碼:
public void runMutexTasks(MutexTaskDto runCond) throws Exception {
// STEP1: 先去查找待執行的互斥任務
runCond.setTaskRunStatus(Enums.MutexTaskRunStatus.WAIT_RUN.getKey()); // 待運行
runCond.setPhysicsFlag(Enums.TaskStatus.NORMAL.getKey()); // 正常狀態(未廢棄)
PageInfo<MutexTaskDto> runnableTasks = MutexTaskService.pagingQueryGroupByTaskId(0, 0, runCond);
if (CollectionUtils.isEmpty(runnableTasks.getRows())) {
LOGGER.debug("根據條件未找到待執行的互斥任務,跳過執行......");
return;
}
// STEP2: 分別嘗試執行
List<MutexTaskDto> runTasks = null;
Collections.shuffle(runnableTasks.getRows()); // 打亂順序
for (MutexTaskDto oneTask : runnableTasks.getRows()) {
runTasks = mutexTaskService.selectRunnableTaskByTaskId(oneTask.getTaskId());
if (CollectionUtils.isEmpty(runTasks)) {
LOGGER.info("互斥任務ID【{}】已不是待運行狀態,跳過任務執行......", oneTask.getTaskId());
continue;
}
// STEP3: 運行任務
MutexTaskDto updateCond = new MutexTaskDto();
updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_SUCCESS.getKey());
updateCond.setTaskPreStatus(Enums.MutexTaskRunStatus.RUNNING.getKey());
updateCond.setTaskId(oneTask.getTaskId());
try {
runTasks(runTasks);
} catch(Exception e) {
updateCond.setRunRemark(getErrorMsg(e));
updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_FAILED.getKey());
mutexTaskService.updateByTaskId(updateCond);
// 這里只打印失敗結果,具體失敗信息需要上層調用方法日志打印出來
LOGGER.error("互斥任務ID【{}】執行失敗!", oneTask.getTaskId());
throw e;
}
mutexTaskService.updateByTaskId(updateCond);
LOGGER.info("互斥任務ID【{}】執行成功......", oneTask.getTaskId());
Thread.sleep(1000); // 搶到了一個節點執行權限,此處暫停1s,給其他機器機會
}
}
// 其中mutexTaskService的selectRunnableTaskByTaskId方法如下:
// 不使用事務,利用數據庫引擎自身的行級鎖
public List<MutexTaskDto> selectRunnableTaskByTaskId(String taskId) {
// STEP1: 先用查詢數據(一個taskID可能對應多條記錄,對應不同的參數)
List<MutexTaskModle> mutexTaskModles = this.mutexTaskDao
.selectByTaskId(taskId);
if (CollectionUtils.isEmpty(mutexTaskModles)) {
return Collections.emptyList();
}
// STEP2: 更新數據(使用數據庫引擎自身所帶的行級鎖)
MutexTaskModle updateInfo = new MutexTaskModle();
updateInfo.setTaskRunStatus(2);
updateInfo.setTaskPreStatus(1);
updateInfo.setTaskId(taskId);
int updateCount = cleaningMutexTaskDao.updateByTaskId(updateInfo);
if (updateCount <= 0) {
LOGGER.info("找到待執行的互斥任務,但是更新任務為執行中失敗......");
return Collections.emptyList();
}
// STEP3: 前面兩項都校驗過,則確認當前任務列表是可以執行的
List<MutexTaskDto> mutexTasks = BeanConvertUtils.convertList(mutexTaskModles,
MutexTaskDto.class);
return mutexTasks;
}
關鍵點就在於第58
行的cleaningMutexTaskDao.updateByTaskId(updateInfo);
。該語句對應的SQL大致為:
update TASK set task_status = ? where task_id = ? and task_tatus = ?
其中task_id為表的主鍵,且啟用了唯一索引。
總結
這個問題剛開始筆者想到的解決方案就是使用for update
。但內心總覺得這不是最佳方案,想起以前做過的項目還有看過的文章,卻也總是不太確定。最終還是自己動手寫了個測試用例"釋懷"了內心的疑惑。最終也順利地想出了這個"完美"的實現。不得不承認:實踐是檢驗真理的唯一標准!工作到現在,越來越覺得大家覺得最好的實現不一定就是最好的,大家認為的最高效的方法不一定就是最高效的。很多事情沒有絕對,就像寫代碼一樣,沒有絕對的好代碼。
當然這不是鼓勵大家隨便寫代碼,筆者想說的是:做軟件就像做學問。不能純粹地拿別人的結論奉為聖經。遇到問題要多思考,才會有自己的沉淀。思考之后要多行動,才不會僅僅停留在思想的巨人,行動的矮子。當然,行動之后也要多多整理出來,就像筆者這樣,奉獻社會,方便你我他......(一臉無語)😂
---
參考鏈接