RabbitMQ學習總結 第二篇:快速入門HelloWorld


目錄

RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld

RabbitMQ學習總結 第三篇:工作隊列Work Queue

RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe

RabbitMQ學習總結 第五篇:路由Routing

RabbitMQ學習總結 第六篇:Topic類型的exchange

RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)

 

1、相關概念

RabbitMQ是一個消息代理,事實上,它接收生產者產生的消息,然后將消息傳遞給消費者。在這個過程中,它可以路由,可以緩沖,或者更具你設定的規則來將消息持久化。RabbitMQ和消息傳輸過程中一般會用一些術語:

  • 生產者(Producing ):

意思無非是指發送消息的那一端,如果一個程序發送消息,那么它就將被稱為生產者,這里用大寫的P來表示。

  • 隊列(queue ):

相當於郵箱的名字,它活動在RabbitMQ服務器里邊。雖然消息流會通過RabbitMQ和你的應用程序,但是只會被存儲在隊列中。隊列是不受任何限制的,它可以盡可能多的去存儲你需要存儲的消息(本質上來說它是個無限緩沖)。可以多個生產者向同一個消息隊列發送消息,也可以多個消費者同時從一個消息隊列中來接收消息。消息隊列可以如下圖模型。

  • 消費者(Consuming ):

即接收端,消費者主要是等待接收消息的程序,用下圖表示:

注意:在大多數應用場景中,生產者、消費者以及RabbitMQ服務是不會同時運行在一台機器上的。

下邊將會實現兩個Java程序,一個只發送生產者一條消息的生產者,一個接收消息、並打印消息的消費者。

在下邊的對話中,”P”是我們的生產者,”C”是我們的消費者,中間的是矩形是隊列(BabbitMQ維護的消息緩沖)

2、Java客戶端的包

RabbitMQ遵守AMQP協議,AMQP協議是一個開放、通用的消息協議。關於AMQP協議的客戶端,有多種語言的實現版本。本文使用的是RabbitMQ提供的Java客戶端。

可在http://www.rabbitmq.com/java-client.html這里下載,然后將相對應的Jar包拷貝你的工作目錄下。RabbitMQ的Java客戶端在Maven庫中也有,groupId是com.rabbitmq,artifactId是amqp-client。

3、發送端

我們將消息發送器起名為Send,消息接收器起名為Recv。發送器將會連接RabbitMQ,發送一條消息,然后退出。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws java.io.IOException {
    /*連接可以抽象為socket連接,為我們維護協議版本信息和協議證書等。這里我們連接
    上了本機的消息服務器實體(localhost)。如果我們想連接其它主機上的RabbitMQ服務,只需要修改一下主機名或是IP就可以了*/
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    /*接下創建channel(信道),這是絕大多數API都能用到的。為了發送消息,你必須要聲明一個消息消息隊列,然后向該隊列里推送消息*/
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    /*聲明一個冪等的隊列(只有在該隊列不存在時,才會被創建)。消息的上下文是一個
      字節數組,你可以指定它的編碼。*/
    channel.close();
    connection.close();
  }
}

注:若RabbitMQ服務磁盤空間不足的話,運行會出錯,參數設置為:disk_free_limit,更多的參數配置可在這里找到http://www.rabbitmq.com/configure.html#config-items

4、接收端

RabbitMQ會往接收器上推消息,與只發送一條消息的發送端不同,這里我們將監聽消息並將消息打印出來。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
    /*這里怎么打開連接和信道,以及聲明用於接收消息的隊列,這些步驟與發送端基本上是一樣的*/
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    /*確保這里的隊列是存在的*/
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    /*這里用到了額外的類QueueingConsumer來緩存服務器將要推過來的消息。我們通知服務器向接收端推送消息,
   然后服務器將會向客戶端異步推送消息,這里提供了一個可以回調的對象來緩存消息,直到我們做好准備來使用
   它,這個類就是QueueingConsumer
*/ QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }

消息接收完后QueueingConsumer.nextDelivery()將會發生阻塞,暫停運行,直到有其他的消息推過來。

如果你需要檢查和驗證隊列的話,需要使用rabbitmqctl list_queues。

 

參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-one-java.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM