前言
傳統處理超時訂單
- 采取定時任務輪訓數據庫訂單,並且批量處理。其弊端也是顯而易見的;對服務器、數據庫性會有很大的要求,並且當處理大量訂單起來會很力不從心,而且實時性也不是特別好
- 當然傳統的手法還可以再優化一下,即存入訂單的時候就算出訂單的過期時間插入數據庫,設置定時任務查詢數據庫的時候就只需要查詢過期了的訂單,然后再做其他的業務操作
jdk延遲隊列 DelayQueue
- 采取jdk自帶的延遲隊列能很好的優化傳統的處理方案,但是該方案的弊、端也是非常致命的,所有的消息數據都是存於內存之中,一旦宕機或重啟服務隊列中數據就全無了,而且也無法進行擴展。
- rabbitMQ延時隊列方案
rabbitmq我就不多介紹了,一台普通的rabbitmq服務器單隊列容納千萬級別的消息還是沒什么壓力的,而且rabbitmq集群擴展支持的也是非常好的,並且隊列中的消息是可以進行持久化,即使我們重啟或者宕機也能保證數據不丟失
術語 (詳情請參照官網文檔:http://www.rabbitmq.com/admin-guide.html)
存活時間(Time-To-Live 簡稱 TTL),分別有三種TTL的設置模式
- x-message-ttl ,該屬性是在創建隊列的時候 ,在arguments的map中配置;該參數的作用是設置當前隊列中所有的消息的存活時間
- x-expires 該屬性也是在arguments中配置;其作用是設置當前隊列在N毫秒中(不能為0,且為正整數),就刪除該隊列;“未使用”意味着隊列沒有消費者,隊列尚未重新聲明,並且至少在有效期內未調用basicGet (basicGet 是手動拉取指定隊列中的一條消息)
- AMQP.BasicProperties配置中的exppiration 屬性,前兩者都是基於隊列的TTL,該屬性是基於單條消息的TLL用於配置每條消息在隊列中的存活時間
死信交換(Dead Letter Exchanges 簡稱 DLX)
-
”死信交換“ 可以分開來理解 ;首先是 ”死信“,也就是死亡的信息,無效的信息;造成這樣的信息有以下幾種情況
消息被拒絕,即消費者沒有成功確認消息被消費
消息TTL過期
超出隊列長度限制
當出現這三種情況的時候,隊列中的消息就會變為“死信” -
再來理解”交換“ 也就是說,當出現"死信"的情況下 rabbitmq 可以對該"死信"進行交換到別的隊列上,但是交換的前提是需要為死信配置一個交換機用於死信的交換
代碼實現
配置類 RabbitmqConfiguration
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* TODO rabbitmq配置類
*/
public class RabbitmqConfiguration {
private final String SERVER_HOST="127.0.0.1";//rabbitmq 服務器地址
private final int PORT=5672;//端口號
private final String USER_NAME="test";//用戶名
private final String PASSWORD="test";//密碼
private final boolean QUEUE_SAVE =true;//隊列是否持久化
private final String MESSAGE_SAVE = "1" ;//消息持久化 1,0
//rabbitmq 連接工廠
private final ConnectionFactory RAB_FACTORY = new ConnectionFactory();
private Connection connection;
public void init() throws Exception{
RAB_FACTORY.setHost(SERVER_HOST);
RAB_FACTORY.setPort(PORT);
RAB_FACTORY.setUsername(USER_NAME);
RAB_FACTORY.setPassword(PASSWORD);
this.connection=RAB_FACTORY.newConnection();
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public boolean isQUEUE_SAVE() {
return QUEUE_SAVE;
}
public String getMESSAGE_SAVE() {
return MESSAGE_SAVE;
}
}
功能類 OrderOverTimeQueue
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* TODO 超時未支付訂單處理消息隊列
*/
public class OrderOverTimeQueue {
private RabbitmqConfiguration rabConf;
//隊列名稱
//****==================訂單延時隊列=======================*****//
//訂單延時隊列
public final String DELAY_QUEUE_NAME = "delay-queue-orderOverTime";
//訂單延時消費隊列
public final String CONSUME_QUEUE_NAME = "consume-queue-orderOverTime";
//訂單延時隊列死信交換的交換器名稱
public final String EXCHANGENAME = "exchange-orderOverTime";
//訂單延時隊列死信的交換器路由key
public final String ROUTINGKEY = "routingKey-orderOverTime";
private Channel delayChannel;//延時隊列連接通道
private Channel consumerChannel;//消費隊列連接通道
public void init() throws Exception{
//創建連接通道
delayChannel=rabConf.getConnection().createChannel();
consumerChannel=rabConf.getConnection().createChannel();
//創建交換器
consumerChannel.exchangeDeclare(EXCHANGENAME,"direct");
/**創建處理延時消息的延時隊列*/
Map <String,Object> arg = new HashMap <String,Object>();
//配置死信交換器
arg.put("x-dead-letter-exchange",EXCHANGENAME); //交換器名稱
//死信交換路由key (交換器可以將死信交換到很多個其他的消費隊列,可以用不同的路由key 來將死信路由到不同的消費隊列去)
arg.put("x-dead-letter-routing-key", ROUTINGKEY);
delayChannel.queueDeclare(DELAY_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, arg);
/**創建消費隊列*/
consumerChannel.queueDeclare(CONSUME_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, null);
//參數1:綁定的隊列名 參數2:綁定至哪個交換器 參數3:綁定路由key
consumerChannel.queueBind(CONSUME_QUEUE_NAME, EXCHANGENAME,ROUTINGKEY);
//最多接受條數 0為無限制,每次消費消息數(根據實際場景設置),true=作用於整channel,false=作用於具體的消費者
consumerChannel.basicQos(0,10, false);
//創建消費隊列的消費者
Consumer consumer = new DefaultConsumer(consumerChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
//業務邏輯處理
ConsumeMessage(message);
//確認消息已經消費 參數2(true=設置后續消息為自動確認消費 false=為手動確認)
consumerChannel.basicAck(envelope.getDeliveryTag(), false);
}catch (Exception e) {
}
}
};
boolean flag=false;//是否手動確認消息 true 是 false否
consumerChannel.basicConsume(CONSUME_QUEUE_NAME, flag, consumer);
}
/**
* 方法描述: 發送延遲訂單處理消息
* @param msg 消息內容 (訂單號或者json格式字符串)
* @param overTime 消息存活時間
* @throws Exception
*/
public void sendMessage(String msg,Long overTime) throws Exception{
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration(overTime.toString()) //設置消息存活時間(毫秒)
.build();
delayChannel.basicPublish("",DELAY_QUEUE_NAME, properties, msg.getBytes("UTF-8"));
}
/**
*
* 方法描述:
* 業務邏輯說明: TODO(總結性的歸納方法業務邏輯)
* @param msg 消費消息(訂單號,或特定格式json字符串)
* @throws InterruptedException
*/
public void ConsumeMessage(String msg) throws InterruptedException {
Thread.sleep(50);//模擬業務邏輯處理
System.out.println("處理到期消息時間=="+System.currentTimeMillis());
System.err.println("刪除訂單 order-number == "+msg);
}
public RabbitmqConfiguration getRabConf() {
return rabConf;
}
public void setRabConf(RabbitmqConfiguration rabConf) {
this.rabConf = rabConf;
}
public static void main(String[] args) throws Exception {
OrderOverTimeQueue ooto=new OrderOverTimeQueue();
RabbitmqConfiguration rf= new RabbitmqConfiguration();
rf.init();
ooto.setRabConf(rf);
ooto.init();
//模擬用戶產生訂單 消息生存時長為30秒
ooto.sendMessage("20180907-order-number", 10000l);
System.out.println("創建消息時間=="+System.currentTimeMillis());
}
}