SpringBoot整合RabbitMQ(一)


      在RabbitMQ的環境搭建成功后,創建SpringBoot項目,通過一個簡單的案例來詳細的說明下RabbitMQ

的生產者消費者的模式。下面結合SpringBoot項目,來具體的說明下這部分的具體應用。

一、pom引入RabbitMQ

         創建項目成功后,我們需要在pom.xml的文件里面來引入rabbitmq的jar,pom.xml文件詳細的內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

二、生產者思路

        RabbitMQ是MQ中核心的組件技術棧,生產者消費者的模型中也是非常重要的部分,在RabbitMQ中生產者

的應用程序並不關心隊列,生產者的任務只需要把MQ的消息發送到Exchange,而不關心並且也不知道Queue的

存在,至於Queue這部分是需要消費者來進行關心的。所以在操作RabbitMQ上,它的步驟具體總結如下:

  • 先創建連接工廠的對象,也就是ConnectionFactory
  • 然后配置連接MQ的地址,端口,賬戶以及密碼,和虛擬主機這部分
  • 連接工廠接着創建連接對象,也就是Connection
  • 接着使用連接對象來創建信道,也就是Channel
  • 最后通過生產者的模式把MQ的消息發送到Exchange

生產者涉及到的源代碼具體如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer
{
    public static void main(String[] args) throws  Exception {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();

        //配置連接mq的地址信息
        connectionFactory.setHost("101.43.158.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        //連接工廠創建連接
        Connection connection = connectionFactory.newConnection();

        //通過connection來創建Channel
        Channel channel = connection.createChannel();

        //通過channel來發送具體的數據信息
        String msg = "Hello RabbitMQ";
        for (int i = 0; i < 5; i++) {
            channel.basicPublish("saas", "", null, msg.getBytes());
            System.out.println("發送數據成功:"+i);
        }
        //發送消息成功后,關閉具體的連接
        channel.close();
        connection.close();
    }
}

三、消費者思路

         在消費者中,和生產者前面的代碼基本是一致的,但是這部分需要特別強調的是在消費者的設計中,

它不需要關心Exchange,而消費者核心需要關注的是Queue這部分,也就是消費者的應用程序主要是通過

Queue里面讀取到MQ的數據,這部分代碼具體如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;

public class Consumer
{
    //定義exchange
    private static final String EXCHANGE = "saas";
    //定義隊列
    private  static  final String queueName="saas";

    public static void main(String[] args) throws  Exception
    {
        try{
            //創建連接工廠
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //配置連接mq的地址信息
            connectionFactory.setHost("101.43.158.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
//        connectionFactory.setVirtualHost("/");

            //連接工廠創建連接
            Connection connection=connectionFactory.newConnection();

            //通過connection來創建Channel
            Channel channel=connection.createChannel();

            //設置exchange類型為fanout
            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);

        /*
         定義一個隊列
         * 一個隊列來接收數據后,消費端才可以從隊列里面來接收具體的數據
         * param1:隊列名稱
         * param2:是否持久化
         * param3:隊列是否獨占此連接
         * param4:隊列不再使用時是否自動刪除此隊列
         * param5:隊列參數
         * */
            channel.queueDeclare(queueName,true,false,true,null);

            channel.queueBind(queueName,EXCHANGE,"");

            //創建一個消費者來消費數據
//        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
            //2.0以后的版本修改為DefaultConsumer
            DefaultConsumer consumer=new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(
                        String consumerTag,
                        com.rabbitmq.client.Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte [] body) throws  java.io.IOException
                {

                    String message=new String(body);
                    System.out.println("接收到的消息為:"+message);
                };
            };
            // 監聽隊列,從隊列中獲取數據
            System.out.println("消費者程序啟動成功,准備接收生產者的數據:\n");
            channel.basicConsume(queueName,consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

四、程序驗證

        完成代碼成功后,下來執行具體的程序來進行驗證。這部分需要特別的強調下,在執行消費者程序前

不需要單獨的再創建exchange和queue,要不就會出現消費者的程序無法啟動,這點需要特別的注意,另外

一個就是被操作的賬戶必須就具備管理員的權限。如上代碼執行后,消費者以及生產者輸出結果信息如下:

如上,就可以看到消費者接收到生產者發送的數據。程序執行后,可以在RabbitMQ的WEB控制台可以看到具體

的數據,主要是Queue的部分,具體如下:

如上,可以看到控制台中消費者的數據。感謝您的閱讀,或許會持續更新RabbitMQ的技術棧的體系文章。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM