activeMQ的request-response請求響應模式


一:為什么需要請求響應模式

  在消息中間中,生產者只負責生產消息,而消費者只負責消費消息,兩者並無直接的關聯。但是如果生產者想要知道消費者有沒有消費完,或者用不用重新發送的時候,這時就要用到請求響應模式。

  應用場景:

  1:主要確定mq有沒有正確的消費消息。  

  2:當某一個業務發送mq,但是需要返回結果,這時候就要用到請求響應模式。應用的場景不是很多。

二:具體的代碼操作

 第一種:activeMQ的spring代碼

  在生產者的xml配置文件中,加上一個監聽的。

<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
</jms:listener-container>

 2:生產的java代碼
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message1 = session.createTextMessage(message);
//發送的時候,告訴消費者應答消息發送到那里
Destination destination = session.createTemporaryQueue();
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(getResponse);
message1.setJMSReplyTo(destination);

String uid = System.currentTimeMillis()+"";
message1.setJMSCorrelationID(uid);

return message1;
}
});
3:生產者創建一個消費
@Component
public class GetResponse implements MessageListener {

    public void onMessage(Message message) {
        try {
            System.out.println("GetResponse accept msg :"+((TextMessage)message).getText());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
View Code

 

4:修改消費端代碼

try {
System.out.println("QueueReceiver1 accept msg : "+((TextMessage)message).getText());
//業務工作
reploy.send(((TextMessage)message).getText(),message);  //消費完成以后發送一個消息,告訴生產者已經消費成功。
}catch (Exception e){
e.printStackTrace();
}
5:創建消費者發送消息
public void send(final String consumerMsg, Message produceMessage) throws Exception{
jmsTemplate.send(produceMessage.getJMSReplyTo(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage("ReplyTo "+consumerMsg);
return msg;
}
});
}

以上就是在spring當中使用,請求-應答模式。



第二種:在spring-boot當中使用請求-應答
1:mq的發送者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

/**
 * mq發送者
 */
@Service
public class ActivePro {

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    public void sendMessage(Destination destination, String message){
        jmsTemplate.convertAndSend(destination,message);
    }

    @JmsListener(destination = "boot.reploy")
    public void receiveQueue(String text){
        System.out.println(text);
    }
}
View Code

 2:mq的消費者

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * mq的消費者
 */
@Component
public class ActiveCon {

    @JmsListener(destination = "boot1.queue")
    @SendTo("boot.reploy")
    public String receiveQueue(String text){
        System.out.println(text);
        return "I am tom";
    }
}
View Code

 3:測試的

    @Test
    public void contextLoads2() {
        try {
            Destination destination =
                    new ActiveMQQueue("boot1.queue");
            produce.sendMessage(destination,"aaaaa");
            System.in.read();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
View Code
 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM