RabbitMQ入門-從HelloWorld開始


從讀者的反饋談RabbitMQ

昨天發完《RabbitMQ入門-初識RabbitMQ》,我陸陸續續收到一些反饋。鑒於部分讀者希望結合實例來講

期待下篇詳細,最好結合案例。謝謝!
哪都好,唯一缺點就是不支持原生ha,配置起來太復雜
...

上篇主要介紹了什么RabbitMQ,RabbitMQ能用來做什么,一些有關RabbitMQ的基本概念,同時還簡單介紹了兩種RabbitMQ的分發消息的模型。
從這篇起,我們將改變原來的思路,針對每種模型詳細講解,並結合代碼實例了解各個模型的原理和使用場景。
Hello World模型

上篇已經簡單介紹過

  • 該模型由三要素組成:P(Producer)、Q(Queue)和C(Consumer)

  • P負責發送消息,Q負責存儲消息,C負責消費消息

  • 消息可以在RabbitMQ或者你的應用中傳遞流動,但是卻只能存儲在Queue中

  • Queue可以接受多個發送者發送來的消息,也可以供多個消費者消費

實例

###准備工作

首先需要本地或者遠程有一個RabbitMQ的服務,具體安裝搭建可以網上找資料,類似於一個tomcat服務器

添加Jar包,推薦使用Maven管理jar包的方式,只需要添加依賴到pom.xml文件中就ok

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.2</version>
        </dependency>

發送端

package com.ximalaya.openapi.rabbitmq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeoutException;

/**
 * Created by jackie on 17/8/2.
 */
public class Send {
    private final static String QUEUE_NAME = "hello.august";

    public static void main(String[] argv)
            throws java.io.IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.3.161");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("send message: " + message);

        channel.close();
        connection.close();
    }
}

  • 首先創建一個ConnectFactory,並指定服務所在的ip地址,如果你的RabbitMQ啟在本機,那setHost就可以寫成setHost("localhost")

  • 從連接工廠中拿到一個連接並創建一個Channel

  • 再聲明一個隊列Queue,表示消息要發到哪個Queue里面

  • 定義要發送的消息message,並通過basicPublish進行消息發送

  • 最后關閉Channel和Connection的連接

運行這段代碼后,我們可以看到在Queue(hello.august)中已經有了一個message,說明消息已經發送到Queue上了。

注意這是RabbitMQ的管理界面,我們可以通過這個管理應用,查看這個message的詳細信息,在該頁面底部有一個Get message的選項,點擊我們可以得到剛剛發送的消息的詳情

接收端

package com.ximalaya.openapi.rabbitmq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by jackie on 17/8/2.
 */
public class Recv {
    private final static String QUEUE_NAME = "hello.august";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.3.161");
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

大部分代碼和發送端相同,在最后有一個Consumer,其主要用於監聽指定ip的RabbitMQ服務上,指定的Queue上的消息。一旦發現有消息,則進行消費。
有一點需要注意,接收端同樣聲明了一個Queue,這是為什么呢?因為完全會出現一種情況,就是發送端還沒有啟動,但是接收端已經啟動了,這時候要監聽消息需要確保Queue已經存在。經過自己測試發現,如果這個Queue不存在則會創建一個同名的Queue,如果已經存在則使用存在的Queue。
這里將接收端的Queue name改為“hello11111”,運行前我們進入管理應用發現沒有這個Queue

當運行完接收端的程序后,我們再次查找名為”hello11111”的Queue,就已經存在了

我們把接收端的Queue名稱改為與發送端一致,這時候運行程序,我們看下面的動態圖片

可以發現Queue中存儲的消息有1變為0,即被消費者消費了。
至此,我們了解了RabbitMQ中的Hello World的模型。
如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,並和您一起分享我日常閱讀過的優質文章。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM