RabbitMQ是一個開源的,實現AMQP協議的,可復用企業消息隊列系統。
類似的系統還有ActiveMQ(實現JMS)和Kafka(分布式)。RabbitMQ支持主流的操作系統,支持多種開發語言,能降低系統間訪問的耦合度,便於數據同步。
RabbitMQ提供如下5種隊列模型(遠程調用不是消息隊列)。
1.Simple
2.Work. 工作模式,一個消息只能被一個消費者獲取。
3.Publish/Subscribe. 訂閱模式,消息被路由投遞給多個隊列,一個消息被多個消費者獲取。ExchangeType為fanout。
4.Routing. 路由模式,一個消息被多個消費者獲取。並且消息的目的queue可被生產者指定。ExchangeType為direct。
5.Topic. 通配符模式,一個消息被多個消費者獲取。消息的目的queue可用BindingKey以通配符(#:一個或多個詞,*:一個詞)的方式指定。ExchangeType為topic。
6.PRC. 遠程調用
相關名詞:
1. Server:RabbitMQ服務器,
2. VirtualHost:權限控制的基本單位,一個VirtualHost里面有若干Exchange和MessageQueue,以及指定被哪些user使用。
3. Connection:生產者/消費者和RabbitMQ服務器的TCP連接。
4. Channel:創建完Connection后,需創建信道才能執行AMQP命令。一個Connection可以創建多個Channel。
5. Exchange:路由。接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType有fanout、direct和topic三種,對應路由使用上述3/4/5號模型。
6. (Message)Queue:消息隊列,用於存儲還未被消費者消費的消息。
7. Message:由Header和Body組成。Header是生產者添加的相關屬性:是否持久化、被哪個MessageQueue接收、優先級等。而Body是傳輸的數據。
8. Binding:消息被復制傳遞時,一個消費者對應一個消息隊列,消費者綁定MessageQueue到Exchange,可指定多個Bindingkey。生產者在發送Message時,可以在header指定RoutingKey,Exchange匹配RoutingKey和Bindingkey將Message路由到相應的Queue。
9. Command:AMQP命令,生產者/消費者通過Command完成與RabbitMQ服務器交互。Publish:發送消息,txSelect:開啟事務,txCommit:提交事務。
程序員應該用代碼來表達自己的思想,下面用代碼展示以上五種模式:
首先創建工廠類,獲取Connection
public class ConnectionUtil { public static Connection getConnection() throws Exception { //connection工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("zx"); factory.setPassword("zx"); factory.setVirtualHost("/zx"); // 通過工廠獲取連接 Connection connection = factory.newConnection(); return connection; } }
1.Simple
生產者
public class Send { private final static String QUEUE_NAME = "queue_simple"; public static void main(String[] argv) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 聲明隊列 (不存在則創建) channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 發送消息 String message = "Hello World"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 關閉通道和連接 channel.close(); connection.close(); } }
消費者
public class Recv { private final static String QUEUE_NAME = "queue_simple"; public static void main(String[] argv) throws Exception { // 獲取連接 Connection connection = ConnectionUtil.getConnection(); // 創建通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); //true 自動確認消息, 下有詳解 // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //阻塞或輪詢 String message = new String(delivery.getBody()); System.out.println("獲取:" + message); } } }
2.Work
生產者
public class Send { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); Thread.sleep(i * 10); } channel.close(); connection.close(); } }
消費者1
public class Recv1 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 開啟Qos, 同一時刻服務器只發送一條消息. 可以嘗試注釋該行, 會發現消息會被平均分配給兩個消費者 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取:" + message); // 模擬handling Thread.sleep(100); // 手動確認消息接收. 在basicConsume方法中, true為自動, false為手動 /* 消息確認方式: * 1. 自動確認. 只要消息從隊列中移除, 服務端認為消息被成功消費 * 2. 手動確認. 消費者獲取消息后, 服務器將該消息標記為不可用, 並等待反饋. 如果消費者一直不反饋, 則該消息將一直處於不可用狀態 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者2
public class Recv2 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); // 模擬handling Thread.sleep(200); // ACK channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
3.Publish/Subscribe
生產者
public class Send { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "Hello world"; // 與前面不同, 生產者將消息發送給exchange, 而非隊列. 若發消息時還沒消費者綁定queue與該exchange, 消息將丟失 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); } }
消費者1
public class Recv1 { private final static String QUEUE_NAME = "queue_fanout_1"; private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機. 綁定也可在rabbitMQ的管理界面進行 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取:" + message); Thread.sleep(100); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者2
public class Recv2 { private final static String QUEUE_NAME = "queue_fanout_2"; private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取:" + message); Thread.sleep(200); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
4.Routing
生產者
public class Send { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String message = "Hello world"; // 發送消息, RoutingKey為 insert channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes()); channel.close(); connection.close(); } }
消費者1
public class Recv1 { private final static String QUEUE_NAME = "queue_direct_1"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機, BindingKey為 delete update channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取:" + message); Thread.sleep(100); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者2
public class Recv2 { private final static String QUEUE_NAME = "queue_direct_2"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機, BindingKey為 insert delete update channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取:" + message); Thread.sleep(200); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
5.Topic
生產者
public class Send { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String message = "Hello world"; // 發送消息, 指定RoutingKey channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes()); channel.close(); connection.close(); } }
消費者1
public class Recv1 { private final static String QUEUE_NAME = "queue_topic_1"; private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取:" + message); Thread.sleep(100); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者2
public class Recv2 { private final static String QUEUE_NAME = "queue_topic_2"; private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機. 通配符! channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取:'" + message); Thread.sleep(200); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
可以看出,上面調用的關系比較復雜。
幸運的是,Spring提供了對rabbitMQ的封裝,將復雜的關系設置整合到配置文件中。
依賴於兩個組件,抽象層spring-amqp和實現層spring-rabbit。
於是代碼簡化為:
生產者
public class Send { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring/rabbitmq-context.xml"); //拿模板的bean RabbitTemplate template = ctx.getBean(RabbitTemplate.class); //發消息 String msg = "Hello world"; template.convertAndSend(msg); //該函數還能指定routing-key Thread.sleep(1000); ctx.close(); } }
消費者
public class Recv { public void listen(String msg) { System.out.println("獲取" + msg); } }
非常漂亮的封裝。配置文件如下
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- connection工廠 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="zx" password="zx" virtual-host="/zx" /> <!-- MQ的管理,包括隊列、交換器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 聲明隊列 (auto表示需要時創建)--> <rabbit:queue name="myQueue" auto-declare="true"/> <!-- 聲明fanout類型的exchange (auto表示需要時創建) --> <rabbit:fanout-exchange name ="fanoutExchange" auto-declare="true" durable="true" > <!-- durable是否持久化, 安全性還是性能的權衡 --> <!-- 注意, 在生產者/消費者 分離的系統中, exchange和queue也分離, 綁定應該交給運維在rabbit管理界面進行, 而不是配置下面的bindings屬性 --> <!-- 小細節, rabbit管理界面綁定時界面屬性中binding key被寫成了routing key? --> <rabbit:bindings> <rabbit:binding queue="myQueue"/> <!-- 還能指定通過pattern屬性指定bindingType --> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 定義Rabbit模板的bean,指定 exchange或queue --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" /> <!-- 還能指定routing-key屬性 --> <bean id="recv" class="com.zx.rabbitmq.spring.Recv" /> <!-- 設置消費者要監聽的隊列, 並指定有消息時執行的方法 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="recv" method="listen" queue-names="myQueue" /> </rabbit:listener-container> </beans>