今天就來說下 這個項目中使用ActiveMQ的情況, MQ: message queue, 顧名思義就是消息隊列的意思.
一: 使用場景:
那么在babasport這個項目中, 我們可以在上架的時候使用消息隊列的模式:
我們之前在點擊一款商品上架的時候, 我們需要分成2步, 第一: 更新商品表中該商品的上架狀態. 第二: 將該商品信息保存到Solr服務器中. 那么如果我們使用了消息隊列后, 第二步就可以使用發送message來異步完成.
消息隊列可以接收消息和 發送消息
消息隊列類型:
隊列:一對一聊天 私聊 QQ
主題(訂閱模式):一對多聊天 群聊 QQ
名詞解釋:
二, 代碼原型
ActiveMQ需要部署到Linux系統下, 這里就不再做概述.
這里也是tar包, 導入到linux下直接解壓啟動即可, 前面已經有過很多博文講Linux下一些常用軟件的安裝步驟.
上架代碼原型:
項目構件圖:
未使用ActiveMQ前ProductServiceImpl.cs:
1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架狀態 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //這個地方的代碼應該在babasport-solr中寫, 現在使用ActiveMQ進行遷移. 11 //TODO 保存商品信息到Solr服務器 12 SolrInputDocument doc = new SolrInputDocument(); 13 //ID 14 doc.setField("id", id); 15 //名稱 16 Product p = productDao.selectByPrimaryKey(id); 17 doc.setField("name_ik", p.getName()); 18 //圖片URL 19 doc.setField("url", p.getImgUrls()[0]); 20 //品牌 ID 21 doc.setField("brandId", p.getBrandId()); 22 //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1 23 SkuQuery skuQuery = new SkuQuery(); 24 skuQuery.createCriteria().andProductIdEqualTo(id); 25 skuQuery.setOrderByClause("price asc"); 26 skuQuery.setPageNo(1); 27 skuQuery.setPageSize(1); 28 List<Sku> skus = skuDao.selectByExample(skuQuery); 29 doc.setField("price", skus.get(0).getPrice()); 30 //...時間等 剩下的省略 31 32 try { 33 solrServer.add(doc); 34 solrServer.commit(); 35 } catch (Exception e) { 36 // TODO Auto-generated catch block 37 e.printStackTrace(); 38 } 39 40 41 42 43 //TODO 靜態化 44 } 45 }
上面的代碼 除了更改本來就該更改的商品狀態信息外, 還去見商品信息保存到了Solr服務器中了. 這里我們使用ActiveMQ進行改造:
使用ActiveMQ后的ProductServiceImpl.cs:
1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架狀態 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //發送商品ID到ActiveMQ即可. 11 jmsTemplate.send(new MessageCreator() { 12 13 @Override 14 public Message createMessage(Session session) throws JMSException { 15 16 return session.createTextMessage(String.valueOf(id)); 17 } 18 }); 19 20 //TODO 靜態化 21 } 22 }
接着就是配置消息發送方(JMS生產者) mq.xml:
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop" 5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task-4.0.xsd 20 http://code.alibabatech.com/schema/dubbo 21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 22 23 24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ--> 25 <!-- 連接工廠, 此工廠由Apache提供 --> 26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 27 <!-- 連接地址 28 在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616 29 --> 30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 31 <!-- 設置用戶名及密碼 --> 32 <property name="userName" value="admin"></property> 33 <property name="password" value="admin"></property> 34 </bean> 35 36 <!-- 配置連接池管理工廠 --> 37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> 38 <!-- 注入工廠 --> 39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property> 40 <!-- 設置最大連接數 --> 41 <property name="maxConnections" value="5"></property> 42 </bean> 43 44 <!-- 把上面的工廠交給Spring管理 --> 45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 46 <!-- 注入上面的工廠 --> 47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property> 48 </bean> 49 50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ --> 51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 52 <!-- 注入Spring單例工廠 --> 53 <property name="connectionFactory" ref="singleConnectionFactory"></property> 54 <!-- 設置默認的目標管道 --> 55 <property name="defaultDestinationName" value="pId"/> 56 </bean> 57 </beans>
配置說明: 這里是首先構建一個MQ的連接工廠, 只要ActiveMQ啟動后 就可以這樣構建連接了. 配置登錄的用戶名和和密碼.
接着就是配置連接池, 把連接工廠交給連接池去管理. 這些都是Apache廠商提供的.
接着就是再將連接池交由Spring管理.
最后我們再來配置一個jmsTemplate模板來操作ActiveMQ, 這個類似於jdbcTemplate模板. 而且我們這個里面注入了一個默認的管道, 也就是productId, 因為我們現在是 傳遞消息一一去對應, 關於怎么對應 就是依賴於這個管道.
接下來我們就看下消息的接收方(JMS消費者)的一些東西:
消費者的目錄結構:(Solr)
Solr項目中的ActiveMQ配置文件mq.xml:
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop" 5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task-4.0.xsd 20 http://code.alibabatech.com/schema/dubbo 21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 22 23 24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ--> 25 <!-- 連接工廠, 此工廠由Apache提供 --> 26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 27 <!-- 連接地址 28 在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616 29 --> 30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 31 <!-- 設置用戶名及密碼 --> 32 <property name="userName" value="admin"></property> 33 <property name="password" value="admin"></property> 34 </bean> 35 36 <!-- 配置連接池管理工廠 ,由Apache提供.--> 37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> 38 <!-- 注入工廠 --> 39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property> 40 <!-- 設置最大連接數 --> 41 <property name="maxConnections" value="5"></property> 42 </bean> 43 44 <!-- 把上面的工廠交給Spring管理 --> 45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 46 <!-- 注入上面的工廠 --> 47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property> 48 </bean> 49 50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ --> 51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 52 <!-- 注入Spring單例工廠 --> 53 <property name="connectionFactory" ref="singleConnectionFactory"></property> 54 <!-- 設置默認的目標管道 --> 55 <property name="defaultDestinationName" value="pId"/> 56 </bean> 57 58 <!-- 實例化一個監聽到消息后 處理此消息的類 --> 59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/> 60 61 <!-- 配置實時監聽器 --> 62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 63 <!-- 配置工廠, 需要配置spirng的工廠 --> 64 <property name="connectionFactory" ref="singleConnectionFactory"/> 65 <!-- 設置監聽的目標 --> 66 <property name="destinationName" value="pId"/> 67 <!-- 監聽后獲取消息的類, 接收監聽到的消息 --> 68 <property name="messageListener" ref="customMessageListener"></property> 69 </bean> 70 </beans>
我們來說下 和上面配置不同的地方, 我們在這里配置了一個監聽器, 因為接收到 JMS 生產者發過來的消息后我們需要有個監聽器去監聽且 將監聽到的消息拿過來處理.
接下來看看監聽器的處理方法做了些什么事情:
CustomMessageListener.java:
1 /* 2 * 接收MQ中的消息 3 */ 4 public class CustomMessageListener implements MessageListener{ 5 @Autowired 6 private SearchService searchService; 7 8 @Override 9 public void onMessage(Message message) { 10 //先將接收到的消息強轉為ActiveMQ類型的消息 11 //因為在消息發送方那邊傳遞的是Text類型的消息對象, 所以需要轉成ActiveMQTextMessage 12 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message; 13 try { 14 String id = amtm.getText(); 15 System.out.println("接收到的ID:"+id); 16 searchService.insertProductToSolr(Long.parseLong(id)); 17 } catch (JMSException e) { 18 // TODO Auto-generated catch block 19 e.printStackTrace(); 20 } 21 }
因為我們接收到的是string類型的文本, 所以這里我們直接將接收到的消息轉換為ActiveMQText類型, 然后通過getText去得到傳遞過來的id, 然后我們就可以通過這個productId去做相應的操作了.
接下來就看保存商品信息到Solr服務器的邏輯:
SearchServiceImpl.java:
1 //保存商品信息到Solr服務器中, 通過ActiveMQ 2 public void insertProductToSolr(Long productId){ 3 //TODO 保存商品信息到Solr服務器 4 SolrInputDocument doc = new SolrInputDocument(); 5 //ID 6 doc.setField("id", productId); 7 //名稱 8 Product p = productDao.selectByPrimaryKey(productId); 9 doc.setField("name_ik", p.getName()); 10 //圖片URL 11 doc.setField("url", p.getImgUrls()[0]); 12 //品牌 ID 13 doc.setField("brandId", p.getBrandId()); 14 //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1 15 SkuQuery skuQuery = new SkuQuery(); 16 skuQuery.createCriteria().andProductIdEqualTo(productId); 17 skuQuery.setOrderByClause("price asc"); 18 skuQuery.setPageNo(1); 19 skuQuery.setPageSize(1); 20 List<Sku> skus = skuDao.selectByExample(skuQuery); 21 doc.setField("price", skus.get(0).getPrice()); 22 //...時間等 剩下的省略 23 24 try { 25 solrServer.add(doc); 26 solrServer.commit(); 27 } catch (Exception e) { 28 // TODO Auto-generated catch block 29 e.printStackTrace(); 30 } 31 }
這樣就比較明朗了, ActiveMQ 隊列就是這樣來實現的.
====================接下來還會有 ActiveMQ 訂閱者模式的示例, 這里只是生產者發送消息給單個消費者, 下次還會更新生產者發送消息給多個消費者.
2016/09/04 20:32 更新
上面已經說了 消息的隊列模式, 及點對點發送消息, 那么接下來就來說下 消息的一對多模式, 也就是 發布/訂閱模式.
項目原型: 當商品上架后(babasport-product), 發送消息id給solr(babasport-solr)來將商品信息保存到solr服務器和cms(babasport-cms)來對商品詳情頁面做頁面靜態化.
===================
babasport-product:
結構圖:
babasport-product下的項目結構圖:
ProductServiceImpl.java中的上架:

1 @Autowired 2 private JmsTemplate jmsTemplate; 3 4 //上架 5 public void isShow(Long[] ids){ 6 Product product = new Product(); 7 product.setIsShow(true); 8 for (final Long id : ids) { 9 //上下架狀態 10 product.setId(id); 11 productDao.updateByPrimaryKeySelective(product); 12 13 //發送商品ID到ActiveMQ即可. 14 jmsTemplate.send(new MessageCreator() { 15 16 @Override 17 public Message createMessage(Session session) throws JMSException { 18 19 return session.createTextMessage(String.valueOf(id)); 20 } 21 }); 22 } 23 }
mq.xml:

1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop" 5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task-4.0.xsd 20 http://code.alibabatech.com/schema/dubbo 21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 22 23 24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ--> 25 <!-- 連接工廠, 此工廠由Apache提供 --> 26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 27 <!-- 連接地址 28 在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616 29 --> 30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 31 <!-- 設置用戶名及密碼 --> 32 <property name="userName" value="admin"></property> 33 <property name="password" value="admin"></property> 34 </bean> 35 36 <!-- 配置連接池管理工廠 --> 37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> 38 <!-- 注入工廠 --> 39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property> 40 <!-- 設置最大連接數 --> 41 <property name="maxConnections" value="5"></property> 42 </bean> 43 44 <!-- 把上面的工廠交給Spring管理 --> 45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 46 <!-- 注入上面的工廠 --> 47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property> 48 </bean> 49 50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ --> 51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 52 <!-- 注入Spring單例工廠 --> 53 <property name="connectionFactory" ref="singleConnectionFactory"></property> 54 <!-- 設置默認的目標管道 --> 55 <property name="defaultDestinationName" value="pId"/> 56 <!-- 默認是隊列模式, 可改為主題, 發布模式 publish subject --> 57 <property name="pubSubDomain" value="true"/> 58 </bean> 59 </beans>
這里面的最大的變化就是將消息發布模式改為了: publish subject.
============================================
babasport-solr:
mq.xml配置文件:

1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop" 5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task-4.0.xsd 20 http://code.alibabatech.com/schema/dubbo 21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 22 23 24 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ--> 25 <!-- 連接工廠, 此工廠由Apache提供 --> 26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 27 <!-- 連接地址 28 在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616 29 --> 30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 31 <!-- 設置用戶名及密碼 --> 32 <property name="userName" value="admin"></property> 33 <property name="password" value="admin"></property> 34 </bean> 35 36 <!-- 配置連接池管理工廠 ,由Apache提供.--> 37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> 38 <!-- 注入工廠 --> 39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property> 40 <!-- 設置最大連接數 --> 41 <property name="maxConnections" value="5"></property> 42 </bean> 43 44 <!-- 把上面的工廠交給Spring管理 --> 45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 46 <!-- 注入上面的工廠 --> 47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property> 48 </bean> 49 50 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ --> 51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 52 <!-- 注入Spring單例工廠 --> 53 <property name="connectionFactory" ref="singleConnectionFactory"></property> 54 <!-- 設置默認的目標管道 --> 55 <property name="defaultDestinationName" value="pId"/> 56 </bean> 57 58 <!-- 實例化一個監聽到消息后 處理此消息的類 --> 59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/> 60 61 <!-- 配置實時監聽器 --> 62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 63 <!-- 配置工廠, 需要配置spirng的工廠 --> 64 <property name="connectionFactory" ref="singleConnectionFactory"/> 65 <!-- 設置監聽的目標 --> 66 <property name="destinationName" value="pId"/> 67 <!-- 監聽后獲取消息的類, 接收監聽到的消息 --> 68 <property name="messageListener" ref="customMessageListener"></property> 69 <!-- 默認是隊列模式, 可改為主題, 發布模式 publish subject --> 70 <property name="pubSubDomain" value="true"/> 71 </bean> 72 </beans>
SearchServiceImpl.java: 保存商品信息到Solr服務器中, 通過ActiveMQ

1 //保存商品信息到Solr服務器中, 通過ActiveMQ 2 public void insertProductToSolr(Long productId){ 3 //TODO 保存商品信息到Solr服務器 4 SolrInputDocument doc = new SolrInputDocument(); 5 //ID 6 doc.setField("id", productId); 7 //名稱 8 Product p = productDao.selectByPrimaryKey(productId); 9 doc.setField("name_ik", p.getName()); 10 //圖片URL 11 doc.setField("url", p.getImgUrls()[0]); 12 //品牌 ID 13 doc.setField("brandId", p.getBrandId()); 14 //價格 sql查詢語句: select price from bbs_sku where product_id = ? order by price asc limit 1 15 SkuQuery skuQuery = new SkuQuery(); 16 skuQuery.createCriteria().andProductIdEqualTo(productId); 17 skuQuery.setOrderByClause("price asc"); 18 skuQuery.setPageNo(1); 19 skuQuery.setPageSize(1); 20 List<Sku> skus = skuDao.selectByExample(skuQuery); 21 doc.setField("price", skus.get(0).getPrice()); 22 //...時間等 剩下的省略 23 24 try { 25 solrServer.add(doc); 26 solrServer.commit(); 27 } catch (Exception e) { 28 // TODO Auto-generated catch block 29 e.printStackTrace(); 30 } 31 }
CustomMessageListener.java: 監聽ActiveMQ中傳遞過來的消息, 且對傳遞過來的消息進行處理:

1 public class CustomMessageListener implements MessageListener{ 2 @Autowired 3 private SearchService searchService; 4 5 @Override 6 public void onMessage(Message message) { 7 //先將接收到的消息強轉為ActiveMQ類型的消息 8 //因為在消息發送方那邊傳遞的是Text類型的消息對象, 所以需要轉成ActiveMQTextMessage 9 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message; 10 try { 11 String id = amtm.getText(); 12 System.out.println("接收到的ID:"+id); 13 searchService.insertProductToSolr(Long.parseLong(id)); 14 } catch (JMSException e) { 15 // TODO Auto-generated catch block 16 e.printStackTrace(); 17 } 18 } 19 }
===============================
babasport-cms:
mq.xml:

1 <!-- 配置Spring 來管理MQ消息隊列 , 連接ActiveMQ--> 2 <!-- 連接工廠, 此工廠由Apache提供 --> 3 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <!-- 連接地址 5 在網頁端訪問是:http://192.168.200.128:8161, 但是這里是tcp連接, 端口號是61616 6 --> 7 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 8 <!-- 設置用戶名及密碼 --> 9 <property name="userName" value="admin"></property> 10 <property name="password" value="admin"></property> 11 </bean> 12 13 <!-- 配置連接池管理工廠 ,由Apache提供.--> 14 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> 15 <!-- 注入工廠 --> 16 <property name="connectionFactory" ref="activeMQConnectionFactory"></property> 17 <!-- 設置最大連接數 --> 18 <property name="maxConnections" value="5"></property> 19 </bean> 20 21 <!-- 把上面的工廠交給Spring管理 --> 22 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 23 <!-- 注入上面的工廠 --> 24 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property> 25 </bean> 26 27 <!-- 使用Spring提供的jmsTemplate模板來操作ActiveMQ --> 28 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 29 <!-- 注入Spring單例工廠 --> 30 <property name="connectionFactory" ref="singleConnectionFactory"></property> 31 <!-- 設置默認的目標管道 --> 32 <property name="defaultDestinationName" value="pId"/> 33 </bean> 34 35 <!-- 實例化一個監聽到消息后 處理此消息的類 --> 36 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/> 37 38 <!-- 配置實時監聽器 --> 39 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 40 <!-- 配置工廠, 需要配置spirng的工廠 --> 41 <property name="connectionFactory" ref="singleConnectionFactory"/> 42 <!-- 設置監聽的目標 --> 43 <property name="destinationName" value="pId"/> 44 <!-- 監聽后獲取消息的類, 接收監聽到的消息 --> 45 <property name="messageListener" ref="customMessageListener"></property> 46 <!-- 默認是隊列模式, 可改為主題, 發布模式 publish subject --> 47 <property name="pubSubDomain" value="true"/> 48 </bean>
CustomMessageListener.java: 監聽ActiveMQ中傳遞過來的消息, 且對傳遞過來的消息進行處理:

1 public class CustomMessageListener implements MessageListener{ 2 @Autowired 3 private StaticPageService staticPageService; 4 @Autowired 5 private CMSService cmsService; 6 7 @Override 8 public void onMessage(Message message) { 9 //先將接收到的消息強轉為ActiveMQ類型的消息 10 //因為在消息發送方那邊傳遞的是Text類型的消息對象, 所以需要轉成ActiveMQTextMessage 11 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message; 12 try { 13 String id = amtm.getText(); 14 System.out.println("CMS接收到的ID:"+id); 15 Map<String, Object> root = new HashMap<String, Object>(); 16 17 Product product = cmsService.selectProductById(Long.parseLong(id)); 18 List<Sku> skus = cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id)); 19 //去掉重復的顏色 20 Set<Color> colors = new HashSet<Color>(); 21 for (Sku sku : skus) { 22 colors.add(sku.getColor()); 23 } 24 root.put("colors", colors); 25 root.put("product", product); 26 root.put("skus", skus); 27 28 staticPageService.index(root, id); 29 } catch (JMSException e) { 30 // TODO Auto-generated catch block 31 e.printStackTrace(); 32 } 33 } 34 }
StaticPageServiceImpl.java: 靜態化頁面的核心類:

1 public class StaticPageServiceImpl implements StaticPageService, ServletContextAware{ 2 //SpringMvc 管理 conf 3 private Configuration conf; 4 public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) { 5 this.conf = freeMarkerConfig.getConfiguration(); 6 } 7 8 //靜態化頁面的方法 9 public void index(Map<String, Object> root, String id){ 10 //輸出目錄: 通過getPath方法獲取的是絕對路徑 11 String path = getPath("/html/product/" + id +".html"); 12 File f = new File(path); 13 File parentFile = f.getParentFile(); 14 if(!parentFile.exists()){ 15 parentFile.mkdirs(); 16 } 17 18 //spring中已經設置了模板路徑:<property name="templateLoaderPath" value="/WEB-INF/ftl/" /> 19 Writer out = null; 20 21 try { 22 //讀 23 Template template = conf.getTemplate("product.html"); 24 25 //設置輸出的位置 26 //寫 27 out = new OutputStreamWriter(new FileOutputStream(f), "UTF-8"); 28 template.process(root, out); 29 } catch (Exception e) { 30 // TODO Auto-generated catch block 31 e.printStackTrace(); 32 }finally { 33 if (out != null) 34 { 35 try { 36 out.close(); 37 } catch (IOException e) { 38 // TODO Auto-generated catch block 39 e.printStackTrace(); 40 } 41 } 42 43 } 44 45 } 46 47 //獲取webapp下的html文件夾所在的位置 48 //將相對路徑轉換為絕對路徑 49 public String getPath(String path){ 50 return servletContext.getRealPath(path); 51 } 52 53 private ServletContext servletContext; 54 @Override 55 public void setServletContext(ServletContext servletContext) { 56 this.servletContext = servletContext; 57 } 58 }
Spring管理 freemarkerConfig配置類:

1 <!-- 配置freemarker 實現類 --> 2 <bean class="cn.itcast.core.service.StaticPageServiceImpl"> 3 <property name="freeMarkerConfig"> 4 <bean class="org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer"> 5 <!-- 設置模板所在目錄或文件夾的位置, 相對路徑 --> 6 <property name="templateLoaderPath" value="/WEB-INF/ftl/" /> 7 <!-- 設置默認編碼集 --> 8 <property name="defaultEncoding" value="UTF-8"></property> 9 </bean> 10 </property> 11 </bean>
更多關於freemarker的講解請關注我以后的博客...
關於ActiveMQ的內容就更新到這么多.