import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
import org.apache.log4j.Logger;
/**
* RabbitMq通用管理工具類<br>
* RabbitMQ是AMQP(高級消息隊列協議)的標准實現
* 1.單發送單(多)接收模式;<br>
* 2.fanout發布訂閱模式(fanout);<br>
* 3.routing路由模式(direct);<br>
* 4.topic通配符模式(topic)<br>
* Broker:簡單來說就是消息隊列服務器實體;
Channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務;
Exchange:交換機,決定了消息路由規則,它指定消息按什么規則,路由到哪個隊列;
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列;
Binding:綁定了Queue和Exchange,意即為符合什么樣路由規則的消息,將會放置入哪一個消息隊列;
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞;
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離;
producer:消息生產者,就是投遞消息的程序;
consumer:消息消費者,就是接受消息的程序;
* 消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
* 1.將交換機置為可持久;
2.將通道置為可持久;
3.消息發送時設置可持久;
當我們"生產"了一條可持久化的消息,嘗試中斷MQ服務,啟動消費者獲取消息,消息依然能夠恢復。相反,則拋出異常;
* 消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel;
(2)客戶端聲明一個exchange,並設置相關屬性;
(3)客戶端聲明一個queue,並設置相關屬性;
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系;
(5)客戶端投遞消息到exchange, exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里
* exchange也有幾個類型,
完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為"abc",那么客戶端提交的消息,只有設置了key為"abc"的才會投遞到隊列。
對key進行模式匹配后進行投遞的叫做Topic交換機,符號"#"匹配一個或多個詞,符號"*"匹配正好一個詞。例如"abc.#"匹配"abc.def.ghi","abc.*"只匹配"abc.def"。
還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。
* Exchange
Durability 持久性,這是exchange的可選屬性,如果你Durability設置為false,那些當前會話結束的時候,該exchange也會被銷毀;
Auto delete 當沒有隊列或者其他exchange綁定到此exchange的時候,該exchange被銷毀;
Internal 表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定;
無法聲明2個名稱相同 但是類型卻不同的exchange;
* Queue
Durability 和exchange相同,未持久化的隊列,服務重啟后銷毀;
Auto delete 當沒有消費者連接到該隊列的時候,隊列自動銷毀;
Exclusive 使隊列成為私有隊列,只有當前應用程序可用,當你需要限制隊列只有一個消費者,這是很有用的;
擴展屬性如下對應源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的參數,
Message TTL 當一個消息被推送在該隊列的時候 可以存在的時間 單位為ms,(對應擴展參數argument "x-message-ttl" );
Auto expire 在隊列自動刪除之前可以保留多長時間(對應擴展參數argument "x-expires");
Max length 一個隊列可以容納的已准備消息的數量(對應擴展參數argument "x-max-length");
一旦創建了隊列和交換機,就不能修改其標志了。例如,如果創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現創建;
* RabbitMQ消息模型的核心理念是:發布者(producer)不會直接發送任何消息給隊列。
* 事實上,發布者(producer)甚至不知道消息是否已經被投遞到隊列。
* 發布者(producer)只需要把消息發送給一個exchange。
* exchange非常簡單,它一邊從發布者方接收消息,一邊把消息推入隊列。
* exchange必須知道如何處理它接收到的消息,是應該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。
* 這些規則是通過exchange type來定義的;
* @author
*/
public class RabbitUtil1 {
/**日志對象**/
private final static Logger log = Logger.getLogger(RabbitUtil1.class);
/**RabbitMq連接工廠對象**/
private volatile static ConnectionFactory factory = null;
public static boolean stopRabbitFlag=false;
/**構造方法**/
public RabbitUtil1() {
this(ConnectionFactory.DEFAULT_HOST,
ConnectionFactory.DEFAULT_AMQP_PORT,
ConnectionFactory.DEFAULT_VHOST,
ConnectionFactory.DEFAULT_USER, ConnectionFactory.DEFAULT_PASS);
}
/**
* 構造方法
* @param serverHost Rabbit服務主機
*/
public RabbitUtil1(String serverHost) {
this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT,
ConnectionFactory.DEFAULT_VHOST,
ConnectionFactory.DEFAULT_USER, ConnectionFactory.DEFAULT_PASS);
}
/**
* 構造方法
* @param serverHost Rabbit服務主機
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, String username, String password) {
this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT,
ConnectionFactory.DEFAULT_VHOST, username, password);
}
/**
* 構造方法
* @param serverHost Rabbit服務主機
* @param vhost 虛擬host(類似權限組)
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, String vhost, String username,
String password) {
this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT, vhost, username,
password);
}
/**
* 構造方法
* @param serverHost Rabbit服務主機
* @param port Rabbit端口
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, int port, String username,
String password) {
this(serverHost, port, ConnectionFactory.DEFAULT_VHOST, username,
password);
}
/**
* 構造方法(初始化單例RabbitConnectionFactory)
* @param serverHost Rabbit服務主機
* @param port Rabbit端口
* @param vhost 虛擬host(類似權限組)
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, int port, String vhost,
String username, String password) {
if (null == factory) {
synchronized (ConnectionFactory.class) {
if (null == factory) {
factory = new ConnectionFactory();
factory.setHost(serverHost);
factory.setPort(port);
factory.setVirtualHost(vhost);
factory.setUsername(username);
factory.setPassword(password);
log.info(">>>>>>Singleton ConnectionFactory Create Success>>>>>>");
}
}
}
if(stopRabbitFlag){
stopRabbitFlag=false;
}
}
/**
* 創建連接
* @return 連接
* @throws Exception
*/
private Connection buildConnection() throws Exception {
return factory.newConnection();
}
/**
* 創建信道
* @param connection 連接
* @return 信道
* @throws Exception 運行時異常
*/
private Channel buildChannel(Connection connection) throws Exception {
return connection.createChannel();
}
/**
* 關閉連接和信道
* @param connection rabbitmq連接
* @param channel rabbitmq信道
*/
private void close(Connection connection, Channel channel) {
try {
if (null != channel) {
channel.close();
}
if (null != connection) {
connection.close();
}
} catch (Exception e) {
log.error(">>>>>>關閉RabbitMq的connection或channel發生異常>>>>>>", e);
}
}
/**
* 發送direct類型消息
* @param exchangeName exchange名稱
* @param routingKey 路由key字符串
* @param message 待發送的消息
* @throws Exception 運行時異常
*/
public void sendDirect(String exchangeName, String routingKey, String message) throws Exception {
Connection connection = null;
Channel channel = null;
try {
connection = buildConnection();
channel = buildChannel(connection);
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
log.info("消息(" + message + "發布成功");
} finally {
close(connection, channel);
}
}
/**
* 接收direct類型消息(手動發送ack)
* @param queueName 隊列名稱
* @param processer 回調處理接口
* @throws Exception 運行時異常
*/
public void receiveDirect(String queueName, RabbitCallback processer) throws Exception {
receiveDirect(queueName, false, processer);
}
/**
* 接收direct類型消息
* @param queueName 隊列名稱
* @param autoAck 是否自動發送ack true-是 false-否
* @param processer 回調處理接口
* @throws Exception 運行時異常
*/
public void receiveDirect(String queueName, boolean autoAck, RabbitCallback processer)
throws Exception {
basicConsume(queueName, autoAck, processer);
}
/**
* 循環獲取消息並處理
* @param queueName 隊列名稱
* @param autoAck 是否自動發送ack true-是 false-否
* @param processer 回調處理接口
* @throws Exception 運行時異常
*/
private void basicConsume(String queueName, final boolean autoAck, final RabbitCallback processer)
throws Exception {
final Channel channel = buildChannel(buildConnection());
/**channel.basicQos(1)
* 保證接收端僅在發送了ack之后才會接收下一個消息,
* 在這種情況下發送端會嘗試把消息發送給下一個接收端
*/
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String responseMsg = new String(body, "UTF-8");
System.out.println("Consumer===");
if (stopRabbitFlag){
if(channel!=null){
try {
channel.basicNack(deliveryTag, false, true);//回復處理失敗,客戶需要關閉
channel.close();//關閉客戶端
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}else {//若服務停止標志為false,需要處理業務,調用回調函數處理
boolean success = processer.process(responseMsg);
if (!autoAck) {
if (success) {
channel.basicAck(deliveryTag, false);
} else {
channel.basicNack(deliveryTag, false, true);
}
}
}
}
};
channel.basicConsume(queueName, autoAck, consumer);
}
}