分布式任務調度-定時任務重復執行解決方案


最近一期需求遇到這么個問題,需要寫一個定時任務,項目是集群部署的並且服務器資源有限沒有redis、Zookeeper等。

我們都知道,當我們服務端在部署集群模式時,會出現所有的定時任務在各自的節點處都會執行一遍,這顯然是不符合要求的,如何解決這個問題?那就是引入分布式鎖

分布式鎖三種實現方式:1、基於數據庫實現分布式鎖;2、基於緩存(Redis等)實現分布式鎖;3、基於Zookeeper實現分布式鎖

基於Redis實現分布式鎖的傳送門:redis分布式鎖

那要怎么解決呢?沒錯,就是使用第一種方式:基於數據庫實現分布式鎖

本文給出一種springboot集成shedlock的解決方案,以及對shedlock的大致實現原理的源碼解析

 

1.引入相關jar包

<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-spring</artifactId>
    <version>2.2.1</version>
</dependency>
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-provider-jdbc-template</artifactId>
    <version>2.2.1</version>
</dependency>

2.數據庫新建表

CREATE TABLE `t_scheduled_lock` (
  `NAME` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '任務名稱',
  `lock_until` timestamp(3) NULL DEFAULT NULL COMMENT '到期時間',
  `locked_at` timestamp(3) NULL DEFAULT NULL COMMENT '開始時間',
  `locked_by` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分布式任務調度鎖表';

3.配置類

import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
import net.javacrumbs.shedlock.spring.ScheduledLockConfiguration;
import net.javacrumbs.shedlock.spring.ScheduledLockConfigurationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

import javax.sql.DataSource;
import java.time.Duration;

@Configuration
@EnableScheduling
public class ScheduledConfig {

    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
        //指定表名,會自動在數據庫中記錄任務執行日志
        return new JdbcTemplateLockProvider(dataSource,"t_scheduled_lock");
    }

    @Bean
    public ScheduledLockConfiguration scheduledLockConfiguration(LockProvider lockProvider) {
        return ScheduledLockConfigurationBuilder
                .withLockProvider(lockProvider)
                .withPoolSize(10)
                .withDefaultLockAtMostFor(Duration.ofMinutes(30))
                .build();
    }

}

4.在啟動類添加注解

//默認持有鎖時間=30分鍾
@EnableSchedulerLock(defaultLockAtMostFor = "PT30M")

5.添加定時任務方法

    @RequestMapping(value ="/text", method = {RequestMethod.GET,RequestMethod.POST})
//    @Scheduled(cron = "-") //表示不執行
    @Scheduled(cron = "0 0 0 * * ?") //表示每天0點執行
    @SchedulerLock(name = "test",lockAtLeastForString = "PT30M")
    public void test() {
        // do something...
    }

@SchedulerLock注解有以下幾個屬性

name:鎖名稱,鎖名稱必須是全局唯一的;

lockAtMostFor(單位:毫秒):設置鎖的最大持有時間,為了解決如果持有鎖的節點掛了,無法釋放鎖,其他節點無法進行下一次任務,正常情況下任務執行完就會釋放鎖;

lockAtMostForString:鎖的最大持有時間的字符串表達,例如“PT30M”表示為30分鍾;

lockAtLeastFor(單位:毫秒):保留鎖的最短時間。這個屬性是鎖的持有時間。設置了多少就一定會持有多長時間,再此期間,下一次任務執行時,其他節點包括它本身是不會執行任務的;

lockAtLeastForString:保留鎖的最短時間的字符串表達,例如“PT30M”表示為30分鍾

拿我上面的代碼解釋一下:鎖test設置了lockAtLeastFor或者lockAtLeastForString屬性的值為30分鍾,就意味這30分鍾內,text()方法不會執行第二遍,30分鍾后才會執行下一次的任務調度。

源碼解析:

假如這個定時任務開發環境不執行,但是測試環境跟生產環境又執行

可以這么設置@Scheduled(cron = "-"),可以將cron的值寫在不同環境的配置文件中 

因為在ScheduledAnnotationBeanPostProcessor.java中的processScheduled()方法提到

 

接下來我們看看@SchedulerLock是如何加鎖的

1.在DefaultLockingTaskExecutor.class中executeWithLock方法寫到,先執行加鎖lock,加鎖成功則執行任務,最后unlock釋放鎖,加鎖不成功則提示It's locked

    public void executeWithLock(Task task, LockConfiguration lockConfig) throws Throwable {
        Optional<SimpleLock> lock = this.lockProvider.lock(lockConfig);
        if (lock.isPresent()) {
            try {
                logger.debug("Locked {}.", lockConfig.getName());
                task.call();
            } finally {
                ((SimpleLock)lock.get()).unlock();
                logger.debug("Unlocked {}.", lockConfig.getName());
            }
        } else {
            logger.debug("Not executing {}. It's locked.", lockConfig.getName());
        }

    }

2.再看看lock()方法是怎么寫的,StorageBasedLockProvider.class中有提到,意思是先去判斷當前的任務name是否添加過,如果沒有則執行insertRecord做新增操作,新增成功則保存當前任務的名稱,若新增失敗,則執行updateRecord更新操作,最終返回新增或更新的結果。注意:這里有可能新增或更新失敗,為什么?因為我們是集群部署的,定時任務同一時刻同時執行時,兩個不同的線程會同時去執行新增或更新,那么問題來了,為什么最終會只有一個線程成功呢?繼續看看insertRecord方法或者updateRecord方法內部怎么寫的了

public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
    boolean lockObtained = this.doLock(lockConfiguration);
    return lockObtained ? Optional.of(new StorageBasedLockProvider.StorageLock(lockConfiguration, this.storageAccessor)) : Optional.empty();
}

protected boolean doLock(LockConfiguration lockConfiguration) {
    String name = lockConfiguration.getName();
    if (!this.lockRecordRegistry.lockRecordRecentlyCreated(name)) {
        if (this.storageAccessor.insertRecord(lockConfiguration)) {
            this.lockRecordRegistry.addLockRecord(name);
            return true;
        }

        this.lockRecordRegistry.addLockRecord(name);
    }

    return this.storageAccessor.updateRecord(lockConfiguration);
}

3.JdbcTemplateStorageAccessor.class

//insertRecord方法其實就是對數據庫插入一條鎖的記錄,包括鎖名稱,鎖到期時間,鎖開始時間以及加鎖的來源,然后將插入的結果返回
public
boolean insertRecord(LockConfiguration lockConfiguration) { String sql = "INSERT INTO " + this.tableName + "(name, lock_until, locked_at, locked_by) VALUES(?, ?, ?, ?)"; return (Boolean)this.transactionTemplate.execute((status) -> { try { int insertedRows = this.jdbcTemplate.update(sql, (preparedStatement) -> { preparedStatement.setString(1, lockConfiguration.getName()); this.setTimestamp(preparedStatement, 2, lockConfiguration.getLockAtMostUntil()); this.setTimestamp(preparedStatement, 3, Instant.now()); preparedStatement.setString(4, this.getHostname()); }); return insertedRows > 0; } catch (DataIntegrityViolationException var5) { return false; } }); }
//updateRecord方法就是更新數據庫的記錄,注意where條件,它會更新(當前鎖名稱+鎖的到期時間<=當前時間)的記錄,並且返回更新結果
//舉個例子:有一條name=test的鎖記錄,lock_until鎖到期時間是中午12點半,當中午12點時定時任務test執行,執行update更新語句事數據庫會返回影響條數0條
//因為test這個鎖還沒到釋放的時間,所以updateRecord方法返回值是true
public boolean updateRecord(LockConfiguration lockConfiguration) { String sql = "UPDATE " + this.tableName + " SET lock_until = ?, locked_at = ?, locked_by = ? WHERE name = ? AND lock_until <= ?"; return (Boolean)this.transactionTemplate.execute((status) -> { int updatedRows = this.jdbcTemplate.update(sql, (statement) -> { Instant now = Instant.now(); this.setTimestamp(statement, 1, lockConfiguration.getLockAtMostUntil());//鎖的到期時間 this.setTimestamp(statement, 2, now); statement.setString(3, this.getHostname()); statement.setString(4, lockConfiguration.getName()); this.setTimestamp(statement, 5, now); }); return updatedRows > 0; }); }

其實說到這里,還是沒能解答我們上面提到的問題:為什么最終會只有一個線程成功呢?看一下transactionTemplate.execute的源碼你就懂了

@Override
@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

    if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
        return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
    }
    else {
        TransactionStatus status = this.transactionManager.getTransaction(this);
        T result;
        try {
            result = action.doInTransaction(status);
        }
        catch (RuntimeException | Error ex) {
            // Transactional code threw application exception -> rollback
            rollbackOnException(status, ex);
            throw ex;
        }
        catch (Throwable ex) {
            // Transactional code threw unexpected exception -> rollback
            rollbackOnException(status, ex);
            throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
        }
        this.transactionManager.commit(status);
        return result;
    }
}

可以看出,當A線程先執行新增或者更新數據時,會為這條數據添加事務並且為數據添加排他鎖,其他B、C、D等線程會一直等待A線程的事務處理完。當A處理完時,其他線程才能繼續對這條數據進行加鎖處理,發現加鎖不成功(因為已經有其他線程比它先行一步處理了),所以就會提示It's locked(上文有提到)

排他鎖的定義:若事務T對數據對象A加上X鎖,則只允許T讀取和修改A,其他任何事務都不能再對A加任何類型的鎖,直到T釋放A上的鎖。這就保證了其他事務在T釋放A上的鎖之前不能再讀取和修改A

線程加鎖完成、任務執行完成之后,下一步就是釋放鎖,看看它是怎么釋放鎖的

public void unlock(final LockConfiguration lockConfiguration) {
     //更新這條數據鎖的到期時間
final String sql = "UPDATE " + this.tableName + " SET lock_until = ? WHERE name = ?"; this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { JdbcTemplateStorageAccessor.this.jdbcTemplate.update(sql, (statement) -> { JdbcTemplateStorageAccessor.this.setTimestamp(statement, 1, lockConfiguration.getUnlockTime());//獲取解鎖時間 statement.setString(2, lockConfiguration.getName()); }); } }); }

//獲取解鎖時間
public
Instant getUnlockTime() {
Instant now = Instant.now();//獲取當前時間
return this.lockAtLeastUntil.isAfter(now) ? this.lockAtLeastUntil : now;
}
 
        

lockAtLeastUntil的值就是上面提到的lockAtLeastFor或者lockAtLeastForString屬性經過換算后得到的時間

釋放鎖的邏輯就是:會用當前時間跟保留鎖的最短時間進行比較,如果當前時間小於lockAtLeastUntil,則繼續用lockAtLeastUntil更新數據(相當於沒更新),如果當前時間大於lockAtLeastUntil,則更新為當前時間(也就是從現在開始,這條數據的鎖已經到期了)。

舉個例子:數據庫有一條中午12點加鎖的記錄A,並且lockAtLeastFor或者lockAtLeastForString設置了30分鍾,就意味着A的釋放鎖時間=12點30分,也就是lockAtLeastUntil=12點30分 ,在當前時間12點20分線程執行到這里想要釋放鎖,就會用12點20分跟12點30分進行比較發現結果是小於,則繼續用lockAtLeastUntil更新數據(相當於沒更新)

 

以上就是本人對shedlock實現分布式鎖理解的整個過程,若有理解不對還望指出,我們共同學習!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM