ActiveMQ消息簽收機制


 消費者客戶端成功接收一條消息的標志是:這條消息被簽收。
         消費者客戶端成功接收一條消息一般包括三個階段:
         1、消費者接收消息,也即從MessageConsumer的receive方法返回
         2、消費者處理消息
         3、消息被簽收
        其中,第三階段的簽收可以有ActiveMQ發起,也可以由消費者客戶端發起,取決於Session是否開啟事務以及簽收模式的設置。
        在帶事務的Session中,消費者客戶端事務提交之時,消息自動完成簽收。
        在不帶事務的Session中,消息何時以及如何被簽收取決於Session的簽收模式設置
       非事務Session可以設置如下幾種簽收模式:
      
1.Session.AUTO_ACKNOWLEDGE
當消息從MessageConsumer的receive方法返回或者從MessageListener接口的onMessage方法返回時,會話自動確認消息簽收
2.Session.CLIENT_ACKNOWLEDGE
需要消費者客戶端主動調用acknowledge方法簽收消息,這種模式實在Session層面進行簽收的,簽收一個已經消費的消息會自動的簽收這個Session已消費的所有消息:
   例如一個消費者在一個Session中消費了5條消息,然后確認第3條消息,所有這5條消息都會被簽收
3.Session.DUPS_OK_ACKNOWLEDGE
這種方式允許JMS不必急於確認收到的消息,允許在收到多個消息之后一次完成確認,與Auto_AcKnowledge相比,這種確認方式在某些情況下可能更有效,因為沒有確認,當系統崩潰或者網絡出現故障的時候,消息可以被重新傳遞.
這種方式會引起消息的重復,但是降低了Session的開銷,所以只有客戶端能容忍重復的消息才可使用。(如果ActiveMQ再次傳送同一消息,那么消息頭中的JMSRedelivered將被設置為true)

帶事務session的案例

  生產者

    必須在生產完數據之后手動提交session

 
package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的連接
        Connection connection = connectionFactory.createConnection();
        //啟動連接
        connection.start();
        // Session: 一個發送或接收消息的線程    false:代表不帶事務的session   AUTO_ACKNOWLEDGE:代表自動簽收
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值my-queue是Query的名字
        Queue queue = session.createQueue("my-queue");
        // MessageProducer:創建消息生產者
        MessageProducer producer = session.createProducer(queue);
        // 設置不持久化  PERSISTENT:代表持久化  NON_PERSISTENT:代表不持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 發送消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        System.out.println("發送成功!");
        session.commit();
        session.close();
        connection.close();
    }
    /**
     * 在指定的會話上,通過指定的消息生產者發出一條消息
     *
     * @param session
     *            消息會話
     * @param producer
     *            消息生產者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 創建一條文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通過消息生產者發出消息
        producer.send(message);
    }
}
 

  消費者

    消費完數據之后必須手動提交session

 
package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsReceiver {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一個發送或接收消息的線程  true:表單開啟事務  AUTO_ACKNOWLEDGE:代表自動簽收
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
        Queue queue = session.createQueue("my-queue");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true) {
            //receive():獲取消息
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println("收到消息:" + message.getText());
                session.commit();
            } else {
                break;
            }
        }
        //回收資源
        session.close();
        connection.close();
    }
}
 

不帶事務session的案例

  1.自動簽收

    

  2.手動簽收

    生產者

 
package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的連接
        Connection connection = connectionFactory.createConnection();
        //啟動連接
        connection.start();
        // Session: 一個發送或接收消息的線程    false:代表不帶事務的session   AUTO_ACKNOWLEDGE:代表自動簽收
       /* Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值my-queue是Query的名字
        Queue queue = session.createQueue("my-queue");
        // MessageProducer:創建消息生產者
        MessageProducer producer = session.createProducer(queue);
        // 設置不持久化  PERSISTENT:代表持久化  NON_PERSISTENT:代表不持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 發送消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        System.out.println("發送成功!");
        session.close();
        connection.close();
    }
    /**
     * 在指定的會話上,通過指定的消息生產者發出一條消息
     *
     * @param session
     *            消息會話
     * @param producer
     *            消息生產者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 創建一條文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通過消息生產者發出消息
        producer.send(message);
     message.acknowledge();  //手動提交
  } 
}
 

    消費者

 
package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.plugin2.os.windows.SECURITY_ATTRIBUTES;

import javax.jms.*;

public class JmsReceiver {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一個發送或接收消息的線程  true:表單開啟事務  AUTO_ACKNOWLEDGE:代表自動簽收
        /*Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
        Queue queue = session.createQueue("my-queue");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true) {
            //receive():獲取消息
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println("收到消息:" + message.getText());
                message.acknowledge();  //手動提交
            } else {
                break;
            }
        }
        //回收資源
        session.close();
        connection.close();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM