基於rabbitMQ 消息延時隊列方案 模擬電商超時未支付訂單處理場景


前言

傳統處理超時訂單
  • 采取定時任務輪訓數據庫訂單,並且批量處理。其弊端也是顯而易見的;對服務器、數據庫性會有很大的要求,並且當處理大量訂單起來會很力不從心,而且實時性也不是特別好
  • 當然傳統的手法還可以再優化一下,即存入訂單的時候就算出訂單的過期時間插入數據庫,設置定時任務查詢數據庫的時候就只需要查詢過期了的訂單,然后再做其他的業務操作
jdk延遲隊列 DelayQueue
  • 采取jdk自帶的延遲隊列能很好的優化傳統的處理方案,但是該方案的弊、端也是非常致命的,所有的消息數據都是存於內存之中,一旦宕機或重啟服務隊列中數據就全無了,而且也無法進行擴展。
  • rabbitMQ延時隊列方案
    rabbitmq我就不多介紹了,一台普通的rabbitmq服務器單隊列容納千萬級別的消息還是沒什么壓力的,而且rabbitmq集群擴展支持的也是非常好的,並且隊列中的消息是可以進行持久化,即使我們重啟或者宕機也能保證數據不丟失

術語 (詳情請參照官網文檔:http://www.rabbitmq.com/admin-guide.html)

存活時間(Time-To-Live 簡稱 TTL),分別有三種TTL的設置模式
  • x-message-ttl ,該屬性是在創建隊列的時候 ,在arguments的map中配置;該參數的作用是設置當前隊列中所有的消息的存活時間
  • x-expires 該屬性也是在arguments中配置;其作用是設置當前隊列在N毫秒中(不能為0,且為正整數),就刪除該隊列;“未使用”意味着隊列沒有消費者,隊列尚未重新聲明,並且至少在有效期內未調用basicGet (basicGet 是手動拉取指定隊列中的一條消息)
  • AMQP.BasicProperties配置中的exppiration 屬性,前兩者都是基於隊列的TTL,該屬性是基於單條消息的TLL用於配置每條消息在隊列中的存活時間
死信交換(Dead Letter Exchanges 簡稱 DLX)
  • ”死信交換“ 可以分開來理解 ;首先是 ”死信“,也就是死亡的信息,無效的信息;造成這樣的信息有以下幾種情況
    消息被拒絕,即消費者沒有成功確認消息被消費
    消息TTL過期
    超出隊列長度限制
    當出現這三種情況的時候,隊列中的消息就會變為“死信”

  • 再來理解”交換“ 也就是說,當出現"死信"的情況下 rabbitmq 可以對該"死信"進行交換到別的隊列上,但是交換的前提是需要為死信配置一個交換機用於死信的交換

代碼實現

配置類 RabbitmqConfiguration
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * TODO rabbitmq配置類
 */  
public class RabbitmqConfiguration { 
	private final String SERVER_HOST="127.0.0.1";//rabbitmq 服務器地址
	private final int PORT=5672;//端口號 
	private final String USER_NAME="test";//用戶名
	private final String PASSWORD="test";//密碼
	private final boolean QUEUE_SAVE =true;//隊列是否持久化
	private final String MESSAGE_SAVE = "1" ;//消息持久化  1,0 
	//rabbitmq 連接工廠
	private final ConnectionFactory RAB_FACTORY = new ConnectionFactory();
	private Connection connection;
  
	public void init() throws Exception{
		RAB_FACTORY.setHost(SERVER_HOST);
		RAB_FACTORY.setPort(PORT); 
		RAB_FACTORY.setUsername(USER_NAME);
		RAB_FACTORY.setPassword(PASSWORD);
		this.connection=RAB_FACTORY.newConnection();
	} 
	public Connection getConnection() {
		return connection;
	}
 
	public void setConnection(Connection connection) {
		this.connection = connection;
	}
 
	public boolean isQUEUE_SAVE() {
		return QUEUE_SAVE;
	}
 
	public String getMESSAGE_SAVE() {
		return MESSAGE_SAVE;
	}
		
}
功能類 OrderOverTimeQueue
import java.io.IOException; 
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * TODO 超時未支付訂單處理消息隊列
 */
public class OrderOverTimeQueue {
	
	private RabbitmqConfiguration rabConf;
	  
	//隊列名稱
	//****==================訂單延時隊列=======================*****//
	//訂單延時隊列
	public final String DELAY_QUEUE_NAME = "delay-queue-orderOverTime";
	//訂單延時消費隊列
	public final String CONSUME_QUEUE_NAME = "consume-queue-orderOverTime";
	//訂單延時隊列死信交換的交換器名稱
	public final String EXCHANGENAME = "exchange-orderOverTime";
	//訂單延時隊列死信的交換器路由key
	public final String ROUTINGKEY = "routingKey-orderOverTime";
	
	private Channel delayChannel;//延時隊列連接通道
	
	private Channel consumerChannel;//消費隊列連接通道
	
	public void init() throws Exception{
		//創建連接通道
		delayChannel=rabConf.getConnection().createChannel();  
		consumerChannel=rabConf.getConnection().createChannel();  
		
		//創建交換器
		consumerChannel.exchangeDeclare(EXCHANGENAME,"direct"); 
		
		/**創建處理延時消息的延時隊列*/
		Map <String,Object> arg = new HashMap <String,Object>();
		//配置死信交換器
	    arg.put("x-dead-letter-exchange",EXCHANGENAME); //交換器名稱
	    //死信交換路由key (交換器可以將死信交換到很多個其他的消費隊列,可以用不同的路由key 來將死信路由到不同的消費隊列去)
	    arg.put("x-dead-letter-routing-key", ROUTINGKEY); 
	    delayChannel.queueDeclare(DELAY_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, arg);  
		
		/**創建消費隊列*/
		consumerChannel.queueDeclare(CONSUME_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, null);
		//參數1:綁定的隊列名  參數2:綁定至哪個交換器  參數3:綁定路由key
		consumerChannel.queueBind(CONSUME_QUEUE_NAME, EXCHANGENAME,ROUTINGKEY); 
		//最多接受條數 0為無限制,每次消費消息數(根據實際場景設置),true=作用於整channel,false=作用於具體的消費者
		consumerChannel.basicQos(0,10, false);
		
	    //創建消費隊列的消費者
  		Consumer consumer = new DefaultConsumer(consumerChannel) {
  	    	@Override
  	    	public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) 
  	    	  throws IOException  {
  	    		String message = new String(body, "UTF-8");
  	    		try {
  	    			//業務邏輯處理
  	    			ConsumeMessage(message);  
  	    			//確認消息已經消費 	參數2(true=設置后續消息為自動確認消費  false=為手動確認)
  	    			consumerChannel.basicAck(envelope.getDeliveryTag(), false);
  	    		}catch (Exception e) { 
  	    		}  
  	    	  }
  	    	}; 
	  	    	
	  	boolean flag=false;//是否手動確認消息  true 是  false否 
	  	consumerChannel.basicConsume(CONSUME_QUEUE_NAME, flag, consumer);
	}
 
	/**
	 * 方法描述: 發送延遲訂單處理消息
	 * @param msg 消息內容 (訂單號或者json格式字符串)
	 * @param overTime 消息存活時間
	 * @throws Exception
	 */
	public void sendMessage(String msg,Long overTime) throws Exception{ 
		AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration(overTime.toString()) //設置消息存活時間(毫秒)
                .build();
		delayChannel.basicPublish("",DELAY_QUEUE_NAME, properties, msg.getBytes("UTF-8"));
	}
	
	/**
	 * 
	 * 方法描述: 
	 * 業務邏輯說明: TODO(總結性的歸納方法業務邏輯) 
	 * @param msg 消費消息(訂單號,或特定格式json字符串)
	 * @throws InterruptedException
	 */
	public void ConsumeMessage(String msg) throws InterruptedException { 
		Thread.sleep(50);//模擬業務邏輯處理
		System.out.println("處理到期消息時間=="+System.currentTimeMillis());
		System.err.println("刪除訂單 order-number  ==  "+msg);
	}
	
	
	public RabbitmqConfiguration getRabConf() {
		return rabConf;
	}
 
 
	public void setRabConf(RabbitmqConfiguration rabConf) {
		this.rabConf = rabConf;
	}
	
	public static void main(String[] args) throws Exception {
		OrderOverTimeQueue ooto=new OrderOverTimeQueue();
		RabbitmqConfiguration rf= new RabbitmqConfiguration();
		rf.init();
		ooto.setRabConf(rf);
		ooto.init();
 
		//模擬用戶產生訂單 消息生存時長為30秒
		ooto.sendMessage("20180907-order-number", 10000l);
		
		System.out.println("創建消息時間=="+System.currentTimeMillis());
		
		
		
	}
	
	
}

最終效果

20180909212141792.png

如果消息還存活的話,在延遲隊列中的“ready”和“total”中都會存在相應的消息記錄數

20180909211433631.png

寫的比較粗糙 歡迎大家發表自己的觀點 >_< !


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM