Java+Maven 簡單使用RabbitMQ(demo)


添加依賴

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>

消息的發送者(生產者)

public static void main(String[] args) throws Exception {
        //創建連接工廠,並設置連接信息
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("ip地址");
        f.setPort(5672);//可選,5672是默認端口
        f.setUsername("admin");
        f.setPassword("admin");

        /*
         * 與rabbitmq服務器建立連接,
         * rabbitmq服務器端使用的是nio,會復用tcp連接,
         * 並開辟多個信道與客戶端通信
         * 以減輕服務器端建立連接的開銷
         */
        Connection c = f.newConnection();
        //建立信道
        Channel ch = c.createChannel();

        /*
         * 聲明隊列,會在rabbitmq中創建一個隊列
         * 如果已經創建過該隊列,就不能再使用其他參數來創建
         *
         * 參數含義:
         *   -queue: 隊列名稱
         *   -durable: 隊列持久化,true表示RabbitMQ重啟后隊列仍存在
         *   -exclusive: 排他,true表示限制僅當前連接可用
         *   -autoDelete: 當最后一個消費者斷開后,是否刪除隊列
         *   -arguments: 其他參數
         */
        ch.queueDeclare("task_queue", true,false,false,null);

        /*
         * 發布消息
         * 這里把消息向默認交換機發送.
         * 默認交換機隱含與所有隊列綁定,routing key即為隊列名稱
         *
         * 參數含義:
         *     -exchange: 交換機名稱,空串表示默認交換機"(AMQP default)",不能用 null
         *     -routingKey: 對於默認交換機,路由鍵就是目標隊列名稱
         *     -props: 其他參數,例如頭信息
         *     -body: 消息內容byte[]數組
         */
//        ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
//        System.out.println("消息已發送");

        while (true) {
            //控制台輸入的消息發送到rabbitmq
            System.out.print("輸入消息: ");
            String msg = new Scanner(System.in).nextLine();
            //如果輸入的是"exit"則結束生產者進程
            if ("exit".equals(msg)) {
                break;
            }
            //參數:exchage,routingKey,props,body;
            //MessageProperties.PERSISTENT_TEXT_PLAIN持久化的設置
            ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
            System.out.println("消息已發送: "+msg);
        }


        c.close();
    }

消費者

 public static void main(String[] args) throws Exception {
        //連接工廠
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("ip地址");
        f.setUsername("admin");
        f.setPassword("admin");
        //建立連接
        Connection c = f.newConnection();
        //建立信道
        Channel ch = c.createChannel();
        //聲明隊列,如果該隊列已經創建過,則不會重復創建
        ch.queueDeclare("task_queue",true,false,false,null);
        System.out.println("等待接收數據");

        //收到消息后用來處理消息的回調對象
        DeliverCallback callback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                String msg = new String(message.getBody(), "UTF-8");
                System.out.println("收到: "+msg);
                //遍歷字符串中的字符,每個點使進程暫停一秒
                for (int i = 0; i < msg.length(); i++) {
                    if (msg.charAt(i)=='.') {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    }
                }
                System.out.println("處理結束");
                //參數1:消息標簽,參數2:是否確認多條消息
                ch.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        };

        //消費者取消時的回調對象
        CancelCallback cancel = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };
        //一次只能接受一條數據
        ch.basicQos(1);

        //第二個參數為消息回執,消息確認處理完成,為true為自動確認,只要消息發送到消費者即消息處理成功;為false為,手動發送確認回執,服務器才認為這個消息處理成功
        ch.basicConsume("task_queue", false, callback, cancel);
    }

以上的ip地址為你MQ安裝的ip地址

以上內容摘自https://blog.csdn.net/weixin_38305440/article/details/102810522,此處做下記錄方便以后查詢使用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM