ActiveMQ發送消息
1:創建鏈接工廠ConnectionFactory
2:創建鏈接Connection
3:啟動session
4:創建消息發送目的地
5:創建生產者
6:發送消息
消息發送類:
package com.apt.study.util.activemq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKEN_URL = "tcp://127.0.0.1:61616"; private AtomicInteger count = new AtomicInteger(); private ConnectionFactory connectionFactory; private Connection connection; private Session session; private Queue queue; private MessageProducer producer; public void init() { try { //創建一個鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //從工廠中創建一個鏈接 connection = connectionFactory.createConnection(); //啟動鏈接,不啟動不影響消息的發送,但影響消息的接收 connection.start(); //創建一個事物session session = connection.createSession(true, Session.SESSION_TRANSACTED); //獲取消息發送的目的地,指消息發往那個地方 queue = session.createQueue("test"); //獲取消息發送的生產者 producer = session.createProducer(queue); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void sendMsg(String queueName) { try { int num = count.getAndIncrement(); TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:生產者發送消息!,count:"+num); producer.send(msg); session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
connection.createSession方法
/** * Creates a <CODE>Session</CODE> object. * * @param transacted indicates whether the session is transacted * @param acknowledgeMode indicates whether the consumer or the client will * acknowledge any messages it receives; ignored if the * session is transacted. Legal values are * <code>Session.AUTO_ACKNOWLEDGE</code>, * <code>Session.CLIENT_ACKNOWLEDGE</code>, and * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. * @return a newly created session * @throws JMSException if the <CODE>Connection</CODE> object fails to * create a session due to some internal error or lack of * support for the specific transaction and acknowledgement * mode. * @see Session#AUTO_ACKNOWLEDGE * @see Session#CLIENT_ACKNOWLEDGE * @see Session#DUPS_OK_ACKNOWLEDGE * @since 1.1 */ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); if(!transacted) { if (acknowledgeMode==Session.SESSION_TRANSACTED) { throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); } } return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); }
createSession方法里有兩個參數,第一個參數表示是否使用事務,第二個參數表示消息的確認模式。消息的確認模式共有4種:
1:AUTO_ACKNOWLEDGE 自動確認
2:CLIENT_ACKNOWLEDGE 客戶端手動確認
3:DUPS_OK_ACKNOWLEDGE 自動批量確認
0:SESSION_TRANSACTED 事務提交並確認
4:INDIVIDUAL_ACKNOWLEDGE 單條消息確認 為AcitveMQ自定義的ACK_MODE
各種確認模式詳細說明可以看文章:ActiveMQ訊息傳送機制以及ACK機制
從createSession方法中可以看出如果如果session不使用事務但是卻使用了消息提交(SESSION_TRANSACTED)確認模式,或使用的消息確認模式不存在,將拋出異常。
ActiveMQ接收消息
1:創建鏈接工廠ConnectionFactory
2:創建鏈接Connection
3:啟動session
4:創建消息發送目的地
5:創建生產者
6:接收消息或設置消息監聽器
消息接收類:
package com.apt.study.util.activemq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; public class Receiver { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKEN_URL = "tcp://127.0.0.1:61616"; private AtomicInteger count = new AtomicInteger(); private ConnectionFactory connectionFactory; private ActiveMQConnection connection; private Session session; private Queue queue; private MessageConsumer consumer; public void init() { try { //創建一個鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //從工廠中創建一個鏈接 connection = (ActiveMQConnection) connectionFactory.createConnection(); //啟動鏈接 connection.start(); //創建一個事物session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //獲取消息接收的目的地,指從哪里接收消息 queue = session.createQueue("test"); //獲取消息接收的消費者 consumer = session.createConsumer(queue); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void receiver(String queueName) { try { TextMessage msg = (TextMessage) consumer.receive(); if(msg!=null) { System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement()); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
consumer.receive方法
/** * Receives the next message produced for this message consumer. * <P> * This call blocks indefinitely until a message is produced or until this * message consumer is closed. * <P> * If this <CODE>receive</CODE> is done within a transaction, the consumer * retains the message until the transaction commits. * * @return the next message produced for this message consumer, or null if * this message consumer is concurrently closed */ public Message receive() throws JMSException { checkClosed(); //檢查unconsumedMessages是否關閉 ,消費者從unconsumedMessages對象中獲取消息 checkMessageListener(); //檢查是否有其他消費者使用了監聽器,同一消息消息隊列中不能采用reveice和messageListener並存消費消息 sendPullCommand(0); //如果prefetchSize為空且unconsumedMessages為空 向JMS提供者發送一個拉取命令來拉取消息,為下次消費做准備 MessageDispatch md = dequeue(-1); //從unconsumedMessages取出一個消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); }
prefetchSize屬性如果大於0,消費者每次拉去消息時都會預先拉取一定量的消息,拉取的消息數量<=prefetchSize,prefetchSize默認指為1000,這個默認值是從connection中傳過來的
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { checkClosed(); if (destination instanceof CustomDestination) { CustomDestination customDestination = (CustomDestination)destination; return customDestination.createConsumer(this, messageSelector, noLocal); } ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); int prefetch = 0; if (destination instanceof Topic) { prefetch = prefetchPolicy.getTopicPrefetch(); } else { prefetch = prefetchPolicy.getQueuePrefetch(); } ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); }
消息接收類代碼中調用session.createConsumer其實調用的就是上面的createConsumer方法,從上面代碼中可以看出connection會將自己的prefetch傳遞給消費者,connection中的ActiveMQPrefetchPolicy
對象屬性如下:
public class ActiveMQPrefetchPolicy extends Object implements Serializable { public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE; public static final int DEFAULT_QUEUE_PREFETCH = 1000; public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500; public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100; public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000; public static final int DEFAULT_INPUT_STREAM_PREFETCH=100; public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE; private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class); private int queuePrefetch; private int queueBrowserPrefetch; private int topicPrefetch; private int durableTopicPrefetch; private int optimizeDurableTopicPrefetch; private int inputStreamPrefetch; private int maximumPendingMessageLimit; /** * Initialize default prefetch policies */ public ActiveMQPrefetchPolicy() { this.queuePrefetch = DEFAULT_QUEUE_PREFETCH; this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH; this.topicPrefetch = DEFAULT_TOPIC_PREFETCH; this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH; this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH; this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH; }
這里我只截了一部分代碼,可以看到隊列(queue)默認的queuePrefetch為1000,queuePrefetch的最大值不能超過MAX_PREFETCH_SIZE(32767)
當然我們也可以自己設置消費者預先拉取的消息數量,方法有兩種
一:在創建connection之后修改connection中的queuePrefetch;代碼如下:
ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(number);
connection.setPrefetchPolicy(prefetchPolicy);
二:在創建隊列(queue)的時候傳入參數,回到ActiveMQMessageConsumer的創建代碼中:
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener) throws JMSException { if (dest == null) { throw new InvalidDestinationException("Don't understand null destinations"); } else if (dest.getPhysicalName() == null) { throw new InvalidDestinationException("The destination object was not given a physical name."); } else if (dest.isTemporary()) { String physicalName = dest.getPhysicalName(); if (physicalName == null) { throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); } String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); if (physicalName.indexOf(connectionID) < 0) { throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); } if (session.connection.isDeleted(dest)) { throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); } if (prefetch < 0) { throw new JMSException("Cannot have a prefetch size less than zero"); } } if (session.connection.isMessagePrioritySupported()) { this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); }else { this.unconsumedMessages = new FifoMessageDispatchChannel(); } this.session = session; this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest); setTransformer(session.getTransformer()); this.info = new ConsumerInfo(consumerId); this.info.setExclusive(this.session.connection.isExclusiveConsumer()); this.info.setSubscriptionName(name); this.info.setPrefetchSize(prefetch); this.info.setCurrentPrefetchSize(prefetch); this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); this.info.setNoLocal(noLocal); this.info.setDispatchAsync(dispatchAsync); this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); this.info.setSelector(null); // Allows the options on the destination to configure the consumerInfo if (dest.getOptions() != null) { Map<String, Object> options = IntrospectionSupport.extractProperties( new HashMap<String, Object>(dest.getOptions()), "consumer."); IntrospectionSupport.setProperties(this.info, options); if (options.size() > 0) { String msg = "There are " + options.size() + " consumer options that couldn't be set on the consumer." + " Check the options are spelled correctly." + " Unknown parameters=[" + options + "]." + " This consumer cannot be started."; LOG.warn(msg); throw new ConfigurationException(msg); } } this.info.setDestination(dest); this.info.setBrowser(browser); if (selector != null && selector.trim().length() != 0) { // Validate the selector SelectorParser.parse(selector); this.info.setSelector(selector); this.selector = selector; } else if (info.getSelector() != null) { // Validate the selector SelectorParser.parse(this.info.getSelector()); this.selector = this.info.getSelector(); } else { this.selector = null; } this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !info.isBrowser(); if (this.optimizeAcknowledge) { this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut(); setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval()); } this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery; if (messageListener != null) { setMessageListener(messageListener); } try { this.session.addConsumer(this); this.session.syncSendPacket(info); } catch (JMSException e) { this.session.removeConsumer(this); throw e; } if (session.connection.isStarted()) { start(); } }
this.info.setPrefetchSize(prefetch);
又上面代碼可以看出,在創建ActiveMQMessageConsumer的過程中,程序會將connection中的queuePrefetch賦給ActiveMQMessageConsumer對象中的info對象(info為一個ConsumerInfo對象)
Map<String, Object> options = IntrospectionSupport.extractProperties( new HashMap<String, Object>(dest.getOptions()), "consumer."); IntrospectionSupport.setProperties(this.info, options);
在創建隊列(queue)的過程中,我們可以傳一些參數來配置消費者,這些參數的前綴必須為consumer. ,當我們傳的參數與info對象中的屬性匹配時,將覆蓋info對象中的屬性值,其傳參形式如下:
queueName?param1=value1¶m2=value2
所以我們如果想改變消費者預先拉取的消息數量,可以在創建對象的時候傳入如下參數
queue = session.createQueue("test?consumer.prefetchSize=number");
ActiveMq接收消息--監聽器
監聽器代碼如下:
package com.apt.study.util.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ReceiveListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage msg = (TextMessage) message; if(msg!=null) { System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消息接收類:
package com.apt.study.util.activemq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; public class Receiver { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKEN_URL = "tcp://127.0.0.1:61616"; private AtomicInteger count = new AtomicInteger(); private ConnectionFactory connectionFactory; private ActiveMQConnection connection; private Session session; private Queue queue; private MessageConsumer consumer; public void init() { try { //創建一個鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //從工廠中創建一個鏈接 connection = (ActiveMQConnection) connectionFactory.createConnection(); //啟動鏈接 connection.start(); //創建一個事物session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue("test"); consumer = session.createConsumer(queue); //設置消息監聽器 consumer.setMessageListener(new ReceiveListener()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
consumer.setMessageListener方法:
public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); if (wasRunning) { session.stop(); } this.messageListener.set(listener); session.redispatch(this, unconsumedMessages); if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
從代碼中可以看出,當我們使用監聽器時,消費者prefetchSize必須大於0