ActiveMQ_點對點隊列(二)


 
一、本文章包含的內容
1、列舉了ActiveMQ中通過Queue方式發送、消費隊列的代碼(普通文本、json/xml字符串、對象數據)
2、spring+activemq方式
 
二、配置信息
1、activemq的pom.xml信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!--activemq  Begin-->
        < dependency >
            < groupId >org.springframework</ groupId >
            < artifactId >spring-jms</ artifactId >
            < version >${spring.version}</ version >
        </ dependency >
        <!-- <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-messaging</artifactId>
             <version>${spring.version}</version>
         </dependency>-->
        < dependency >
            < groupId >org.apache.activemq</ groupId >
            < artifactId >activemq-all</ artifactId >
            < version >5.14.0</ version >
        </ dependency >
        <!--activemq  End-->

2、activemq的配置文件:spring-jms.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<!-- 啟用spring mvc 注解 -->
    < context:component-scan base-package = "org.soa.test.activemq" />
 
    <!-- 配置JMS連接工廠 -->
    < bean id = "connectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >
        < property name = "brokerURL" value = "failover:(tcp://192.168.146.129:61616)" />
        <!--解決接收消息拋出異常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke-->
        < property name = "trustAllPackages" value = "true" />
        <!-- 是否異步發送 -->
        < property name = "useAsyncSend" value = "true" />
    </ bean >
 
    <!--   Queue模式 Begin -->
 
    <!-- 定義消息隊列(Queue) -->
    < bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >
        <!-- 設置消息隊列的名字 -->
        < constructor-arg >
            < value >defaultQueueName</ value >
        </ constructor-arg >
    </ bean >
 
    <!-- 配置JMS模板,Spring提供的JMS工具類,它發送、接收消息。(Queue) -->
    < bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" >
        < property name = "connectionFactory" ref = "connectionFactory" />
        < property name = "defaultDestination" ref = "queueDestination" />
        < property name = "pubSubDomain" value = "false" />
        <!--接收超時時間-->
        <!--<property name="receiveTimeout" value="10000" />-->
    </ bean >
    <!--   Queue模式 End -->

 

三、隊列發送端及測試程序

1、發送代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package org.soa.test.activemq.queues;
 
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import java.util.List;
 
/**
  * Created by JamesC on 16-9-22.
  */
@Component
public class ProduceMsg {
 
     @Autowired
     private JmsTemplate jmsTemplate;
 
     /**
      * 向指定隊列發送消息
      */
     public void sendMessage(Destination destination, final String msg) {
         System.out.println( "向隊列" + destination.toString() + "發送了消息------------" + msg);
         jmsTemplate.send(destination, new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 return session.createTextMessage(msg);
             }
         });
     }
 
     /**
      * 向默認隊列發送消息(默認隊列名稱在bean:queueDestination配置)
      */
     public void sendMessage( final String msg) {
         //queue://queue1
         String destination = jmsTemplate.getDefaultDestination().toString();
         System.out.println( "向隊列" + destination + "發送了消息------------" + msg);
         jmsTemplate.send( new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 return session.createTextMessage(msg);
             }
         });
     }
 
     /**
      * 向默認隊列發送消息
      */
     public void sendMessageConvertAndSend( final Object msg) {
 
         String destination = jmsTemplate.getDefaultDestination().toString();
         System.out.println( "向隊列" + destination + "發送了消息------------" + msg);
         //使用內嵌的MessageConverter進行數據類型轉換,包括xml(JAXB)、json(Jackson)、普通文本、字節數組
         jmsTemplate.convertAndSend(destination, msg);
     }
 
     /**
      * 向指定隊列發送消息
      */
     public void sendStudentInfo(Destination destination, final StudentInfo msg) {
         System.out.println( "向隊列" + destination.toString() + "發送了消息------------" + msg);
         jmsTemplate.send(destination, new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 return session.createObjectMessage(msg);
             }
         });
     }
}

 

2、測試程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package org.soa.test.activemq.queues;
 
import com.alibaba.fastjson.JSON;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
import javax.jms.Destination;
import java.util.Date;
 
 
/**
  * Created by JamesC on 16-9-22.
  */
@RunWith (SpringJUnit4ClassRunner. class )
@ContextConfiguration ( "/spring-jms.xml" )
public class ProduceMsgTest extends AbstractJUnit4SpringContextTests {
 
     @Autowired
     protected ApplicationContext ctx;
 
     /**
      * 隊列名queue1  這里使用jms配置文件中的數據
      */
     @Autowired
     private Destination queueDestination;
 
     /**
      * 隊列消息生產者
      */
     @Autowired
     private ProduceMsg produceMessage;
 
 
     //向默認隊列發消息(文本)
     @Test
     public void produceMsg_DefaultQueue() {
         String msg = "這里是向默認隊列發送的消息" + new Date().toString();
         produceMessage.sendMessage(msg);
     }
 
     //向默認隊列發消息(Json字符串)
     @Test
     public void produceMsg_Json() {
         StudentInfo info = new StudentInfo();
         info.setId( 1 );
         info.setStdName( "李磊" );
         info.setStdNo( "001" );
         info.setEnterDate( new Date());  //隊列存放的是時間戳
 
         String alibabaJson = JSON.toJSONString(info);
         produceMessage.sendMessage(alibabaJson);
     }
 
     //向默認隊列發消息(使用convertAndSend發送對象)
     @Test
     public void produceMsg_ConvertAndSend() {
         StudentInfo info = new StudentInfo();
         info.setId( 1 );
         info.setStdName( "李磊" );
         info.setStdNo( "001" );
         info.setEnterDate( new Date());
 
         produceMessage.sendMessageConvertAndSend(info);
     }
 
     //向指定隊列發消息(文本)
     @Test
     public void produceMsg_CustomQueue() {
         for ( int i = 0 ; i < 20 ; i++) {
             ActiveMQQueue myDestination = new ActiveMQQueue( "queueCustom" );
             produceMessage.sendMessage(myDestination, "----發送消息給queueCustom" );
         }
     }
 
     //向指定隊列發消息(隊列名稱從XML讀取)
     @Test
     public void produceMsg_XmlQueue() {
         for ( int i = 0 ; i < 20 ; i++) {
             ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean( "queueDestination" );
             produceMessage.sendMessage(destinationQueue, "----send my msg to queueXml" );
         }
     }
 
     //向指定隊列發消息(發送對象)
     @Test
     public void produceMsg_StudentInfo() {
 
         StudentInfo info = new StudentInfo();
         info.setId( 1 );
         info.setStdName( "李磊" );
         info.setStdNo( "001" );
         info.setEnterDate( new Date());
 
         ActiveMQQueue destination = new ActiveMQQueue( "StudentInfo" );
         produceMessage.sendStudentInfo(destination, info);
     }
}

 

四、隊列消費端及測試程序

1、消費代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package org.soa.test.activemq.queues;
 
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
 
/**
  * Created by JamesC on 16-9-22.
  */
@Component
public class ConsumeMsg {
     @Autowired
     private JmsTemplate jmsTemplate;
 
     /**
      * 接受消息
      */
     public String receive(Destination destination) {
         TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
         String msg = "" ;
         try {
             msg = tm.getText();
             System.out.println( "從隊列" + destination.toString() + "收到了消息:\t" + msg);
 
         } catch (JMSException e) {
             e.printStackTrace();
             return "" ;
         }
         return msg;
     }
 
     /**
      * 接受消息
      */
     public StudentInfo receiveStudentInfo() {
         try {
             String destination = jmsTemplate.getDefaultDestination().toString();
             ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination);
             return (StudentInfo)msg.getObject();
 
         } catch (JMSException e) {
             //檢查性異常轉換為非檢查性異常
             throw JmsUtils.convertJmsAccessException(e);
         }
     }
 
     /**
      * 接受消息
      */
     public Object receiveConvertAndReceive() {
         String destination = jmsTemplate.getDefaultDestination().toString();
         Object msg = jmsTemplate.receiveAndConvert(destination);
         return msg;
     }
}

 

2、測試程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package org.soa.test.activemq.queues;
 
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.soa.test.activemq.StudentInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
/**
  * Created by JamesC on 16-9-22.
  */
@RunWith (SpringJUnit4ClassRunner. class )
@ContextConfiguration ( "/spring-jms.xml" )
public class ConsumeMsgTest {
 
     @Autowired
     private ConsumeMsg consumeMsg;
 
     //從指定隊列接收消息(文本)
     @Test
     public void receiveMsg() {
         //沒有消息阻塞一段時間后會拋異常
         //java.lang.NullPointerException
         ActiveMQQueue destination = new ActiveMQQueue( "defaultQueueName" );
         consumeMsg.receive(destination);
     }
 
     //從指定隊列接收消息(StudentInfo對象消息)
     @Test
     public void receiveStudentInfo() {
         StudentInfo msg = consumeMsg.receiveStudentInfo();
         System.out.println(msg.getStdName());
     }
 
     //從指定隊列接收消息(Json對象)
     @Test
     public void receiveConvertAndReceive() {
 
         StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive();
         System.out.println(msg.getStdName());
     }
}

 

 

 

 

 

 






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM