概述
對Acknowledge機制進行測試。
此處的測試是針對Consumer的確認設計的;對於Producer的確認是透明的,無法提供測試。
測試實例
設計demo,測試三種確認機制。
測試機制 | 測試實例 | 結果預測 |
AUTO_ACKNOWLEDGE | 接收正常 | 消息出隊量=消息入隊量 |
接收異常 | 消息出隊量=0 | |
CLIENT_ACKNOWLEDGE | 1次確認/2條消息 - 每2條消息確認1次 | 每次確認2條信息 |
從不確認 | 消息出隊量=0 | |
DUPS_OK_ACKNOWLEDGE | 每一次接收消息后,使線程睡眠數秒;觀察消息出隊情況 | 符合批量確認、延遲確認的特點 |
demo設計
demo設計圖

測試分工
測試類 | 測試方法 |
AutoAckConsumer.java - 測試AUTO_ACKNOWLEDGE |
receiveNormal():void - 測試“接收正常” |
receiveIntentionalException():void - 測試“接收異常” |
|
ClientAckConsumer.java - 測試CLIENT_ACKNOWLEDGE |
receivePerTwice():void - 測試“1次確認/2條消息” |
receiveWithoutAck():void - 測試“從不確認” |
|
DupsOkAckConsumer.java - 測試DUPS_OK_ACKNOWLEDGE |
receive():void - 測試批量確認和延遲確認 |
測試步驟和結果
1.測試AUTO_ACKNOWLEDGE
1.1.接收正常
測試步驟 |
|
測試截圖 |
![]() |
1.2.接收異常
測試步驟 |
|
測試截圖 |
![]() |
結論整理 |
|
2.測試CLIENT_ACKNOWLEDGE
2.1.每2條消息確認1次
測試步驟 |
|
測試截圖 |
![]() |
結論整理 |
每次確認不是只對當前的Message進行確認,而是對自上次確認以來的所有Message進行確認.在這里,每次確認2條. |
2.2.從不確認
測試步驟 |
|
測試截圖 |
![]() |
3.測試DUPS_OK_ACKNOWLEDGE
測試步驟 |
|
結論整理 |
|
代碼
文件目錄結構
1 jms-producer 2 |---- src/main/resources/ 3 |---- jndi.properties 4 |---- src/main/java/ 5 |---- cn.sinobest.asj.producer.jms.acknowledge 6 |---- SimpleProducer.java # 發送 7 jms-consumer 8 |---- src/main/resources/ 9 |---- jndi.properties 10 |---- src/main/java/ 11 |---- cn.sinobest.asj.consumer.jms.acknowledge 12 |---- AutoAckConsumer.java # 測試AUTO_ACKNOWLEDGE 13 |---- ClientAckConsumer.java # 測試AUTO_ACKNOWLEDGE 14 |---- DupsOkAckConsumer.java # 測試DUPS_OK_ACKNOWLEDGE
文件內容
1.jndi.properties
jms-producer端
1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory 2 3 # use the following property to configure the default connector 4 java.naming.provider.url=tcp://localhost:61616 5 6 # register some queues in JNDI using the form 7 # queue.[jndiName] = [physicalName] 8 queue.exampleQueue=example.queue 9 10 # register some topics in JNDI using the form 11 # topic.[jndiName] = [physicalName] 12 topic.exampleTopic=example.topic
jms-consumer端
1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory 2 3 # use the following property to configure the default connector 4 java.naming.provider.url=tcp://localhost:61616 5 6 # register some queues in JNDI using the form 7 # queue.[jndiName] = [physicalName] 8 queue.exampleQueue=example.queue 9 10 # register some topics in JNDI using the form 11 # topic.[jndiName] = [physicalName] 12 topic.exampleTopic=example.topic
2.SimpleProducer.java

1 package cn.sinobest.asj.producer.jms.acknowledge; 2 import javax.jms.Connection; 3 import javax.jms.ConnectionFactory; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.MessageProducer; 7 import javax.jms.Session; 8 import javax.jms.TextMessage; 9 import javax.naming.Context; 10 import javax.naming.InitialContext; 11 import javax.naming.NamingException; 12 import org.junit.Test; 13 /** 14 * A simple demo for producer client to send message to ActiveMQ.<br> 15 * 對{@link cn.sinobest.asj.producer.jms.clientmode.SimpleProducer}的改進. 16 * 17 * @author lijinlong 18 * 19 */ 20 public class SimpleProducer { 21 /** JNDI name for ConnectionFactory */ 22 static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory"; 23 /** JNDI name for Queue Destination (use for PTP Mode) */ 24 static final String QUEUE_JNDI_NAME = "exampleQueue"; 25 /** JNDI name for Topic Destination (use for Pub/Sub Mode) */ 26 static final String TOPIC_JNDI_NAME = "exampleTopic"; 27 /** 28 * 發送消息到隊列.<br> 29 * PTP Mode. 30 */ 31 @Test 32 public void sendToQueue() { 33 send(QUEUE_JNDI_NAME); 34 } 35 36 /** 37 * 發送消息到主題.<br> 38 * PTP Mode. 39 */ 40 @Test 41 public void sendToTopic() { 42 send(TOPIC_JNDI_NAME); 43 } 44 /** 45 * 發送到指定的目的地. 46 * 47 * @param destJndiName 48 * 目的地的JNDI name:{@link #QUEUE_JNDI_NAME}或 49 * {@link #TOPIC_JNDI_NAME}. 50 */ 51 private void send(String destJndiName) { 52 Context jndiContext = null; 53 ConnectionFactory connectionFactory = null; 54 Connection connection = null; 55 Session session = null; 56 Destination destination = null; 57 MessageProducer producer = null; 58 // create a JNDI API IntialContext object 59 try { 60 jndiContext = new InitialContext(); 61 } catch (NamingException e) { 62 System.out.println("Could not create JNDI Context:" 63 + e.getMessage()); 64 System.exit(1); 65 } 66 // look up ConnectionFactory and Destination 67 try { 68 connectionFactory = (ConnectionFactory) jndiContext 69 .lookup(CONNECTION_FACTORY_JNDI_NAME); 70 destination = (Destination) jndiContext.lookup(destJndiName); 71 } catch (NamingException e) { 72 System.out.println("JNDI look up failed:" + e.getMessage()); 73 System.exit(1); 74 } 75 // send Messages and finally release the resources. 76 try { 77 connection = connectionFactory.createConnection(); 78 session = connection.createSession(Boolean.FALSE, 79 Session.AUTO_ACKNOWLEDGE); 80 producer = session.createProducer(destination); 81 TextMessage message = session.createTextMessage(); 82 for (int i = 0; i < 3; i++) { 83 message.setText(String.format("This is the %dth message.", 84 i + 1)); 85 producer.send(message); 86 } 87 } catch (JMSException e) { 88 e.printStackTrace(); 89 } finally { 90 try { 91 if (session != null) 92 session.close(); 93 if (connection != null) 94 connection.close(); 95 } catch (JMSException e) { 96 e.printStackTrace(); 97 } 98 } 99 } 100 }
3.AutoAckConsumer.java

1 package cn.sinobest.asj.consumer.jms.acknowledge; 2 import javax.jms.Connection; 3 import javax.jms.ConnectionFactory; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.MessageConsumer; 8 import javax.jms.MessageListener; 9 import javax.jms.Session; 10 import javax.jms.TextMessage; 11 import javax.naming.Context; 12 import javax.naming.InitialContext; 13 import javax.naming.NamingException; 14 import org.junit.Test; 15 import cn.sinobest.asj.consumer.util.Hold; 16 /** 17 * AUTO_ACKNOWLEDGE確認模式的Consumer.<br> 18 * 基於PTP Mode,采用異步的方式接收消息,研究拋出或不拋出異常的情況下,Queue中的消息的出隊情況.<br> 19 * 20 * @author lijinlong 21 * 22 */ 23 public class AutoAckConsumer { 24 /** JNDI name for ConnectionFactory */ 25 static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory"; 26 /** JNDI name for Queue Destination (use for PTP Mode) */ 27 static final String QUEUE_JNDI_NAME = "exampleQueue"; 28 /** 29 * 正常的接收.<br> 30 */ 31 @Test 32 public void receiveNormal() { 33 MessageListener listener = new MessageListener() { 34 public void onMessage(Message message) { 35 try { 36 String text = ((TextMessage) message).getText(); 37 System.out.println(text); 38 } catch (JMSException e) { 39 e.printStackTrace(); 40 } 41 } 42 }; 43 receive(listener); 44 } 45 /** 46 * 故意拋出異常的接收.<br> 47 * 結果: 48 * <ul> 49 * <li>JMS Provider重復發送消息給Consumer。重復次數達到一定的閥值,JMS 50 * Provider認為此消息無法消費,此消息將會被刪除或者遷移到"dead letter"通道中。</li> 51 * <li>在測試過程中,會重發6次(共發7次),然后移到ActiveMQ.DLQ隊列;DLQ - dead letter queue.</li> 52 * <li>重發次數可以配置 - 53 * 在brokerUrl中指定參數jms.redeliveryPolicy.maximumRedeliveries=3,則重發3次(共4次).</li> 54 * </ul> 55 */ 56 @Test 57 public void receiveIntentionalException() { 58 MessageListener listener = new MessageListener() { 59 public void onMessage(Message message) { 60 try { 61 String text = ((TextMessage) message).getText(); 62 System.out.println(text); 63 } catch (JMSException e) { 64 e.printStackTrace(); 65 } 66 boolean intentional = true; 67 if (intentional) { 68 throw new RuntimeException("故意拋出的異常。"); 69 } 70 } 71 }; 72 receive(listener); 73 } 74 75 /** 76 * 接收消息.<br> 77 * 78 * @param listener 79 * 監聽器,如果消息接收成功,將被回調. 80 */ 81 private void receive(MessageListener listener) { 82 Context jndiContext = null; 83 ConnectionFactory connectionFactory = null; 84 Connection connection = null; 85 Session session = null; 86 Destination destination = null; 87 MessageConsumer consumer = null; 88 // create a JNDI API IntialContext object 89 try { 90 jndiContext = new InitialContext(); 91 } catch (NamingException e) { 92 System.out.println("Could not create JNDI Context:" 93 + e.getMessage()); 94 System.exit(1); 95 } 96 // look up ConnectionFactory and Destination 97 try { 98 connectionFactory = (ConnectionFactory) jndiContext 99 .lookup(CONNECTION_FACTORY_JNDI_NAME); 100 destination = (Destination) jndiContext.lookup(QUEUE_JNDI_NAME); 101 } catch (NamingException e) { 102 System.out.println("JNDI look up failed:" + e.getMessage()); 103 System.exit(1); 104 } 105 // receive Messages and finally release the resources. 106 try { 107 connection = connectionFactory.createConnection(); 108 connection.start(); // connection should be called in 109 // receiver-client 110 session = connection.createSession(Boolean.FALSE, 111 Session.AUTO_ACKNOWLEDGE); 112 consumer = session.createConsumer(destination); 113 // key code for asynchronous receive:set messageListener 114 consumer.setMessageListener(listener); 115 Hold.hold(); // 阻塞程序繼續執行 116 } catch (JMSException e) { 117 e.printStackTrace(); 118 } finally { 119 try { 120 if (session != null) 121 session.close(); 122 if (connection != null) 123 connection.close(); 124 } catch (JMSException e) { 125 e.printStackTrace(); 126 } 127 } 128 } 129 }
4.ClientAckConsumer.java

1 package cn.sinobest.asj.consumer.jms.acknowledge; 2 import javax.jms.Connection; 3 import javax.jms.ConnectionFactory; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.MessageConsumer; 8 import javax.jms.MessageListener; 9 import javax.jms.Session; 10 import javax.jms.TextMessage; 11 import javax.naming.Context; 12 import javax.naming.InitialContext; 13 import javax.naming.NamingException; 14 import org.junit.Test; 15 import cn.sinobest.asj.consumer.util.Hold; 16 /** 17 * CLIENT_ACKNOWLEDGE確認模式的Consumer.<br> 18 * 基於PTP Mode,采用異步的方式接收消息,研究從不確認、每2次確認的情況下,Queue中的消息的出隊情況.<br> 19 * 20 * @author lijinlong 21 * 22 */ 23 public class ClientAckConsumer { 24 /** JNDI name for ConnectionFactory */ 25 static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory"; 26 /** JNDI name for Queue Destination (use for PTP Mode) */ 27 static final String QUEUE_JNDI_NAME = "exampleQueue"; 28 /** 29 * 從不確認的接收.<br> 30 * 結果: 31 * <ul> 32 * <li>只接收一次,但是消息不會出隊.</li> 33 * <li>Consumer重啟,會再次接收到消息.</li> 34 * </ul> 35 */ 36 @Test 37 public void receiveWithoutAck() { 38 MessageListener listener = new MessageListener() { 39 public void onMessage(Message message) { 40 try { 41 String text = ((TextMessage) message).getText(); 42 System.out.println(text); 43 } catch (JMSException e) { 44 e.printStackTrace(); 45 } 46 } 47 }; 48 receive(listener); 49 } 50 51 private int ack_count = 0; // 確認次數統計 52 /** 53 * 每接收兩次確認一次.<br> 54 * 結果:每次確認不是只對當前的Message進行確認,而是對自上次確認以來的所有Message進行確認.在這里,每次確認2條. 55 */ 56 @Test 57 public void receivePerTwice() { 58 MessageListener listener = new MessageListener() { 59 public void onMessage(Message message) { 60 try { 61 String text = ((TextMessage) message).getText(); 62 System.out.println(text); 63 64 ack_count ++; 65 if (ack_count % 2 == 0) 66 message.acknowledge(); 67 68 } catch (JMSException e) { 69 e.printStackTrace(); 70 } 71 } 72 }; 73 receive(listener); 74 } 75 /** 76 * 接收消息.<br> 77 * 78 * @param listener 79 * 監聽器,如果消息接收成功,將被回調. 80 */ 81 private void receive(MessageListener listener) { 82 Context jndiContext = null; 83 ConnectionFactory connectionFactory = null; 84 Connection connection = null; 85 Session session = null; 86 Destination destination = null; 87 MessageConsumer consumer = null; 88 // create a JNDI API IntialContext object 89 try { 90 jndiContext = new InitialContext(); 91 } catch (NamingException e) { 92 System.out.println("Could not create JNDI Context:" 93 + e.getMessage()); 94 System.exit(1); 95 } 96 // look up ConnectionFactory and Destination 97 try { 98 connectionFactory = (ConnectionFactory) jndiContext 99 .lookup(CONNECTION_FACTORY_JNDI_NAME); 100 destination = (Destination) jndiContext.lookup(QUEUE_JNDI_NAME); 101 } catch (NamingException e) { 102 System.out.println("JNDI look up failed:" + e.getMessage()); 103 System.exit(1); 104 } 105 // receive Messages and finally release the resources. 106 try { 107 connection = connectionFactory.createConnection(); 108 connection.start(); // connection should be called in 109 // receiver-client 110 session = connection.createSession(Boolean.FALSE, 111 Session.CLIENT_ACKNOWLEDGE); 112 consumer = session.createConsumer(destination); 113 // key code for asynchronous receive:set messageListener 114 consumer.setMessageListener(listener); 115 Hold.hold(); // 阻塞程序繼續執行 116 } catch (JMSException e) { 117 e.printStackTrace(); 118 } finally { 119 try { 120 if (session != null) 121 session.close(); 122 if (connection != null) 123 connection.close(); 124 } catch (JMSException e) { 125 e.printStackTrace(); 126 } 127 } 128 } 129 }
5.DupsOkAckConsumer.java

1 package cn.sinobest.asj.consumer.jms.acknowledge; 2 import javax.jms.Connection; 3 import javax.jms.ConnectionFactory; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.MessageConsumer; 8 import javax.jms.MessageListener; 9 import javax.jms.Session; 10 import javax.jms.TextMessage; 11 import javax.naming.Context; 12 import javax.naming.InitialContext; 13 import javax.naming.NamingException; 14 import org.junit.Test; 15 import cn.sinobest.asj.consumer.util.Hold; 16 /** 17 * DUPS_OK_ACKNOWLEDGE確認模式的Consumer.<br> 18 * @author lijinlong 19 * 20 */ 21 public class DupsOkAckConsumer { 22 /** JNDI name for ConnectionFactory */ 23 static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory"; 24 /** JNDI name for Topic Destination (use for Pub/Sub Mode) */ 25 static final String TOPIC_JNDI_NAME = "exampleTopic"; 26 27 /** 28 * 從主題接收消息. 29 */ 30 @Test 31 public void receive() { 32 receive(createMessageListener()); 33 } 34 35 /** 36 * 創建MessageListener實例. 37 * @return 38 */ 39 private MessageListener createMessageListener() { 40 MessageListener listener = new MessageListener() { 41 public void onMessage(Message message) { 42 try { 43 String text = ((TextMessage) message).getText(); 44 System.out.println(text); 45 } catch (JMSException e) { 46 e.printStackTrace(); 47 } 48 49 try { 50 Thread.sleep(5 * 1000); 51 } catch (InterruptedException e) { 52 e.printStackTrace(); 53 } 54 } 55 }; 56 57 return listener; 58 } 59 60 /** 61 * 接收消息.<br> 62 * 63 * @param listener 64 * 監聽器,如果消息接收成功,將被回調. 65 */ 66 private void receive(MessageListener listener) { 67 Context jndiContext = null; 68 ConnectionFactory connectionFactory = null; 69 Connection connection = null; 70 Session session = null; 71 Destination destination = null; 72 MessageConsumer consumer = null; 73 // create a JNDI API IntialContext object 74 try { 75 jndiContext = new InitialContext(); 76 } catch (NamingException e) { 77 System.out.println("Could not create JNDI Context:" 78 + e.getMessage()); 79 System.exit(1); 80 } 81 // look up ConnectionFactory and Destination 82 try { 83 connectionFactory = (ConnectionFactory) jndiContext 84 .lookup(CONNECTION_FACTORY_JNDI_NAME); 85 destination = (Destination) jndiContext.lookup(TOPIC_JNDI_NAME); 86 } catch (NamingException e) { 87 System.out.println("JNDI look up failed:" + e.getMessage()); 88 System.exit(1); 89 } 90 // receive Messages and finally release the resources. 91 try { 92 connection = connectionFactory.createConnection(); 93 connection.start(); // connection should be called in 94 // receiver-client 95 session = connection.createSession(Boolean.FALSE, 96 Session.DUPS_OK_ACKNOWLEDGE); 97 consumer = session.createConsumer(destination); 98 // key code for asynchronous receive:set messageListener 99 consumer.setMessageListener(listener); 100 Hold.hold(); // 阻塞程序繼續執行 101 } catch (JMSException e) { 102 e.printStackTrace(); 103 } finally { 104 try { 105 if (session != null) 106 session.close(); 107 if (connection != null) 108 connection.close(); 109 } catch (JMSException e) { 110 e.printStackTrace(); 111 } 112 } 113 } 114 }