【Active入門-2】ActiveMQ學習-生產者與消費者


1個生產者,1個消費者,使用Queue

方式1:
生產者將消息發送到Queue中,退出;
然后運行消費者:
可以看到,可以接收到消息。

方式2:
先運行消費者程序:
然后運行生產者:
消費者見下圖:

1個生產者,2個消費者,使用Queue

先運行消費者1:
在運行消費者2:
接下來運行生產者:
下面是消費者消費情況:
總結:
1. 使用Queue時,生產者只要將Message發送到MQ服務器端,消費者就可以進行消費,而無需生產者程序一直運行;
2. 消息是按照先入先出的順序,一旦有消費者將Message消費,該Message就會從MQ服務器隊列中刪去;
3. 有文章說,“生產者”<-->"消費者"是一對一的關系,其實並不准確,從上面可以看出,一個生產者產生的消息,可以被
多個消費者進行消費,只不過多個消費者在消費消息時是競爭的關系,先得到的先消費,一旦消費完成,該消息就會出隊列,
就不能被其他消費者再消費了,即“一次性消費”。

特點:
1.“離線一次性消費”;
離線:指生產者在發送消息時,不需要消費者在線,生產者只需要將消息發送到MQ隊列中,消費者可以稍后上線取消息;
一次性消費:指消費者之間存在競爭關系,任何一個消費者將消息消費掉之后,其他消費者都不能再進行消費;
若是只有1個生產者和1個消費者,就是我們熟悉的“點對點”通信了;

類似場景對比:
類似送快遞,快遞員(producer)將快遞(Message)放到指定地點(destination)后,就可以離開了,
拿快遞的人(customer)在接收到通知后,到指定地點(destination)去取快遞(Message)就可以了。
當然,取快遞時可能要進行身份驗證,這就涉及到創建連接(connection)時,需要指定用戶名和密碼了。
還有就是,實際生活中,當快遞員把快遞放好之后,照理說應該通知客戶去哪里取快遞,而ActiveMq幫我們
做好了一切,通知的工作Activemq會幫我們實現,而無需我們親自編碼通知消費者,生產者只需要將Message
放到Mq中即可,通知消費者的工作,mq會幫我們處理。


消費者程序:

   
   
   
           
  1. package com.ll.activemq;
  2. import java.util.Collection;
  3. import java.util.Iterator;
  4. import javax.jms.Connection;
  5. import javax.jms.ConnectionFactory;
  6. import javax.jms.Destination;
  7. import javax.jms.MapMessage;
  8. import javax.jms.MessageConsumer;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import org.apache.activemq.ActiveMQConnection;
  12. import org.apache.activemq.ActiveMQConnectionFactory;
  13. public class Receiver {
  14. public static void main(String[] args) {
  15. // ConnectionFactory :連接工廠,JMS 用它創建連接
  16. ConnectionFactory connectionFactory;
  17. // Connection :JMS 客戶端到JMS Provider 的連接
  18. Connection connection = null;
  19. // Session: 一個發送或接收消息的線程
  20. Session session;
  21. // Destination :消息的目的地;消息發送給誰.
  22. Destination destination;
  23. // 消費者,消息接收者
  24. MessageConsumer consumer;
  25. connectionFactory = new ActiveMQConnectionFactory(
  26. ActiveMQConnection.DEFAULT_USER,
  27. ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
  28. try {
  29. // 構造從工廠得到連接對象
  30. connection = connectionFactory.createConnection();
  31. // 啟動
  32. connection.start();
  33. // 獲取操作連接
  34. session = connection.createSession(Boolean.FALSE,
  35. Session.AUTO_ACKNOWLEDGE);
  36. // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
  37. destination = session.createQueue("FirstQueue");
  38. consumer = session.createConsumer(destination);
  39. // consumer2 = session.createConsumer(destination);
  40. System.out.println("消費者1:消費者程序開始運行...");
  41. while (true) {
  42. // 設置接收者接收消息的時間,為了便於測試,這里誰定為100s
  43. TextMessage message = (TextMessage) consumer.receive(18000);
  44. // Object message = (Object) consumer.receive(28000);
  45. if (null != message) {
  46. System.out.println("消費者1:收到消息-->" + message.getText());
  47. } else {
  48. System.out.println("消費者1:運行結束...\n");
  49. break;
  50. }
  51. }
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. } finally {
  55. try {
  56. if (null != connection)
  57. connection.close();
  58. } catch (Throwable ignore) {
  59. }
  60. }
  61. }
  62. }

生產者程序:

    
    
    
            
  1. package com.ll.activemq;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.DeliveryMode;
  5. import javax.jms.Destination;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. public class Sender {
  12. private static final int SEND_NUMBER = 5;
  13. public static void main(String[] args) {
  14. // ConnectionFactory :連接工廠,JMS 用它創建連接
  15. ConnectionFactory connectionFactory;
  16. // Connection :JMS 客戶端到JMS Provider 的連接
  17. Connection connection = null;
  18. // Session: 一個發送或接收消息的線程
  19. Session session;
  20. // Destination :消息的目的地;消息發送給誰.
  21. Destination destination;
  22. // MessageProducer:消息發送者
  23. MessageProducer producer;
  24. // TextMessage message;
  25. // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar
  26. connectionFactory = new ActiveMQConnectionFactory(
  27. ActiveMQConnection.DEFAULT_USER,
  28. ActiveMQConnection.DEFAULT_PASSWORD,
  29. "tcp://localhost:61616");
  30. try {
  31. // 構造從工廠得到連接對象
  32. connection = connectionFactory.createConnection();
  33. // 啟動
  34. connection.start();
  35. // 獲取操作連接
  36. session = connection.createSession(Boolean.TRUE,
  37. Session.AUTO_ACKNOWLEDGE);
  38. // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
  39. destination = session.createQueue("FirstQueue");
  40. // 得到消息生成者【發送者】
  41. producer = session.createProducer(destination);
  42. // 設置不持久化,此處學習,實際根據項目決定
  43. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  44. // 構造消息,此處寫死,項目就是參數,或者方法獲取
  45. sendMessage(session, producer);
  46. session.commit();
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. } finally {
  50. try {
  51. if (null != connection)
  52. connection.close();
  53. } catch (Throwable ignore) {
  54. }
  55. }
  56. }
  57. public static void sendMessage(Session session, MessageProducer producer)
  58. throws Exception {
  59. for (int i = 1; i <= SEND_NUMBER; i++) {
  60. TextMessage message = session
  61. .createTextMessage("ActiveMq 發送的消息" + i);
  62. // 發送消息到目的地方
  63. System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
  64. producer.send(message);
  65. }
  66. System.out.println("生產者程序退出...");
  67. }
  68. }






附件列表

     


    免責聲明!

    本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



     
    粵ICP備18138465號   © 2018-2025 CODEPRJ.COM