使用spring-rabbit測試RabbitMQ消息確認(發送確認,接收確認)


1、首先是rabbitmq的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
    <!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都沒有,要用跟jar包匹配的版本 -->
    
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <rabbit:connection-factory 
        id="connectionFactory"
        host="${rabbit.host}" 
        port="${rabbit.port}" 
        username="${rabbit.username}" 
        password="${rabbit.password}"
        publisher-confirms="true" 
    />

    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 給模板指定轉換器 --><!-- mandatory必須設置true,return callback才生效 -->
    <rabbit:template id="amqpTemplate"    connection-factory="connectionFactory" 
        confirm-callback="confirmCallBackListener"
        return-callback="returnCallBackListener" 
        mandatory="true" 
    />
    
    <rabbit:queue name="CONFIRM_TEST" />
        
    <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >
        <rabbit:bindings>
            <rabbit:binding queue="CONFIRM_TEST" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 配置consumer, 監聽的類和queue的對應關系 -->
    <rabbit:listener-container
        connection-factory="connectionFactory" acknowledge="manual" >
        <rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" />
    </rabbit:listener-container>

</beans>

2、發送方:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("publishService")
public class PublishService {
    @Autowired  
    private AmqpTemplate amqpTemplate; 
    
    public void send(String exchange, String routingKey, Object message) {  
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }  
}

3、消費方:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service("receiveConfirmTestListener")
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {  
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{
            System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch(Exception e){
            e.printStackTrace();//TODO 業務處理
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }  
} 

4、確認后回調方:

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
    }
}

5、失敗后return回調:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.stereotype.Service;

@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback{
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
    }
}

6、測試類:

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.dingcheng.confirms.publish.PublishService;  
  
@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration(locations = {"classpath:application-context.xml"})  
public class TestConfirm {  
    @Autowired  
    private PublishService publishService;  
    
    private static String exChange = "DIRECT_EX";
      
    @Test  
    public void test1() throws InterruptedException{  
        String message = "currentTime:"+System.currentTimeMillis();
        System.out.println("test1---message:"+message);
        //exchange,queue 都正確,confirm被回調, ack=true
        publishService.send(exChange,"CONFIRM_TEST",message);  
        Thread.sleep(1000);
    }  
    
    @Test  
    public void test2() throws InterruptedException{  
        String message = "currentTime:"+System.currentTimeMillis();
        System.out.println("test2---message:"+message);
        //exchange 錯誤,queue 正確,confirm被回調, ack=false
        publishService.send(exChange+"NO","CONFIRM_TEST",message);  
        Thread.sleep(1000);
    }  
    
    @Test  
    public void test3() throws InterruptedException{  
        String message = "currentTime:"+System.currentTimeMillis();
        System.out.println("test3---message:"+message);
        //exchange 正確,queue 錯誤 ,confirm被回調, ack=true; return被回調 replyText:NO_ROUTE
        publishService.send(exChange,"",message);  
//        Thread.sleep(1000);
    }  
    
    @Test  
    public void test4() throws InterruptedException{  
        String message = "currentTime:"+System.currentTimeMillis();
        System.out.println("test4---message:"+message);
        //exchange 錯誤,queue 錯誤,confirm被回調, ack=false
        publishService.send(exChange+"NO","CONFIRM_TEST",message);  
        Thread.sleep(1000);
    }  
}

7、測試結果:

test1---message:currentTime:1483786948506
test2---message:currentTime:1483786948532
consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506
test3---message:currentTime:1483786948536
confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
confirm--:correlationData:null,ack:false,cause:Channel closed by application
[ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  
 return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey:
confirm--:correlationData:null,ack:true,cause:null
test4---message:currentTime:1483786948546
confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
[ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  

8、總結如下:

如果消息沒有到exchange,則confirm回調,ack=false

如果消息到達exchange,則confirm回調,ack=true

exchange到queue成功,則不回調return

exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)

備注:需要說明,spring-rabbit和原生的rabbit-client ,表現是不一樣的。測試的時候,原生的client,exchange錯誤的話,直接就報錯了,是不會到confirmListener和returnListener的

 

源碼地址:https://github.com/qq315737546/spring-rabbit

全文地址請點擊:https://blog.csdn.net/qq315737546/article/details/54176560


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM