Rabbitmq基本API使用


 

 

一、生產者
  1. 創建ConnectionFactory工廠(地址、用戶名、密碼、vhost)
  2. 創建Connection
  3. 創建信道(Channel)
  4. 創建 exchange(指定 名稱、類型-DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");、是否持久化)
  5. 發送消息(指定:exchange、發送的routingKey , 發送到的消息 )
       
基礎的生產者: 

 

public class TestProducer {


    public final static String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args)
            throws IOException, TimeoutException {
        /* 創建連接,連接到RabbitMQ*/
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.112.131");
        connectionFactory.setVirtualHost("my_vhost");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = connectionFactory.newConnection();


        /*創建信道*/
        Channel channel = connection.createChannel();
        /*創建交換器*/
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);


        /*日志消息級別,作為路由鍵使用*/
        String[] routekeys = {"king","queue","prince"};
        for(int i=0;i<3;i++){
            String routekey = routekeys[i%3];
            String msg = "Hellol,RabbitMq"+(i+1);
            /*發布消息,需要參數:交換器,路由鍵,其中以日志消息級別為路由鍵*/
            channel.basicPublish(EXCHANGE_NAME,routekey,null,
                    msg.getBytes());
            System.out.println("Sent "+routekey+":"+msg);
        }
        channel.close();
        connection.close();
    }
}
View Code

 

二、消費者
  1. 創建ConnectionFactory工廠(地址、用戶名、密碼、vhost)
  2. 創建Connection
  3. 創建信道(Channel)
  4. 聲明一個 exchange(指定 名稱、類型、是否持久化)
  5. 創建一個隊列(指定:名稱,是否持久化,是否獨占,是否自動刪除,其他參數)
  6. 隊列、exchange通過routeKey進行綁定
  7. 消費者接收消息(隊列名稱,是否自動ACK)
    基本的消費者:

    

public class TestConsumer {
    public static void main(String[] argv)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.112.131");
        factory.setVirtualHost("my_vhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 打開連接和創建頻道,與發送端一樣
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(TestProducer.EXCHANGE_NAME,
                "direct");
        /*聲明一個隊列*/
        String queueName = "focuserror";
        channel.queueDeclare(queueName,false,false,
                false,null);

        /*綁定,將隊列和交換器通過路由鍵進行綁定*/
        String routekey = "king";/*表示只關注error級別的日志消息*/
        channel.queueBind(queueName,TestProducer.EXCHANGE_NAME,routekey);
        System.out.println("waiting for message........");
        /*聲明了一個消費者*/
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received["+envelope.getRoutingKey()
                        +"]"+message);
            }
        };
        /*消費者正式開始在指定隊列上消費消息*/
        channel.basicConsume(queueName,true,consumer);
    }
}
View Code

 

三、消息持久化
  1.  exchange 需要持久化
  2. 發送消息設置參數為 MessageProperties.PERSISTENT_TEXT_PLAIN
  3. 隊列需要設置參數為持久化
1、//TODO 創建持久化交換器 durable=true
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);

2、//TODO 發布持久化的消息(delivery-mode=2)
channel.basicPublish(EXCHANGE_NAME,routekey,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        msg.getBytes());

3、//TODO 聲明一個持久化隊列(durable=true)
// autoDelete=true 消費者停止了,則隊列會自動刪除
//exclusive=true獨占隊列,只能有一個消費者消費
String queueName = "msgdurable";
channel.queueDeclare(queueName,true,false,
        false,null);
四、如何支持事務(防止投遞消息的時候消息丟失-效率特別低,不建議使用,可以使用生產者ACK機制)
  1.    啟動事務
  2. 成功提交
  3. 失敗則回滾
//TODO
//加入事務
channel.txSelect();
try {
    for(int i=0;i<3;i++){
        String routekey = routekeys[i%3];
        // 發送的消息
        String message = "Hello World_"+(i+1)
                +("_"+System.currentTimeMillis());
        channel.basicPublish(EXCHANGE_NAME, routekey, true,
                null, message.getBytes());
        System.out.println("----------------------------------");
        System.out.println(" Sent Message: [" + routekey +"]:'"
                + message + "'");
        Thread.sleep(200);
    }
    //TODO
    //事務提交
    channel.txCommit();
} catch (IOException e) {
    e.printStackTrace();
    //TODO
    //事務回滾
    channel.txRollback();
} catch (InterruptedException e) {
    e.printStackTrace();
}
View Code
五、消費消息手動ACK,如果異常則使用拒絕的方式,然后異常消息推送到-死信隊列
       批量ack的時候如果其中有一個消息出現異常,則會導致消息丟失(日志處理的時候可以使用批量)
 
1 /*消費者正式開始在指定隊列上消費消息,第二個參數false為手動應答*/
channel.basicConsume(queueName,false,consumer);

2 收到消息以后,手動應答數據接收成功
channel.basicAck(
        envelope.getDeliveryTag(),false);
3 收到消息,如果處理失敗則拒絕消息:DeliveryTag是消息在隊列中的標識
channel.basicReject(
        envelope.getDeliveryTag(),false);
4 決絕的參數說明
//TODO Reject方式拒絕(這里第2個參數決定是否重新投遞),不要重復投遞,因為消息重復投遞后處理可能依然異常
//channel.basicReject(envelope.getDeliveryTag(),false);

//TODO Nack方式的拒絕(第2個參數決定是否批量,第3個參數是否重新投遞)
channel.basicNack(envelope.getDeliveryTag(), false, true);
View Code

 六、創建隊列的參數解析:場景,延遲隊列,保存帶有時效性的訂單,一旦訂單過期,則信息會轉移到死信隊列

//TODO /*自動過期隊列--參數需要Map傳遞*/
String queueName = "setQueue";
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-expires",10*1000);//消息在隊列中保存10秒后被刪除
//TODO 隊列的各種參數
/*加入隊列的各種參數*/
// autoDelete=true 消費者停止了,則隊列會自動刪除
//exclusive=true獨占隊列,只能有一個消費者消費
channel.queueDeclare(queueName,true,true, false,arguments);
七、發送消息以后帶有應答的隊列
  1. 聲明一個回應隊列
  2. 聲明一個回應消息的消費者
  3. 聲明一個屬性對象(指定隊列,會唯一的id)
  4. 生產者發送消息給消費者(帶着回應隊列)
  5. 消費者接收到消息以后根據對應的信息,給予回應
  生產者端:
 
  
1//TODO 響應QueueName ,消費者將會把要返回的信息發送到該Queue
String responseQueue = channel.queueDeclare().getQueue();
//TODO 消息的唯一id
String msgId = UUID.randomUUID().toString();

2/*聲明了一個消費者*/
final Consumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received["+envelope.getRoutingKey()
                +"]"+message);
    }
};
//TODO 消費者應答隊列上的消息
channel.basicConsume(responseQueue,true,consumer);
3//TODO 設置消息中的應答屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .replyTo(responseQueue)
        .messageId(msgId)
        .build();
4、
String msg = "Hello,RabbitMq";
//TODO 發送消息時,把響應相關屬性設置進去
channel.basicPublish(EXCHANGE_NAME,"error",
        properties,
        msg.getBytes());
View Code

 消費者端:

  

String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()
        +"]"+message);
//TODO 從消息中拿到相關屬性(確定要應答的消息ID,)
AMQP.BasicProperties respProp
        = new AMQP.BasicProperties.Builder()
        .replyTo(properties.getReplyTo())
        .correlationId(properties.getMessageId())
        .build();
//TODO 消息消費時,同時需要生作為生產者生產消息(以OK為標識)
channel.basicPublish("", respProp.getReplyTo() ,
        respProp ,
        ("OK,"+message).getBytes("UTF-8"));

 

 

 

八、死信隊列 -  下列消息會放到死信隊列
  1. 消息被否定確認,使用 channel.basicNack 或 channel.basicReject ,並且此時requeue 屬性被設置為false。
  2. 消息在隊列的存活時間超過設置的TTL時間。
  3. 消息隊列的消息數量已經超過最大隊列長度

   

 測試,消費者被拒絕的時候消息會進到死信隊列中:
 
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(WillMakeDlxConsumer.BUS_EXCHANGE_NAME,
                BuiltinExchangeType.TOPIC);
        //TODO 綁定死信交換器
        /*聲明一個隊列,並綁定死信交換器*/


        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", WillMakeDlxConsumer.DLX_EXCHANGE_NAME);
//        //TODO 死信路由鍵,會替換消息原來的路由鍵
//        args.put("x-dead-letter-routing-key", "deal");


        channel.queueDeclare(WillMakeDlxConsumer.BUS_QUEUE_NAME,false,false,
                false,
                args);


        /*綁定,將隊列和交換器通過路由鍵進行綁定*/
        channel.queueBind(WillMakeDlxConsumer.BUS_QUEUE_NAME,
                WillMakeDlxConsumer.BUS_EXCHANGE_NAME,"#");


        System.out.println("waiting for message........");


        //聲明死信隊列
        channel.exchangeDeclare(WillMakeDlxConsumer.DLX_EXCHANGE_NAME, "topic",true);
        channel.queueDeclare(WillMakeDlxConsumer.DLX_QUEUE_NAME, true, false, false, null);
        //路由鍵為 # 代表可以路由到所有消息
        channel.queueBind(WillMakeDlxConsumer.DLX_QUEUE_NAME ,WillMakeDlxConsumer.DLX_EXCHANGE_NAME,  "#");


        /*聲明了一個消費者*/
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                //TODO
                //TODO 如果是king的消息確認
                if(envelope.getRoutingKey().equals("king")){
                    System.out.println("Received["
                            +envelope.getRoutingKey()
                            +"]"+message);
                    channel.basicAck(envelope.getDeliveryTag(),
                            false);
                }else{
                    //TODO 如果是其他的消息拒絕(rqueue=false),成為死信消息
                    System.out.println("Will reject["
                            +envelope.getRoutingKey()
                            +"]"+message);
                    channel.basicReject(envelope.getDeliveryTag(),
                            false);
                }
            }
        };
        /*消費者正式開始在指定隊列上消費消息*/
        channel.basicConsume(WillMakeDlxConsumer.BUS_QUEUE_NAME ,false,consumer);
View Code

過期消息進入到死信隊列:

public static void main(String[] argsv) throws IOException, TimeoutException {


    Connection connection = getConnection();

    // 創建一個信道
    Channel channel = connection.createChannel();
    // 指定轉發
    channel.exchangeDeclare(WillMakeDlxConsumer.BUS_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

    Map<String,Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", WillMakeDlxConsumer.DLX_EXCHANGE_NAME);
    //設置隊列中消息過期時間,過期則進入死信隊列
    args.put("x-message-ttl", 10*1000);


    channel.queueDeclare(WillMakeDlxConsumer.BUS_QUEUE_NAME,true,false,
            false,
            args);


    /*綁定,將隊列和交換器通過路由鍵進行綁定*/
    channel.queueBind(WillMakeDlxConsumer.BUS_QUEUE_NAME,
            WillMakeDlxConsumer.BUS_EXCHANGE_NAME,"#");


    System.out.println("waiting for message........");


    //聲明死信隊列
    channel.exchangeDeclare(WillMakeDlxConsumer.DLX_EXCHANGE_NAME, "topic",true);
    channel.queueDeclare(WillMakeDlxConsumer.DLX_QUEUE_NAME, true, false, false, null);
    //路由鍵為 # 代表可以路由到所有消息
    channel.queueBind(WillMakeDlxConsumer.DLX_QUEUE_NAME ,WillMakeDlxConsumer.DLX_EXCHANGE_NAME,  "#");


    /*日志消息級別,作為路由鍵使用*/
    String[] routekeys = {"king","mark","james"};
    for(int i=0;i<10;i++){
        String routekey = routekeys[i%3];
        String msg = "Hellol,RabbitMq"+(i+1);
        /*發布消息,需要參數:交換器,路由鍵,其中以日志消息級別為路由鍵*/
        channel.basicPublish(WillMakeDlxConsumer.BUS_EXCHANGE_NAME,routekey,null,
                msg.getBytes());
        System.out.println("Sent "+routekey+":"+msg);
    }
    // 關閉頻道和連接
    channel.close();
    connection.close();
}
View Code

 

 

九、消費者批量預取消費-每次服務器給消費者推送多少數據進行處理

//TODO 如果是兩個消費者(QOS ,批量)則輪詢獲取數據

//TODO 150條預取(150都取出來 150, 210-150  60  )
channel.basicQos(150,true);
/*消費者正式開始在指定隊列上消費消息*/
channel.basicConsume(queueName,false,consumer);
//TODO 自定義消費者批量確認
//BatchAckConsumer batchAckConsumer = new BatchAckConsumer(channel);
//channel.basicConsume(queueName,false,batchAckConsumer);
View Code

 

 

十、生產者投遞消息確認模式,如果失敗了則可以重新投遞
 
   同步確認:
  
// 啟用發送者確認模式
channel.confirmSelect();


//所有日志嚴重性級別
for(int i=0;i<2;i++){
    // 發送的消息
    String message = "Hello World_"+(i+1);
    //參數1:exchange name
    //參數2:routing key
    channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, true,null, message.getBytes());
    System.out.println(" Sent Message: [" + ROUTE_KEY +"]:'"+ message + "'");
    //TODO
    //確認是否成功(true成功)
    if(channel.waitForConfirms()){
        System.out.println("send success");
    }else{
        //如果失敗則,可以重新投遞減小消息丟失的幾率
        System.out.println("send failure");
    }
}
View Code

 異步確認-添加監聽器:

  

//TODO
// 啟用發送者確認模式
channel.confirmSelect();
//TODO
// 添加發送者確認監聽器
channel.addConfirmListener(new ConfirmListener() {
    //TODO 成功
    public void handleAck(long deliveryTag, boolean multiple)
            throws IOException {
        System.out.println("send_ACK:"+deliveryTag+",multiple:"+multiple);
    }
    //TODO 失敗
    public void handleNack(long deliveryTag, boolean multiple)
            throws IOException {
        System.out.println("Erro----send_NACK:"+deliveryTag+",multiple:"+multiple);
    }
});


//TODO
// 添加失敗者通知
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode, String replyText,
                             String exchange, String routingKey,
                             AMQP.BasicProperties properties,
                             byte[] body)
            throws IOException {
        String message = new String(body);
        System.out.println("RabbitMq路由失敗:  "+routingKey+"."+message);
    }
});




String[] routekeys={"king","mark"};
//TODO 6條
for(int i=0;i<20;i++){
    String routekey = routekeys[i%2];
    //String routekey = "king";
    // 發送的消息
    String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
    channel.basicPublish(EXCHANGE_NAME, routekey, true,
            MessageProperties.PERSISTENT_BASIC, message.getBytes());
}
View Code

 

十一、基本信息-創建鏈接

  

public class BasicMq {

    public static final String  MQ_IP = "192.168.112.131";
    public static final String  USER = "admin";
    public static final String  PWD = "admin";
    public static final String  VHOST = "my_vhost";

    /**
     *
     * @return
     */
    public static final Connection getConnection()   {
        /* 創建連接,連接到RabbitMQ*/
        ConnectionFactory connectionFactory = null;
        try {
            connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(BasicMq.MQ_IP);
            connectionFactory.setUsername( BasicMq.USER);
            connectionFactory.setPassword( BasicMq.PWD );
            connectionFactory.setVirtualHost( BasicMq.VHOST );

            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null ;
    }


    public static final Channel getChannel() throws IOException {
        return getConnection().createChannel();
    }
}
View Code

 

十二、異常整理
  1.  如果生產者聲明exchange為 durable=true,那么消費者對應的exchange也必須為durable=true
  2. 消費者原來是durable=false,修改后變為durable=true,那么因為服務器上已經有這個隊列,但是參數不一致會異常,需要刪除服務器上的對應隊列

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM