主要的消息管理者對象:
package com.rynk.mugua.trading.biz.service.impl;
import java.util.concurrent.DelayQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rynk.commons.entity.QueueMessage;
import com.rynk.mugua.trading.biz.commons.RedisKeyResolver;
import com.rynk.mugua.trading.biz.commons.lock.DistributedLockHandler;
import com.rynk.mugua.trading.biz.eth.DelayedTack;
import lombok.extern.slf4j.Slf4j;
/**
* 延時消息管理員
* @author ZHANGYUKUNUP
*
*/
@Component
@Slf4j
public class QueueManger {
MessagePersistent messagePersistent;
/**
* 延時消息隊列
*/
private DelayQueue<DelayedTack> dQueue = new DelayQueue<>();
/**
* 消息任務處理線程
*/
private Thread taskThread;
@Autowired
DistributedLockHandler lock;
public QueueManger() {
taskThread = new TaskThread();
taskThread.start();
}
/**
* 任務線程
* @author ZHANGYUKUNUP
*
*/
class TaskThread extends Thread{
@Override
public void run() {
while (true) {
try {
DelayedTack delayedTack = dQueue.take();
QueueMessage queueMessage = delayedTack.getQueueMessage();
if( queueMessage == null ) {
return ;
}
//簡單的加個鎖保證消息不被重復消費(需要保證解鎖前 數據被提交到數據庫,否者會出同步問題 ,也就是說不能有更加大的 事務范圍 包裹當前方法 )
if( lock.tryLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) ) ) {
//如果這個消息被正常消費,那么久標記消費成功,如果異常消費,那么久重試這個消息
try {
if( QueueManger.this.messageDispense(delayedTack.getQueueMessage()) ) {
messagePersistent.succeed( queueMessage );
}else {
QueueManger.this.reTry( queueMessage );
}
}catch (Exception e) {
e.printStackTrace();
QueueManger.this.reTry(queueMessage);
}finally {
lock.unLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) );
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 重試
* @param queueMessage
*/
protected void reTry(QueueMessage queueMessage) {
messagePersistent.reTry(queueMessage);
}
/**
* 分發消息
* @param queueMessage
*/
protected boolean messageDispense(QueueMessage queueMessage) {
return messagePersistent.consume(queueMessage);
}
/**
* 添加一個延時消息
* @param delayedTack
*/
public void put(DelayedTack delayedTack) {
dQueue.put(delayedTack);
}
/**
* 查詢未處理的延時消息數量
* @return
*/
public int size() {
return dQueue.size();
}
/**
* 消息處理線程存活狀態
* @return
*/
public boolean isAlive() {
return taskThread.isAlive();
}
}
消息對象:
package com.rynk.commons.entity;
import java.util.Date;
import org.springframework.data.mongodb.core.mapping.Document;
import com.rynk.commons.entity.em.QueueMessageType;
import com.rynk.commons.entity.em.TransferRecordStatus;
import com.rynk.commons.util.SnGeneratorUtil;
import lombok.Data;
@Data
@Document(collection = "mg_queue_message")
public class QueueMessage extends BaseEntity {
/**
* 喚醒時間
*/
private Date awakenDate;
/**
* 處理狀態
*/
private TransferRecordStatus transferRecordStatus;
/**
* 消息體
*/
private String body;
/**
* 消息體類型
*/
private QueueMessageType type;
/**
* 重試次數
*/
private Integer tryTimes;
/**
* 最后一次核對時間
*/
private Date lastCheckDate;
/**
*
* @param body 消息體來源類型
* @param type 消息類型
* @param delayed 延時
* @return
*/
public static QueueMessage newInstance( String body , QueueMessageType type , long delayed ) {
QueueMessage item = new QueueMessage();
item.setId( SnGeneratorUtil.getId().toString() );
item.setCreateDate( new Date() );
item.setDr(false);
item.setTransferRecordStatus( TransferRecordStatus.WAIT );
item.setTryTimes(1);
item.setBody(body);
item.setType(type);
item.setAwakenDate( new Date( System.currentTimeMillis()+delayed ));
item.setLastCheckDate( item.getAwakenDate() );
return item;
}
}
基於redis 的 分布式鎖對象:
package com.rynk.mugua.trading.biz.commons.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 分布式鎖
*
* @author ZHANGYUKUN
*
*/
@Component
public class DistributedLockHandler {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class);
/**
* 最大持有鎖的時間(毫秒)
*/
private final static long LOCK_EXPIRE = 30 * 1000L;
/**
* 嘗試獲取鎖的時間間隔(毫秒)
*/
private final static long LOCK_TRY_INTERVAL = 30L;
/**
* 獲取鎖最大等待時間( 毫秒 )
*/
private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;
@Resource// (name = "customRedisTemplate")
private RedisTemplate<String, String> template;
/**
* 嘗試獲取 分布式鎖
*
* @param lockKey
* 鎖名
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean tryLock(String lockKey) {
return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
/**
* 嘗試獲取 分布式鎖(不自動釋放鎖)
*
* @param lockKey
* 鎖名
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean tryLockNotAutoRelease(String lockKey) {
return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, -1);
}
/**
* 嘗試獲取 分布式鎖
*
* @param lockKey
* 鎖名
* @param timeout
* 獲取鎖最大等待時間
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean tryLock(String lockKey, long timeout) {
return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
/**
* 嘗試獲取 分布式鎖(不自動釋放鎖)
*
* @param lockKey
* 鎖名
* @param timeout
* 獲取鎖最大等待時間
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean tryLockNotAutoRelease(String lockKey, long timeout) {
return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, -1);
}
/**
* 嘗試獲取 分布式鎖
*
* @param lockKey
* 鎖名
* @param timeout
* 獲取鎖最大等待時間
* @param tryInterval
* 獲取鎖嘗試 時間間隔
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean tryLock(String lockKey, long timeout, long tryInterval) {
return getLock(lockKey, timeout, tryInterval, LOCK_EXPIRE);
}
/**
* 嘗試獲取 分布式鎖(不釋放鎖)
*
* @param lockKey
* 鎖名
* @param timeout
* 獲取鎖最大等待時間
* @param tryInterval
* 獲取鎖嘗試 時間間隔
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean tryLockNotAutoRelease(String lockKey, long timeout, long tryInterval) {
return getLock(lockKey, timeout, tryInterval, -1);
}
/**
* 嘗試獲取 分布式鎖
*
* @param lockKey
* 鎖名
* @param timeout
* 獲取鎖最大等待時間
* @param tryInterval
* 獲取鎖嘗試 時間間隔
* @param lockExpireTime
* 鎖最大持有時間
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean tryLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
return getLock(lockKey, timeout, tryInterval, lockExpireTime);
}
/**
* 獲取分布式鎖
*
* @param lockKey
* 鎖名
* @param timeout
* 獲取鎖最大等待時間
* @param tryInterval
* 獲取鎖嘗試 時間間隔
* @param lockExpireTime
* 鎖最大持有時間
* @return true 得到了鎖 ,false 獲取鎖失敗
*/
public boolean getLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
try {
if (StringUtils.isEmpty(lockKey)) {
return false;
}
long startTime = System.currentTimeMillis();
do {
ValueOperations<String, String> ops = template.opsForValue();
SimpleDateFormat sd = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
if (ops.setIfAbsent(lockKey, sd.format(new Date()) )) {
if (lockExpireTime > 0) {
template.expire(lockKey, lockExpireTime, TimeUnit.MILLISECONDS);
}
return true;
}
Thread.sleep(tryInterval);
} while (System.currentTimeMillis() - startTime < timeout);
} catch (InterruptedException e) {
logger.error(e.getMessage());
return false;
}
return false;
}
/**
* 釋放鎖
*
* @param lockKey
*/
public void unLock(String lockKey) {
if (!StringUtils.isEmpty(lockKey)) {
template.delete(lockKey);
}
}
}
延時任務對象: 用來分裝延時消息的
package com.rynk.mugua.trading.biz.eth;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import com.rynk.commons.entity.QueueMessage;
import com.rynk.mugua.trading.biz.mqMessage.ChainMessageDelayTimeLevel;
/**
* 延時任務
* @author zhangyukun
*
*/
public class DelayedTack implements Delayed{
/**
* 執行的時間
*/
Long runTime;
/**
* 消息對象
*/
QueueMessage queueMessage;
public QueueMessage getQueueMessage() {
return queueMessage;
}
public void setQueueMessage(QueueMessage queueMessage) {
this.queueMessage = queueMessage;
}
/**
*
* @param delay 延時毫秒數
* @param queueMessage 消息體
*/
public DelayedTack( QueueMessage queueMessage ) {
if( queueMessage.getTryTimes() == 1 ) {
this.runTime = queueMessage.getAwakenDate().getTime();
}else {
this.runTime =System.currentTimeMillis() + ChainMessageDelayTimeLevel.getDelayTimeLevel( queueMessage.getTryTimes() )*1000 ;
}
this.queueMessage = queueMessage;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert( runTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
}
}
持久化消息的對象: 三個 方式 按照自己的實現做就是了
package com.rynk.mugua.trading.biz.service.impl;
import com.rynk.commons.entity.QueueMessage;
public class MessagePersistent {
/**
* 消費這個消息要處理的業務邏輯
* @param queueMessage
* @return
*/
public boolean consume(QueueMessage queueMessage) {
return false;
}
/**
* 標記這個消息已經被正常消費
* @param queueMessage
*/
public void succeed(QueueMessage queueMessage) {
}
/**
* 重試消息(標記數據庫的狀態,然后把它重新放到延時隊列中)
* @param queueMessage
*/
public void reTry(QueueMessage queueMessage) {
}
}
備注: 上面的 消息隊列,基於Java的 延時消息,帶有持久,保證唯一消費,但是有一個明顯的 問題 ,所有的消息都在內存,如果有1億條消息,內存肯定是要爆炸的 ,
如果要求 消息負載大,那么我們可以考慮 吧部分消息 放在 內存。
1 同上 所有消息都要持久化,保證不丟失,
2 只有最近一段時間需要消費的消息才加入到內存( 比如一分鍾 )
3 加入消息的時候 ,判斷這個消息的執行執行時間,如果是 1 分鍾以內,持久化以后,直接放入內存隊列, 1 分鍾以外,直接 持久化,不放入內存。
4 每隔 一段時間 ( 1 分鍾 ),載入一個,並且可以指定載入條數。
5 如果是 多節點,考慮分布式環境的消息消費,可以 通過 消息Id 模 節點編號,來指定分配消息( 數據分片原理, 之歌分片不能隨意改動,改動為題也不大,這是知識消費,避免 重復消費就是了,但是 如果一個節點 掛了,可能 部分消息不消費 )
