ActiveMQ的介紹及使用實例.


今天就來說下 這個項目中使用ActiveMQ的情況, MQ: message queue, 顧名思義就是消息隊列的意思. 


一: 使用場景: 

消息隊列在大型電子商務類網站,如京東、淘寶、去哪兒等網站有這深入的應用,隊列的主要作用是消除高並發訪問高峰,加快網站的響應速度。在不使用消息隊列的情況下,用戶的請求數據直接寫入數據庫,在高並發的情況下,會對數據庫造成巨大的壓力,同時也使得系統響應延遲加劇。在使用隊列后,用戶的請求發給隊列后立即返回(當然不能直接給用戶提示訂單提交成功,京東上提示:您“您提交了訂單,請等待系統確認”),再由消息隊列的消費者進程從消息隊列中獲取數據,異步寫入數據庫。由於消息隊列的服務處理速度遠快於數據庫,因此用戶的響應延遲可得到有效改善。

那么在babasport這個項目中, 我們可以在上架的時候使用消息隊列的模式:
我們之前在點擊一款商品上架的時候, 我們需要分成2步, 第一: 更新商品表中該商品的上架狀態. 第二: 將該商品信息保存到Solr服務器中.  那么如果我們使用了消息隊列后, 第二步就可以使用發送message來異步完成.

消息隊列可以接收消息和 發送消息

消息隊列類型:

隊列:一對一聊天  私聊  QQ

主題(訂閱模式):一對多聊天  群聊  QQ

名詞解釋: 

 

 二, 代碼原型
ActiveMQ需要部署到Linux系統下, 這里就不再做概述.
這里也是tar包, 導入到linux下直接解壓啟動即可, 前面已經有過很多博文講Linux下一些常用軟件的安裝步驟.


上架代碼原型:
項目構件圖:

未使用ActiveMQ前ProductServiceImpl.cs:

 1 //上架
 2     public void isShow(Long[] ids){
 3         Product product = new Product();
 4         product.setIsShow(true);
 5         for (final Long id : ids) {
 6             //上下架狀態
 7             product.setId(id);
 8             productDao.updateByPrimaryKeySelective(product);
 9             
10             //這個地方的代碼應該在babasport-solr中寫, 現在使用ActiveMQ進行遷移.
11             //TODO 保存商品信息到Solr服務器
12             SolrInputDocument doc = new SolrInputDocument();
13             //ID
14             doc.setField("id", id);
15             //名稱
16             Product p = productDao.selectByPrimaryKey(id);
17             doc.setField("name_ik", p.getName());
18             //圖片URL
19             doc.setField("url", p.getImgUrls()[0]);
20             //品牌 ID
21             doc.setField("brandId", p.getBrandId());
22             //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1
23             SkuQuery skuQuery = new SkuQuery();
24             skuQuery.createCriteria().andProductIdEqualTo(id);
25             skuQuery.setOrderByClause("price asc");
26             skuQuery.setPageNo(1);
27             skuQuery.setPageSize(1);
28             List<Sku> skus = skuDao.selectByExample(skuQuery);
29             doc.setField("price", skus.get(0).getPrice());
30             //...時間等 剩下的省略
31             
32             try {
33                 solrServer.add(doc);
34                 solrServer.commit();
35             } catch (Exception e) {
36                 // TODO Auto-generated catch block
37                 e.printStackTrace();
38             }
39             
40             
41             
42             
43             //TODO 靜態化
44         }
45     }

上面的代碼 除了更改本來就該更改的商品狀態信息外, 還去見商品信息保存到了Solr服務器中了. 這里我們使用ActiveMQ進行改造: 
使用ActiveMQ后的ProductServiceImpl.cs:

 1 //上架
 2     public void isShow(Long[] ids){
 3         Product product = new Product();
 4         product.setIsShow(true);
 5         for (final Long id : ids) {
 6             //上下架狀態
 7             product.setId(id);
 8             productDao.updateByPrimaryKeySelective(product);
 9             
10             //發送商品ID到ActiveMQ即可.
11             jmsTemplate.send(new MessageCreator() {
12                 
13                 @Override
14                 public Message createMessage(Session session) throws JMSException {
15                     
16                     return session.createTextMessage(String.valueOf(id));
17                 }
18             });
19             
20             //TODO 靜態化
21         }
22     }

接着就是配置消息發送方(JMS生產者) mq.xml:

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:aop="http://www.springframework.org/schema/aop" 
 5     xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
 9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
10         http://www.springframework.org/schema/mvc 
11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
12         http://www.springframework.org/schema/context 
13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
14         http://www.springframework.org/schema/aop 
15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
16         http://www.springframework.org/schema/tx 
17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18         http://www.springframework.org/schema/task
19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
20         http://code.alibabatech.com/schema/dubbo        
21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22         
23         
24     <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25     <!-- 連接工廠, 此工廠由Apache提供 -->
26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27         <!-- 連接地址 
28             在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29         -->
30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31         <!-- 設置用戶名及密碼 -->
32         <property name="userName" value="admin"></property>
33         <property name="password" value="admin"></property>
34     </bean>
35         
36     <!-- 配置連接池管理工廠 -->
37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38         <!-- 注入工廠 -->
39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40         <!-- 設置最大連接數 -->
41         <property name="maxConnections" value="5"></property>
42     </bean>
43     
44     <!-- 把上面的工廠交給Spring管理  -->
45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46         <!-- 注入上面的工廠 -->
47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48     </bean>
49     
50     <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52         <!-- 注入Spring單例工廠 -->
53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
54         <!-- 設置默認的目標管道 -->
55         <property name="defaultDestinationName" value="pId"/>
56     </bean>
57 </beans>

配置說明: 這里是首先構建一個MQ的連接工廠, 只要ActiveMQ啟動后 就可以這樣構建連接了. 配置登錄的用戶名和和密碼.
接着就是配置連接池, 把連接工廠交給連接池去管理. 這些都是Apache廠商提供的. 
接着就是再將連接池交由Spring管理. 
最后我們再來配置一個jmsTemplate模板來操作ActiveMQ, 這個類似於jdbcTemplate模板. 而且我們這個里面注入了一個默認的管道, 也就是productId, 因為我們現在是 傳遞消息一一去對應, 關於怎么對應  就是依賴於這個管道.


接下來我們就看下消息的接收方(JMS消費者)的一些東西:
消費者的目錄結構:(Solr)


Solr項目中的ActiveMQ配置文件mq.xml:

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:aop="http://www.springframework.org/schema/aop" 
 5     xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
 9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
10         http://www.springframework.org/schema/mvc 
11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
12         http://www.springframework.org/schema/context 
13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
14         http://www.springframework.org/schema/aop 
15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
16         http://www.springframework.org/schema/tx 
17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18         http://www.springframework.org/schema/task
19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
20         http://code.alibabatech.com/schema/dubbo        
21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22         
23         
24     <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25     <!-- 連接工廠, 此工廠由Apache提供 -->
26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27         <!-- 連接地址 
28             在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29         -->
30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31         <!-- 設置用戶名及密碼 -->
32         <property name="userName" value="admin"></property>
33         <property name="password" value="admin"></property>
34     </bean>
35         
36     <!-- 配置連接池管理工廠 ,由Apache提供.-->
37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38         <!-- 注入工廠 -->
39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40         <!-- 設置最大連接數 -->
41         <property name="maxConnections" value="5"></property>
42     </bean>
43     
44     <!-- 把上面的工廠交給Spring管理  -->
45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46         <!-- 注入上面的工廠 -->
47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48     </bean>
49     
50     <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52         <!-- 注入Spring單例工廠 -->
53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
54         <!-- 設置默認的目標管道 -->
55         <property name="defaultDestinationName" value="pId"/>
56     </bean>
57     
58     <!-- 實例化一個監聽到消息后 處理此消息的類 -->
59     <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
60     
61     <!-- 配置實時監聽器 -->
62     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
63         <!-- 配置工廠, 需要配置spirng的工廠 -->
64         <property name="connectionFactory" ref="singleConnectionFactory"/>
65         <!-- 設置監聽的目標 -->
66         <property name="destinationName" value="pId"/>
67         <!-- 監聽后獲取消息的類, 接收監聽到的消息 -->
68         <property name="messageListener" ref="customMessageListener"></property>
69     </bean>
70 </beans>

我們來說下 和上面配置不同的地方, 我們在這里配置了一個監聽器, 因為接收到 JMS 生產者發過來的消息后我們需要有個監聽器去監聽且 將監聽到的消息拿過來處理.
接下來看看監聽器的處理方法做了些什么事情: 
CustomMessageListener.java:

 1 /*
 2  * 接收MQ中的消息
 3  */
 4 public class CustomMessageListener implements MessageListener{
 5     @Autowired
 6     private SearchService searchService;
 7     
 8     @Override
 9     public void onMessage(Message message) {
10         //先將接收到的消息強轉為ActiveMQ類型的消息
11         //因為在消息發送方那邊傳遞的是Text類型的消息對象, 所以需要轉成ActiveMQTextMessage
12         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
13         try {
14             String id = amtm.getText();
15             System.out.println("接收到的ID:"+id);
16             searchService.insertProductToSolr(Long.parseLong(id));
17         } catch (JMSException e) {
18             // TODO Auto-generated catch block
19             e.printStackTrace();
20         }
21     }

因為我們接收到的是string類型的文本, 所以這里我們直接將接收到的消息轉換為ActiveMQText類型, 然后通過getText去得到傳遞過來的id, 然后我們就可以通過這個productId去做相應的操作了. 

接下來就看保存商品信息到Solr服務器的邏輯:
SearchServiceImpl.java:

 1 //保存商品信息到Solr服務器中, 通過ActiveMQ
 2     public void insertProductToSolr(Long productId){
 3         //TODO 保存商品信息到Solr服務器
 4         SolrInputDocument doc = new SolrInputDocument();
 5         //ID
 6         doc.setField("id", productId);
 7         //名稱
 8         Product p = productDao.selectByPrimaryKey(productId);
 9         doc.setField("name_ik", p.getName());
10         //圖片URL
11         doc.setField("url", p.getImgUrls()[0]);
12         //品牌 ID
13         doc.setField("brandId", p.getBrandId());
14         //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1
15         SkuQuery skuQuery = new SkuQuery();
16         skuQuery.createCriteria().andProductIdEqualTo(productId);
17         skuQuery.setOrderByClause("price asc");
18         skuQuery.setPageNo(1);
19         skuQuery.setPageSize(1);
20         List<Sku> skus = skuDao.selectByExample(skuQuery);
21         doc.setField("price", skus.get(0).getPrice());
22         //...時間等 剩下的省略
23         
24         try {
25             solrServer.add(doc);
26             solrServer.commit();
27         } catch (Exception e) {
28             // TODO Auto-generated catch block
29             e.printStackTrace();
30         }
31     }

這樣就比較明朗了, ActiveMQ 隊列就是這樣來實現的. 

====================接下來還會有 ActiveMQ 訂閱者模式的示例, 這里只是生產者發送消息給單個消費者, 下次還會更新生產者發送消息給多個消費者.

 2016/09/04 20:32 更新
上面已經說了 消息的隊列模式, 及點對點發送消息, 那么接下來就來說下 消息的一對多模式, 也就是 發布/訂閱模式.
項目原型: 當商品上架后(babasport-product), 發送消息id給solr(babasport-solr)來將商品信息保存到solr服務器和cms(babasport-cms)來對商品詳情頁面做頁面靜態化.

===================
babasport-product:
結構圖:

babasport-product下的項目結構圖:


ProductServiceImpl.java中的上架:

 1 @Autowired
 2     private JmsTemplate jmsTemplate;
 3     
 4     //上架
 5     public void isShow(Long[] ids){
 6         Product product = new Product();
 7         product.setIsShow(true);
 8         for (final Long id : ids) {
 9             //上下架狀態
10             product.setId(id);
11             productDao.updateByPrimaryKeySelective(product);
12             
13             //發送商品ID到ActiveMQ即可.
14             jmsTemplate.send(new MessageCreator() {
15                 
16                 @Override
17                 public Message createMessage(Session session) throws JMSException {
18                     
19                     return session.createTextMessage(String.valueOf(id));
20                 }
21             });
22         }
23     }
View Code

mq.xml:

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:aop="http://www.springframework.org/schema/aop" 
 5     xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
 9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
10         http://www.springframework.org/schema/mvc 
11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
12         http://www.springframework.org/schema/context 
13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
14         http://www.springframework.org/schema/aop 
15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
16         http://www.springframework.org/schema/tx 
17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18         http://www.springframework.org/schema/task
19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
20         http://code.alibabatech.com/schema/dubbo        
21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22         
23         
24     <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25     <!-- 連接工廠, 此工廠由Apache提供 -->
26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27         <!-- 連接地址 
28             在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29         -->
30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31         <!-- 設置用戶名及密碼 -->
32         <property name="userName" value="admin"></property>
33         <property name="password" value="admin"></property>
34     </bean>
35         
36     <!-- 配置連接池管理工廠 -->
37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38         <!-- 注入工廠 -->
39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40         <!-- 設置最大連接數 -->
41         <property name="maxConnections" value="5"></property>
42     </bean>
43     
44     <!-- 把上面的工廠交給Spring管理  -->
45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46         <!-- 注入上面的工廠 -->
47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48     </bean>
49     
50     <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52         <!-- 注入Spring單例工廠 -->
53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
54         <!-- 設置默認的目標管道 -->
55         <property name="defaultDestinationName" value="pId"/>
56         <!-- 默認是隊列模式, 可改為主題, 發布模式 publish subject -->
57         <property name="pubSubDomain" value="true"/>
58     </bean>
59 </beans>
View Code

這里面的最大的變化就是將消息發布模式改為了: publish subject.

============================================
babasport-solr:

mq.xml配置文件:

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:aop="http://www.springframework.org/schema/aop" 
 5     xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
 9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
10         http://www.springframework.org/schema/mvc 
11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
12         http://www.springframework.org/schema/context 
13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
14         http://www.springframework.org/schema/aop 
15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
16         http://www.springframework.org/schema/tx 
17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18         http://www.springframework.org/schema/task
19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
20         http://code.alibabatech.com/schema/dubbo        
21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22         
23         
24     <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
25     <!-- 連接工廠, 此工廠由Apache提供 -->
26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27         <!-- 連接地址 
28             在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
29         -->
30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31         <!-- 設置用戶名及密碼 -->
32         <property name="userName" value="admin"></property>
33         <property name="password" value="admin"></property>
34     </bean>
35         
36     <!-- 配置連接池管理工廠 ,由Apache提供.-->
37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38         <!-- 注入工廠 -->
39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40         <!-- 設置最大連接數 -->
41         <property name="maxConnections" value="5"></property>
42     </bean>
43     
44     <!-- 把上面的工廠交給Spring管理  -->
45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46         <!-- 注入上面的工廠 -->
47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48     </bean>
49     
50     <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52         <!-- 注入Spring單例工廠 -->
53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
54         <!-- 設置默認的目標管道 -->
55         <property name="defaultDestinationName" value="pId"/>
56     </bean>
57     
58     <!-- 實例化一個監聽到消息后 處理此消息的類 -->
59     <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
60     
61     <!-- 配置實時監聽器 -->
62     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
63         <!-- 配置工廠, 需要配置spirng的工廠 -->
64         <property name="connectionFactory" ref="singleConnectionFactory"/>
65         <!-- 設置監聽的目標 -->
66         <property name="destinationName" value="pId"/>
67         <!-- 監聽后獲取消息的類, 接收監聽到的消息 -->
68         <property name="messageListener" ref="customMessageListener"></property>
69         <!-- 默認是隊列模式, 可改為主題, 發布模式 publish subject -->
70         <property name="pubSubDomain" value="true"/>
71     </bean>
72 </beans>
View Code

SearchServiceImpl.java: 保存商品信息到Solr服務器中, 通過ActiveMQ

 1 //保存商品信息到Solr服務器中, 通過ActiveMQ
 2     public void insertProductToSolr(Long productId){
 3         //TODO 保存商品信息到Solr服務器
 4         SolrInputDocument doc = new SolrInputDocument();
 5         //ID
 6         doc.setField("id", productId);
 7         //名稱
 8         Product p = productDao.selectByPrimaryKey(productId);
 9         doc.setField("name_ik", p.getName());
10         //圖片URL
11         doc.setField("url", p.getImgUrls()[0]);
12         //品牌 ID
13         doc.setField("brandId", p.getBrandId());
14         //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1
15         SkuQuery skuQuery = new SkuQuery();
16         skuQuery.createCriteria().andProductIdEqualTo(productId);
17         skuQuery.setOrderByClause("price asc");
18         skuQuery.setPageNo(1);
19         skuQuery.setPageSize(1);
20         List<Sku> skus = skuDao.selectByExample(skuQuery);
21         doc.setField("price", skus.get(0).getPrice());
22         //...時間等 剩下的省略
23         
24         try {
25             solrServer.add(doc);
26             solrServer.commit();
27         } catch (Exception e) {
28             // TODO Auto-generated catch block
29             e.printStackTrace();
30         }
31     }
View Code

CustomMessageListener.java: 監聽ActiveMQ中傳遞過來的消息, 且對傳遞過來的消息進行處理:

 1 public class CustomMessageListener implements MessageListener{
 2     @Autowired
 3     private SearchService searchService;
 4     
 5     @Override
 6     public void onMessage(Message message) {
 7         //先將接收到的消息強轉為ActiveMQ類型的消息
 8         //因為在消息發送方那邊傳遞的是Text類型的消息對象, 所以需要轉成ActiveMQTextMessage
 9         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
10         try {
11             String id = amtm.getText();
12             System.out.println("接收到的ID:"+id);
13             searchService.insertProductToSolr(Long.parseLong(id));
14         } catch (JMSException e) {
15             // TODO Auto-generated catch block
16             e.printStackTrace();
17         }
18     }
19 }
View Code


===============================
babasport-cms:
mq.xml:

 1 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ-->
 2     <!-- 連接工廠, 此工廠由Apache提供 -->
 3     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 4         <!-- 連接地址 
 5             在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616
 6         -->
 7         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
 8         <!-- 設置用戶名及密碼 -->
 9         <property name="userName" value="admin"></property>
10         <property name="password" value="admin"></property>
11     </bean>
12         
13     <!-- 配置連接池管理工廠 ,由Apache提供.-->
14     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
15         <!-- 注入工廠 -->
16         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
17         <!-- 設置最大連接數 -->
18         <property name="maxConnections" value="5"></property>
19     </bean>
20     
21     <!-- 把上面的工廠交給Spring管理  -->
22     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
23         <!-- 注入上面的工廠 -->
24         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
25     </bean>
26     
27     <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ -->
28     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
29         <!-- 注入Spring單例工廠 -->
30         <property name="connectionFactory" ref="singleConnectionFactory"></property>
31         <!-- 設置默認的目標管道 -->
32         <property name="defaultDestinationName" value="pId"/>
33     </bean>
34     
35     <!-- 實例化一個監聽到消息后 處理此消息的類 -->
36     <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
37     
38     <!-- 配置實時監聽器 -->
39     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
40         <!-- 配置工廠, 需要配置spirng的工廠 -->
41         <property name="connectionFactory" ref="singleConnectionFactory"/>
42         <!-- 設置監聽的目標 -->
43         <property name="destinationName" value="pId"/>
44         <!-- 監聽后獲取消息的類, 接收監聽到的消息 -->
45         <property name="messageListener" ref="customMessageListener"></property>
46         <!-- 默認是隊列模式, 可改為主題, 發布模式 publish subject -->
47         <property name="pubSubDomain" value="true"/>
48     </bean>
View Code

CustomMessageListener.java: 監聽ActiveMQ中傳遞過來的消息, 且對傳遞過來的消息進行處理:

 1 public class CustomMessageListener implements MessageListener{
 2     @Autowired
 3     private StaticPageService staticPageService;
 4     @Autowired
 5     private CMSService cmsService;
 6     
 7     @Override
 8     public void onMessage(Message message) {
 9         //先將接收到的消息強轉為ActiveMQ類型的消息
10         //因為在消息發送方那邊傳遞的是Text類型的消息對象, 所以需要轉成ActiveMQTextMessage
11         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
12         try {
13             String id = amtm.getText();
14             System.out.println("CMS接收到的ID:"+id);
15             Map<String, Object> root = new HashMap<String, Object>();
16             
17             Product product = cmsService.selectProductById(Long.parseLong(id));
18             List<Sku> skus = cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id));
19             //去掉重復的顏色
20             Set<Color> colors = new HashSet<Color>();
21             for (Sku sku : skus) {
22                 colors.add(sku.getColor());
23             }
24             root.put("colors", colors);
25             root.put("product", product);
26             root.put("skus", skus);
27             
28             staticPageService.index(root, id);
29         } catch (JMSException e) {
30             // TODO Auto-generated catch block
31             e.printStackTrace();
32         }
33     }
34 }
View Code

StaticPageServiceImpl.java: 靜態化頁面的核心類:

 1 public class StaticPageServiceImpl implements StaticPageService, ServletContextAware{
 2     //SpringMvc 管理 conf
 3     private Configuration conf;
 4     public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) {
 5         this.conf = freeMarkerConfig.getConfiguration();
 6     }
 7 
 8     //靜態化頁面的方法
 9     public void index(Map<String, Object> root, String id){
10         //輸出目錄: 通過getPath方法獲取的是絕對路徑
11         String path = getPath("/html/product/" + id +".html");
12         File f = new File(path);
13         File parentFile = f.getParentFile();
14         if(!parentFile.exists()){
15             parentFile.mkdirs();
16         }
17         
18         //spring中已經設置了模板路徑:<property name="templateLoaderPath" value="/WEB-INF/ftl/" />
19         Writer out = null;
20         
21         try {
22             //
23             Template template = conf.getTemplate("product.html");
24             
25             //設置輸出的位置
26             //
27             out = new OutputStreamWriter(new FileOutputStream(f), "UTF-8");
28             template.process(root, out);
29         } catch (Exception e) {
30             // TODO Auto-generated catch block
31             e.printStackTrace();
32         }finally {
33             if (out != null)
34             {
35                 try {
36                     out.close();
37                 } catch (IOException e) {
38                     // TODO Auto-generated catch block
39                     e.printStackTrace();
40                 }
41             }
42             
43         }
44         
45     }
46 
47     //獲取webapp下的html文件夾所在的位置
48     //將相對路徑轉換為絕對路徑
49     public String getPath(String path){
50         return servletContext.getRealPath(path);
51     }
52     
53     private ServletContext servletContext;
54     @Override
55     public void setServletContext(ServletContext servletContext) {
56         this.servletContext = servletContext;
57     }
58 }
View Code

Spring管理 freemarkerConfig配置類:

 1 <!-- 配置freemarker 實現類 -->    
 2         <bean class="cn.itcast.core.service.StaticPageServiceImpl">
 3             <property name="freeMarkerConfig">
 4                 <bean class="org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer">
 5                     <!-- 設置模板所在目錄或文件夾的位置, 相對路徑  -->
 6                     <property name="templateLoaderPath" value="/WEB-INF/ftl/" />
 7                     <!-- 設置默認編碼集 -->
 8                     <property name="defaultEncoding" value="UTF-8"></property>
 9                 </bean>
10             </property>
11         </bean>
View Code

更多關於freemarker的講解請關注我以后的博客...
關於ActiveMQ的內容就更新到這么多.




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM