ElasticSearch(九):springboot項目集成消息中間件activeMQ


目的:為了將elasticsearch做成單獨的服務,那么我們必須解耦,也就是業務邏輯和搜索模塊是沒有關系的,並且是異步的。那么項目之間通信,使用的選擇有限,消息中間件是一個不錯的選擇。

消息中間件常用的:ActiveMQ,RabbitMQ,RocketMQ,Kafka等。由於activeMQ是使用java開發的,並且有比較完整的文檔,在很多公司都有較多的應用,最重要的,產品比較成熟。所以選擇了activeMQ作為中間件。由於這個項目是探索項目,所以只是一個單機版本的中間件,並且沒有對消息進行持久化,也沒有判斷消息的重復消費和丟失等,完全是最簡單的應用。

至於復雜的東西,以后會慢慢更新。

正文

1. windows下安裝並啟動activeMQ。

網址:這里下載最新版本的activeMQ這里下載任意版本的activeMQ

下載完之后如下:

然后雙擊 bin/win64/activemq.bat  (根據自己系統啟動)

啟動完成之后如下:訪問 localhost:8161 ,然后點擊 manager ActiveMQ broker  會要求輸入賬號密碼,賬號密碼分別為admin,admin。(賬號是在目錄下 conf-->users.properties配置文件中)

第三個頁面表示你已經等了成功了,在Queues和Topics里面可以看到接收到的發布,訂閱消息。

2. application.properities文件配置activeMQ的參數。

application.properties配置文件如下:

spring.activemq.broker-url=tcp://localhost:61616    //tcp連接的網址和端口,這里端口是61616,不是8161
spring.activemq.close-timeout=15s
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=10

3. 配置ActiveMQ的配置類

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * @author: cc
 * @Date: 2018/8/29 11:22
 * @Description:
 */
@Configuration
public class JmsConfig {

    public final static String TOPIC_CREATE = "es.topic.create";
    public final static String QUEUE_CREATE = "es.queue.create";
    public final static String TOPIC_DELETE = "es.topic.delete";
    public final static String QUEUE_DELETE = "es.queue.delete";
    public final static String TOPIC_UPDATE = "es.topic.update";
    public final static String QUEUE_UPDATE = "es.queue.update";

    @Bean("esQueueCreate")
    public Queue esQueueCreate() {
        return new ActiveMQQueue(QUEUE_CREATE);
    }
    @Bean("esQueueDelete")
    public Queue esQueueDelete() {
        return new ActiveMQQueue(QUEUE_DELETE);
    }
    @Bean("esQueueUpdate")
    public Queue esQueueUpdate() {
        return new ActiveMQQueue(QUEUE_UPDATE);
    }

    @Bean("esTopicCreate")
    public Topic esTopicCreate() {
        return new ActiveMQTopic(TOPIC_CREATE);
    }
    @Bean("esTopicDelete")
    public Topic esTopicDelete() {
        return new ActiveMQTopic(TOPIC_DELETE);
    }
    @Bean("esTopicUpdate")
    public Topic esTopicUpdate() {
        return new ActiveMQTopic(TOPIC_UPDATE);
    }
    /**
     * topic模式的ListenerContainer
     * @param activeMQConnectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

    /**
     * queue模式的ListenerContainer
     * @param activeMQConnectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }
}
View Code

解釋:

這里分別定義了queue 和topic 的增加,刪除,修改的消息,分別用來接收調用服務的修改,添加,刪除數據的對應的操作(如果使用一個,那么就需要對數據進行判斷到底是執行的什么操作。)

后來定義了兩個異步的消息監聽器,本質是一樣的,一個監聽queue,一個監聽topic

4. 創建消息生成類,消息消費類

消息生成類:發送消息到activeMQ的類

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;

/**
 * @author: cc
 * @Date: 2018/8/29 11:04
 * @Description:
 */

@Service
public class Producer {
    @Resource
    private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination, String message) {
        this.jmsTemplate.convertAndSend(destination,message);
    }

}
View Code

消息消費類:只需要在需要監聽的方法上加上注解:(topic或者queue)

@JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
@JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")

即可完成消息的監聽,你可以在你的方法上直接接受String類型的消息進行解析。

如下:

public class Consumer {
    private  Logger logger = LoggerFactory.getLogger(this.getClass());

    @JmsListener(destination = JmsConfig.TOPIC_CREATE,containerFactory = "jmsListenerContainerTopic")
    public void onTopicMessage(String msg) {
        logger.info("接收到topic消息:{}",msg);
    }
    
    @JmsListener(destination = JmsConfig.QUEUE_CREATE,containerFactory = "jmsListenerContainerQueue")
    public void onQueueMessage(String msg) {
        logger.info("接收到queue消息:{}",msg);
    }
}
View Code

5. 完成

到這里已經可以發送消息和接受消息了,但是有很多問題的,比如消息丟失,重復消費等等問題,以后在解決。目前已經可以進行項目通信了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM