主要的消息管理者對象:
package com.rynk.mugua.trading.biz.service.impl; import java.util.concurrent.DelayQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.rynk.commons.entity.QueueMessage; import com.rynk.mugua.trading.biz.commons.RedisKeyResolver; import com.rynk.mugua.trading.biz.commons.lock.DistributedLockHandler; import com.rynk.mugua.trading.biz.eth.DelayedTack; import lombok.extern.slf4j.Slf4j; /** * 延時消息管理員 * @author ZHANGYUKUNUP * */ @Component @Slf4j public class QueueManger { MessagePersistent messagePersistent; /** * 延時消息隊列 */ private DelayQueue<DelayedTack> dQueue = new DelayQueue<>(); /** * 消息任務處理線程 */ private Thread taskThread; @Autowired DistributedLockHandler lock; public QueueManger() { taskThread = new TaskThread(); taskThread.start(); } /** * 任務線程 * @author ZHANGYUKUNUP * */ class TaskThread extends Thread{ @Override public void run() { while (true) { try { DelayedTack delayedTack = dQueue.take(); QueueMessage queueMessage = delayedTack.getQueueMessage(); if( queueMessage == null ) { return ; } //簡單的加個鎖保證消息不被重復消費(需要保證解鎖前 數據被提交到數據庫,否者會出同步問題 ,也就是說不能有更加大的 事務范圍 包裹當前方法 ) if( lock.tryLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) ) ) { //如果這個消息被正常消費,那么久標記消費成功,如果異常消費,那么久重試這個消息 try { if( QueueManger.this.messageDispense(delayedTack.getQueueMessage()) ) { messagePersistent.succeed( queueMessage ); }else { QueueManger.this.reTry( queueMessage ); } }catch (Exception e) { e.printStackTrace(); QueueManger.this.reTry(queueMessage); }finally { lock.unLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) ); } } } catch (Exception e) { e.printStackTrace(); } } } } /** * 重試 * @param queueMessage */ protected void reTry(QueueMessage queueMessage) { messagePersistent.reTry(queueMessage); } /** * 分發消息 * @param queueMessage */ protected boolean messageDispense(QueueMessage queueMessage) { return messagePersistent.consume(queueMessage); } /** * 添加一個延時消息 * @param delayedTack */ public void put(DelayedTack delayedTack) { dQueue.put(delayedTack); } /** * 查詢未處理的延時消息數量 * @return */ public int size() { return dQueue.size(); } /** * 消息處理線程存活狀態 * @return */ public boolean isAlive() { return taskThread.isAlive(); } }
消息對象:
package com.rynk.commons.entity; import java.util.Date; import org.springframework.data.mongodb.core.mapping.Document; import com.rynk.commons.entity.em.QueueMessageType; import com.rynk.commons.entity.em.TransferRecordStatus; import com.rynk.commons.util.SnGeneratorUtil; import lombok.Data; @Data @Document(collection = "mg_queue_message") public class QueueMessage extends BaseEntity { /** * 喚醒時間 */ private Date awakenDate; /** * 處理狀態 */ private TransferRecordStatus transferRecordStatus; /** * 消息體 */ private String body; /** * 消息體類型 */ private QueueMessageType type; /** * 重試次數 */ private Integer tryTimes; /** * 最后一次核對時間 */ private Date lastCheckDate; /** * * @param body 消息體來源類型 * @param type 消息類型 * @param delayed 延時 * @return */ public static QueueMessage newInstance( String body , QueueMessageType type , long delayed ) { QueueMessage item = new QueueMessage(); item.setId( SnGeneratorUtil.getId().toString() ); item.setCreateDate( new Date() ); item.setDr(false); item.setTransferRecordStatus( TransferRecordStatus.WAIT ); item.setTryTimes(1); item.setBody(body); item.setType(type); item.setAwakenDate( new Date( System.currentTimeMillis()+delayed )); item.setLastCheckDate( item.getAwakenDate() ); return item; } }
基於redis 的 分布式鎖對象:
package com.rynk.mugua.trading.biz.commons.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 分布式鎖 * * @author ZHANGYUKUN * */ @Component public class DistributedLockHandler { private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class); /** * 最大持有鎖的時間(毫秒) */ private final static long LOCK_EXPIRE = 30 * 1000L; /** * 嘗試獲取鎖的時間間隔(毫秒) */ private final static long LOCK_TRY_INTERVAL = 30L; /** * 獲取鎖最大等待時間( 毫秒 ) */ private final static long LOCK_TRY_TIMEOUT = 20 * 1000L; @Resource// (name = "customRedisTemplate") private RedisTemplate<String, String> template; /** * 嘗試獲取 分布式鎖 * * @param lockKey * 鎖名 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean tryLock(String lockKey) { return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 嘗試獲取 分布式鎖(不自動釋放鎖) * * @param lockKey * 鎖名 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean tryLockNotAutoRelease(String lockKey) { return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, -1); } /** * 嘗試獲取 分布式鎖 * * @param lockKey * 鎖名 * @param timeout * 獲取鎖最大等待時間 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean tryLock(String lockKey, long timeout) { return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 嘗試獲取 分布式鎖(不自動釋放鎖) * * @param lockKey * 鎖名 * @param timeout * 獲取鎖最大等待時間 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean tryLockNotAutoRelease(String lockKey, long timeout) { return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, -1); } /** * 嘗試獲取 分布式鎖 * * @param lockKey * 鎖名 * @param timeout * 獲取鎖最大等待時間 * @param tryInterval * 獲取鎖嘗試 時間間隔 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean tryLock(String lockKey, long timeout, long tryInterval) { return getLock(lockKey, timeout, tryInterval, LOCK_EXPIRE); } /** * 嘗試獲取 分布式鎖(不釋放鎖) * * @param lockKey * 鎖名 * @param timeout * 獲取鎖最大等待時間 * @param tryInterval * 獲取鎖嘗試 時間間隔 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean tryLockNotAutoRelease(String lockKey, long timeout, long tryInterval) { return getLock(lockKey, timeout, tryInterval, -1); } /** * 嘗試獲取 分布式鎖 * * @param lockKey * 鎖名 * @param timeout * 獲取鎖最大等待時間 * @param tryInterval * 獲取鎖嘗試 時間間隔 * @param lockExpireTime * 鎖最大持有時間 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean tryLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) { return getLock(lockKey, timeout, tryInterval, lockExpireTime); } /** * 獲取分布式鎖 * * @param lockKey * 鎖名 * @param timeout * 獲取鎖最大等待時間 * @param tryInterval * 獲取鎖嘗試 時間間隔 * @param lockExpireTime * 鎖最大持有時間 * @return true 得到了鎖 ,false 獲取鎖失敗 */ public boolean getLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) { try { if (StringUtils.isEmpty(lockKey)) { return false; } long startTime = System.currentTimeMillis(); do { ValueOperations<String, String> ops = template.opsForValue(); SimpleDateFormat sd = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss"); if (ops.setIfAbsent(lockKey, sd.format(new Date()) )) { if (lockExpireTime > 0) { template.expire(lockKey, lockExpireTime, TimeUnit.MILLISECONDS); } return true; } Thread.sleep(tryInterval); } while (System.currentTimeMillis() - startTime < timeout); } catch (InterruptedException e) { logger.error(e.getMessage()); return false; } return false; } /** * 釋放鎖 * * @param lockKey */ public void unLock(String lockKey) { if (!StringUtils.isEmpty(lockKey)) { template.delete(lockKey); } } }
延時任務對象: 用來分裝延時消息的
package com.rynk.mugua.trading.biz.eth; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import com.rynk.commons.entity.QueueMessage; import com.rynk.mugua.trading.biz.mqMessage.ChainMessageDelayTimeLevel; /** * 延時任務 * @author zhangyukun * */ public class DelayedTack implements Delayed{ /** * 執行的時間 */ Long runTime; /** * 消息對象 */ QueueMessage queueMessage; public QueueMessage getQueueMessage() { return queueMessage; } public void setQueueMessage(QueueMessage queueMessage) { this.queueMessage = queueMessage; } /** * * @param delay 延時毫秒數 * @param queueMessage 消息體 */ public DelayedTack( QueueMessage queueMessage ) { if( queueMessage.getTryTimes() == 1 ) { this.runTime = queueMessage.getAwakenDate().getTime(); }else { this.runTime =System.currentTimeMillis() + ChainMessageDelayTimeLevel.getDelayTimeLevel( queueMessage.getTryTimes() )*1000 ; } this.queueMessage = queueMessage; } @Override public long getDelay(TimeUnit unit) { return unit.convert( runTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } }
持久化消息的對象: 三個 方式 按照自己的實現做就是了
package com.rynk.mugua.trading.biz.service.impl; import com.rynk.commons.entity.QueueMessage; public class MessagePersistent { /** * 消費這個消息要處理的業務邏輯 * @param queueMessage * @return */ public boolean consume(QueueMessage queueMessage) { return false; } /** * 標記這個消息已經被正常消費 * @param queueMessage */ public void succeed(QueueMessage queueMessage) { } /** * 重試消息(標記數據庫的狀態,然后把它重新放到延時隊列中) * @param queueMessage */ public void reTry(QueueMessage queueMessage) { } }
備注: 上面的 消息隊列,基於Java的 延時消息,帶有持久,保證唯一消費,但是有一個明顯的 問題 ,所有的消息都在內存,如果有1億條消息,內存肯定是要爆炸的 ,
如果要求 消息負載大,那么我們可以考慮 吧部分消息 放在 內存。
1 同上 所有消息都要持久化,保證不丟失,
2 只有最近一段時間需要消費的消息才加入到內存( 比如一分鍾 )
3 加入消息的時候 ,判斷這個消息的執行執行時間,如果是 1 分鍾以內,持久化以后,直接放入內存隊列, 1 分鍾以外,直接 持久化,不放入內存。
4 每隔 一段時間 ( 1 分鍾 ),載入一個,並且可以指定載入條數。
5 如果是 多節點,考慮分布式環境的消息消費,可以 通過 消息Id 模 節點編號,來指定分配消息( 數據分片原理, 之歌分片不能隨意改動,改動為題也不大,這是知識消費,避免 重復消費就是了,但是 如果一個節點 掛了,可能 部分消息不消費 )