應用activeMQ消息中間件同步索引庫


mq是一個消息服務器;

安裝包內置了tomcat,直接登錄訪問,登錄:http://ip:8161/admin/    (相當於dubbo的moniter監控中心) admin admin
傳統串行化,並行化:

mq消息服務器集中管理消息:

1)點對點:

   異步接收消息(監聽模式):

 

2)訂閱模式:

  必須使用監聽模式(異步);


Spring整合mq:

  配置生產者:

 1     <!-- 創建acitiveMQ消息服務工廠對象,把acitiveMQ消息服務器交給spring管理 -->
 2     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 3         <constructor-arg name="brokerURL" value="tcp://192.168.74.132:61616"></constructor-arg>
 4     </bean>
 5 
 6     <!-- spring jms java消息服務提供工廠對象管理mq消息 -->
 7     <bean id="connectionFactory"
 8         class="org.springframework.jms.connection.SingleConnectionFactory">
 9         <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
10     </bean>
11 
12     <!-- spring jms 提供jms消息模版對象 發送消息 -->
13     <bean class="org.springframework.jms.core.JmsTemplate">
14         <property name="connectionFactory" ref="connectionFactory"></property>
15     </bean>
16 
17     <!-- 點對點模式空間 -->
18     <!-- <bean id="oneQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
19         <constructor-arg value="oneQueue"></constructor-arg> </bean> -->
20     <!-- 發布訂閱模式空間 -->
21     <!-- <bean id="oneTopic" class="org.apache.activemq.command.ActiveMQTopic"> 
22         <constructor-arg value="oneTopic"></constructor-arg> </bean> -->
23     <bean id="add_update_del_topic" class="org.apache.activemq.command.ActiveMQTopic">
24         <constructor-arg value="add_update_del_topic"></constructor-arg>
25     </bean>

 

   配置消費者:

 1 <!-- 創建acitiveMQ消息服務工廠對象,把acitiveMQ消息服務器交給spring管理 -->
 2     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 3         <constructor-arg name="brokerURL" value="tcp://192.168.74.132:61616"></constructor-arg>
 4     </bean>
 5 
 6     <!-- spring jms java消息服務提供工廠對象管理mq消息 -->
 7     <bean id="connectionFactory"
 8         class="org.springframework.jms.connection.SingleConnectionFactory">
 9         <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
10     </bean>
11 
12     <!-- 點對點模式空間 -->
13     <!-- <bean id="oneQueue" class="org.apache.activemq.command.ActiveMQQueue">
14         <constructor-arg value="oneQueue"></constructor-arg>
15     </bean> -->
16     <!-- 發布訂閱模式空間 -->
17     <!-- <bean id="oneTopic" class="org.apache.activemq.command.ActiveMQTopic">
18         <constructor-arg value="oneTopic"></constructor-arg>
19     </bean> -->
20     <bean id="add_update_del_topic" class="org.apache.activemq.command.ActiveMQTopic">
21         <constructor-arg value="add_update_del_topic"></constructor-arg>
22     </bean>
23 
24     <!-- 創建接受消息監聽器,此監聽器是真實接受消息的監聽器 -->
25     <bean id="myMessageListener" class="cn.e3.search.listener.IndexListener"></bean>
26 
27     <!-- spring jms 提供消息監聽容器接受消息. -->
28     <!-- 自動觸發接受消息 -->
29     <bean
30         class="org.springframework.jms.listener.DefaultMessageListenerContainer">
31         <!-- 指定消息服務器地址 -->
32         <property name="connectionFactory" ref="connectionFactory"></property>
33         <!-- 指定接受消息服務器空間 -->
34         <property name="destination" ref="add_update_del_topic"></property>
35         <!-- 指定消息接受者 -->
36         <property name="messageListener" ref="myMessageListener"></property>
37     </bean>

 

 業務分析,面向服務分布式架構SOA:

商品發生變動后,發送消息至mq:

1 jmsTemplate.send(activeMQTopic,new MessageCreator() {
2             @Override
3             public Message createMessage(Session session) throws JMSException {
4                 return session.createTextMessage(""+itemId);
5             }
6         });

  xml中配置監聽器,在監聽器類中根據接收的id查詢變動后的商品,將商品索引文檔寫入索引庫;

  <!-- 創建接受消息監聽器,此監聽器是真實接受消息的監聽器 -->
    <bean id="myMessageListener" class="cn.e3.search.listener.IndexListener"></bean>

 1     public void onMessage(Message message) {
 2 
 3         try {
 4             // 初始化一個商品id
 5             Long itemId = null;
 7             if (message instanceof TextMessage) {
 8                 // 接受消息
 9                 TextMessage m = (TextMessage) message;
10                 // 獲取商品id
11                 itemId = Long.parseLong(m.getText());
12                 //根據商品id查詢數據庫新的數據
13                 SearchItem searchItem = searchItemMapper.findDatabaseToSolrIndexWithID(itemId);
14                 
15                 //把數據庫數據封裝到文檔對象
16                 SolrInputDocument doc = new SolrInputDocument();
17                 //封裝文檔域字段所對應值
18                 //封裝文檔域所對應數據庫查詢值
19                 doc.addField("id", searchItem.getId());
20                 
21                 //標題
22                 doc.addField("item_title", searchItem.getTitle());
23                 //買點
24                 doc.addField("item_sell_point", searchItem.getSell_point());
25                 //價格
26                 doc.addField("item_price", searchItem.getPrice());
27                 //圖片地址
28                 doc.addField("item_image", searchItem.getImage());
29                 //商品類別
30                 doc.addField("item_category_name", searchItem.getCategory_name());
31                 //商品描述
32                 doc.addField("item_desc", searchItem.getItem_desc());
33                 
34                 //使用solr服務對象把索引文檔對象寫入索引庫,實現索引庫同步
35                 solrServer.add(doc);
36                 //提交
37                 solrServer.commit();
38             }
39         } catch (Exception e) {
40             // TODO Auto-generated catch block
41             e.printStackTrace();
42         }
43     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM