springboot-25-springboot 集成 ActiveMq


消息的發布有2種形式, 隊列式(點對點) 和主題式(pub/sub) 模式, 隊列式發布后, 接收者從隊列中獲取消息后, 消息就會消失, 但任意消費者都可以從隊列中接受消息, 消息只能被接受一次

主題式則為接受后消息不消失

JMS 是(java message service) 是 基於JVM代理的規范, ActiveMQ是他的一種實現

2、JMS消息基本組件

2.1、ConnectionFactory

創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。

2.2、Destination

Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。

所以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。

2.3、Connection

Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。

2.4、Session

Session是操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。

2.5、消息的生產者

消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。

2.6、消息消費者

消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。

2.7、MessageListener

消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

 

1, 安裝activeMQ

1), linux安裝

訪問: http://activemq.apache.org/activemq-5111-release.html

2), docker安裝

sudo docker run -d -p 61616:61616 -p 8161:8161 cloudesire/activemq

安裝成功后, 可訪問: 

 

3) 使用springboot的內嵌 activemq

 <!--springboot 內嵌 activemq-->
        <dependency>
            <groupId>org.apacke.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
        </dependency>

2, 新建springboot項目

1), 添加依賴

<dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
        </dependency>

2), 添加配置:  

spring: 
  activemq:
    broker-url: tcp://192.168.50.202:61616

支持的配置有: 

  activemq:
    broker-url: tcp://192.168.50.202:61616
    user:
    password:
    in-memory:

3) 消息發送者: 

package com.wenbronk.enterprise.jms.message;

import org.springframework.jms.core.MessageCreator;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * 消息創建者
 * Created by wenbronk on 2017/6/13.
 */
public class CreateMessage implements MessageCreator{
    @Override
    public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage("測試消息");
    }
}

4) 消息接受着: 

package com.wenbronk.enterprise.jms.message;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 * Created by wenbronk on 2017/6/13.
 */
@Component
public class ReceiMessage {

    /**
     * 使用 @JmsListener 指定要監聽的域, 有消息發送時就會發送到此域中
     * @param message
     */
    @JmsListener(destination = "my-destination")
    public void receiveMessage(String message) {
        System.out.println("接受到的消息是: " + message);
    }

}

5), 消息發送: 

package com.wenbronk.enterprise.jms.config;

import com.wenbronk.enterprise.jms.message.CreateMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

/**
 * 消息發送及目的地定義
 * 實現commandLineRunner, 使得程序啟動后執行該類的run() 方法
 * Created by wenbronk on 2017/6/13.
 */
@Component
public class MessageConfig implements CommandLineRunner {

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * 消息域為  my-destination
     * 發送者為 createMessage()
     * @param args
     * @throws Exception
     */
    @Override
    public void run(String... args) throws Exception {
        jmsTemplate.send("my-destination", new CreateMessage());
    }
}

 

 

如果想自定義使用 topic 或者 queue 模式, 需要自己指定: 

MessageConfig

package com.wenbronk.enterprise.jms.config;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 創建 點對點模式的對象和發布訂閱模式的對象
 * Created by wenbronk on 2017/6/13.
 */
@Configuration
public class TopicConfig {
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("sample.queue");
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic("sample.topic");
    }
}

消息發布者:

package com.wenbronk.enterprise.jms.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 點對點和訂閱模式的消息發布
 * Created by wenbronk on 2017/6/13.
 */
@Component
@EnableScheduling
public class MessageProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @Scheduled(fixedDelay=3000)//每3s執行1次
    public void send() {
        //send queue.
        this.jmsMessagingTemplate.convertAndSend(this.queue, "hi,activeMQ");
        //send topic.
        this.jmsMessagingTemplate.convertAndSend(this.topic, "hi,activeMQ(topic)");
    }

}

消息接受者

package com.wenbronk.enterprise.jms.message;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 點對點和訂閱模式的消息接受着
 * Created by wenbronk on 2017/6/13.
 */
@Component
public class MessageConsumer {

    @JmsListener(destination = "sample.topic")
    public void receiveQueue(String text) {
        System.out.println("Consumer2="+text);
    }

    @JmsListener(destination = "sample.topic")
    public void receiveTopic(String text) {
        System.out.println("Consumer3="+text);
    }

}

需要在配置文件中開啟 發布訂閱模式的支持: 

spring:   
 activemq: broker
-url: tcp://192.168.50.202:61616 jms: pub-sub-domain: true

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM