rabbitmq消息消費者


pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloudparent</artifactId>
        <groupId>com.cxy</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitMqConsumer</artifactId>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
    </dependencies>

</project>

消費者代碼:

package com.cxy.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/***
 * @ClassName: Consumer1
 * @Description:
 * @Auther: cxy
 * @Date: 2019/3/24:11:37
 * @version : V1.0
 */
public class Consumer1 {
    private  static  final  String Queue="helloworld";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory= new ConnectionFactory();
        //設置連接地址
        connectionFactory.setHost("192.168.230.134");
        //設置端口
        connectionFactory.setPort(5672);
        //設置密碼用戶名
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,每個虛擬機相當於一個小的mq
        connectionFactory.setVirtualHost("/");
        Connection connection =null;
        try {
            //建立連接
            connection = connectionFactory.newConnection();
            //建立通道,生產着和消費者都是在通道中完成
            Channel channel = connection.createChannel();
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
             參數1,聲明隊列
             參數2 是否持久化
             參數3 是否排他,是否獨戰連接,隊列只允許該鏈接中訪問,如果連接關閉,隊列也就刪除了
             參數4:是否自動刪除,如果將此參數設置true,那么就變成零時隊列
             參數5 :擴展參數,例如存活時間
           */
            channel.queueDeclare(Queue,true,false,false,null);
          /*
         String basicConsume(String queue, boolean autoAck, Consumer callback)
         參數一:隊列名稱
         參數二:自動回復
         參數三 消費者方法
          */
            DefaultConsumer defaultConsumer=new DefaultConsumer(channel) {
                //當接受到消息時候,此方法被調用
                /**
                * @Author cxy
                * @Description //TODO
                * @Date  2019/3/24
                * @Param [consumerTag, envelope, properties, body]
                * @return void
                 *
                 * consumerTag 用來標識.可以再監聽隊列時候設置
                 * envelope 信封,通過envelope可以通過這個獲取到很多東西
                 * properties 額外的消息屬性
                 * body:消息體
                **/
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //獲取交換機
                    String exchange = envelope.getExchange();
                    //消息id,用來表示那個消息消費了
                    long deliveryTag = envelope.getDeliveryTag();
                    String message=new String(body,"utf-8");
                    System.out.println("receive");
                }
            };
           channel.basicConsume(Queue,true ,defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

由於注釋內容都寫得很詳細就沒有單獨寫文字了.運行之后可以發掘管控台中消息沒有了,

在正式開發中不會使用這種原生得代碼去使用,會采用springboot去整合相關內容,至於以上代碼為什么還要去監聽隊列,防止如果隊列不存在,程序會存在異常,所以這樣,在正式開發中

會采用手動會簽得方式,如果,沒有會簽,就會進行消息重試.保證消息不會丟失


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM