1,什么是RabbitMq
RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。
2,幾種MQ對比
RabbitMQ 是用Erlang 語言進行開發的,一款設計之初就是抗高並發的語言
3,RabbitMQ 安裝
1.下載並安裝erlang,下載地址:http://www.erlang.org/download 2.配置erlang環境變量信息 新增環境變量ERLANG_HOME=erlang的安裝地址 將%ERLANG_HOME%\bin加入到path中 3.下載並安裝RabbitMQ,下載地址:http://www.rabbitmq.com/download.html 注意: RabbitMQ 它依賴於Erlang,需要先安裝Erlang。
RabbitMQ 管理平台地址 http://127.0.0.1:15672
默認賬號:guest/guest 用戶可以自己創建新的賬號
https://blog.csdn.net/qq_35098526/article/details/80009424 安裝之后啟動不了,可以在sbin 里面:
輸入:rabbitmq-plugins enable rabbitmq_management (先定位到rabbitmq安裝目錄)命令,出現plugins安裝成功的提示。
過程:
Microsoft Windows [Version 10.0.17134.950] C:\Program Files\RabbitMQ Server> C:\Program Files\RabbitMQ Server>cd rabbitmq_server-3.7.8 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.8>cd sbin C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.8\sbin>rabbitmq-plugins enable rabbitmq_management Enabling plugins on node rabbit@DESKTOP-2MDM24J: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@DESKTOP-2MDM24J... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch started 3 plugins.
4,RabbitMQ 五種隊列形式
1.點對點隊列,也可以叫做簡單隊列
生產者投遞的消息,每次只准一個消費者來消費,如果消費者集群的話,消息會被均攤。
例如:50 個消息,2個消費者,消費者1會消費奇數,消費者2會消費偶數,兩個消費者不受影響,各自消費各自的消息
producer:
public class Producer { private static final String QUEUE_NAME = "rabbitmq_simple_queue_one"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectionUtils.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 3.創建隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "Hello, World :" + i; System.out.println(msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } channel.close(); connection.close(); } }
consumer1:
public class Consumer { private static final String QUEUE_NAME = "rabbitmq_simple_queue_one"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("consumer1"); Connection connection = MQConnectionUtils.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 3.創建隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消費者獲取消息:" + msgString); } }; // 3.監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自動簽收的應答模式 } }
consumer2:
public class Consumer2 { private static final String QUEUE_NAME = "rabbitmq_simple_queue_one"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("consumer2"); Connection connection = MQConnectionUtils.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 3.創建隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消費者獲取消息:" + msgString); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 3.監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自動簽收的應答模式 } }
2,工作隊列模式,也可以叫做公平隊列模式
點對點簡單隊列弊端:消費者集群的話,消息會被均攤處理,但是不同的消費者處理消息的能力是不同的,consumer1 每秒處理1個消息,consumer2 美妙處理3個消息,如果消息均攤,consumer1的效率則被浪費。
公平消費模式:誰處理的快,並且采用手動簽收,告知RabbitMQ之后,RabbitMQ 再給分發消息。這樣,誰處理的快,誰就會處理的多。
producer:
public class Producer { // 公平隊列名稱 private static final String QUEUE_NAME = "rabbitmq_fair_queue_one"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectionUtils.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 創建隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 保證消費者只能取一個/每次 channel.basicQos(1); //每次只給消費者1條消息,等消費完成,手動ack 應答之后,再給下一條 for (int i = 0; i < 50; i++) { String msg = "Hello, World: " + i; System.out.println(msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } channel.close(); connection.close(); } }
consumer1:
public class Consumer { private static final String QUEUE_NAME = "rabbitmq_fair_queue_one"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("consumer01"); Connection connection = MQConnectionUtils.getConnection(); // 創建通道 final Channel channel = connection.createChannel(); // 3.創建隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消費者獲取消息:" + msgString); try { Thread.sleep(200); } catch (Exception e) { } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 3.監聽隊列 channel.basicConsume(QUEUE_NAME, false, consumer); //false 代表使用手動消息應答,需要使用channel.basicAck(envelope.getDeliveryTag(),false) 告知消息中間件 } }
consumer2:
public class Consumer2 { private static final String QUEUE_NAME = "rabbitmq_fair_queue_one"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("consumer02"); Connection connection = MQConnectionUtils.getConnection(); // 創建通道 final Channel channel = connection.createChannel(); // 3.創建隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消費者獲取消息:" + msgString); try { Thread.sleep(1000); //讓這個消費者處理消息的能力更差一點 } catch (Exception e) { } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 3.監聽隊列 channel.basicConsume(QUEUE_NAME, false, consumer); } }
3,發布訂閱模式,采用fanout 扇形交換機,
高級隊列模式中,有交換機,生產者將消息發給交換機,在根據交換機的類型,發給定的的隊列,然后發給指定的消費者消費
producer:
public class Producer { // 定義交換機名稱 private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one"; // 定義交換機類型 private static final String EXCHANGE_TYPE = "fanout"; public static void main(String[] args) throws IOException, TimeoutException { // 和rabbitmq 建立連接 Connection connection = MQConnectionUtils.getConnection(); // 創建channel Channel channel = connection.createChannel(); // 創建交換機 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String message = "pub/sub"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); } }
郵件消費者:
ublic class EmailConsumer { private static final String QUEUE_NAME = "rabbitmq_pubsub_email_queue_one"; private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("郵件消費者。。。"); Connection connection = MQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 定義隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 將隊列和交換機進行綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息 :" + msg); } }; // 消費者監聽隊列消息 true 代表自動簽收 channel.basicConsume(QUEUE_NAME, true, consumer); } }
短信消費者:
// 信息消費者 public class TextConsumer { private static final String QUEUE_NAME = "rabbitmq_pubsub_text_queue_one"; private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消費者。。。"); Connection connection = MQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 定義隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 將隊列和交換機進行綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息 :" + msg); } }; // 消費者監聽隊列消息 true 代表自動簽收 channel.basicConsume(QUEUE_NAME, true, consumer); } }
4,路由模式:采用direct 交換機
producer:
public class Producer { // 定義交換機名稱 private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one"; // 定義交換機類型 private static final String EXCHANGE_TYPE = "direct"; // 定義路由 private static final String ROUTINGKEY = "info"; public static void main(String[] args) throws IOException, TimeoutException { // 和rabbitmq 建立連接 Connection connection = MQConnectionUtils.getConnection(); // 創建channel Channel channel = connection.createChannel(); // 創建交換機 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String message = "pub/sub"; channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes()); channel.close(); connection.close(); } }
郵件消費者:
public class EmailConsumer { private static final String QUEUE_NAME = "rabbitmq_direct_email_queue_one"; private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one"; private static final String ROUTINGKEY_INFO = "info"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("郵件消費者。。。"); Connection connection = MQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 定義隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 將隊列和交換機進行綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息 :" + msg); } }; // 消費者監聽隊列消息 true 代表自動簽收 channel.basicConsume(QUEUE_NAME, true, consumer); } }
短信消費者:
public class TextConsumer { private static final String QUEUE_NAME = "rabbitmq_direct_text_queue_one"; private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one"; // 設定路由 private static final String ROUTINGKEY_INFO = "info"; private static final String ROUTINGKEY_WARN = "warn"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消費者。。。"); Connection connection = MQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 定義隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 將隊列和交換機進行綁定 綁定路由 //info 和 warn 路由的都能接收到 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_WARN); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息 :" + msg); } }; // 消費者監聽隊列消息 true 代表自動簽收 channel.basicConsume(QUEUE_NAME, true, consumer); } }
5,通配符模式,采用topic 交換機 # 代表任意 * 代表一個
producer:
public class Producer { // 定義交換機名稱 private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one"; // 定義交換機類型 private static final String EXCHANGE_TYPE = "topic"; // 定義路由 private static final String ROUTINGKEY = "routingkey.info.error.warn"; public static void main(String[] args) throws IOException, TimeoutException { // 和rabbitmq 建立連接 Connection connection = MQConnectionUtils.getConnection(); // 創建channel Channel channel = connection.createChannel(); // 創建交換機 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String message = "pub/sub"; channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes()); channel.close(); connection.close(); } }
郵件消費者:
public class EmailConsumer { private static final String QUEUE_NAME = "rabbitmq_topic_email_queue_one"; private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one"; private static final String ROUTINGKEY = "routingkey.#"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("郵件消費者。。。"); Connection connection = MQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 定義隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 將隊列和交換機進行綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息 :" + msg); } }; // 消費者監聽隊列消息 true 代表自動簽收 channel.basicConsume(QUEUE_NAME, true, consumer); } }
短信消費者:
public class TextConsumer { private static final String QUEUE_NAME = "rabbitmq_topic_text_queue_one"; private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one"; private static final String ROUTINGKEY = "routingkey.info.*"; // 設定路由 public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消費者。。。"); Connection connection = MQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 定義隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 將隊列和交換機進行綁定 綁定路由 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息 :" + msg); } }; // 消費者監聽隊列消息 true 代表自動簽收 channel.basicConsume(QUEUE_NAME, true, consumer); } }