RabbitMQ在普通MAVEN項目中的使用


五、在普通的Maven應用中使用MQ

  rabbitmq的隊列結構

 

5.1簡單模式

5.1.1 消息生產者
  • 創建Maven項目

  • 添加RabbitMQ連接所需要的依賴

  • <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.10.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
        <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.9</version>
    </dependency>

    在resources目錄下創建log4j.properties

  • log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
    log4j.logger.org.mybatis = DEBUG
    log4j.appender.A1=org.apache.log4j.ConsoleAppender
    log4j.appender.A1.layout=org.apache.log4j.PatternLayout
    log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n

  • 創建MQ連接幫助類

  • package com.qfedu.mq.utils;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtil {
    
        public static Connection getConnection() throws IOException, TimeoutException {
            //1.創建連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            //2.在工廠對象中設置MQ的連接信息(ip,port,virtualhost,username,password)
            factory.setHost("47.96.11.185");
            factory.setPort(5672);
            factory.setVirtualHost("host1");
            factory.setUsername("ytao");
            factory.setPassword("admin123");
            //3.通過工廠對象獲取與MQ的鏈接
            Connection connection = factory.newConnection();
            return connection;
        }
    
    }

    消息生產者發送消息

  • package com.qfedu.mq.service;
    
    import com.qfedu.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class SendMsg {
    
        public static void main(String[] args) throws Exception{
    
            String msg = "Hello HuangDaoJun!";
            Connection connection = ConnectionUtil.getConnection();   
            Channel channel = connection.createChannel();    
    
            //定義隊列(使用Java代碼在MQ中新建一個隊列)
            //參數1:定義的隊列名稱
            //參數2:隊列中的數據是否持久化(如果選擇了持久化)
            //參數3: 是否排外(當前隊列是否為當前連接私有)
            //參數4:自動刪除(當此隊列的連接數為0時,此隊列會銷毀(無論隊列中是否還有數據))
            //參數5:設置當前隊列的參數
            //channel.queueDeclare("queue7",false,false,false,null);
    
            //參數1:交換機名稱,如果直接發送信息到隊列,則交換機名稱為""
            //參數2:目標隊列名稱
            //參數3:設置當前這條消息的屬性(設置過期時間 10)
            //參數4:消息的內容
            channel.basicPublish("","queue7",null,msg.getBytes());
            System.out.println("發送:" + msg);
    
            channel.close();
            connection.close();
        }
    
    }
    5.1.2 消息消費者
    • 創建Maven項目

    • 添加依賴

    • log4j.properties

    • ConnetionUtil.java

    • 消費者消費消息

    • package com.qfedu.mq.service;
      
      import com.qfedu.mq.utils.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      public class ReceiveMsg {
      
          public static void main(String[] args) throws IOException, TimeoutException {
              Connection connection = ConnectionUtil.getConnection();
              Channel channel = connection.createChannel();
      
              Consumer consumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, 
                              AMQP.BasicProperties properties, byte[] body) throws IOException {
                      //body就是從隊列中獲取的數據
                      String msg = new String(body);
                      System.out.println("接收:"+msg);
                  }
              };
      
              channel.basicConsume("queue1",true,consumer);
          }
      }

      5.2 工作模式----    一個生產者多個消費者

      • 5.2.1 發送者
      • public class SendMsg {
        
            public static void main(String[] args) throws Exception{
                System.out.println("請輸入消息:");
                Scanner scanner = new Scanner(System.in);
                String msg = null;
                while(!"quit".equals(msg = scanner.nextLine())){
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
        
                    channel.basicPublish("","queue2",null,msg.getBytes());
                    System.out.println("發送:" + msg);
        
                    channel.close();
                    connection.close();
                }
            }
        
        }

        5.2.2 消費者1

      • public class ReceiveMsg {
        
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
        
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body就是從隊列中獲取的數據
                        String msg = new String(body);
                        System.out.println("Consumer1接收:"+msg);
                        if("wait".equals(msg)){
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                };
        
                channel.basicConsume("queue2",true,consumer);
            }
        }

        5.2.3 消費者2

      • public class ReceiveMsg {
        
            public static void main(String[] args) throws IOException, TimeoutException {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
        
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body就是從隊列中獲取的數據
                        String msg = new String(body);
                        System.out.println("Consumer2接收:"+msg);
                    }
                };
        
                channel.basicConsume("queue2",true,consumer);
            }
        }

        5.3 訂閱模式

        5.3.1 發送者 發送消息到交換機
      • public class SendMsg {
        
            public static void main(String[] args) throws Exception{
                System.out.println("請輸入消息:");
                Scanner scanner = new Scanner(System.in);
                String msg = null;
                while(!"quit".equals(msg = scanner.nextLine())){
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
        
                    channel.basicPublish("ex1","",null,msg.getBytes());
                    System.out.println("發送:" + msg);
        
                    channel.close();
                    connection.close();
                }
            }
        
        }

        5.3.2 消費者1

      • public class ReceiveMsg1 {
        
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
        
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body就是從隊列中獲取的數據
                        String msg = new String(body);
                        System.out.println("Consumer1接收:"+msg);
                        if("wait".equals(msg)){
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                };
        
                channel.basicConsume("queue3",true,consumer);
            }
        }

        5.3.3 消費者2

      • public class ReceiveMsg2 {
        
            public static void main(String[] args) throws IOException, TimeoutException {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
        
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body就是從隊列中獲取的數據
                        String msg = new String(body);
                        System.out.println("Consumer2接收:"+msg);
                    }
                };
        
                channel.basicConsume("queue4",true,consumer);
            }
        }

        5.4 路由模式

        5.4.1 發送者 發送消息到交換機
      • public class SendMsg {
        
            public static void main(String[] args) throws Exception{
                System.out.println("請輸入消息:");
                Scanner scanner = new Scanner(System.in);
                String msg = null;
                while(!"quit".equals(msg = scanner.nextLine())){
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
        
                    if(msg.startsWith("a")){
                        channel.basicPublish("ex2","a",null,msg.getBytes());
                    }else if(msg.startsWith("b")){
                        channel.basicPublish("ex2","b",null,msg.getBytes());
                    }
                    System.out.println("發送:" + msg);
        
                    channel.close();
                    connection.close();
                }
            }
        
        }

        5.4.2 消費者1

      • public class ReceiveMsg1 {
        
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
        
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body就是從隊列中獲取的數據
                        String msg = new String(body);
                        System.out.println("Consumer1接收:"+msg);
                        if("wait".equals(msg)){
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                };
        
                channel.basicConsume("queue5",true,consumer);
            }
        }

        5.4.3 消費者2

      • public class ReceiveMsg2 {
        
            public static void main(String[] args) throws IOException, TimeoutException {
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
        
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body就是從隊列中獲取的數據
                        String msg = new String(body);
                        System.out.println("Consumer2接收:"+msg);
                    }
                };
        
                channel.basicConsume("queue6",true,consumer);
            }
        }

         

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM