ActiveMQ消息發送與接收


推薦文章:ActiveMQ訊息傳送機制以及ACK機制

 

ActiveMQ發送消息

  1:創建鏈接工廠ConnectionFactory

  2:創建鏈接Connection

  3:啟動session

  4:創建消息發送目的地

  5:創建生產者

  6:發送消息

消息發送類:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final String USERNAME = "admin";
    
    private static final String PASSWORD = "admin";
    
    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
    
    private AtomicInteger count = new AtomicInteger();
    
    private ConnectionFactory connectionFactory;
    
    private Connection connection;
    
    private Session session;
    
    private Queue queue;
    
    private MessageProducer producer;
    
    public void init() {
        try {
            //創建一個鏈接工廠
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //從工廠中創建一個鏈接
            connection = connectionFactory.createConnection();
            //啟動鏈接,不啟動不影響消息的發送,但影響消息的接收
            connection.start();
            //創建一個事物session
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //獲取消息發送的目的地,指消息發往那個地方
            queue = session.createQueue("test");
            //獲取消息發送的生產者
            producer = session.createProducer(queue);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public void sendMsg(String queueName) {
        try {
            int num = count.getAndIncrement();
            TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:生產者發送消息!,count:"+num);
            producer.send(msg);
                 
            session.commit();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
}
connection.createSession方法
 /**
     * Creates a <CODE>Session</CODE> object.
     *
     * @param transacted indicates whether the session is transacted
     * @param acknowledgeMode indicates whether the consumer or the client will
     *                acknowledge any messages it receives; ignored if the
     *                session is transacted. Legal values are
     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
     * @return a newly created session
     * @throws JMSException if the <CODE>Connection</CODE> object fails to
     *                 create a session due to some internal error or lack of
     *                 support for the specific transaction and acknowledgement
     *                 mode.
     * @see Session#AUTO_ACKNOWLEDGE
     * @see Session#CLIENT_ACKNOWLEDGE
     * @see Session#DUPS_OK_ACKNOWLEDGE
     * @since 1.1
     */
    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        checkClosedOrFailed();
        ensureConnectionInfoSent();
        if(!transacted) {
            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
            }
        }
        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
    }
createSession方法里有兩個參數,第一個參數表示是否使用事務,第二個參數表示消息的確認模式。消息的確認模式共有4種:
1:AUTO_ACKNOWLEDGE 自動確認
2:CLIENT_ACKNOWLEDGE 客戶端手動確認 
3:DUPS_OK_ACKNOWLEDGE 自動批量確認
0:SESSION_TRANSACTED 事務提交並確認
4:INDIVIDUAL_ACKNOWLEDGE 單條消息確認 為AcitveMQ自定義的ACK_MODE
各種確認模式詳細說明可以看文章:ActiveMQ訊息傳送機制以及ACK機制
從createSession方法中可以看出如果如果session不使用事務但是卻使用了消息提交(SESSION_TRANSACTED)確認模式,或使用的消息確認模式不存在,將拋出異常。

ActiveMQ接收消息

  1:創建鏈接工廠ConnectionFactory

  2:創建鏈接Connection

  3:啟動session

  4:創建消息發送目的地

  5:創建生產者

  6:接收消息或設置消息監聽器

消息接收類:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";
    
    private static final String PASSWORD = "admin";
    
    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
    
    private AtomicInteger count = new AtomicInteger();
    
    private ConnectionFactory connectionFactory;
    
    private ActiveMQConnection connection;
    
    private Session session;
    
    private Queue queue;
    
    private MessageConsumer consumer;
    
    public void init() {
        try {
            //創建一個鏈接工廠
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //從工廠中創建一個鏈接
            connection = (ActiveMQConnection) connectionFactory.createConnection();
            
            //啟動鏈接
            connection.start();
            //創建一個事物session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //獲取消息接收的目的地,指從哪里接收消息
            queue = session.createQueue("test");
            //獲取消息接收的消費者
            consumer = session.createConsumer(queue);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public void receiver(String queueName) {
        
        try {
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}
consumer.receive方法
/**
     * Receives the next message produced for this message consumer.
     * <P>
     * This call blocks indefinitely until a message is produced or until this
     * message consumer is closed.
     * <P>
     * If this <CODE>receive</CODE> is done within a transaction, the consumer
     * retains the message until the transaction commits.
     *
     * @return the next message produced for this message consumer, or null if
     *         this message consumer is concurrently closed
     */
    public Message receive() throws JMSException {
        checkClosed(); //檢查unconsumedMessages是否關閉 ,消費者從unconsumedMessages對象中獲取消息
        checkMessageListener(); //檢查是否有其他消費者使用了監聽器,同一消息消息隊列中不能采用reveice和messageListener並存消費消息

        sendPullCommand(0); //如果prefetchSize為空且unconsumedMessages為空 向JMS提供者發送一個拉取命令來拉取消息,為下次消費做准備
        MessageDispatch md = dequeue(-1); //從unconsumedMessages取出一個消息  if (md == null) {
            return null;
        }

        beforeMessageIsConsumed(md);
        afterMessageIsConsumed(md, false);

        return createActiveMQMessage(md);
    }
prefetchSize屬性如果大於0,消費者每次拉去消息時都會預先拉取一定量的消息,拉取的消息數量<=prefetchSize,prefetchSize默認指為1000,這個默認值是從connection中傳過來的
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
        checkClosed();

        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createConsumer(this, messageSelector, noLocal);
        }

        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
        int prefetch = 0;
        if (destination instanceof Topic) {
            prefetch = prefetchPolicy.getTopicPrefetch();
        } else {
            prefetch = prefetchPolicy.getQueuePrefetch();
        }
        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
    }

消息接收類代碼中調用session.createConsumer其實調用的就是上面的createConsumer方法,從上面代碼中可以看出connection會將自己的prefetch傳遞給消費者,connection中的ActiveMQPrefetchPolicy

對象屬性如下:

public class ActiveMQPrefetchPolicy extends Object implements Serializable {
    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
    public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
    public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
    public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;

    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class);

    private int queuePrefetch;
    private int queueBrowserPrefetch;
    private int topicPrefetch;
    private int durableTopicPrefetch;
    private int optimizeDurableTopicPrefetch;
    private int inputStreamPrefetch;
    private int maximumPendingMessageLimit;

    /**
     * Initialize default prefetch policies
     */
    public ActiveMQPrefetchPolicy() {
        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
        this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
        this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
    }

這里我只截了一部分代碼,可以看到隊列(queue)默認的queuePrefetch為1000,queuePrefetch的最大值不能超過MAX_PREFETCH_SIZE(32767)

當然我們也可以自己設置消費者預先拉取的消息數量,方法有兩種

一:在創建connection之后修改connection中的queuePrefetch;代碼如下:

ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(number);
connection.setPrefetchPolicy(prefetchPolicy);

二:在創建隊列(queue)的時候傳入參數,回到ActiveMQMessageConsumer的創建代碼中:

 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
            String name, String selector, int prefetch,
            int maximumPendingMessageCount, boolean noLocal, boolean browser,
            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
        if (dest == null) {
            throw new InvalidDestinationException("Don't understand null destinations");
        } else if (dest.getPhysicalName() == null) {
            throw new InvalidDestinationException("The destination object was not given a physical name.");
        } else if (dest.isTemporary()) {
            String physicalName = dest.getPhysicalName();

            if (physicalName == null) {
                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
            }

            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();

            if (physicalName.indexOf(connectionID) < 0) {
                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
            }

            if (session.connection.isDeleted(dest)) {
                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
            }
            if (prefetch < 0) {
                throw new JMSException("Cannot have a prefetch size less than zero");
            }
        }
        if (session.connection.isMessagePrioritySupported()) {
            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
        }else {
            this.unconsumedMessages = new FifoMessageDispatchChannel();
        }

        this.session = session;
        this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
        setTransformer(session.getTransformer());

        this.info = new ConsumerInfo(consumerId);
        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
        this.info.setSubscriptionName(name);
        this.info.setPrefetchSize(prefetch);
        this.info.setCurrentPrefetchSize(prefetch);
        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
        this.info.setNoLocal(noLocal);
        this.info.setDispatchAsync(dispatchAsync);
        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
        this.info.setSelector(null);

        // Allows the options on the destination to configure the consumerInfo
        if (dest.getOptions() != null) {
            Map<String, Object> options = IntrospectionSupport.extractProperties(
                new HashMap<String, Object>(dest.getOptions()), "consumer.");
            IntrospectionSupport.setProperties(this.info, options);
            if (options.size() > 0) {
                String msg = "There are " + options.size()
                    + " consumer options that couldn't be set on the consumer."
                    + " Check the options are spelled correctly."
                    + " Unknown parameters=[" + options + "]."
                    + " This consumer cannot be started.";
                LOG.warn(msg);
                throw new ConfigurationException(msg);
            }
        }

        this.info.setDestination(dest);
        this.info.setBrowser(browser);
        if (selector != null && selector.trim().length() != 0) {
            // Validate the selector
            SelectorParser.parse(selector);
            this.info.setSelector(selector);
            this.selector = selector;
        } else if (info.getSelector() != null) {
            // Validate the selector
            SelectorParser.parse(this.info.getSelector());
            this.selector = this.info.getSelector();
        } else {
            this.selector = null;
        }

        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
                                   && !info.isBrowser();
        if (this.optimizeAcknowledge) {
            this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
            setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
        }

        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
        this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
        this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
        if (messageListener != null) {
            setMessageListener(messageListener);
        }
        try {
            this.session.addConsumer(this);
            this.session.syncSendPacket(info);
        } catch (JMSException e) {
            this.session.removeConsumer(this);
            throw e;
        }

        if (session.connection.isStarted()) {
            start();
        }
    }

this.info.setPrefetchSize(prefetch);

又上面代碼可以看出,在創建ActiveMQMessageConsumer的過程中,程序會將connection中的queuePrefetch賦給ActiveMQMessageConsumer對象中的info對象(info為一個ConsumerInfo對象)

Map<String, Object> options = IntrospectionSupport.extractProperties(
                new HashMap<String, Object>(dest.getOptions()), "consumer.");
            IntrospectionSupport.setProperties(this.info, options);

在創建隊列(queue)的過程中,我們可以傳一些參數來配置消費者,這些參數的前綴必須為consumer. ,當我們傳的參數與info對象中的屬性匹配時,將覆蓋info對象中的屬性值,其傳參形式如下:

queueName?param1=value1&param2=value2

所以我們如果想改變消費者預先拉取的消息數量,可以在創建對象的時候傳入如下參數

queue = session.createQueue("test?consumer.prefetchSize=number");

 

ActiveMq接收消息--監聽器

監聽器代碼如下:

package com.apt.study.util.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ReceiveListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
             TextMessage msg = (TextMessage) message;
             if(msg!=null) {
                 System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText());
             }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

消息接收類:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";
    
    private static final String PASSWORD = "admin";
    
    private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
    
    private AtomicInteger count = new AtomicInteger();
    
    private ConnectionFactory connectionFactory;
    
    private ActiveMQConnection connection;
    
    private Session session;
    
    private Queue queue;
    
    private MessageConsumer consumer;
    
    public void init() {
        try {
            //創建一個鏈接工廠
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
             //從工廠中創建一個鏈接
            connection = (ActiveMQConnection) connectionFactory.createConnection();
            
            //啟動鏈接
            connection.start();
            //創建一個事物session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            queue = session.createQueue("test");
            
            consumer = session.createConsumer(queue);
            //設置消息監聽器
            consumer.setMessageListener(new ReceiveListener());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
}
consumer.setMessageListener方法:
public void setMessageListener(MessageListener listener) throws JMSException {
        checkClosed();
        if (info.getPrefetchSize() == 0) {
            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
        }
        if (listener != null) {
            boolean wasRunning = session.isRunning();
            if (wasRunning) {
                session.stop();
            }

            this.messageListener.set(listener);
            session.redispatch(this, unconsumedMessages);

            if (wasRunning) {
                session.start();
            }
        } else {
            this.messageListener.set(null);
        }
    }

從代碼中可以看出,當我們使用監聽器時,消費者prefetchSize必須大於0

 

 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM