1.安裝erlang
下載地址:http://www.erlang.org/downloads
設置ERLANG環境變量
2.安裝RabbitMQ
下載地址: http://www.rabbitmq.com/download.html
輸入命令安裝各種管理插件:
D:\RabbitMQServer\rabbitmq_server-3.7.10\sbin>rabbitmq-plugins enable rabbitmq_management
重啟服務
net stop rabbitmq && net start rabbitmq
登錄
http://127.0.0.1:15672 默認用戶名密碼 guest guest
常用命令(RabbitMQ命令在sbin目錄下D:\RabbitMQServer\rabbitmq_server-3.7.10\sbin,記得設置環境變量)
rabbitmqctl delete_vhost test_vhosts 刪除虛擬機test_vhosts
3. RabbitMQ知識整理
來自(https://blog.csdn.net/dreamchasering/article/details/77653512)
什么是MQ?
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。
RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。
1、隊列、生產者、消費者
隊列是RabbitMQ的內部對象,用於存儲消息。生產者(下圖中的P)生產消息並投遞到隊列中,消費者(下圖中的C)可以從隊列中獲取消息並消費。
多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息並處理。
2、Exchange、Binding
剛才我們看到生產者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),再通過Binding將Exchange與Queue關聯起來。
3、Exchange Type、Bingding key、routing key
在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。
RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。
附上一張RabbitMQ的結構圖:
最后來具體解析一下幾個問題:
1、可以自動創建隊列,也可以手動創建隊列,如果自動創建隊列,那么是誰負責創建隊列呢?是生產者?還是消費者?
如果隊列不存在,當然消費者不會收到任何的消息。但是如果隊列不存在,那么生產者發送的消息就會丟失。所以,為了數據不丟失,消費者和生產者都可以創建隊列。那么如果創建一個已經存在的隊列呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是隊列屬性並不會改變。
隊列對於負載均衡的處理是完美的。對於多個消費者來說,RabbitMQ使用輪詢的方式均衡的發送給不同的消費者。
2、RabbitMQ的消息確認機制
默認情況下,如果消息已經被某個消費者正確的接收到了,那么該消息就會被從隊列中移除。當然也可以讓同一個消息發送到很多的消費者。
如果一個隊列沒有消費者,那么,如果這個隊列有數據到達,那么這個數據會被緩存,不會被丟棄。當有消費者時,這個數據會被立即發送到這個消費者,這個數據被消費者正確收到時,這個數據就被從隊列中刪除。
那么什么是正確收到呢?通過ack。每個消息都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:
RabbitMQ Server會把這個信息發送到下一個消費者。
如果這個app有bug,忘記了ack,那么RabbitMQServer不會再發送數據給它,因為Server認為這個消費者處理能力有限。
而且ack的機制可以起到限流的作用(Benefitto throttling):在消費者處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的均衡消費者的負載。
4.JAVA demo
引入RabbitMQ客戶端
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
3.1 使用默認配置直接發送消息到隊列
生產者
import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * * 默認發送,直接將消息發送到某個隊列,默認交換機type為direct * * @author * @date 2019/01/10 11:17:10 */ public class ProducterDirectDemo { public static void main(String[] args) throws IOException, TimeoutException { String queneName = "testQuene"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("test_vhosts"); // 創建與RabbitMQ服務器的TCP連接 connection = factory.newConnection(); // 創建一個頻道 channel = connection.createChannel(); // 聲明默認的隊列 channel.queueDeclare(queneName, true, false, true, null); while (true) { channel.basicPublish("", queneName, null, UUID.randomUUID().toString().getBytes()); Thread.sleep(1000); } } catch (Exception ex) { ex.printStackTrace(); } finally { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } } }
消費者
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 接收默認消息 * * @author * @date 2019/01/10 11:14:32 */ public class ConsumerDirectDemo { public static void main(String[] args) { String queneName = "testQuene"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("test_vhosts"); connection = factory.newConnection(); channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message); } }; // channel綁定隊列,autoAck為true表示一旦收到消息則自動回復確認消息 channel.basicConsume(queneName, true, consumer); } catch (Exception ex) { ex.printStackTrace(); } } }
3.2 設置交換器,隊列,路由發送消息
生產者
import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 指定交換機,隊列,路由key方式 * * @author * @date 2019/01/10 11:19:38 */ public class ProducterAllDemo { public static void main(String[] args) throws IOException, TimeoutException { String queneName = "firstQueue"; String exchangeName = "amq.fanout"; String routingKey = "test1"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("test_vhosts"); // 創建與RabbitMQ服務器的TCP連接 connection = factory.newConnection(); // 創建一個頻道 channel = connection.createChannel(); // 聲明交換機類型 channel.exchangeDeclare("amq.fanout", "fanout", true); // 聲明默認的隊列 (也可不申明隊列,使用默認隊列) channel.queueDeclare(queneName, true, false, true, null); // String queue = channel.queueDeclare().getQueue(); // 將隊列與交換機綁定 channel.queueBind(queneName, exchangeName, routingKey); // 指定一個隊列 // channel.queueDeclare(queneName, false, false, false, null); while (true) { channel.basicPublish(exchangeName, routingKey, null, UUID.randomUUID().toString().getBytes()); Thread.sleep(1000); } } catch (Exception ex) { ex.printStackTrace(); } finally { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } } }
消費者
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * * @author * @date 2019/01/10 11:19:42 */ public class ConsumerAllDemo { public static void main(String[] args) { String queneName = "firstQueue"; String exchangeName = "amq.fanout"; String routingKey = "test1"; Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("test_vhosts"); connection = factory.newConnection(); channel = connection.createChannel(); // 聲明交換機類型 channel.exchangeDeclare(exchangeName, "fanout", true); // 聲明默認的隊列(也可不申明隊列,使用默認隊列) channel.queueDeclare(queneName, true, false, true, null); // String queue = channel.queueDeclare().getQueue(); // 將隊列與交換機綁定 channel.queueBind(queneName, exchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message); } }; // channel綁定隊列、消費者,autoAck為true表示一旦收到消息則自動回復確認消息 channel.basicConsume(queneName, true, consumer); } catch (Exception ex) { ex.printStackTrace(); } } }