Java RabbitMQ配置和使用,基於SpringBoot


package rabbitmq.demo;

import com.rabbitmq.client.AMQP;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法

    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 創建交換器 Exchange
     */
    @Test
    public void createExchange() {
        // 創建Exchange 交換器
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        System.out.println("創建Exchange完成");
        // 創建Queue 隊列,持久化
        amqpAdmin.declareQueue(new Queue("amqpproduct", true));
        System.out.println("創建Queue完成");
        // 創建綁定規則
        amqpAdmin.declareBinding(new Binding("amqpproduct", Binding.DestinationType.QUEUE,
                "amqpadmin.exchange", "amqpproduct", null));
        System.out.println("創建綁定規則完成");
    }

    /**
     * 發送消息給RabbitMQ消息隊列 發送單播 點對點的消息  direct方式
     */
    @Test
    public void rabbitMqSendTest() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);

        //方式一  以默認序列化的方式發送,如用json的方式,寫RabbitConfig文件
        rabbitTemplate.convertAndSend("myexchange.direct", "myproduct", map);
    }

    /**
     * 發送消息給RabbitMQ消息隊列 廣播方式  fanout方式
     */
    @Test
    public void rabbitMqSendTest2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);

        //fanout廣播方式 不用填路由鍵 無效
        rabbitTemplate.convertAndSend("myexchange.fanout", "", map);
    }

    /**
     * 從RabbitMQ接收消息
     */
    @Test
    public void rabbitMqGetTest() {
        //收取消息成功后,會從相對應的消息隊列里刪除該消息
        Object o = rabbitTemplate.receiveAndConvert("myproduct");
        System.out.println("接收的消息隊列數據類型:" + o.getClass());
        System.out.println("接收的消息隊列數據:" + o.toString());
    }

}
package rabbitmq.demo.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    /**
     * 用json方式替換默認的序列化
     * @return
     */
    @Bean
    public MessageConverter messageConverter()
    {
        return  new Jackson2JsonMessageConverter();
    }
}
package rabbitmq.demo.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitListenerService {
    /**
     * 添加監聽器,自動獲取消息隊列信息
     * @param o
     */
    @RabbitListener(queues = {"myproduct","myproduct","myarticle.new"})  //可監聽多個queue
    public  void  rabbitMqListenerReceive(Object o){
        // 需要在啟動入口添加 @EnableRabbit   開啟基於注解的rabbitMQ模式
        System.out.println("接收的消息隊列數據類型:" + o.getClass());
        System.out.println("接收的消息隊列數據:" + o.toString());
    }
}
#application.yml
server:
  port: 8080
spring:
  #給項目來個名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服務器
  rabbitmq:
    host: 139.199.162.126
    port: 5672
    username: xxxx
    password: xxxx
    #虛擬host 可以不設置,使用server默認host
    #virtual-host: JCcccHost
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM