package rabbitmq.demo; import com.rabbitmq.client.AMQP; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RunWith(SpringRunner.class) @SpringBootTest public class DemoApplicationTests { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法 @Autowired AmqpAdmin amqpAdmin; /** * 創建交換器 Exchange */ @Test public void createExchange() { // 創建Exchange 交換器 amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); System.out.println("創建Exchange完成"); // 創建Queue 隊列,持久化 amqpAdmin.declareQueue(new Queue("amqpproduct", true)); System.out.println("創建Queue完成"); // 創建綁定規則 amqpAdmin.declareBinding(new Binding("amqpproduct", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqpproduct", null)); System.out.println("創建綁定規則完成"); } /** * 發送消息給RabbitMQ消息隊列 發送單播 點對點的消息 direct方式 */ @Test public void rabbitMqSendTest() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //方式一 以默認序列化的方式發送,如用json的方式,寫RabbitConfig文件 rabbitTemplate.convertAndSend("myexchange.direct", "myproduct", map); } /** * 發送消息給RabbitMQ消息隊列 廣播方式 fanout方式 */ @Test public void rabbitMqSendTest2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //fanout廣播方式 不用填路由鍵 無效 rabbitTemplate.convertAndSend("myexchange.fanout", "", map); } /** * 從RabbitMQ接收消息 */ @Test public void rabbitMqGetTest() { //收取消息成功后,會從相對應的消息隊列里刪除該消息 Object o = rabbitTemplate.receiveAndConvert("myproduct"); System.out.println("接收的消息隊列數據類型:" + o.getClass()); System.out.println("接收的消息隊列數據:" + o.toString()); } }
package rabbitmq.demo.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { /** * 用json方式替換默認的序列化 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
package rabbitmq.demo.service; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitListenerService { /** * 添加監聽器,自動獲取消息隊列信息 * @param o */ @RabbitListener(queues = {"myproduct","myproduct","myarticle.new"}) //可監聽多個queue public void rabbitMqListenerReceive(Object o){ // 需要在啟動入口添加 @EnableRabbit 開啟基於注解的rabbitMQ模式 System.out.println("接收的消息隊列數據類型:" + o.getClass()); System.out.println("接收的消息隊列數據:" + o.toString()); } }
#application.yml server: port: 8080 spring: #給項目來個名字 application: name: rabbitmq-provider #配置rabbitMq 服務器 rabbitmq: host: 139.199.162.126 port: 5672 username: xxxx password: xxxx #虛擬host 可以不設置,使用server默認host #virtual-host: JCcccHost
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>