原文:https://blog.csdn.net/ctwy291314/article/details/80534604
RabbitMQ安裝請參照RabbitMQ應用
不啰嗦直接上代碼
目錄結構如下:
pom.xml
-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
<modelVersion>4.0.0</modelVersion>
-
-
<groupId>com.test</groupId>
-
<artifactId>RabbitMQ_MQTT</artifactId>
-
<version>0.0.1-SNAPSHOT</version>
-
<packaging>jar</packaging>
-
-
<name>RabbitMQ_MQTT</name>
-
<url>http://maven.apache.org</url>
-
-
<parent>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-parent</artifactId>
-
<version>1.5.6.RELEASE</version>
-
<relativePath /> <!-- lookup parent from repository -->
-
</parent>
-
-
<properties>
-
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
<java.version>1.8</java.version>
-
</properties>
-
-
<dependencies>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-test</artifactId>
-
<scope>test</scope>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-devtools</artifactId>
-
<optional>true</optional>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-web</artifactId>
-
</dependency>
-
<!-- <dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId>
-
<version>1.12</version> </dependency> -->
-
</dependencies>
-
<build>
-
<plugins>
-
<plugin>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-maven-plugin</artifactId>
-
<configuration>
-
<fork>true</fork>
-
</configuration>
-
</plugin>
-
</plugins>
-
</build>
-
-
-
</project>
application.properties
-
servier.port= 8080
-
-
-
spring.rabbitmq.queues=topic.1,mqtt.test.*,mqtt.test.dd
-
spring.rabbitmq.host= 127.0.0.1
-
spring.rabbitmq.port= 5672
-
spring.rabbitmq.username=guest
-
spring.rabbitmq.password=guest
-
spring.rabbitmq.publisher-confirms= true
-
spring.rabbitmq.virtual-host=/
Application.java
-
package com.gm;
-
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.boot.SpringApplication;
-
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-
import org.springframework.boot.autoconfigure.SpringBootApplication;
-
import org.springframework.context.annotation.ComponentScan;
-
import org.springframework.context.annotation.Configuration;
-
import org.springframework.web.bind.annotation.PostMapping;
-
import org.springframework.web.bind.annotation.RequestMapping;
-
import org.springframework.web.bind.annotation.RestController;
-
-
import com.gm.rabbit.CallBackSender;
-
-
-
-
-
-
-
public class Application {
-
-
-
private CallBackSender sender;
-
-
public static void main(String[] args) {
-
SpringApplication.run(Application.class, args);
-
}
-
-
-
public void callbak() {
-
sender.send( "topic.baqgl.admin.1", "測試消息");
-
}
-
}
RabbitConfig.java
-
package com.gm.rabbit;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
-
import org.springframework.amqp.core.AcknowledgeMode;
-
import org.springframework.amqp.core.Binding;
-
import org.springframework.amqp.core.BindingBuilder;
-
import org.springframework.amqp.core.DirectExchange;
-
import org.springframework.amqp.core.Message;
-
import org.springframework.amqp.core.Queue;
-
import org.springframework.amqp.core.TopicExchange;
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
-
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
-
import org.springframework.beans.factory.annotation.Value;
-
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
-
import org.springframework.context.annotation.Scope;
-
-
-
public class RabbitConfig {
-
-
-
private String addresses;
-
-
-
private String port;
-
-
-
private String username;
-
-
-
private String password;
-
-
-
private String virtualHost;
-
-
-
private boolean publisherConfirms;
-
-
-
private String queues;
-
-
final static String EXCHANGE_NAME = "amq.topic";
-
final static String QUEUE_NAME = "topic.baqgl.*.*";
-
final static String ROUTING_KEY = "topic.baqgl.#";
-
-
-
public ConnectionFactory connectionFactory() {
-
-
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
-
connectionFactory.setAddresses(addresses + ":" + port);
-
connectionFactory.setUsername(username);
-
connectionFactory.setPassword(password);
-
connectionFactory.setVirtualHost(virtualHost);
-
/** 如果要進行消息回調,則這里必須要設置為true */
-
connectionFactory.setPublisherConfirms(publisherConfirms);
-
return connectionFactory;
-
}
-
-
-
/** 因為要設置回調類,所以應是prototype類型,如果是singleton類型,則回調類為最后一次設置 */
-
-
public RabbitTemplate rabbitTemplate() {
-
RabbitTemplate template = new RabbitTemplate(connectionFactory());
-
return template;
-
}
-
-
-
TopicExchange exchange() {
-
return new TopicExchange(EXCHANGE_NAME);
-
}
-
-
-
public Queue queue() {
-
return new Queue(QUEUE_NAME, true);
-
}
-
-
-
public Binding binding() {
-
return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY);
-
}
-
-
-
-
public SimpleMessageListenerContainer messageContainer() {
-
/*Queue[] q = new Queue[queues.split(",").length];
-
for (int i = 0; i < queues.split(",").length; i++) {
-
q[i] = new Queue(queues.split(",")[i]);
-
}*/
-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
-
container.setQueues(queue());
-
container.setExposeListenerChannel( true);
-
container.setMaxConcurrentConsumers( 1);
-
container.setConcurrentConsumers( 1);
-
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
-
container.setMessageListener( new ChannelAwareMessageListener() {
-
-
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
-
try {
-
System.out.println(
-
"消費端接收到消息:" + message.getMessageProperties() + ":" + new String(message.getBody()));
-
System.out.println( "topic:"+message.getMessageProperties().getReceivedRoutingKey());
-
// deliveryTag是消息傳送的次數,我這里是為了讓消息隊列的第一個消息到達的時候拋出異常,處理異常讓消息重新回到隊列,然后再次拋出異常,處理異常拒絕讓消息重回隊列
-
/*if (message.getMessageProperties().getDeliveryTag() == 1
-
|| message.getMessageProperties().getDeliveryTag() == 2) {
-
throw new Exception();
-
}*/
-
-
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只確認當前一個消息收到,true確認所有consumer獲得的消息
-
} catch (Exception e) {
-
e.printStackTrace();
-
-
if (message.getMessageProperties().getRedelivered()) {
-
System.out.println( "消息已重復處理失敗,拒絕再次接收...");
-
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
-
} else {
-
System.out.println( "消息即將再次返回隊列處理...");
-
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue為是否重新回到隊列
-
}
-
}
-
}
-
});
-
return container;
-
}
-
-
}
CallBackSender.java
-
package com.gm.rabbit;
-
-
import java.util.UUID;
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.amqp.rabbit.support.CorrelationData;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.stereotype.Component;
-
-
-
public class CallBackSender implements RabbitTemplate.ConfirmCallback {
-
-
private RabbitTemplate rabbitTemplate;
-
-
public void send(String topic, String message) {
-
rabbitTemplate.setConfirmCallback( this);
-
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
-
-
System.out.println( "消息id:" + correlationData.getId());
-
//用RabbitMQ發送MQTT需將exchange配置為amq.topic
-
this.rabbitTemplate.convertAndSend("amq.topic", topic, message, correlationData);
-
}
-
-
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-
System.out.println( "消息id:" + correlationData.getId());
-
if (ack) {
-
System.out.println( "消息發送確認成功");
-
} else {
-
System.out.println( "消息發送確認失敗:" + cause);
-
}
-
}
-
}
ApplicationTests.java
-
package com.gm;
-
-
import org.junit.Test;
-
import org.junit.runner.RunWith;
-
import org.springframework.boot.test.context.SpringBootTest;
-
import org.springframework.test.context.junit4.SpringRunner;
-
-
-
-
public class ApplicationTests {
-
-
-
public void contextLoads() {
-
System.out.println( "hello world");
-
}
-
-
}
TopicTest.java
-
package com.gm.rabbit;
-
-
import org.junit.Test;
-
import org.junit.runner.RunWith;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.boot.test.context.SpringBootTest;
-
import org.springframework.test.context.junit4.SpringRunner;
-
-
-
-
public class TopicTest {
-
-
-
private CallBackSender sender;
-
-
-
public void topic() throws Exception {
-
sender.send( "topic.baqgl.admin.1", "測試消息");
-
}
-
}
本文選擇的是RabbitMQ集成MQTT,並實現消息持久化,如不需要集成MQTT只需修改RabbitConfig.java中的EXCHANGE_NAME即可。
集成MQTT相關配置:
集成MQTT相關配置:
創建用戶:
-
創建賬號
-
rabbitmqctl add_user admin 123456
-
設置用戶角色
-
rabbitmqctl set_user_tags admin administrator
-
設置用戶權限
-
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
-
設置完成后可以查看當前用戶和角色(需要開啟服務)
-
rabbitmqctl list_users
安裝插件:
-
rabbitmq-plugins enable rabbitmq_management
-
rabbitmq-plugins enable rabbitmq_mqtt
默認配置。window下,rabbitmq的配置文件在C:\Users\Administrator\AppData\Roaming\RabbitMQ下。沒配置的情況下,采用如下配置:
-
[{rabbit, [{tcp_listeners, [ 5672]}]},
-
{rabbitmq_mqtt, [{default_user, << "admin">>},
-
{default_pass, << "123456">>},
-
{allow_anonymous, true},
-
{vhost, << "/">>},
-
{exchange, << "amq.topic">>},
-
{subscription_ttl, 1800000},
-
{prefetch, 10},
-
{ssl_listeners, []},
-
%% Default MQTT with TLS port is 8883
-
%% {ssl_listeners, [ 8883]}
-
{tcp_listeners, [ 1883]},
-
{tcp_listen_options, [{backlog, 128},
-
{nodelay, true}]}]}
-
].