參考:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
源碼:https://github.com/zuzhaoyue/JavaDemo
先決條件
本教程假定RabbitMQ 在標准端口(5672)上的本地主機上安裝並運行。如果您使用不同的主機,端口或證書,則連接設置需要進行調整。
介紹
RabbitMQ是一個消息代理:它接受和轉發消息。你可以把它想象成一個郵局:當你把你想要發送的郵件放在郵箱里時,你可以確定郵遞員最終會將郵件遞交你的收件人。與此類似,RabbitMQ既是郵政信箱、又是郵局和郵遞員。
RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受、存儲和轉發二進制數據塊 - 消息。
RabbitMQ和一般的消息傳遞會使用一些術語:
-
生產(Producing)意味着發送。一個發送消息的程序就是一個生產者:
-
隊列(queue)相當於上面例子里的郵箱。盡管消息流經RabbitMQ和您的應用程序,但它們只能存儲在隊列中。一個隊列只受主機內存和磁盤限制的約束,它本質上是一個很大的消息緩沖區。可以同時有許多生產者(producer)向一個隊列發送消息,當然也可以同時有許多消費者嘗試從一個隊列接收數據。以下是隊列的表示方式:
-
消費(consuming)與接受(receiving)意思差不多。一個消費者的主要功能是等待接收信息:
請注意,生產者,消費者和broker不必駐留在同一主機上;
在本教程中,我們將用Java編寫兩個程序; 一個生產者,用於發送單個消息。一個消費者,用於接收消息並將其打印出來。
在下圖中,“P”是我們的生產者,“C”是我們的消費者。中間的盒子是一個隊列
![(P)→[|||]→(C)](/image/aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvcHl0aG9uLW9uZS5wbmc=.png)
知道了上面這些后就可以寫代碼了~
發出
![(P) - > [|||]](/image/aHR0cHM6Ly93d3cucmFiYml0bXEuY29tL2ltZy90dXRvcmlhbHMvc2VuZGluZy5wbmc=.png)
我們會調用消息發布者(生產者)來發送,調用消息使用者(消費者) Recv來消費。發布者將連接到RabbitMQ,發送一條消息,然后退出。
發布者的代碼如下:
/** * Created by zuzhaoyue on 18/5/15. */ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello1"; public static void main(String[] argv) throws Exception { //創建一個連接服務器的連接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //為了發送信息,我們必須聲明一個隊列,然后才可以向這個隊列中發消息,這個聲明是冪等的,也就是說僅會在這個隊列不存在時,才會創建一個隊列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); //最后,我們關閉連接和隊列 channel.close(); connection.close(); } }
接收
Rabbitmq會推送消息給消費者,因此與發布單個消息的發布者不同,我們不會關閉消費者,而是讓它一直進行以監聽消息並將其打印出來。

代碼如下
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by zuzhaoyue on 18/5/15. */ public class Recv { private final static String QUEUE_NAME = "hello1"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //以下的defaultconsumer實現了consumer這個接口,這個接口被用來緩沖服務器推送過來的信息 //一開始的set up和剛剛的send.java里的相似:1.打開一個連接,2.聲明一個隊列(這個隊列名要和剛剛的隊列名相同) //注意:我們在這里聲明隊列,因為我們可能在生產者之前開始消費 //我們告訴服務器從隊列向我們傳送消息,既然它會異步傳送,我們以對象的形式提供一個回調,來緩沖這些消息,直到我們准備使用它們,這正是defaultconsumer做的事情 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; String result = channel.basicConsume(QUEUE_NAME, true, consumer); System.out.println("result:" + result); } }
執行send.java的main()方法后,訪問127.0.0.1:15672
顯示:
說明隊列中有了一個消息,點開后我們可以看到該消息內容是hello world,如下圖:
然后運行recv.java的main()方法,打開消費者,頁面顯示如下:
該消息已經被成功消費。