rabbitmq+java入門(一)hello world


 參考:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

源碼:https://github.com/zuzhaoyue/JavaDemo

先決條件

本教程假定RabbitMQ 在標准端口(5672)上的本地主機上安裝並運行。如果您使用不同的主機,端口或證書,則連接設置需要進行調整。

介紹

RabbitMQ是一個消息代理:它接受和轉發消息。你可以把它想象成一個郵局:當你把你想要發送的郵件放在郵箱里時,你可以確定郵遞員最終會將郵件遞交你的收件人。與此類似,RabbitMQ既是郵政信箱、又是郵局和郵遞員。

RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受、存儲和轉發二進制數據塊 - 消息

RabbitMQ和一般的消息傳遞會使用一些術語:

  • 生產(Producing)意味着發送。一個發送消息的程序就是一個生產者

     

  • 隊列(queue)相當於上面例子里的郵箱盡管消息流經RabbitMQ和您的應用程序,但它們只能存儲在隊列中一個隊列只受主機內存和磁盤限制的約束,它本質上是一個很大的消息緩沖區。可以同時有許多生產者(producer)向一個隊列發送消息,當然也可以同時有許多消費者嘗試從一個隊列接收數據以下是隊列的表示方式:

     

  • 消費(consuming)與接受(receiving)意思差不多。一個消費者的主要功能是等待接收信息:

     

請注意,生產者,消費者和broker不必駐留在同一主機上; 

在本教程中,我們將用Java編寫兩個程序; 一個生產者,用於發送單個消息。一個消費者,用於接收消息並將其打印出來。

在下圖中,“P”是我們的生產者,“C”是我們的消費者。中間的盒子是一個隊列

(P)→[|||]→(C)

知道了上面這些后就可以寫代碼了~

發出

(P) - > [|||]

我們會調用消息發布者(生產者)來發送,調用消息使用者(消費者) Recv來消費發布者將連接到RabbitMQ,發送一條消息,然后退出。

發布者的代碼如下:

/**
 * Created by zuzhaoyue on 18/5/15.
 */
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {
    private final static String QUEUE_NAME = "hello1";

    public static void main(String[] argv) throws Exception {
        //創建一個連接服務器的連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //為了發送信息,我們必須聲明一個隊列,然后才可以向這個隊列中發消息,這個聲明是冪等的,也就是說僅會在這個隊列不存在時,才會創建一個隊列。

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        //最后,我們關閉連接和隊列
        channel.close();
        connection.close();
    }

}

 

 

接收

Rabbitmq會推送消息給消費者,因此與發布單個消息的發布者不同,我們不會關閉消費者,而是讓它一直進行以監聽消息並將其打印出來。

(C)
 

代碼如下

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by zuzhaoyue on 18/5/15.
 */
public class Recv {
    private final static String QUEUE_NAME = "hello1";
    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //以下的defaultconsumer實現了consumer這個接口,這個接口被用來緩沖服務器推送過來的信息
        //一開始的set up和剛剛的send.java里的相似:1.打開一個連接,2.聲明一個隊列(這個隊列名要和剛剛的隊列名相同)
        //注意:我們在這里聲明隊列,因為我們可能在生產者之前開始消費
        //我們告訴服務器從隊列向我們傳送消息,既然它會異步傳送,我們以對象的形式提供一個回調,來緩沖這些消息,直到我們准備使用它們,這正是defaultconsumer做的事情
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        String  result = channel.basicConsume(QUEUE_NAME, true, consumer);
        System.out.println("result:" + result);

    }
}

 

執行send.java的main()方法后,訪問127.0.0.1:15672

顯示:

說明隊列中有了一個消息,點開后我們可以看到該消息內容是hello world,如下圖:

然后運行recv.java的main()方法,打開消費者,頁面顯示如下:

該消息已經被成功消費。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM