package rabbitmq.demo;
import com.rabbitmq.client.AMQP;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
@Autowired
AmqpAdmin amqpAdmin;
/**
* 創建交換器 Exchange
*/
@Test
public void createExchange() {
// 創建Exchange 交換器
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
System.out.println("創建Exchange完成");
// 創建Queue 隊列,持久化
amqpAdmin.declareQueue(new Queue("amqpproduct", true));
System.out.println("創建Queue完成");
// 創建綁定規則
amqpAdmin.declareBinding(new Binding("amqpproduct", Binding.DestinationType.QUEUE,
"amqpadmin.exchange", "amqpproduct", null));
System.out.println("創建綁定規則完成");
}
/**
* 發送消息給RabbitMQ消息隊列 發送單播 點對點的消息 direct方式
*/
@Test
public void rabbitMqSendTest() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//方式一 以默認序列化的方式發送,如用json的方式,寫RabbitConfig文件
rabbitTemplate.convertAndSend("myexchange.direct", "myproduct", map);
}
/**
* 發送消息給RabbitMQ消息隊列 廣播方式 fanout方式
*/
@Test
public void rabbitMqSendTest2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//fanout廣播方式 不用填路由鍵 無效
rabbitTemplate.convertAndSend("myexchange.fanout", "", map);
}
/**
* 從RabbitMQ接收消息
*/
@Test
public void rabbitMqGetTest() {
//收取消息成功后,會從相對應的消息隊列里刪除該消息
Object o = rabbitTemplate.receiveAndConvert("myproduct");
System.out.println("接收的消息隊列數據類型:" + o.getClass());
System.out.println("接收的消息隊列數據:" + o.toString());
}
}
package rabbitmq.demo.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 用json方式替換默認的序列化
* @return
*/
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();
}
}
package rabbitmq.demo.service;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitListenerService {
/**
* 添加監聽器,自動獲取消息隊列信息
* @param o
*/
@RabbitListener(queues = {"myproduct","myproduct","myarticle.new"}) //可監聽多個queue
public void rabbitMqListenerReceive(Object o){
// 需要在啟動入口添加 @EnableRabbit 開啟基於注解的rabbitMQ模式
System.out.println("接收的消息隊列數據類型:" + o.getClass());
System.out.println("接收的消息隊列數據:" + o.toString());
}
}
#application.yml
server:
port: 8080
spring:
#給項目來個名字
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: 139.199.162.126
port: 5672
username: xxxx
password: xxxx
#虛擬host 可以不設置,使用server默認host
#virtual-host: JCcccHost
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>