RabbitMQ消息服務用戶手冊
(UBP, Message Queue)
XXX
2016年7月
1 基礎知識
1.1 集群總體概述
Rabbitmq Broker集群是多個erlang節點的邏輯組,每個節點運行Rabbitmq應用,他們之間共享用戶、虛擬主機、隊列、exchange、綁定和運行時參數。
1.2 集群復制信息
除了message queue(存在一個節點,從其他節點都可見、訪問該隊列,要實現queue的復制就需要做queue的HA)之外,任何一個Rabbitmq broker上的所有操作的data和state都會在所有的節點之間進行復制。
1.3 集群運行前提
1、集群所有節點必須運行相同的erlang及Rabbitmq版本。
2、hostname解析,節點之間通過域名相互通信,本文為3個node的集群,采用配置hosts的形式。
1.4 集群互通方式
1、集群所有節點必須運行相同的erlang及Rabbitmq版本hostname解析,節點之間通過域名相互通信,本文為3個node的集群,采用配置hosts的形式。
1.5 端口及其用途
1、5672 客戶端連接端口。
2、15672 web管控台端口。
3、25672 集群通信端口。
1.6 集群配置方式
通過rabbitmqctl手工配置的方式。
1.7 集群故障處理
1、rabbitmq broker集群允許個體節點宕機。
2、對應集群的的網絡分區問題(network partitions)集群推薦用於LAN環境,不適用WAN環境;要通過WAN連接broker,Shovel or Federation插件是最佳解決方案(Shovel or Federation不同於集群:注Shovel為中心服務遠程異步復制機制,稍后會有介紹)。
1.8 節點運行模式
為保證數據持久性,目前所有node節點跑在disk模式,如果今后壓力大,需要提高性能,考慮采用ram模式。
1.9 集群認證方式
通過Erlang Cookie,相當於共享秘鑰的概念,長度任意,只要所有節點都一致即可。rabbitmq server在啟動的時候,erlang VM會自動創建一個隨機的cookie文件。cookie文件的位置: /var/lib/rabbitmq/.erlang.cookie 或者/root/.erlang.cookie。我們的為保證cookie的完全一致,采用從一個節點copy的方式,實現各個節點的cookie文件一致。
2 集群搭建
2.1 集群節點安裝
1、安裝依賴包
PS:安裝rabbitmq所需要的依賴包
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
|
2、下載安裝包
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm |
3、安裝服務命令
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
|
4、修改集群用戶與連接心跳檢測
注意修改vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件 修改:loopback_users 中的 <<"guest">>,只保留guest 修改:heartbeat 為1 |
5、安裝管理插件
//首先啟動服務 /etc/init.d/rabbitmq-server start stop status restart //查看服務有沒有啟動: lsof -i:5672 rabbitmq-plugins enable rabbitmq_management //可查看管理端口有沒有啟動: lsof -i:15672 或者 netstat -tnlp|grep 15672 |
6、服務指令
/etc/init.d/rabbitmq-server start stop status restart 驗證單個節點是否安裝成功:http://192.168.11.71:15672/ 1. Ps:以上操作三個節點(71、72、73)同時進行操作 |
2.2 文件同步步驟
PS:選擇76、77、78任意一個節點為Master(這里選擇76為Master),也就是說我們需要把76的Cookie文件同步到77、78節點上去,進入/var/lib/rabbitmq目錄下,把/var/lib/rabbitmq/.erlang.cookie文件的權限修改為777,原來是400;然后把.erlang.cookie文件copy到各個節點下;最后把所有cookie文件權限還原為400即可。
/etc/init.d/rabbitmq-server stop //進入目錄修改權限;遠程copy77、78節點,比如: scp /var/lib/rabbitmq/.erlang.cookie 到192.168.11.77和192.168.11.78中 |
2.3 組成集群步驟
1、停止MQ服務
PS:我們首先停止3個節點的服務
rabbitmqctl stop |
2、組成集群操作
PS:接下來我們就可以使用集群命令,配置76、77、78為集群模式,3個節點(76、77、78)執行啟動命令,后續啟動集群使用此命令即可。
rabbitmq-server -detached |
3、slave加入集群操作(重新加入集群也是如此,以最開始的主節點為加入節點)
//注意做這個步驟的時候:需要配置/etc/hosts 必須相互能夠尋址到 bhz77:rabbitmqctl stop_app bhz77:rabbitmqctl join_cluster --ram rabbit@bhz76 –ram代表內存方式,不寫代表磁盤 bhz77:rabbitmqctl start_app bhz78:rabbitmqctl stop_app bhz78:rabbitmqctl join_cluster rabbit@bhz76 bhz78:rabbitmqctl start_app //在另外其他節點上操作要移除的集群節點 rabbitmqctl forget_cluster_node rabbit@bhz24 |
4、修改集群名稱
PS:修改集群名稱(默認為第一個node名稱):
rabbitmqctl set_cluster_name rabbitmq_cluster1 |
5、查看集群狀態
PS:最后在集群的任意一個節點執行命令:查看集群狀態
rabbitmqctl cluster_status |
6、管控台界面
PS: 訪問任意一個管控台節點:http://192.168.11.71:15672 如圖所示
2.4 配置鏡像隊列
PS:設置鏡像隊列策略(在任意一個節點上執行)
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' |
PS:將所有隊列設置為鏡像隊列,即隊列會被復制到各個節點,各個節點狀態一致,RabbitMQ高可用集群就已經搭建好了,我們可以重啟服務,查看其隊列是否在從節點同步。
2.5 安裝Ha-Proxy
1、Haproxy簡介
HAProxy是一款提供高可用性、負載均衡以及基於TCP和HTTP應用的代理軟件,HAProxy是完全免費的、借助HAProxy可以快速並且可靠的提供基於TCP和HTTP應用的代理解決方案。
HAProxy適用於那些負載較大的web站點,這些站點通常又需要會話保持或七層處理。
HAProxy可以支持數以萬計的並發連接,並且HAProxy的運行模式使得它可以很簡單安全的整合進架構中,同時可以保護web服務器不被暴露到網絡上。
2、Haproxy安裝
PS:79、80節點同時安裝Haproxy,下面步驟統一
//下載依賴包 yum install gcc vim wget //下載haproxy wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.5.tar.gz //解壓 tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local //進入目錄、進行編譯、安裝 cd /usr/local/haproxy-1.6.5 make TARGET=linux31 PREFIX=/usr/local/haproxy make install PREFIX=/usr/local/haproxy mkdir /etc/haproxy //賦權 groupadd -r -g 149 haproxy useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy //創建haproxy配置文件 touch /etc/haproxy/haproxy.cfg |
3、Haproxy配置
PS:haproxy 配置文件haproxy.cfg詳解
vim /etc/haproxy/haproxy.cfg |
#logging options global log 127.0.0.1 local0 info maxconn 5120 chroot /usr/local/haproxy uid 99 gid 99 daemon quiet nbproc 20 pidfile /var/run/haproxy.pid
defaults log global #使用4層代理模式,”mode http”為7層代理模式 mode tcp #if you set mode to tcp,then you nust change tcplog into httplog option tcplog option dontlognull retries 3 option redispatch maxconn 2000 contimeout 5s ##客戶端空閑超時時間為 60秒 則HA 發起重連機制 clitimeout 60s ##服務器端鏈接超時時間為 15秒 則HA 發起重連機制 srvtimeout 15s #front-end IP for consumers and producters
listen rabbitmq_cluster bind 0.0.0.0:5672 #配置TCP模式 mode tcp #balance url_param userid #balance url_param session_id check_post 64 #balance hdr(User-Agent) #balance hdr(host) #balance hdr(Host) use_domain_only #balance rdp-cookie #balance leastconn #balance source //ip #簡單的輪詢 balance roundrobin #rabbitmq集群節點配置 #inter 每隔五秒對mq集群做健康檢查, 2次正確證明服務器可用,2次失敗證明服務器不可用,並且配置主備機制 server bhz76 192.168.11.76:5672 check inter 5000 rise 2 fall 2 server bhz77 192.168.11.77:5672 check inter 5000 rise 2 fall 2 server bhz78 192.168.11.78:5672 check inter 5000 rise 2 fall 2 #配置haproxy web監控,查看統計信息 listen stats bind 192.168.11.79:8100 mode http option httplog stats enable #設置haproxy監控地址為http://localhost:8100/rabbitmq-stats stats uri /rabbitmq-stats stats refresh 5s |
4、啟動haproxy
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg //查看haproxy進程狀態 ps -ef | grep haproxy |
5、訪問haproxy
PS:訪問如下地址可以對rmq節點進行監控:http://192.168.1.27:8100/rabbitmq-stats
6、關閉haproxy
killall haproxy ps -ef | grep haproxy |
2.6 安裝KeepAlived
1、Keepalived簡介
Keepalived,它是一個高性能的服務器高可用或熱備解決方案,Keepalived主要來防止服務器單點故障的發生問題,可以通過其與Nginx、Haproxy等反向代理的負載均衡服務器配合實現web服務端的高可用。Keepalived以VRRP協議為實現基礎,用VRRP協議來實現高可用性(HA).VRRP(Virtual Router Redundancy Protocol)協議是用於實現路由器冗余的協議,VRRP協議將兩台或多台路由器設備虛擬成一個設備,對外提供虛擬路由器IP(一個或多個)。
2、Keepalived安裝
PS:下載地址:http://www.keepalived.org/download.html
//安裝所需軟件包 yum install -y openssl openssl-devel //下載 wget http://www.keepalived.org/software/keepalived-1.2.18.tar.gz //解壓、編譯、安裝 tar -zxvf keepalived-1.2.18.tar.gz -C /usr/local/ cd keepalived-1.2.18/ && ./configure --prefix=/usr/local/keepalived make && make install //將keepalived安裝成Linux系統服務,因為沒有使用keepalived的默認安裝路徑(默認路徑:/usr/local),安裝完成之后,需要做一些修改工作 //首先創建文件夾,將keepalived配置文件進行復制: mkdir /etc/keepalived cp /usr/local/keepalived/etc/keepalived/keepalived.conf /etc/keepalived/ //然后復制keepalived腳本文件: cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/init.d/ cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/ ln -s /usr/local/sbin/keepalived /usr/sbin/ ln -s /usr/local/keepalived/sbin/keepalived /sbin/ //可以設置開機啟動:chkconfig keepalived on,到此我們安裝完畢! chkconfig keepalived on |
3、Keepalived配置
PS:修改keepalived.conf配置文件
vim /etc/keepalived/keepalived.conf |
PS: 79節點(Master)配置如下
! Configuration File for keepalived
global_defs { router_id bhz79 ##標識節點的字符串,通常為hostname
}
vrrp_script chk_haproxy { script "/etc/keepalived/haproxy_check.sh" ##執行腳本位置 interval 2 ##檢測時間間隔 weight -20 ##如果條件成立則權重減20 }
vrrp_instance VI_1 { state MASTER ## 主節點為MASTER,備份節點為BACKUP interface eth0 ## 綁定虛擬IP的網絡接口(網卡),與本機IP地址所在的網絡接口相同(我這里是eth0) virtual_router_id 79 ## 虛擬路由ID號(主備節點一定要相同) mcast_src_ip 192.168.11.79 ## 本機ip地址 priority 100 ##優先級配置(0-254的值) nopreempt advert_int 1 ## 組播信息發送間隔,倆個節點必須配置一致,默認1s authentication { ## 認證匹配 auth_type PASS auth_pass bhz }
track_script { chk_haproxy }
virtual_ipaddress { 192.168.11.70 ## 虛擬ip,可以指定多個 } } |
PS: 80節點(backup)配置如下
! Configuration File for keepalived
global_defs { router_id bhz80 ##標識節點的字符串,通常為hostname
}
vrrp_script chk_haproxy { script "/etc/keepalived/haproxy_check.sh" ##執行腳本位置 interval 2 ##檢測時間間隔 weight -20 ##如果條件成立則權重減20 }
vrrp_instance VI_1 { state BACKUP ## 主節點為MASTER,備份節點為BACKUP interface eno16777736 ## 綁定虛擬IP的網絡接口(網卡),與本機IP地址所在的網絡接口相同(我這里是eno16777736) virtual_router_id 79 ## 虛擬路由ID號(主備節點一定要相同) mcast_src_ip 192.168.11.80 ## 本機ip地址 priority 90 ##優先級配置(0-254的值) nopreempt advert_int 1 ## 組播信息發送間隔,倆個節點必須配置一致,默認1s authentication { ## 認證匹配 auth_type PASS auth_pass bhz }
track_script { chk_haproxy }
virtual_ipaddress { 192.168.1.70 ## 虛擬ip,可以指定多個 } } |
4、執行腳本編寫
PS:添加文件位置為/etc/keepalived/haproxy_check.sh(79、80兩個節點文件內容一致即可)
#!/bin/bash COUNT=`ps -C haproxy --no-header |wc -l` if [ $COUNT -eq 0 ];then /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg sleep 2 if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then killall keepalived fi fi |
5、執行腳本賦權
PS:haproxy_check.sh腳本授權,賦予可執行權限.
chmod +x /etc/keepalived/haproxy_check.sh |
6、啟動keepalived
PS:當我們啟動倆個haproxy節點以后,我們可以啟動keepalived服務程序:
//啟動兩台機器的keepalived service keepalived start | stop | status | restart //查看狀態 ps -ef | grep haproxy ps -ef | grep keepalived |
7、高可用測試
PS:vip在27節點上
PS:27節點宕機測試:停掉27的keepalived服務即可。
PS:查看28節點狀態:我們發現VIP漂移到了28節點上,那么28節點的haproxy可以繼續對外提供服務!
2.7 集群配置文件
創建如下配置文件位於:/etc/rabbitmq目錄下(這個目錄需要自己創建)
環境變量配置文件:rabbitmq-env.conf
配置信息配置文件:rabbitmq.config(可以不創建和配置,修改)
rabbitmq-env.conf配置文件:
---------------------------------------關鍵參數配置-------------------------------------------
RABBITMQ_NODE_IP_ADDRESS=本機IP地址
RABBITMQ_NODE_PORT=5672
RABBITMQ_LOG_BASE=/var/lib/rabbitmq/log
RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia
配置參考參數如下:
RABBITMQ_NODENAME=FZTEC-240088 節點名稱
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 監聽IP
RABBITMQ_NODE_PORT=5672 監聽端口
RABBITMQ_LOG_BASE=/data/rabbitmq/log 日志目錄
RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目錄
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 后端存儲目錄
更詳細的配置參見: http://www.rabbitmq.com/configure.html#configuration-file
配置文件信息修改:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/ebin/rabbit.app和rabbitmq.config配置文件配置任意一個即可,我們進行配置如下:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/ebin/rabbit.app
-------------------------------------關鍵參數配置----------------------------------------
tcp_listerners 設置rabbimq的監聽端口,默認為[5672]。
disk_free_limit 磁盤低水位線,若磁盤容量低於指定值則停止接收數據,默認值為{mem_relative, 1.0},即與內存相關聯1:1,也可定制為多少byte.
vm_memory_high_watermark,設置內存低水位線,若低於該水位線,則開啟流控機制,默認值是0.4,即內存總量的40%。
hipe_compile 將部分rabbimq代碼用High Performance Erlang compiler編譯,可提升性能,該參數是實驗性,若出現erlang vm segfaults,應關掉。
force_fine_statistics, 該參數屬於rabbimq_management,若為true則進行精細化的統計,但會影響性能
------------------------------------------------------------------------------------------
更詳細的配置參見:http://www.rabbitmq.com/configure.html
3 Stream調研
3.1 Stream簡介
Spring Cloud Stream是創建消息驅動微服務應用的框架。Spring Cloud Stream是基於spring boot創建,用來建立單獨的/工業級spring應用,使用spring integration提供與消息代理之間的連接。本文提供不同代理中的中間件配置,介紹了持久化發布訂閱機制,以及消費組以及分割的概念。
將注解@EnableBinding加到應用上就可以實現與消息代理的連接,@StreamListener注解加到方法上,使之可以接收處理流的事件。
3.2 官方參考文檔
原版:
http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_main_concepts
翻譯:
http://blog.csdn.net/phyllisy/article/details/51352868
3.3 API操作手冊
3.3.1 生產者示例
PS:生產者yml配置
spring: cloud: stream: instanceCount: 3 bindings: output_channel: #輸出 生產者 group: queue-1 #指定相同的exchange-1和不同的queue 表示廣播模式 #指定相同的exchange和相同的queue表示集群負載均衡模式 destination: exchange-1 # kafka:發布訂閱模型里面的topic rabbitmq: exchange的概念(但是exchange的類型那里設置呢?) binder: rabbit_cluster binders: rabbit_cluster: type: rabbit environment: spring: rabbitmq: host: 192.168.1.27 port: 5672 username: guest password: guest virtual-host: / |
PS: Barista接口為自定義管道
package bhz.spring.cloud.stream;
import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel;
/** * <B>中文類名:</B><BR> * <B>概要說明:</B><BR> * 這里的Barista接口是定義來作為后面類的參數,這一接口定義來通道類型和通道名稱。 * 通道名稱是作為配置用,通道類型則決定了app會使用這一通道進行發送消息還是從中接收消息。 * @author bhz(Alienware) * @since 2015年11月22日 */ public interface Barista {
String INPUT_CHANNEL = "input_channel"; String OUTPUT_CHANNEL = "output_channel";
//注解@Input聲明了它是一個輸入類型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。這一名字與上述配置app2的配置文件中position1應該一致,表明注入了一個名字叫做input_channel的通道,它的類型是input,訂閱的主題是position2處聲明的mydest這個主題 @Input(Barista.INPUT_CHANNEL) SubscribableChannel loginput(); //注解@Output聲明了它是一個輸出類型的通道,名字是output_channel。這一名字與app1中通道名一致,表明注入了一個名字為output_channel的通道,類型是output,發布的主題名為mydest。 @Output(Barista.OUTPUT_CHANNEL) MessageChannel logoutput(); } |
PS: 生產者消息投遞
package bhz.spring.cloud.stream;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service;
@Service public class RabbitmqSender {
@Autowired private Barista source;
// 發送消息 public String sendMessage(Object message){ try{ source.logoutput().send(MessageBuilder.withPayload(message).build()); System.out.println("發送數據:" + message); }catch (Exception e){ e.printStackTrace(); } return null; } } |
PS: Spring Boot應用入口
package bhz.spring.cloud.stream;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication @EnableBinding(Barista.class) public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } } |
3.3.2 消費者示例
PS:消費者yml配置
spring: cloud: stream: instanceCount: 3 bindings: input_channel: #輸出 生產者 destination: exchange-1 # kafka:發布訂閱模型里面的topic rabbitmq: exchange的概念(但是exchange的類型那里設置呢?) group: queue-1 #指定相同的exchange-1和不同的queue 表示廣播模式 #指定相同的exchange和相同的queue表示集群負載均衡模式 binder: rabbit_cluster consumer: concurrency: 1 rabbit: bindings: input_channel: consumer: transacted: true txSize: 10 acknowledgeMode: MANUAL durableSubscription: true maxConcurrency: 20 recoveryInterval: 3000 binders: rabbit_cluster: type: rabbit environment: spring: rabbitmq: host: 192.168.1.27 port: 5672 username: guest password: guest virtual-host: / |
PS: Barista接口為自定義管道
package bhz.spring.cloud.stream;
import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel;
/** * <B>中文類名:</B><BR> * <B>概要說明:</B><BR> * 這里的Barista接口是定義來作為后面類的參數,這一接口定義來通道類型和通道名稱。 * 通道名稱是作為配置用,通道類型則決定了app會使用這一通道進行發送消息還是從中接收消息。 * @author bhz(Alienware) * @since 2015年11月22日 */ public interface Barista {
String INPUT_CHANNEL = "input_channel"; String OUTPUT_CHANNEL = "output_channel";
//注解@Input聲明了它是一個輸入類型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。這一名字與上述配置app2的配置文件中position1應該一致,表明注入了一個名字叫做input_channel的通道,它的類型是input,訂閱的主題是position2處聲明的mydest這個主題 @Input(Barista.INPUT_CHANNEL) SubscribableChannel loginput(); //注解@Output聲明了它是一個輸出類型的通道,名字是output_channel。這一名字與app1中通道名一致,表明注入了一個名字為output_channel的通道,類型是output,發布的主題名為mydest。 @Output(Barista.OUTPUT_CHANNEL) MessageChannel logoutput();
} |
PS: 消費者消息獲取
package bhz.spring.cloud.stream;
import java.io.IOException;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.binding.ChannelBindingService; import org.springframework.cloud.stream.config.ChannelBindingServiceConfiguration; import org.springframework.cloud.stream.endpoint.ChannelsEndpoint; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.RendezvousChannel; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.core.MessageReceivingOperations; import org.springframework.messaging.core.MessageRequestReplyOperations; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@EnableBinding(Barista.class) @Service public class RabbitmqReceiver {
@Autowired private Barista source;
@StreamListener(Barista.INPUT_CHANNEL) public void receiver( Message message) {
//廣播通道 //PublishSubscribeChannel psc = new PublishSubscribeChannel(); //確認通道 //RendezvousChannel rc = new RendezvousChannel(); Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); System.out.println("Input Stream 1 接受數據:" + message); try { channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); } } } |
PS: Spring Boot應用入口
package bhz.spring.cloud.stream;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication @EnableBinding(Barista.class) @EnableTransactionManagement public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } } |
4 制定擴展
4.1 延遲隊列插件
#step1:upload the ‘rabbitmq_delayed_message_exchange-0.0.1.ez’ file:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
http://www.rabbitmq.com/community-plugins.html
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange/v3.6.x#files/
#step2:PUT Directory:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/plugins
#step3:Then run the following command:
Start the rabbitmq cluster for command ## rabbitmq-server -detached
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
訪問地址:http://192.168.1.21:15672/#/exchanges,添加延遲隊列