我的物聯網項目(八)簡單分布式調度


定時調度基本在任何平台或多或少的要用到,實現定時調度的功能很簡單,我做過的項目中用到更多的是spring quartz或者spring task,它們在單機上使用定時任務配置是非常簡單的,但是在集群環境中就需要面臨一個必須解決的問題:如何限定只有一台機器在執行定時任務?

其實spring quartz也可以實現此功能,它是由數據庫的數據來確定調度任務是否正在執行, 正在執行則其他服務器就不能去執行該行調度數據,所以需要數據庫的11張表來執行此種功能,總的來說成本較高,操作起來也比較復雜。另外一些開源的分布式調度平台也有一些,如當當網的elastic-job,淘寶的TBSchedule,包括阿里雲也有SchedulerX,這些分布式調度平台在一定程度上也可以滿足集群環境下的功能需求。我當初在技術選型上,只想簡單無門檻,不想有太多的學習成本在里面,尤其針對目前階段的項目,想用一種簡單的方式來實現目的就行,所以盡量基於目前的代碼和技術。

一 實現思路

主要利用Redis的(Redis用的雲集群,暫時不需要考慮單點故障或者不穩定的情況)函數setNX()來實現分布式鎖,大概流程是首先是某個集群環境的單邊服務器將某一任務標識名(簡單來說就是key)作為鍵存到redis里,並為其設個過期時間,如果這個時候另外的單邊服務器也請求過來,先是通過setNX()看看是否能將任務標識名(同一個標識名)插入到redis里,可以的話就返回true,不可以就返回false,如果返回false,說明這次的任務調度別的服務器已經在做了,不需要執行這次任務。如果返回true,說明這次任何調度是由自己來執行。

這個里面由於集群環境下的每台服務器到了時間點都會去執行一遍,當然肯定只有一台才能執行成功,這個里面需要注意兩個事情:

  1. 定時調度的策略應該上一個任務完成到下一個任務開始的時間間隔,這樣的話才能保證集群環境下其它的服務器下次搶占鎖的機率,如spring task的fixedDelay。
  2. 調度時間循環間隔設置當然以具體業務場景為准,但最好算好大概的每次業務執行的時間長短,然后根據這個時間長短來設置定時調度的循環間隔時間。比如說如果小於1s的調用,由於使用redis會有10幾毫秒的運算耗費,因此不能保證在1s以下的時間間隔比較均勻。所以盡量保證每台服務器的均勻分布來執行計划任務。

二 代碼實現

1.鎖對象

public class Lock { private String name; private String value; public Lock(String name, String value) { this.name = name; this.value = value; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } }

2.分布式鎖工具類

public class DistributedLockService { private final Logger log = LoggerFactory.getLogger(getClass()); private final static long LOCK_EXPIRE = 10;//單個業務持有鎖的時間10s,防止死鎖 private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); @Autowired private RedisTemplate<String, String> redisTemplate; /** * 嘗試獲取全局鎖 */ public boolean tryLock(Lock lock) { return getLock(lock,LOCK_EXPIRE); } /** * 操作redis獲取全局鎖 */ public boolean getLock(Lock lock,long lockExpireTime){ if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) { return false; } // SETNX成功,則成功獲取一個鎖 if (setNX(lock.getName(), lock.getValue(),lockExpireTime)) { return true; }else {// SETNX失敗,說明鎖仍然被其他對象保持 log.info(lock.getName()+" lock is exist!" + dateFormat.format(new Date()) + "###"); return false; } } /** * @Title: setNX * @Description: 設置鎖 */ private boolean setNX(final String key, final String value, final long expire) { return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() { @SuppressWarnings("unchecked") public Boolean doInRedis(RedisConnection connection) { byte[] keyBytes = ((RedisSerializer<String>) redisTemplate.getKeySerializer()).serialize(key); boolean locked = connection.setNX(keyBytes, ((RedisSerializer<String>)redisTemplate.getValueSerializer()).serialize(value)); if(locked){ connection.expire(keyBytes, expire); } return locked; } }); } /** * @Title: get * @Description: 根據key獲取value */ public Object get(final String key) { return redisTemplate.execute(new RedisCallback<Object>() { @SuppressWarnings("unchecked") public Object doInRedis(RedisConnection connection) throws DataAccessException { byte[] bs = connection.get(((RedisSerializer<String>)redisTemplate.getKeySerializer()).serialize(key)); return redisTemplate.getDefaultSerializer().deserialize(bs); } }); } /** * 釋放鎖 */ public void releaseLock(Lock lock) { if (!StringUtils.isEmpty(lock.getName())) { redisTemplate.delete(lock.getName()); } log.info(lock.getName()+" lock is unchecked!" + dateFormat.format(new Date()) + "###"); } }

 

3.定時調度實現

public class ScheduledTasks { @Autowired private DistributedLockService distributedLockService; private final static Logger log= Logger.getLogger(ScheduledTasks.class); private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); //5秒執行一次 @Scheduled(fixedDelay = 5000) public void doJob() { log.info("###sync start:"+ dateFormat.format(new Date()) + "###"); Lock lock = new Lock("xxlock" , "xxx"); if(distributedLockService.tryLock(lock)){ log.info("Gets the lock!" + dateFormat.format(new Date()) + "###"); //做具體業務...... distributedLockService.releaseLock(lock); } log.info("###sync end:"+ dateFormat.format(new Date()) + "###"); } }

 

三 繼續優化

上面將做具體業務的代碼耦合到了定時調度ScheduledTasks里面,這塊需要優化下,后面我們將具體的業務代碼單獨抽離出來做成一個rest服務,ScheduledTasks里面通過接口請求去執行業務邏輯即可。

定時調度這塊后續我們還在繼續優化,主要有如下:

1.將調度時間間隔,調度http請求接口,調度的動態開啟和關閉,查看目前的調度任務和執行的日志做成后台可視化界面,方便統一管理和運維。

2.集群環境下的服務器做到分片,負載均衡。其實現在的並沒有嚴格做到負載均衡,其實集群環境下每台服務器都在執行,只是沒有執行具體業務而已,所以后續這塊自己將用代碼實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM