目的:為了將elasticsearch做成單獨的服務,那么我們必須解耦,也就是業務邏輯和搜索模塊是沒有關系的,並且是異步的。那么項目之間通信,使用的選擇有限,消息中間件是一個不錯的選擇。
消息中間件常用的:ActiveMQ,RabbitMQ,RocketMQ,Kafka等。由於activeMQ是使用java開發的,並且有比較完整的文檔,在很多公司都有較多的應用,最重要的,產品比較成熟。所以選擇了activeMQ作為中間件。由於這個項目是探索項目,所以只是一個單機版本的中間件,並且沒有對消息進行持久化,也沒有判斷消息的重復消費和丟失等,完全是最簡單的應用。
至於復雜的東西,以后會慢慢更新。
正文
1. windows下安裝並啟動activeMQ。
網址:這里下載最新版本的activeMQ,這里下載任意版本的activeMQ
下載完之后如下:
然后雙擊 bin/win64/activemq.bat (根據自己系統啟動)
啟動完成之后如下:訪問 localhost:8161 ,然后點擊 manager ActiveMQ broker 會要求輸入賬號密碼,賬號密碼分別為admin,admin。(賬號是在目錄下 conf-->users.properties配置文件中)
第三個頁面表示你已經等了成功了,在Queues和Topics里面可以看到接收到的發布,訂閱消息。
2. application.properities文件配置activeMQ的參數。
application.properties配置文件如下:
spring.activemq.broker-url=tcp://localhost:61616 //tcp連接的網址和端口,這里端口是61616,不是8161 spring.activemq.close-timeout=15s spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=10
3. 配置ActiveMQ的配置類

import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.ConnectionFactory; import javax.jms.Queue; import javax.jms.Topic; /** * @author: cc * @Date: 2018/8/29 11:22 * @Description: */ @Configuration public class JmsConfig { public final static String TOPIC_CREATE = "es.topic.create"; public final static String QUEUE_CREATE = "es.queue.create"; public final static String TOPIC_DELETE = "es.topic.delete"; public final static String QUEUE_DELETE = "es.queue.delete"; public final static String TOPIC_UPDATE = "es.topic.update"; public final static String QUEUE_UPDATE = "es.queue.update"; @Bean("esQueueCreate") public Queue esQueueCreate() { return new ActiveMQQueue(QUEUE_CREATE); } @Bean("esQueueDelete") public Queue esQueueDelete() { return new ActiveMQQueue(QUEUE_DELETE); } @Bean("esQueueUpdate") public Queue esQueueUpdate() { return new ActiveMQQueue(QUEUE_UPDATE); } @Bean("esTopicCreate") public Topic esTopicCreate() { return new ActiveMQTopic(TOPIC_CREATE); } @Bean("esTopicDelete") public Topic esTopicDelete() { return new ActiveMQTopic(TOPIC_DELETE); } @Bean("esTopicUpdate") public Topic esTopicUpdate() { return new ActiveMQTopic(TOPIC_UPDATE); } /** * topic模式的ListenerContainer * @param activeMQConnectionFactory * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } /** * queue模式的ListenerContainer * @param activeMQConnectionFactory * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } }
解釋:
這里分別定義了queue 和topic 的增加,刪除,修改的消息,分別用來接收調用服務的修改,添加,刪除數據的對應的操作(如果使用一個,那么就需要對數據進行判斷到底是執行的什么操作。)
后來定義了兩個異步的消息監聽器,本質是一樣的,一個監聽queue,一個監聽topic
4. 創建消息生成類,消息消費類
消息生成類:發送消息到activeMQ的類

import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.Destination; /** * @author: cc * @Date: 2018/8/29 11:04 * @Description: */ @Service public class Producer { @Resource private JmsTemplate jmsTemplate; public void sendMessage(Destination destination, String message) { this.jmsTemplate.convertAndSend(destination,message); } }
消息消費類:只需要在需要監聽的方法上加上注解:(topic或者queue)
@JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
@JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")
即可完成消息的監聽,你可以在你的方法上直接接受String類型的消息進行解析。
如下:

public class Consumer { private Logger logger = LoggerFactory.getLogger(this.getClass()); @JmsListener(destination = JmsConfig.TOPIC_CREATE,containerFactory = "jmsListenerContainerTopic") public void onTopicMessage(String msg) { logger.info("接收到topic消息:{}",msg); } @JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue") public void onQueueMessage(String msg) { logger.info("接收到queue消息:{}",msg); } }
5. 完成
到這里已經可以發送消息和接受消息了,但是有很多問題的,比如消息丟失,重復消費等等問題,以后在解決。目前已經可以進行項目通信了。