一、RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。
二、目錄結構

三、是使用springboot搭建rabbitmq我們需要基本的依賴包
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
四、這里我們主要介紹6中模式的配置和使用
1)默認的模式(這種方式不是沒有exchange,而是使用默認的exchange。默認為Direct)


聲明方式:
/** * 第一種:使用默認的交換機(direct模式) */ @Configuration public class QueueConfiguration { /** * 聲明隊列:隊列有五個參數(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * name:隊列名稱 * durable:持久性 * exclusive:排他性(獨立性) * autoDelete:自動刪除 * arguments:其他相關參數 * @return */ @Bean public Queue queue() { return new Queue("queue", false); } }
(1)簡單:只有一個listener在監聽queue,這樣消息只能傳到這個隊列
(2)進階:如果存在多個listener監聽這個queue,rabbitmq會優雅的平均分配給listener
(3)arguments(參數配置)
x-message-ttl(Time-To-Live):消息存活時間,單位毫秒
x-expires:隊列沒有訪問超時時,自動刪除(包含沒有消費的消息),單位毫秒。
x-max-length:限制隊列最大長度(新增后擠出最早的),單位個數。
x-max-length-bytes :限制隊列最大容量
x-dead-letter-exchange:死信交換機,將刪除/過期的數據,放入指定交換機。
x-dead-letter-routing-key:死信路由,將刪除/過期的數據,放入指定routingKey
x-max-priority:隊列優先級。
x-queue-mode:對列模式,默認lazy(將數據放入磁盤,消費時放入內存)。
x-queue-master-locator:鏡像隊列
2)主題模式/通配符模式(topicExchange)

聲明方式:
/** * 第二種:topic交換機模式(主題模式) */ @Configuration public class TopicExchangeConfiguration { @Bean public Queue queue1() { return new Queue("queue1", false); } @Bean public Queue queue2() { return new Queue("queue2", false); } /** * 聲明交換機類型:存在4個參數(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) * 這里的參數基本和queue一樣的理解 * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange("topic", false, false); } /** * 綁定隊列到交換機上面 * @return */ @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(topicExchange()).with("*.topic"); } /** * 這里存在兩種匹配符 * *:代表一個單位的字符(1.topic) * #:代表多個單位的字符(2.2.topic) * @return */ @Bean public Binding binding2() { return BindingBuilder.bind(queue2()).to(topicExchange()).with("#.topic"); } }
通配符:
*:代表一個單位的字符(1.topic)
#:代表多個單位的字符(2.2.topic)
3)直連模式(directExchange)

聲明方式:
/** * 第三種:Direct模式(直連模式,默認交換機也是這種類型) */ @Configuration public class DirectExchangeConfiguration { @Bean public Queue queue3() { return new Queue("queue3", false); } @Bean public Queue queue4() { return new Queue("queue4", false); } /** * 參數和topic的交換機類型一樣 * @return */ @Bean public DirectExchange directExchange() { return new DirectExchange("direct", false, false); } @Bean public Binding binding3() { return BindingBuilder.bind(queue3()).to(directExchange()).with("direct.3"); } @Bean public Binding binding4() { return BindingBuilder.bind(queue4()).to(directExchange()).with("direct.4"); } }
4)發布/訂閱模式(fanout模式)

聲明方式:
/** * 第四種:fanout模式(發布/訂閱模式) */ @Configuration public class FanoutExchangeConfiguration { @Bean public Queue queue5() { return new Queue("queue5", false); } @Bean public Queue queue6() { return new Queue("queue6", false); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout", false, false); } /** * 這里的綁定不需要routingKey * @return */ @Bean public Binding binding5() { return BindingBuilder.bind(queue5()).to(fanoutExchange()); } /** * 相比於topic,fanout只能全部發送,topic可以更具匹配規則進行 * @return */ @Bean public Binding binding6() { return BindingBuilder.bind(queue6()).to(fanoutExchange()); } }
說明:fanout模式是不需要綁定routingKey,這種方式也是廣播形式的主要方式
5)消息頭模式(headers模式)
/** * 第五種:headers模式(消息頭模式) */ @Configuration public class HeadersExchangeConfiguration { @Bean public Queue queue7() { return new Queue("queue7", false); } @Bean public Queue queue8() { return new Queue("queue8", false); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers", false, false); } /** * 確認header是否存在 * @return */ @Bean public Binding binding7() { return BindingBuilder.bind(queue7()).to(headersExchange()).where("header").exists(); } @Bean public Binding binding8() { return BindingBuilder.bind(queue8()).to(headersExchange()).where("header").exists(); } }
說明:這種方式主要是限定headers,方便通過其他方式攜帶數據。
6)rpc:

聲明方式(大同小異):
@Configuration public class RpcConfiguration { @Bean public Queue rpc() { return new Queue("rpc", false); } @Bean public DirectExchange rpcExchange() { return new DirectExchange("rpcExchange", false, false); } @Bean public Binding rpcBinding() { return BindingBuilder.bind(rpc()).to(rpcExchange()).with("rpcRoutingKey"); } }
lisntener:
@Component @RabbitListener(queues = "rpc") public class RpcListener { @RabbitHandler public String rpcListener(String text, Channel channel, Message message) throws IOException { System.out.println("rpcServer:" + text); MessageProperties messageProperties = message.getMessageProperties(); channel.basicAck(messageProperties.getDeliveryTag(), false); return "success"; } }
注意這里是有返回數據的。
客戶端(publish)
這里推送存在兩種方式,同步和異步
a、同步:主題這里默認超時是5秒,可以通過rabbitTemplate設置setReceiveTimeout超時時間。
String message = (String) rabbitTemplate.convertSendAndReceive("rpcExchange", "rpcRoutingKey", time);
System.out.println("rpcClient:" + message);
b、異步:
AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("rpcExchange", "rpcRoutingKey", time); System.out.println("rpcClient:" + future.get());
注意:AsyncRabbitTemplate是需要手動去配置的。並且需要配置AbstractMessageListenerContainer
如果沒有配置AbstractMessageListenerContainer,則需要配置amq.rabbitmq.reply-to(amq.*需要權限才可以配置)
這里是spring對rabbitmq在源碼部分對其進行的判斷,如果不理解可以自己跟convertSendAndReceive函數
@Bean public AsyncRabbitTemplate asyncRabbitTemplate(DirectMessageListenerContainer container) { AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate, container); return asyncRabbitTemplate; } @Bean public DirectMessageListenerContainer directMessageListenerContainer(ConnectionFactory connectionFactory) { DirectMessageListenerContainer container = new DirectMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("rpc"); //這里我改成手動了,但是沒有好的方式去獲取channel,然后ack.所以我這里使用的自動。 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //這里可以使用默認的執行器:SimpleAsyncTaskExecutor(但是,這里不是采用的線程池而是直接new Thread) container.setTaskExecutor(new ThreadPoolExecutor(5, 60, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3000))); return container; }
五、消息發送者
1)yaml配置
server: port: 9001 spring: rabbitmq: host: 192.168.5.100 port: 5672 username: guest password: guest publisher-confirms: true publisher-returns: true template: #參數意義:true當沒有合適的queue直接返回到ReturnCallback # false沒有合適的直接丟棄 mandatory: true
2)如果配置了publisher-confirms、publisher-returns為true.並且加入template.mandatory為true。可以配置如下
@Component public class RabbitmqPublisherConfiguration { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public RabbitTemplate rabbitTemplate() { //1、設置publisher-confirms為true //2、發布確認,只是在exchange范圍 //3、如果沒有exchange,則false.如果過為true,則說明發送到exchange成功 rabbitTemplate.setConfirmCallback((correlationData, ack, s) -> { if (ack) { System.out.println("send success"); } else { System.out.println("send fail"); } }); //1、設置publisher-returns為true //2、如果沒有發布成功,則將消息返回。當然這只是在接受消息層,不是exchange。 rabbitTemplate.setReturnCallback((message, id, reason, exchange, routingKey) -> { StringBuffer buffer = new StringBuffer(); buffer.append("----------------------------------------\n"); buffer.append("接受消息: {0},失敗!\n"); buffer.append("消息ID: {1}\n"); buffer.append("原因: {2}\n"); buffer.append("exchange: {3}\n"); buffer.append("routingKey: {4}\n"); buffer.append("----------------------------------------"); MessageFormat messageFormat = new MessageFormat(buffer.toString()); String text = messageFormat.format(new Object[]{new String(message.getBody()), id, reason, exchange, routingKey}); System.out.println(text); }); return rabbitTemplate; } }
a、ConfirmCallback:只是針對exchange,如果消息可以通過exchange,則發送成功。反之則失敗
b、ReturnCallback:這個只是針對於routingKey,是否通過。如果這個routingKey不存在,則將消息返回。反之則發送。
3)消息發送
@Component @EnableScheduling public class RabbitmqPublisher { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(cron = "0/15 * * * * ?") public void execute() { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); String time = formatter.format(LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault())); //默認 rabbitTemplate.convertAndSend("queue", time); //主題模式 rabbitTemplate.convertAndSend("topic", "1.topic", time); rabbitTemplate.convertAndSend("topic", "2.2.topic", time); //直連模式 rabbitTemplate.convertAndSend("direct", "direct.3", time); rabbitTemplate.convertAndSend("direct", "direct.4", time); //廣播模式 rabbitTemplate.convertAndSend("fanout", "", time); //headers模式 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("header", "header"); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); Message message = MessageBuilder.withBody(time.getBytes()).andProperties(messageProperties).build(); rabbitTemplate.convertAndSend("headers", "", message); } }
六、消息監聽者
1)yaml配置
server: port: 9002 spring: rabbitmq: host: 192.168.5.100 port: 5672 username: guest password: guest listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual
說明:如果配置acknowledge-mode: manual(手動模式),則需要手動確認消息。如果沒有則不需要手動確認,否則會報錯。
需要在每個listener下面加上
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
listener的對手動對消息的處理方式有3種:Ack、Nack、Reject
Ack:確認收到消息
Nack:不確認收到消息
Reject:拒接消息
2)listener
@Component public class RabbitmqListener { //1.默認隊列 @RabbitListener(queues = "queue") public void queueDouble1(String text, Channel channel, Message message) throws IOException { System.out.println("queueDouble1:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "queue") public void queueDouble2(String text, Channel channel, Message message) throws IOException { System.out.println("queueDouble2:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } //2.主題隊列 @RabbitListener(queues = "queue1") public void queue1(String text, Channel channel, Message message) throws IOException { System.out.println("queue1:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "queue2") public void queue2(String text, Channel channel, Message message) throws IOException { System.out.println("queue2:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } //3.直連隊列 @RabbitListener(queues = "queue3") public void queue3(String text, Channel channel, Message message) throws IOException { System.out.println("queue3:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "queue4") public void queue4(String text, Channel channel, Message message) throws IOException { System.out.println("queue4:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } //4.廣播隊列 @RabbitListener(queues = "queue5") public void queue5(String text, Channel channel, Message message) throws IOException { System.out.println("queue5:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "queue6") public void queue6(String text, Channel channel, Message message) throws IOException { System.out.println("queue6:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } //5.消息頭隊列 @RabbitListener(queues = "queue7") public void queue7(String text, Channel channel, Message message) throws IOException { System.out.println("queue7:" + text); System.out.println("header7:" + message.getMessageProperties().getHeaders().get("header")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "queue8") public void queue8(String text, Channel channel, Message message) throws IOException { System.out.println("queue8:" + text); System.out.println("header8:" + message.getMessageProperties().getHeaders().get("header")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
2)也可以寫成,另外一種方式
@Component @RabbitListener(queues = "queue") public class RabbitmqHandlerListener { @RabbitHandler public void messageHandler(String text, Channel channel, Message message) throws IOException { System.out.println("queueDouble3:" + text); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
七、測試


1)默認:

均勻的分配到每一個節點
2)主題(topic):

只要符合規則就接受
3)直連(direct)

和模式方式一樣,一對一。多個均勻分布
4)廣播(fanout)

5)消息頭(headers)

八、當然例子也可以參考官網:https://www.rabbitmq.com/getstarted.html
