rabbitMQ 消息隊列(MQ)


 
軟件開發的根本就是降低軟件開發的復雜性
 
采用可復用的軟件設計模型,采用合適的軟件架構搭建自己的系統。
 
消息隊列提供了一個異步通信協議,消息的發送者不用一直等待知道消息被成功的處理。消息被暫存於隊列中,對信息感興趣的消費者會訂閱消息,並處理他們。
 
使用消息隊列不是 殺雞用牛刀 ,而是一種未雨籌謀。隨着系統不斷升級,你將從中獲益。
 
消息隊列(MQ)使用消息將應用程序連接。通過 RabbitMQ 消息代理服務器在應用程序之間路由。
 
一種通用的軟件“總線”,解決應用程序之間繁重的消息通信工作
 
 
只要消息按照這些規則發布出去,任何消費者應用都能訂閱感興趣的消息。現在信息的生產者和消費者可以完全解耦合。
 
從任何發布者  到任何感興趣的消費者   之間的信息,通過一條軟件總線 動態的連接
 
 
Erlang   面向並發的編程語言
 
RabbitMQ 在應用程序和服務器之間 扮演着路由角色。
 
producer 創建消息,然后發布到代理服務器(RabbitMQ)
 
 
其實原理很簡單:
生產者創建消息,消費者接受消息。你的應用可以作為生產者,也可以作為消費者。
 
“信道”才能連接
 
重要的記者消費者和接受者是消息發送和接受概念的體現,而不是客戶端和服務端。
從底層開始構造,隊列
AMQP,即Advanced Message Queuing Protocol
高級消息隊列協議。
 
AMQP 消息路由必須有3個部分,交換器、隊列、綁定。
 
生產者把消息發布到交換器上,消息最終到達隊列,並被消費者接受。
綁定決定了消息如何從路由器到達特定的隊列。
 
通過AMQP 的 basic.consume 命令訂閱,會將信道置為接收模式,直到取消對隊列訂閱為止。
 
向隊列單條消息通過 AMQP 的 basic.get 命令實現。
 
消費者收到每一條消息都必須進行確認,消費者必須通過AMQP 的 basic.act 參數設置為 true 。
同時 RabbitMQ 才能安全的把消息從隊列中刪除。
 
如果應用程序有 bug 而忘記確認消息的話,Rabbit將不會給消費者發送更多的消息了。因為在上一條消息確認前,RabbitMQ 會認為這個消費者並沒有做好接受下一條消息。
 
 
 
 
 

實戰 rabbitMQ

 
一、MQ(消息隊列)
 
RabbitMQ(兔子消息隊列)
 
MQ 為 Message  Queue ,消息隊列是應用程序和應用程序之間的通信方法。
 
RabbitMQ 是一個開源的,在 AMQP 基礎上完整的,可復用的企業消息系統。
 
AMQP 
  消息隊列的一個協議,類似 tcp 、udp
 
RabbitMQ 是實現 AMQP 協議的具體一個東西。
 
多種開發語言的支持,java python .net
其實就是一種驅動。
 
rabbit.com 官網
 
MQ的其他產品還有很多:
ActiveMQ 、 Kafka (分布式消息訂閱系統)等等
 
二、搭建 rabbitMQ 環境
首先要安裝 Erlang 環境,然后在這個基礎上才是 
rabbitMQ 環境
 
用戶名、計算機名最好都是中文,不然可能安裝失敗。
 
安裝成功后 可以瀏覽器訪問
端口號: 15672
 
默認用戶  gust / gust
 
其實在 docker 上安裝鏡像就好了
 
進入界面后 ,注冊一個新號。
Demo/demo 
然后新建一個映射路徑(virtual hosts)。  /demo
 
 
docker run -d --hostname localhost --name myrabbit -p 15672:15672 -p 5672:5672 rabbitmq
 
rabitmq  端口
   5672  15672  25672
 
 
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 rabbitmq
 
 
端口號簡紹
 
5672   AMQP協議的端口 
 
25672   集群端口
 
15672  管理界面的端口
 
Docker 啟動 rabbitMQ 
 
1、啟動在 docker容器里
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
使用 docker logs 查看
docker logs some-rabbit
 
2、
 
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
 
3、
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 rabbitmq
 
docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
 
本地Mac 電腦啟動 rabbitMQ 
1、進入文件夾
/usr/local/Cellar/rabbitmq/
 
