從讀者的反饋談RabbitMQ
昨天發完《RabbitMQ入門-初識RabbitMQ》,我陸陸續續收到一些反饋。鑒於部分讀者希望結合實例來講
期待下篇詳細,最好結合案例。謝謝!
哪都好,唯一缺點就是不支持原生ha,配置起來太復雜
...
上篇主要介紹了什么RabbitMQ,RabbitMQ能用來做什么,一些有關RabbitMQ的基本概念,同時還簡單介紹了兩種RabbitMQ的分發消息的模型。
從這篇起,我們將改變原來的思路,針對每種模型詳細講解,並結合代碼實例了解各個模型的原理和使用場景。
Hello World模型
上篇已經簡單介紹過
-
該模型由三要素組成:P(Producer)、Q(Queue)和C(Consumer)
-
P負責發送消息,Q負責存儲消息,C負責消費消息
-
消息可以在RabbitMQ或者你的應用中傳遞流動,但是卻只能存儲在Queue中
-
Queue可以接受多個發送者發送來的消息,也可以供多個消費者消費
實例
###准備工作
首先需要本地或者遠程有一個RabbitMQ的服務,具體安裝搭建可以網上找資料,類似於一個tomcat服務器
添加Jar包,推薦使用Maven管理jar包的方式,只需要添加依賴到pom.xml文件中就ok
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
發送端
package com.ximalaya.openapi.rabbitmq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.TimeoutException;
/**
* Created by jackie on 17/8/2.
*/
public class Send {
private final static String QUEUE_NAME = "hello.august";
public static void main(String[] argv)
throws java.io.IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.3.161");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("send message: " + message);
channel.close();
connection.close();
}
}
-
首先創建一個ConnectFactory,並指定服務所在的ip地址,如果你的RabbitMQ啟在本機,那setHost就可以寫成setHost("localhost")
-
從連接工廠中拿到一個連接並創建一個Channel
-
再聲明一個隊列Queue,表示消息要發到哪個Queue里面
-
定義要發送的消息message,並通過basicPublish進行消息發送
-
最后關閉Channel和Connection的連接
運行這段代碼后,我們可以看到在Queue(hello.august)中已經有了一個message,說明消息已經發送到Queue上了。
注意這是RabbitMQ的管理界面,我們可以通過這個管理應用,查看這個message的詳細信息,在該頁面底部有一個Get message的選項,點擊我們可以得到剛剛發送的消息的詳情
接收端
package com.ximalaya.openapi.rabbitmq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by jackie on 17/8/2.
*/
public class Recv {
private final static String QUEUE_NAME = "hello.august";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.3.161");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
大部分代碼和發送端相同,在最后有一個Consumer,其主要用於監聽指定ip的RabbitMQ服務上,指定的Queue上的消息。一旦發現有消息,則進行消費。
有一點需要注意,接收端同樣聲明了一個Queue,這是為什么呢?因為完全會出現一種情況,就是發送端還沒有啟動,但是接收端已經啟動了,這時候要監聽消息需要確保Queue已經存在。經過自己測試發現,如果這個Queue不存在則會創建一個同名的Queue,如果已經存在則使用存在的Queue。
這里將接收端的Queue name改為“hello11111”,運行前我們進入管理應用發現沒有這個Queue
當運行完接收端的程序后,我們再次查找名為”hello11111”的Queue,就已經存在了
我們把接收端的Queue名稱改為與發送端一致,這時候運行程序,我們看下面的動態圖片
可以發現Queue中存儲的消息有1變為0,即被消費者消費了。
至此,我們了解了RabbitMQ中的Hello World的模型。
如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,並和您一起分享我日常閱讀過的優質文章。