簡介
RabbitMQ是一個Message Broker,核心思想就是接受消息,轉發消息。
實現的協議:AMQP。
術語
(Jargon)
P,Producing,制造和發送信息的一方。
Queue,消息隊列。
C,Consuming,接收消息的一方。
Simple Demo

發送方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //(如果沒有就)創建Queue String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//以byte的方式發布 System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close();
接收方
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);//看一下Queue是否存在 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery();//阻塞,直到接收到一條消息 String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }
訂閱/發布Demo
發送消息給多個訂閱者
核心思想:消息發送給exchange,每個接收方
創建匿名Queue綁定到exchange,exchange發送消息給每個接收方。
Exchanges
在RabbitMQ完整的模型中,消息只能發送給一個exchange。
exchange一方面接收消息,另一方面push給queues。

exchange類型
> rabbitmqctl list_exchanges
direct
topic
headers
fanout 廣播消息給已知隊列

發送方
String EXCHANGE_NAME = "logs"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 設置 exchange 類型 channel.exchangeDeclare(EXCHANGE_NAME /*exchange名稱*/, "fanout"/*類型*/); // 發布消息時,指定 exchange 名稱 channel.basicPublish( EXCHANGE_NAME , "", null, message.getBytes()); channel.close(); connection.close();
接收方(可多個同時運行)
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 設置exchange名稱和類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 創建一個臨時的、帶有隨機名稱的Queue,用來與 exchange 綁定 String queueName = channel.queueDeclare().getQueue(); hannel.queueBind(queueName, EXCHANGE_NAME, ""); // 綁定 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }
Install
@Windows
1 先安裝Erlang。
2 官方網下載 .exe。
管理
命令行管理
http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
WebUI管理
> rabbitmq-plugins enable rabbitmq_management
重啟后訪問 http://localhost:15672/
guest,guest
角色
management
policymaker
monitoring
administrator
添加用戶並分配角色
> rabbitmqctl add_user name pass
> rabbitmqctl set_user_tags name administrator
插件管理
啟用插件
> rabbitmq-plugins enable plugin-name
配置文件
etc\rabbitmq.config
Refs