一個簡單的消息隊列的實現(支持延時消息,支持持久化,保證唯一消費)


主要的消息管理者對象:

package com.rynk.mugua.trading.biz.service.impl;

import java.util.concurrent.DelayQueue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rynk.commons.entity.QueueMessage;
import com.rynk.mugua.trading.biz.commons.RedisKeyResolver;
import com.rynk.mugua.trading.biz.commons.lock.DistributedLockHandler;
import com.rynk.mugua.trading.biz.eth.DelayedTack;

import lombok.extern.slf4j.Slf4j;
/**
 * 延時消息管理員
 * @author ZHANGYUKUNUP
 *
 */
@Component
@Slf4j
public class QueueManger {
	
	MessagePersistent messagePersistent;
	
	/**
	 * 延時消息隊列
	 */
	private DelayQueue<DelayedTack> dQueue = new DelayQueue<>();
	
	/**
	 * 消息任務處理線程
	 */
	private Thread taskThread;
	
	@Autowired
	DistributedLockHandler lock;
	
	
	public QueueManger() {
		taskThread = new TaskThread();
		taskThread.start();
	}
	
	/**
	 * 任務線程
	 * @author ZHANGYUKUNUP
	 *
	 */
	class TaskThread extends Thread{
		
		@Override
		public void run() {
			while (true) {
				try {
					DelayedTack delayedTack = dQueue.take();
					QueueMessage queueMessage = delayedTack.getQueueMessage();
					if( queueMessage == null ) {
						return ;
					}
					
					//簡單的加個鎖保證消息不被重復消費(需要保證解鎖前 數據被提交到數據庫,否者會出同步問題 ,也就是說不能有更加大的 事務范圍 包裹當前方法 )
					if(  lock.tryLock(  RedisKeyResolver.getMsgrKey( queueMessage.getId() )    ) ) {
						
						//如果這個消息被正常消費,那么久標記消費成功,如果異常消費,那么久重試這個消息
						try {
							if( QueueManger.this.messageDispense(delayedTack.getQueueMessage())  ) {
								messagePersistent.succeed( queueMessage );
							}else {
								QueueManger.this.reTry( queueMessage  );
							}
						}catch (Exception e) {
							e.printStackTrace();
							QueueManger.this.reTry(queueMessage);
						}finally {
							lock.unLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() )    );
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}
	
	
	
	
	/**
	 * 重試
	 * @param queueMessage
	 */
	protected void reTry(QueueMessage queueMessage) {
		messagePersistent.reTry(queueMessage);
	}



	/**
	 * 分發消息
	 * @param queueMessage
	 */
	protected boolean messageDispense(QueueMessage queueMessage) {
		
		return messagePersistent.consume(queueMessage);
	}

	/**
	 * 添加一個延時消息
	 * @param delayedTack
	 */
	public void put(DelayedTack delayedTack) {
		dQueue.put(delayedTack);
	}
	
	/**
	 * 查詢未處理的延時消息數量
	 * @return
	 */
	public int size() {
		return dQueue.size();
	}
	
	/**
	 * 消息處理線程存活狀態
	 * @return
	 */
	public boolean isAlive() {
		return taskThread.isAlive();
	}
	

}

 

消息對象:

package com.rynk.commons.entity;

import java.util.Date;

import org.springframework.data.mongodb.core.mapping.Document;

import com.rynk.commons.entity.em.QueueMessageType;
import com.rynk.commons.entity.em.TransferRecordStatus;
import com.rynk.commons.util.SnGeneratorUtil;

import lombok.Data;

@Data
@Document(collection = "mg_queue_message")
public class QueueMessage extends BaseEntity {

	/**
	 * 喚醒時間
	 */
	private Date awakenDate;
	
	
	/**
	 * 處理狀態
	 */
	private TransferRecordStatus transferRecordStatus;
	
	
	/**
	 * 消息體
	 */
	private String  body;
	
	/**
	 * 消息體類型
	 */
	private QueueMessageType type;
	

	/**
	 * 重試次數
	 */
	private Integer tryTimes;
	
	
	/**
	 * 最后一次核對時間
	 */
	private Date lastCheckDate;
	

	
	
	/**
	 * 
	 * @param body 消息體來源類型
	 * @param type 消息類型
	 * @param delayed 延時
	 * @return
	 */
	public static  QueueMessage newInstance( String  body , QueueMessageType type , long delayed ) {
		QueueMessage item = new QueueMessage();
		item.setId( SnGeneratorUtil.getId().toString() );
		item.setCreateDate(  new Date() );
		item.setDr(false);
		
		item.setTransferRecordStatus( TransferRecordStatus.WAIT );
		item.setTryTimes(1);
		item.setBody(body);
		item.setType(type);
		item.setAwakenDate( new Date( System.currentTimeMillis()+delayed ));
		item.setLastCheckDate( item.getAwakenDate() );
		
		return item;
	}

	
}

  

 

基於redis 的 分布式鎖對象:

 

package com.rynk.mugua.trading.biz.commons.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 分布式鎖
 *
 * @author ZHANGYUKUN
 *
 */
@Component
public class DistributedLockHandler {

	private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class);

	/**
	 * 最大持有鎖的時間(毫秒)
	 */
	private final static long LOCK_EXPIRE = 30 * 1000L;

	/**
	 * 嘗試獲取鎖的時間間隔(毫秒)
	 */
	private final static long LOCK_TRY_INTERVAL = 30L;

	/**
	 * 獲取鎖最大等待時間( 毫秒 )
	 */
	private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;

	@Resource// (name = "customRedisTemplate")
	private RedisTemplate<String, String> template;

	/**
	 * 嘗試獲取 分布式鎖
	 *
	 * @param lockKey
	 *            鎖名
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean tryLock(String lockKey) {
		return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
	}

	/**
	 * 嘗試獲取 分布式鎖(不自動釋放鎖)
	 *
	 * @param lockKey
	 *            鎖名
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean tryLockNotAutoRelease(String lockKey) {
		return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, -1);
	}

	/**
	 * 嘗試獲取 分布式鎖
	 *
	 * @param lockKey
	 *            鎖名
	 * @param timeout
	 *            獲取鎖最大等待時間
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean tryLock(String lockKey, long timeout) {
		return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
	}

	/**
	 * 嘗試獲取 分布式鎖(不自動釋放鎖)
	 *
	 * @param lockKey
	 *            鎖名
	 * @param timeout
	 *            獲取鎖最大等待時間
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean tryLockNotAutoRelease(String lockKey, long timeout) {
		return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, -1);
	}

	/**
	 * 嘗試獲取 分布式鎖
	 *
	 * @param lockKey
	 *            鎖名
	 * @param timeout
	 *            獲取鎖最大等待時間
	 * @param tryInterval
	 *            獲取鎖嘗試 時間間隔
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean tryLock(String lockKey, long timeout, long tryInterval) {
		return getLock(lockKey, timeout, tryInterval, LOCK_EXPIRE);
	}

	/**
	 * 嘗試獲取 分布式鎖(不釋放鎖)
	 *
	 * @param lockKey
	 *            鎖名
	 * @param timeout
	 *            獲取鎖最大等待時間
	 * @param tryInterval
	 *            獲取鎖嘗試 時間間隔
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean tryLockNotAutoRelease(String lockKey, long timeout, long tryInterval) {
		return getLock(lockKey, timeout, tryInterval, -1);
	}

	/**
	 * 嘗試獲取 分布式鎖
	 *
	 * @param lockKey
	 *            鎖名
	 * @param timeout
	 *            獲取鎖最大等待時間
	 * @param tryInterval
	 *            獲取鎖嘗試 時間間隔
	 * @param lockExpireTime
	 *            鎖最大持有時間
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean tryLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
		return getLock(lockKey, timeout, tryInterval, lockExpireTime);
	}

	/**
	 * 獲取分布式鎖
	 *
	 * @param lockKey
	 *            鎖名
	 * @param timeout
	 *            獲取鎖最大等待時間
	 * @param tryInterval
	 *            獲取鎖嘗試 時間間隔
	 * @param lockExpireTime
	 *            鎖最大持有時間
	 * @return true 得到了鎖 ,false 獲取鎖失敗
	 */
	public boolean getLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
		try {
			if (StringUtils.isEmpty(lockKey)) {
				return false;
			}
			long startTime = System.currentTimeMillis();
			do {
				ValueOperations<String, String> ops = template.opsForValue();
				SimpleDateFormat sd = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
				if (ops.setIfAbsent(lockKey, sd.format(new Date())  )) {
					if (lockExpireTime > 0) {
						template.expire(lockKey, lockExpireTime, TimeUnit.MILLISECONDS);
					}
					return true;
				}
				Thread.sleep(tryInterval);
			} while (System.currentTimeMillis() - startTime < timeout);
		} catch (InterruptedException e) {
			logger.error(e.getMessage());
			return false;
		}
		return false;
	}

	/**
	 * 釋放鎖
	 *
	 * @param lockKey
	 */
	public void unLock(String lockKey) {
		if (!StringUtils.isEmpty(lockKey)) {
			template.delete(lockKey);
		}
	}

}

  

 

延時任務對象: 用來分裝延時消息的

 

package com.rynk.mugua.trading.biz.eth;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import com.rynk.commons.entity.QueueMessage;
import com.rynk.mugua.trading.biz.mqMessage.ChainMessageDelayTimeLevel;

/**
 * 延時任務
 * @author zhangyukun
 *
 */
public class DelayedTack implements Delayed{
	/**
	 *  執行的時間
	 */
	Long  runTime;
	
	/**
	 * 消息對象
	 */
	QueueMessage queueMessage;
	
	
	public QueueMessage getQueueMessage() {
		return queueMessage;
	}

	public void setQueueMessage(QueueMessage queueMessage) {
		this.queueMessage = queueMessage;
	}


	/**
	 * 
	 * @param delay 延時毫秒數
	 * @param queueMessage 消息體
	 */
	public DelayedTack( QueueMessage queueMessage  ) {
		
		if( queueMessage.getTryTimes() == 1  ) {
			this.runTime = queueMessage.getAwakenDate().getTime();
		}else {
			this.runTime =System.currentTimeMillis() + ChainMessageDelayTimeLevel.getDelayTimeLevel( queueMessage.getTryTimes() )*1000 ;
		}
		
		this.queueMessage = queueMessage;
	}
	
	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert( runTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
	}

	@Override
	public int compareTo(Delayed o) {
		return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
	}
	


}

  

持久化消息的對象: 三個 方式 按照自己的實現做就是了

package com.rynk.mugua.trading.biz.service.impl;

import com.rynk.commons.entity.QueueMessage;

public class MessagePersistent {
	
	/**
	 * 消費這個消息要處理的業務邏輯
	 * @param queueMessage
	 * @return
	 */
	public boolean consume(QueueMessage queueMessage) {
		return false;
	}
	
	/**
	 * 標記這個消息已經被正常消費
	 * @param queueMessage
	 */
	public void succeed(QueueMessage queueMessage) {
		
	}
	
	/**
	 * 重試消息(標記數據庫的狀態,然后把它重新放到延時隊列中)
	 * @param queueMessage
	 */
	public void reTry(QueueMessage queueMessage) {
		
		
	}

}

  

 

 

 

 

 

 

 

 

 

 

備注: 上面的 消息隊列,基於Java的 延時消息,帶有持久,保證唯一消費,但是有一個明顯的 問題 ,所有的消息都在內存,如果有1億條消息,內存肯定是要爆炸的 ,

如果要求 消息負載大,那么我們可以考慮 吧部分消息 放在 內存。

  1 同上 所有消息都要持久化,保證不丟失,

  2 只有最近一段時間需要消費的消息才加入到內存( 比如一分鍾 )

  3 加入消息的時候 ,判斷這個消息的執行執行時間,如果是 1 分鍾以內,持久化以后,直接放入內存隊列, 1 分鍾以外,直接 持久化,不放入內存。

  4 每隔 一段時間 ( 1 分鍾   ),載入一個,並且可以指定載入條數。

  5 如果是  多節點,考慮分布式環境的消息消費,可以 通過 消息Id 模  節點編號,來指定分配消息(  數據分片原理, 之歌分片不能隨意改動,改動為題也不大,這是知識消費,避免 重復消費就是了,但是 如果一個節點 掛了,可能 部分消息不消費 )

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM