生產者MessageQueueSelector實戰


 top下面默認有四個Queue, queque的數量不能大約配置,否者會報錯

 

 

假設一個top下面有三個類目,分別是手機,衣服,食品,他們發送消息都是隨機發送到一個queue里面,如果有一天,衣服的消息突然增多了,堵塞隊列了,其他兩個類目也會受到影響,造成消息發送失敗,這個的話就可以指定類目發送到哪個queue,手機指定發送到隊列0 ,衣服發送隊列1,這樣即使衣服的消息增多了也不會影響其他隊列,但這樣也會失去負載均衡

 

 

 

代碼案例:

同步發送

@Override
public MtopResult api(MtopInnerRequest request) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
List<Produce> list = Produce.produceList();

//MessageQueueSelector 選擇
if (! CollectionUtils.isEmpty(list)){
for (Produce produce : list) {
Message message = new Message("box","orderMessage",produce.getId(),JSON.toJSONString(produce).getBytes());

SendResult send = payProduct.getProducer().send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer queusNum = Integer.valueOf(o.toString());
return list.get(queusNum);
}
}, 0);
System.out.printf("發送結果=%s, msg=%s ", send.getSendStatus(), send.toString());
}
}

 

 

top 默認是4個queue

 

 

queue不能大於配置,

 

 

 報錯

 

 

異步發送

 

//MessageQueueSelector 選擇
if (!CollectionUtils.isEmpty(list)) {

for (Produce produce : list) {
Message message = new Message("box", "orderMessage", produce.getId(), JSON.toJSONString(produce).getBytes());
payProduct.getProducer().send(message, (list1, message1, o) ->
{
Integer integer = Integer.valueOf(o.toString());
return list1.get(integer);
}
, 4, new SendCallback() {

@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("發送結果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());

}

@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
throwable.printStackTrace();
}
});
}


 

 




}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM