mq是一個消息服務器;
安裝包內置了tomcat,直接登錄訪問,登錄:http://ip:8161/admin/ (相當於dubbo的moniter監控中心) admin admin
傳統串行化,並行化:
mq消息服務器集中管理消息:
1)點對點:
異步接收消息(監聽模式):
2)訂閱模式:
必須使用監聽模式(異步);
Spring整合mq:
配置生產者:
1 <!-- 創建acitiveMQ消息服務工廠對象,把acitiveMQ消息服務器交給spring管理 --> 2 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <constructor-arg name="brokerURL" value="tcp://192.168.74.132:61616"></constructor-arg> 4 </bean> 5 6 <!-- spring jms java消息服務提供工廠對象管理mq消息 --> 7 <bean id="connectionFactory" 8 class="org.springframework.jms.connection.SingleConnectionFactory"> 9 <property name="targetConnectionFactory" ref="targetConnectionFactory"></property> 10 </bean> 11 12 <!-- spring jms 提供jms消息模版對象 發送消息 --> 13 <bean class="org.springframework.jms.core.JmsTemplate"> 14 <property name="connectionFactory" ref="connectionFactory"></property> 15 </bean> 16 17 <!-- 點對點模式空間 --> 18 <!-- <bean id="oneQueue" class="org.apache.activemq.command.ActiveMQQueue"> 19 <constructor-arg value="oneQueue"></constructor-arg> </bean> --> 20 <!-- 發布訂閱模式空間 --> 21 <!-- <bean id="oneTopic" class="org.apache.activemq.command.ActiveMQTopic"> 22 <constructor-arg value="oneTopic"></constructor-arg> </bean> --> 23 <bean id="add_update_del_topic" class="org.apache.activemq.command.ActiveMQTopic"> 24 <constructor-arg value="add_update_del_topic"></constructor-arg> 25 </bean>
配置消費者:
1 <!-- 創建acitiveMQ消息服務工廠對象,把acitiveMQ消息服務器交給spring管理 --> 2 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <constructor-arg name="brokerURL" value="tcp://192.168.74.132:61616"></constructor-arg> 4 </bean> 5 6 <!-- spring jms java消息服務提供工廠對象管理mq消息 --> 7 <bean id="connectionFactory" 8 class="org.springframework.jms.connection.SingleConnectionFactory"> 9 <property name="targetConnectionFactory" ref="targetConnectionFactory"></property> 10 </bean> 11 12 <!-- 點對點模式空間 --> 13 <!-- <bean id="oneQueue" class="org.apache.activemq.command.ActiveMQQueue"> 14 <constructor-arg value="oneQueue"></constructor-arg> 15 </bean> --> 16 <!-- 發布訂閱模式空間 --> 17 <!-- <bean id="oneTopic" class="org.apache.activemq.command.ActiveMQTopic"> 18 <constructor-arg value="oneTopic"></constructor-arg> 19 </bean> --> 20 <bean id="add_update_del_topic" class="org.apache.activemq.command.ActiveMQTopic"> 21 <constructor-arg value="add_update_del_topic"></constructor-arg> 22 </bean> 23 24 <!-- 創建接受消息監聽器,此監聽器是真實接受消息的監聽器 --> 25 <bean id="myMessageListener" class="cn.e3.search.listener.IndexListener"></bean> 26 27 <!-- spring jms 提供消息監聽容器接受消息. --> 28 <!-- 自動觸發接受消息 --> 29 <bean 30 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 31 <!-- 指定消息服務器地址 --> 32 <property name="connectionFactory" ref="connectionFactory"></property> 33 <!-- 指定接受消息服務器空間 --> 34 <property name="destination" ref="add_update_del_topic"></property> 35 <!-- 指定消息接受者 --> 36 <property name="messageListener" ref="myMessageListener"></property> 37 </bean>
業務分析,面向服務分布式架構SOA:
商品發生變動后,發送消息至mq:
1 jmsTemplate.send(activeMQTopic,new MessageCreator() { 2 @Override 3 public Message createMessage(Session session) throws JMSException { 4 return session.createTextMessage(""+itemId); 5 } 6 });
xml中配置監聽器,在監聽器類中根據接收的id查詢變動后的商品,將商品索引文檔寫入索引庫;
<!-- 創建接受消息監聽器,此監聽器是真實接受消息的監聽器 -->
<bean id="myMessageListener" class="cn.e3.search.listener.IndexListener"></bean>
1 public void onMessage(Message message) { 2 3 try { 4 // 初始化一個商品id 5 Long itemId = null; 7 if (message instanceof TextMessage) { 8 // 接受消息 9 TextMessage m = (TextMessage) message; 10 // 獲取商品id 11 itemId = Long.parseLong(m.getText()); 12 //根據商品id查詢數據庫新的數據 13 SearchItem searchItem = searchItemMapper.findDatabaseToSolrIndexWithID(itemId); 14 15 //把數據庫數據封裝到文檔對象 16 SolrInputDocument doc = new SolrInputDocument(); 17 //封裝文檔域字段所對應值 18 //封裝文檔域所對應數據庫查詢值 19 doc.addField("id", searchItem.getId()); 20 21 //標題 22 doc.addField("item_title", searchItem.getTitle()); 23 //買點 24 doc.addField("item_sell_point", searchItem.getSell_point()); 25 //價格 26 doc.addField("item_price", searchItem.getPrice()); 27 //圖片地址 28 doc.addField("item_image", searchItem.getImage()); 29 //商品類別 30 doc.addField("item_category_name", searchItem.getCategory_name()); 31 //商品描述 32 doc.addField("item_desc", searchItem.getItem_desc()); 33 34 //使用solr服務對象把索引文檔對象寫入索引庫,實現索引庫同步 35 solrServer.add(doc); 36 //提交 37 solrServer.commit(); 38 } 39 } catch (Exception e) { 40 // TODO Auto-generated catch block 41 e.printStackTrace(); 42 } 43 }