cd  3.7.8
 
2、啟動服務
sbin/rabbitmq-server 
3、瀏覽器訪問
瀏覽器 http://localhost:15672可進入rabbitmq控制終端登錄頁面
默認用戶名和密碼為 
guest/guest.
 
 
三、簡單隊列
生產者將消息發送到隊列,消費者從隊列中獲取消息。
 
1、先創建 pom.xml 文件
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>cn.itcast.rabbitmq</groupId>
   <artifactId>itcast-rabbitmq</artifactId>
   <version>0.0.1-SNAPSHOT</version>
 
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>3.4.1</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
         <version>1.7.7</version>
      </dependency>
      <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-lang3</artifactId>
         <version>3.3.2</version>
      </dependency>
      <dependency>
           <groupId>org.springframework.amqp</groupId>
           <artifactId>spring-rabbit</artifactId>
           <version>1.4.0.RELEASE</version>
       </dependency>
   </dependencies>
</project>
 
創建單獨的消息通道
package cn.itcast.rabbitmq.util;
 
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
 
public class ConnectionUtil {
 
    public static Connection getConnection() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
 
        //設置服務地址
        factory.setHost("localhost”);
 
        //端口
        factory.setPort(5672);
 
        //設置賬號信息,用戶名、密碼、vhost
        factory.setVirtualHost("/taotao");
        factory.setUsername("taotao");
        factory.setPassword("taotao”);
 
        // 通過工程獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}
 
創建生產者代碼
 
package cn.itcast.rabbitmq.simple;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
public class Send {
 
    private final static String QUEUE_NAME = "test_queue";
 
    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
 
        // 從連接中創建通道
        Channel channel = connection.createChannel();
 
        // 聲明(創建)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 消息內容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
 
        System.out.println(" [x] Sent '" + message + "'");
 
        //關閉通道和連接
        channel.close();
        connection.close();
    }
}
 
創建消費者的代碼
package cn.itcast.rabbitmq.simple;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
 
public class Recv {
 
    private final static String QUEUE_NAME = "test_queue";
 
    public static void main(String[] argv) throws Exception {
 
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
 
        // 監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
 
        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}
 
 
四、work模式
 
交給兩個對象,一個前台、一個后台。
 
創建生產者
package cn.itcast.rabbitmq.work;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
public class Send {
 
    private final static String QUEUE_NAME = "test_queue_work";
 
    public static void main(String[] argv) throws Exception {
 
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        for (int i = 0; i < 100; i++) {
            // 消息內容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
 
            Thread.sleep(i * 10);
        }
 
        channel.close();
        connection.close();
    }
}
 
創建消費者一:
package cn.itcast.rabbitmq.work;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
 
public class Recv {
 
    private final static String QUEUE_NAME = "test_queue_work";
 
    public static void main(String[] argv) throws Exception {
 
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);
 
        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
 
        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
 
創建消費者 2
package cn.itcast.rabbitmq.work;
 
import cn.itcast.rabbitmq.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
 
public class Recv2 {
 
    private final static String QUEUE_NAME = "test_queue_work";
 
    public static void main(String[] argv) throws Exception {
 
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
 
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 同一時刻服務器只會發一條消息給消費者
        channel.basicQos(1);
 
        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);
 
        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);
 
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
 
Work 模式的 能者多勞 模式
 
 
消息的確認模式
 
消息從隊列中獲取信息,服務端如何知道消息已經被消費呢?
 
看情況選擇合適的 模式。
 
模式1-自動模式
    只要消息從隊列中獲取,無論取到后是否為成功消息,都認為消息是成功消息。
    // 監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,true, consumer);
 
模式2-手動模式
  消費者從中獲取到消息后,服務器就會標記其為不可用狀態,等待消費者反饋,如果該消息一直沒有反饋,那么一直不可用。
 
      // 監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);
如果為 true ,就是自動模式
 
如果為 false ,就是手動模式,同時還要反饋信息。
 
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 
訂閱模式
 
一個生產者,多個消費者
每個消費者都有自己的一個隊列
生產者沒有將消息直接發送到隊列,而是發送到了交換機
每個隊列都要綁定交換機
生產者發送的消息,經過交換機。實現一個消息被多個消費者獲取的目的
 
 
后面的模式都是由前面的創新而來的。
 
路由模式
 
選擇性的接受數據、消息。
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM