天氣依舊很好,主要是涼快。老習慣,我在北京向各位問好。
搜索無處不在,相信各位每天都免不了與它的親密接觸,那么我想你確實有必要來了解一下它們,就上周在公司實現的一個小需求來給各位分享一下:如何在分布式環境下同步索引庫?
需求分析
公司數據庫中的數據信息每天都免不了增、刪、改操作,在執行這些簡單的更新操作時,我們不僅將變更后的數據要更新到數據庫中,同時還要馬上同步索引庫中的數據,有的時候還要同步一下緩存中的數據(本文只分享如何同步solr索引庫)。
分析方案
當我們在后台管理系統中觸發了更新操作時,不會緊跟着調用同步功能去更新索引庫和緩存這種機制去實現,因為耦合性太高了,容易影響正常的業務流程。那么,既然我們不做,做的話就要影響業務,所以我們就有必要請一位私人秘書來替我們完成同步操作了,既然請了秘書,就沒必要再去關心同步操作,而是我們只需要在更新完數據后通知這位秘書,讓它去完成同步操作,豈不更妙?好了,說了這么久,這位秘書就是英俊瀟灑不可或缺的消息隊列——MQ,為什么使用它?主要還是開源、解耦。廢話不說了,一起從簡,開始上碼。
哦,對了到這兒我就有必要說一下MQ的倆種使用模式,因為這個確實有點用,我就爬過這坑。主要分為2種:點對點(Queue)和發布\訂閱(Topic)模式。
從上圖可以看出,這倆種模式最主要的區別就是發送出去的消息可以由多少個消費者來接受,很明顯:
發布\訂閱模式:需要一個生產者發送消息到主題版塊(Topic)中,可以有多個消費者訂閱該版塊來接受消息。消費者接受消息時,必須處於運行狀態,而且只能接受運行之后的消息。
點對點模式:需要一個生產者發送消息到隊列版塊(Queue)中,只能有一個消費者從該隊列(Queue)中接受該消息。生產者發送消息時,消費者不需要處於運行狀態。
好,明確這點就夠了,我們先用起來,至於它的一些細節,你們自己去找找資料好好讀讀,因為本人也是初次使用到,后期有機會再和大家共勉。
一路走好,碼到成功
步驟一:安裝MQ(activeMQ)
狠簡單,解壓即用。如有需要請參考http://www.cnblogs.com/1315925303zxz/p/6377551.html。
步驟二:spring整合MQ
1 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> 2 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <property name="brokerURL" value="tcp://192.168.136.139:61616"/> 4 </bean> 5 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 6 <bean id="connectionFactory" 7 class="org.springframework.jms.connection.SingleConnectionFactory"> 8 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 9 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 10 </bean> 11 12 <!-- 生產者 --> 13 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 14 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 15 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 16 <property name="connectionFactory" ref="connectionFactory"/> 17 </bean> 18 <!--這個是隊列目的地:(倆種配置方式) 19 一、點對點模式:需要一個生產者發送消息到隊列版塊(Queue)中,只能有一個消費者從該隊列(Queue)中接受該消息。 20 【生產者發送消息時,消費者不需要處於運行狀態】。 21 二、發布訂閱模式:需要一個生產者發送消息到主題版塊(Topic)中,可以有多個消費者訂閱該版塊來接受消息。 22 【生產者發送消息時,消費者必須處於運行狀態,而且只能接受運行之后的消息】。 23 --> 24 <!-- 點對點模式 --> 25 <!-- <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> 26 <constructor-arg> 27 <value>test-queue</value> 28 </constructor-arg> 29 </bean> --> 30 <!-- 發布訂閱模式 --> 31 <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic"> 32 <constructor-arg value="test-topic"/> 33 </bean> 34 35 36 <!-- 消費者 --> 37 <!-- 配置自定義消息監聽器 --> 38 <bean id="myMessageListener" class="cn.soa.mq.MyMessageListener"></bean> 39 <!-- 配置MessageListenerContainer --> 40 <bean id="jmsContainer" 41 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 42 <property name="connectionFactory" ref="connectionFactory"/> 43 <property name="destination" ref="testTopic"/> <!-- 這兒注意生產者使用的是那種模式並且用哪個隊列來發送消息的 --> 44 <property name="messageListener" ref="myMessageListener"/> 45 </bean>
步驟三:執行更新操作時,通知秘書去同步索引庫、緩存等
1 @Autowired 2 private ItemMapper itemMapper; 3 //消息隊列 4 @Autowired 5 private JmsTemplate jmsTemplate; 6 @Resource(name="testTopic") 7 private Destination testTopic;
8 @Override 9 public int saveItem(Item item) { 10 final String itemId = UUIDUtils.getUUID(); 11 if(StringUtil.isNullOrBlank(item.getId())){ 12 //如果商品主鍵為空,則設置一個ID 13 item.setId(itemId); 14 } 15 if(StringUtil.isNullOrBlank(String.valueOf(item.getCreateTime()))){ 16 //如果創建時間為空,則設置當前時間為創建時間 17 item.setCreateTime(DateTimeUtils.getCurrentDate()); 18 } 19 int save = itemMapper.saveItem(item); 20 if(save == 1){ 21 //如果新增商品成功,則發送商品ID到消息隊列中,目的同步索引庫、緩存等 22 jmsTemplate.send(testTopic, new MessageCreator(){ 23 @Override 24 public Message createMessage(Session session) throws JMSException { 25 // 將商品ID發送出去 26 logger.error("發送新增商品的ID到MQ消息隊列中:{}=============================="); 27 TextMessage message = session.createTextMessage(itemId); 28 return message; 29 } 30 }); 31 } 32 return save; 33 }
步驟四:使用MQ監聽器同步索引庫(監聽器需在spring配置文件中配置)
1 public class MyMessageListener implements MessageListener{ 2 3 private final static Logger logger = LoggerFactory.getLogger(MyMessageListener.class); 4 5 @Autowired 6 private SolrServer solrServer; 7 8 @Autowired 9 private ItemService itemService; 10 11 /** 12 * 根據監聽到的商品ID來同步索引庫數據。 13 */ 14 @Override 15 public void onMessage(Message message) { 16 logger.info("============開始同步索引庫================"); 17 // 根據不同業務邏輯進行相應處理 18 if(message instanceof TextMessage){ 19 try { 20 TextMessage textMessage = (TextMessage) message; 21 String ID = textMessage.getText(); //監聽到新商品ID 22 Item newItem = itemService.findItemById(ID); //根據新主鍵查詢到商品信息 23 // 將商品數據封裝到SolrInputDocument對象 24 SolrInputDocument doc = new SolrInputDocument(); 25 doc.addField("id", newItem.getId()); 26 doc.addField("product_catalog_name", "忠哥系列"); 27 doc.addField("product_price", newItem.getPrice()); 28 doc.addField("product_name", newItem.getItemName()); 29 30 // 添加到索引庫 31 solrServer.add(doc); 32 // 提交 33 solrServer.commit(); 34 } catch (Exception e) { 35 logger.error("同步索引庫失敗:{}"+e.getMessage()); 36 } 37 } 38 } 39 }
步驟五:校驗數據是否同步成功,馬上就可以在索引庫中搜到我們剛剛新增的信息
參考:
http://www.cnblogs.com/1315925303zxz/p/6254016.html 仿京東站內搜索案例
http://www.cnblogs.com/1315925303zxz/p/6372004.html solrcloud集群環境搭建
與君共勉!每天都有新技能!