rabbitmq是什么,怎么搭建我在這就不敘述了,這里只說怎么使用。
說個小插曲,如果有的人連接rabbitmq非常慢,經常超時或者啟動rabbitmq很慢,可以在linux環境下配置一下就行了:
第一步:打開linux輸入以下命令
[root@hadoop1 mq]# hostname
hadoop1
這里的hadoop1就是我的主機名,然后把這個主機名配置進hosts文件即可;
第二步:vim /etc/hosts 命令對hosts文件進行添加一行數據:
主機名hadoop1為例
只需在hosts文件中加入 127.0.0.1 hadoop1
然后wq保存退出即可。
第三步:重啟rabbitmq應該就可以解決連接,啟動過慢的問題了;
=====================================================
以下是使用rabbitmq的過程。
給大家推薦https://blog.csdn.net/qq_35387940/article/details/100514134這個博客,我的案例也是參照這個寫的,修改部分代碼加上自己的心得體會,有些原作者沒有的注釋我都加上了。可以先去看原作者的,然后再看這個。
一個交換機可以連接多個隊列,生產者發送信息給交換機,交換機根據生產者帶來的路由關鍵字來確認傳遞給哪個隊列,消費者可以監聽一個或多個隊列,當隊列里存在消息的時候就會消費。
第一步:添加相關依賴
<dependency>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步:編寫application.yml主配置文件,有的配置看情況而定
server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:開啟失敗重試 enabled: true #第一次重試的間隔時長 initial-interval: 10000ms #最長重試間隔,超過這個間隔將不再重試 max-interval: 300000ms #下次重試間隔的倍數,此處是2即下次重試間隔是上次的2倍 multiplier: 2
第三步:編寫配置類
@Configuration public class DirectConfig { /** * 配置一個隊列,可以配置多個
*第一個參數是隊列的名稱 *第二個參數durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,但並不是消息的持久化 *當消息代理重啟時仍然存在,暫存隊列:當前連接有效 *第三個參數exclusive(獨有的):默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable *第四個參數autoDelete:是否自動刪除,默認是false,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 *return new Queue("TestDirectQueue",true,true,false); *一般設置一下隊列的持久化就好,其余兩個就是默認false */ @Bean public Queue myQueue1 () { return new Queue("directQueue111",true); }
@Bean
public Queue myQueue2 () {
return new Queue("directQueue222",true);
}
/** * 給交換機起名字,也可以配置多個交換機
*第一個參數是交換機的名字,
*第二個參數durable:是否持久化,默認false,持久化交換機
*第三個參數autoDelete,默認為false,同隊列解釋
如果你是topic模式的,那么交換機類型就是TopicExchange,如果是廣播(fanout)類型的就是FanoutExchange
*/
@Bean public DirectExchange myDirectExchange() { return new DirectExchange("directExchange",true,false); } @Bean public DirectExchange lonelyExchange() { return new DirectExchange("lonelyExchange",true,false); } /*綁定 將隊列和交換機綁定, 並設置用於匹配鍵:mykey 隊列在前,交換機在后
1.如果是fanout的模式的話,就沒有后面的with路由關鍵字
2.如果是topic模式的話是支持通配符的
通配符規則:
`#`:匹配一個或多個詞
`*`:匹配不多不少恰好1個詞
舉例:
`audit.#`:能夠匹配`audit.irs.corporate`隊列 或者 `audit.irs`隊列
`audit.*`:只能匹配`audit.irs`隊列
*/
@Bean public Binding directBinding(){ return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("mykey");
//return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("audit.#"); } }
第四步:寫個接口方法進行測試
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/sendDirectMessage") @ResponseBody public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //將消息攜帶綁定鍵值:路由關鍵字mykey 發送到交換機 directExchange
amqpTemplate.convertAndSend("directExchange", "mykey", map); return "ok"; }
紅框圈起來的是比較常用的3個方法,分別是:
- 指定交換機、RoutingKey和消息體
- 指定消息
- 指定RoutingKey和消息,會向默認的交換機發送消息
到此,生產者已經編寫完成,一般來說我們生產者和消費者在不同的模塊里,我也是采用這種模式,編寫兩個spring boot模塊。
第五步:編寫消費者,配置跟生產者基本一致。
依賴包不變,編寫application.yml,注意端口不能沖突
server: port: 9999 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl
第六步:編寫消費者,也就是一個監聽器,監聽自己隊列是否有數據。
@Component//必須要寫 //@RabbitListener(queues = {"directQueue111","directQueue222"})
public class DirectListener { /** * RabbitListener 注解放在類上就需要使用RabbitHandler注解放在方法上進行配合使用, * 根據傳遞過來的參數類型判斷那個方法執行,如果傳遞過來的是String類型的,就去找帶有@RabbitHandler注解的方法,參數是String的
* 如果傳遞過來的是map類型的,就找參數是map的方法
*@RabbitListener也可以直接放在方法上監聽隊列,如果該隊列存在消息,就由這個方法來接收處理 */
//@RabbitHandler
@RabbitListener(queues = {"directQueue111"}) public void process(Map map) { System.out.println("DirectReceiver111消費者收到消息 : " + map.toString()); }
@RabbitListener(queues = {"directQueue222"})
public void process(Map map) {
System.out.println("DirectReceiver222消費者收到消息 : " + map.toString());
}
}
一般情況下消息隊列的使用已經介紹完畢了,但是有的小伙伴就會問,我怎么知道我的消息是發出去了沒有?消費者收到了沒有?好的,這就涉及到了生產者的確認機制和消費者的確認機制。
我們先來說生產者的確認機制:在生產者application.yml中添加消息確認配置
server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:開啟失敗重試 enabled: true #第一次重試的間隔時長 initial-interval: 10000ms #最長重試間隔,超過這個間隔將不再重試 max-interval: 300000ms #下次重試間隔的倍數,此處是2即下次重試間隔是上次的2倍 multiplier: 2 #確認消息已發送到交換機(Exchange) publisher-confirms: true #確認消息已發送到隊列(Queue) publisher-returns: true
在生產者方編寫一個配置文件RabbitConfig:
@Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //Mandatory強制的 //設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數
rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("ConfirmCallback: " + "相關數據:" + correlationData); System.out.println("ConfirmCallback: " + "確認情況:" + b); System.out.println("ConfirmCallback: " + "原因:" + s); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: " + "消息:" + message); System.out.println("ReturnCallback: " + "回應碼:" + replyCode); System.out.println("ReturnCallback: " + "回應信息:" + replyText); System.out.println("ReturnCallback: " + "交換機:" + exchange); System.out.println("ReturnCallback: " + "路由鍵:" + routingKey); } }); return rabbitTemplate; } }
生產者消息確認無非就四種情況:先把結論也寫了出來
①消息推送到server,但是在server里找不到交換機 --》 觸發confirm回調失敗 ②消息推送到server,找到交換機了,但是沒找到隊列 --》 先觸發confirm回調返回true,然后再觸發return返回失敗 ③消息推送到sever,交換機和隊列啥都沒找到 --》 觸發confirm回調失敗 ④消息推送成功
①消息推送到server,但是在server里找不到交換機
寫個測試接口,把消息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有創建沒有配置的)
@GetMapping("/TestMessageAck") public String TestMessageAck() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: non-existent-exchange test message "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map); return "ok"; }
調用接口,查看rabbitmq-provuder項目的控制台輸出情況(原因里面有說,沒有找到交換機'non-existent-exchange'):
2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40) ConfirmCallback: 相關數據:null ConfirmCallback: 確認情況:false ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)
結論: ①這種情況觸發的是 ConfirmCallback 回調函數。
②消息推送到server,找到交換機了,但是沒找到隊列
這種情況就是需要新增一個交換機,但是不給這個交換機綁定隊列,我來簡單地在DirectRabitConfig里面新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何綁定配置操作:
@Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); }
然后寫個測試接口,把消息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何隊列配置的):
@GetMapping("/TestMessageAck2") public String TestMessageAck2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: lonelyDirectExchange test message "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("lonelyDirectExchange", "DirectRouting", map); return "ok"; }
調用接口,查看rabbitmq-provuder項目的控制台輸出情況:
ReturnCallback: 消息:(Body:'{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ReturnCallback: 回應碼:312 ReturnCallback: 回應信息:NO_ROUTE ReturnCallback: 交換機:lonelyDirectExchange ReturnCallback: 路由鍵:TestDirectRouting ConfirmCallback: 相關數據:null ConfirmCallback: 確認情況:true ConfirmCallback: 原因:null
可以看到這種情況,兩個函數都被調用了;
這種情況下,消息是推送成功到服務器了的,所以ConfirmCallback對消息確認情況是true;
而在RetrunCallback回調函數的打印參數里面可以看到,消息是推送到了交換機成功了,但是在路由分發給隊列的時候,找不到隊列,所以報了錯誤 NO_ROUTE 。
結論:②這種情況觸發的是 ConfirmCallback和RetrunCallback兩個回調函數。
③消息推送到sever,交換機和隊列啥都沒找到
這種情況其實一看就覺得跟①很像,沒錯 ,③和①情況回調是一致的,所以不做結果說明了。
結論: ③這種情況觸發的是 ConfirmCallback 回調函數。
④消息推送成功
那么測試下,按照正常調用之前消息推送的接口就行,就調用下 /sendFanoutMessage接口,可以看到控制台輸出:
ConfirmCallback: 相關數據:null ConfirmCallback: 確認情況:true ConfirmCallback: 原因:null
結論: ④這種情況觸發的是 ConfirmCallback 回調函數。
以上是生產者推送消息的消息確認 回調函數的使用介紹(可以在回調函數根據需求做對應的擴展或者業務數據處理)。
下面我們繼續說一說消費者的消息確認機制:
把消費者自動確認消息改為手動確認:
server: port: 8888 spring: rabbitmq: host: 192.168.1.149 username: qjwl password: 123456 virtual-host: /qjwl template: retry: #enabled:開啟失敗重試 enabled: true #第一次重試的間隔時長 initial-interval: 10000ms #最長重試間隔,超過這個間隔將不再重試 max-interval: 300000ms #下次重試間隔的倍數,此處是2即下次重試間隔是上次的2倍 multiplier: 2
#確認消息已發送到交換機(Exchange)
publisher-confirms: true
#確認消息已發送到隊列(Queue)
publisher-returns: true
# 手動確認消息(默認是自動確認): listener: simple: acknowledge-mode: manual
還是那個在消費者方編寫監聽器:
@Component public class DirectListener {
/**
* 手動確認機制
*/
@RabbitListener(queues = {"directQueue111"})
public void userInsert(Map map, Channel channel, Message message) throws IOException {
try {
//Message中是rabbitmq管道中的所有信息,包括了消息體,隊列名稱,交換機名稱,唯一標志,狀態碼等基本信息
System.out.println(message);
//這個map就是到接收的消息
System.out.println(map.toString());
//同一時刻服務器只會發一條消息給消費者(能者多勞模式)
//channel.basicQos(1);
/**
* deliveryTag:該消息的index
* multiple:是否批量.true:將一次性ack所有小於deliveryTag的消息。
*/
//確認簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//簽收失敗
/**
* deliveryTag:該消息的index
* multiple:是否批量.true:將一次性拒絕所有小於deliveryTag的消息。
* requeue:被拒絕的是否重新入隊列
*/
//如果重新放進隊列中還是會放在隊列頭部,繼續消費者消費,如果一直消費一直錯誤就會產生堆積問題,理性使用
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
deliveryTag:該消息的index
requeue:被拒絕的是否重新入隊列
channel.basicNack 與 channel.basicReject 的區別在於basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息
如果不手動確認,也不拋出異常,消息不會自動重新推送(包括其他消費者),因為對於rabbitmq來說始終沒有接收到消息消費是否成功的確認,並且Channel是在消費端有緩存的,沒有斷開連接。
如果rabbitmq斷開,連接后會自動重新推送。如果消費端應用重啟,消息會自動重新推送。
東西呢就是這么個東西,大部分都是跟另外一篇博客一致的,只有消費者消息確認這一塊差異多點。over