rabbitmq 參考手冊


 

 

 

 

RabbitMQ消息服務用戶手冊

(UBP, Message Queue)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

XXX

2016年7月

 

 

1    基礎知識

1.1   集群總體概述

Rabbitmq Broker集群是多個erlang節點的邏輯組,每個節點運行Rabbitmq應用,他們之間共享用戶、虛擬主機、隊列、exchange、綁定和運行時參數。

1.2   集群復制信息

除了message queue(存在一個節點,從其他節點都可見、訪問該隊列,要實現queue的復制就需要做queue的HA)之外,任何一個Rabbitmq broker上的所有操作的data和state都會在所有的節點之間進行復制。

1.3   集群運行前提

1、集群所有節點必須運行相同的erlang及Rabbitmq版本。

2、hostname解析,節點之間通過域名相互通信,本文為3個node的集群,采用配置hosts的形式。

1.4   集群互通方式

1、集群所有節點必須運行相同的erlang及Rabbitmq版本hostname解析,節點之間通過域名相互通信,本文為3個node的集群,采用配置hosts的形式。

1.5   端口及其用途

1、5672 客戶端連接端口。       

2、15672 web管控台端口。  

3、25672 集群通信端口。

1.6   集群配置方式

通過rabbitmqctl手工配置的方式。

1.7   集群故障處理

1、rabbitmq broker集群允許個體節點宕機。

2、對應集群的的網絡分區問題(network partitions)集群推薦用於LAN環境,不適用WAN環境;要通過WAN連接broker,Shovel or Federation插件是最佳解決方案(Shovel or Federation不同於集群:注Shovel為中心服務遠程異步復制機制,稍后會有介紹)。

1.8   節點運行模式

為保證數據持久性,目前所有node節點跑在disk模式,如果今后壓力大,需要提高性能,考慮采用ram模式。

1.9   集群認證方式

通過Erlang Cookie,相當於共享秘鑰的概念,長度任意,只要所有節點都一致即可。rabbitmq server在啟動的時候,erlang VM會自動創建一個隨機的cookie文件。cookie文件的位置: /var/lib/rabbitmq/.erlang.cookie 或者/root/.erlang.cookie。我們的為保證cookie的完全一致,采用從一個節點copy的方式,實現各個節點的cookie文件一致。

2    集群搭建

2.1   集群節點安裝

1、安裝依賴包

PS:安裝rabbitmq所需要的依賴包

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

 

 

2、下載安裝包

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

 

3、安裝服務命令

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

 

 

4、修改集群用戶與連接心跳檢測

注意修改vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件

修改:loopback_users 中的 <<"guest">>,只保留guest

修改:heartbeat 為1

 

5、安裝管理插件

//首先啟動服務

/etc/init.d/rabbitmq-server start stop status restart

//查看服務有沒有啟動: lsof -i:5672

rabbitmq-plugins enable rabbitmq_management

//可查看管理端口有沒有啟動: lsof -i:15672 或者 netstat -tnlp|grep 15672

 

6、服務指令

/etc/init.d/rabbitmq-server start stop status restart

驗證單個節點是否安裝成功:http://192.168.11.71:15672/

1.  Ps:以上操作三個節點(71、72、73)同時進行操作

2.2   文件同步步驟

PS:選擇76、77、78任意一個節點為Master(這里選擇76為Master),也就是說我們需要把76的Cookie文件同步到77、78節點上去,進入/var/lib/rabbitmq目錄下,把/var/lib/rabbitmq/.erlang.cookie文件的權限修改為777,原來是400;然后把.erlang.cookie文件copy到各個節點下;最后把所有cookie文件權限還原為400即可。

/etc/init.d/rabbitmq-server stop

//進入目錄修改權限;遠程copy77、78節點,比如:

scp /var/lib/rabbitmq/.erlang.cookie 到192.168.11.77和192.168.11.78中

 

2.3   組成集群步驟

1、停止MQ服務

PS:我們首先停止3個節點的服務

rabbitmqctl stop

2、組成集群操作

PS:接下來我們就可以使用集群命令,配置76、77、78為集群模式,3個節點(76、77、78)執行啟動命令,后續啟動集群使用此命令即可。

rabbitmq-server -detached

3、slave加入集群操作(重新加入集群也是如此,以最開始的主節點為加入節點)

//注意做這個步驟的時候:需要配置/etc/hosts 必須相互能夠尋址到

bhz77:rabbitmqctl stop_app

bhz77:rabbitmqctl join_cluster --ram rabbit@bhz76 –ram代表內存方式,不寫代表磁盤

bhz77:rabbitmqctl start_app

bhz78:rabbitmqctl stop_app

bhz78:rabbitmqctl join_cluster rabbit@bhz76

bhz78:rabbitmqctl start_app

//在另外其他節點上操作要移除的集群節點

rabbitmqctl forget_cluster_node rabbit@bhz24

4、修改集群名稱

PS:修改集群名稱(默認為第一個node名稱):

rabbitmqctl set_cluster_name rabbitmq_cluster1

 

5、查看集群狀態

PS:最后在集群的任意一個節點執行命令:查看集群狀態

rabbitmqctl cluster_status

 

6、管控台界面

PS: 訪問任意一個管控台節點:http://192.168.11.71:15672 如圖所示

 

2.4   配置鏡像隊列

PS:設置鏡像隊列策略(在任意一個節點上執行)

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

PS:將所有隊列設置為鏡像隊列,即隊列會被復制到各個節點,各個節點狀態一致,RabbitMQ高可用集群就已經搭建好了,我們可以重啟服務,查看其隊列是否在從節點同步。

 

2.5   安裝Ha-Proxy

1、Haproxy簡介

HAProxy是一款提供高可用性、負載均衡以及基於TCP和HTTP應用的代理軟件,HAProxy是完全免費的、借助HAProxy可以快速並且可靠的提供基於TCP和HTTP應用的代理解決方案。

HAProxy適用於那些負載較大的web站點,這些站點通常又需要會話保持或七層處理。

HAProxy可以支持數以萬計的並發連接,並且HAProxy的運行模式使得它可以很簡單安全的整合進架構中,同時可以保護web服務器不被暴露到網絡上。

2、Haproxy安裝

PS:79、80節點同時安裝Haproxy,下面步驟統一

//下載依賴包

yum install gcc vim wget

//下載haproxy

wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.5.tar.gz

//解壓

tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local

//進入目錄、進行編譯、安裝

cd /usr/local/haproxy-1.6.5

make TARGET=linux31 PREFIX=/usr/local/haproxy

make install PREFIX=/usr/local/haproxy

mkdir /etc/haproxy

//賦權

groupadd -r -g 149 haproxy

useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy

//創建haproxy配置文件

touch /etc/haproxy/haproxy.cfg

 

3、Haproxy配置

PS:haproxy 配置文件haproxy.cfg詳解

vim /etc/haproxy/haproxy.cfg

 

#logging options

global

    log 127.0.0.1 local0 info

    maxconn 5120

    chroot /usr/local/haproxy

    uid 99

    gid 99

    daemon

    quiet

    nbproc 20

    pidfile /var/run/haproxy.pid

 

defaults

    log global

    #使用4層代理模式,”mode http”為7層代理模式

    mode tcp

    #if you set mode to tcp,then you nust change tcplog into httplog

    option tcplog

    option dontlognull

    retries 3

    option redispatch

    maxconn 2000

    contimeout 5s

     ##客戶端空閑超時時間為 60秒 則HA 發起重連機制

     clitimeout 60s

     ##服務器端鏈接超時時間為 15秒 則HA 發起重連機制

     srvtimeout 15s   

#front-end IP for consumers and producters

 

listen rabbitmq_cluster

    bind 0.0.0.0:5672

    #配置TCP模式

    mode tcp

    #balance url_param userid

    #balance url_param session_id check_post 64

    #balance hdr(User-Agent)

    #balance hdr(host)

    #balance hdr(Host) use_domain_only

    #balance rdp-cookie

    #balance leastconn

    #balance source //ip

    #簡單的輪詢

    balance roundrobin

    #rabbitmq集群節點配置 #inter 每隔五秒對mq集群做健康檢查, 2次正確證明服務器可用,2次失敗證明服務器不可用,並且配置主備機制

        server bhz76 192.168.11.76:5672 check inter 5000 rise 2 fall 2

        server bhz77 192.168.11.77:5672 check inter 5000 rise 2 fall 2

        server bhz78 192.168.11.78:5672 check inter 5000 rise 2 fall 2

#配置haproxy web監控,查看統計信息

listen stats

    bind 192.168.11.79:8100

    mode http

    option httplog

    stats enable

    #設置haproxy監控地址為http://localhost:8100/rabbitmq-stats

    stats uri /rabbitmq-stats

    stats refresh 5s

 

4、啟動haproxy

/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg

//查看haproxy進程狀態

ps -ef | grep haproxy

 

5、訪問haproxy

PS:訪問如下地址可以對rmq節點進行監控:http://192.168.1.27:8100/rabbitmq-stats

 

6、關閉haproxy

killall haproxy

ps -ef | grep haproxy

 

2.6   安裝KeepAlived

 

1、Keepalived簡介

Keepalived,它是一個高性能的服務器高可用或熱備解決方案,Keepalived主要來防止服務器單點故障的發生問題,可以通過其與Nginx、Haproxy等反向代理的負載均衡服務器配合實現web服務端的高可用。Keepalived以VRRP協議為實現基礎,用VRRP協議來實現高可用性(HA).VRRP(Virtual Router Redundancy Protocol)協議是用於實現路由器冗余的協議,VRRP協議將兩台或多台路由器設備虛擬成一個設備,對外提供虛擬路由器IP(一個或多個)。

 

2、Keepalived安裝

PS:下載地址:http://www.keepalived.org/download.html

//安裝所需軟件包

yum install -y openssl openssl-devel

//下載

wget http://www.keepalived.org/software/keepalived-1.2.18.tar.gz

//解壓、編譯、安裝

tar -zxvf keepalived-1.2.18.tar.gz -C /usr/local/

cd keepalived-1.2.18/ && ./configure --prefix=/usr/local/keepalived

make && make install

//將keepalived安裝成Linux系統服務,因為沒有使用keepalived的默認安裝路徑(默認路徑:/usr/local),安裝完成之后,需要做一些修改工作

//首先創建文件夾,將keepalived配置文件進行復制:

mkdir /etc/keepalived

cp /usr/local/keepalived/etc/keepalived/keepalived.conf /etc/keepalived/

//然后復制keepalived腳本文件:

cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/init.d/

cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/

ln -s /usr/local/sbin/keepalived /usr/sbin/

ln -s /usr/local/keepalived/sbin/keepalived /sbin/

//可以設置開機啟動:chkconfig keepalived on,到此我們安裝完畢!

chkconfig keepalived on

 

 

3、Keepalived配置

PS:修改keepalived.conf配置文件

vim /etc/keepalived/keepalived.conf

 

 

PS: 79節點(Master)配置如下

! Configuration File for keepalived

 

global_defs {

   router_id bhz79  ##標識節點的字符串,通常為hostname

 

}

 

vrrp_script chk_haproxy {

    script "/etc/keepalived/haproxy_check.sh"  ##執行腳本位置

    interval 2  ##檢測時間間隔

    weight -20  ##如果條件成立則權重減20

}

 

vrrp_instance VI_1 {

    state MASTER  ## 主節點為MASTER,備份節點為BACKUP

    interface eth0 ## 綁定虛擬IP的網絡接口(網卡),與本機IP地址所在的網絡接口相同(我這里是eth0)

    virtual_router_id 79  ## 虛擬路由ID號(主備節點一定要相同)

    mcast_src_ip 192.168.11.79 ## 本機ip地址

    priority 100  ##優先級配置(0-254的值)

    nopreempt

    advert_int 1  ## 組播信息發送間隔,倆個節點必須配置一致,默認1s

authentication {  ## 認證匹配

        auth_type PASS

        auth_pass bhz

    }

 

    track_script {

        chk_haproxy

    }

 

    virtual_ipaddress {

        192.168.11.70  ## 虛擬ip,可以指定多個

    }

}

 

 

 

 

 

 

 

PS: 80節點(backup)配置如下

! Configuration File for keepalived

 

global_defs {

   router_id bhz80  ##標識節點的字符串,通常為hostname

 

}

 

vrrp_script chk_haproxy {

    script "/etc/keepalived/haproxy_check.sh"  ##執行腳本位置

    interval 2  ##檢測時間間隔

    weight -20  ##如果條件成立則權重減20

}

 

vrrp_instance VI_1 {

    state BACKUP  ## 主節點為MASTER,備份節點為BACKUP

    interface eno16777736 ## 綁定虛擬IP的網絡接口(網卡),與本機IP地址所在的網絡接口相同(我這里是eno16777736)

    virtual_router_id 79  ## 虛擬路由ID號(主備節點一定要相同)

    mcast_src_ip 192.168.11.80  ## 本機ip地址

    priority 90  ##優先級配置(0-254的值)

    nopreempt

    advert_int 1  ## 組播信息發送間隔,倆個節點必須配置一致,默認1s

authentication {  ## 認證匹配

        auth_type PASS

        auth_pass bhz

    }

 

    track_script {

        chk_haproxy

    }

 

    virtual_ipaddress {

        192.168.1.70  ## 虛擬ip,可以指定多個

    }

}

 

 

 

 

 

 

4、執行腳本編寫

PS:添加文件位置為/etc/keepalived/haproxy_check.sh(79、80兩個節點文件內容一致即可)

#!/bin/bash

COUNT=`ps -C haproxy --no-header |wc -l`

if [ $COUNT -eq 0 ];then

    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg

    sleep 2

    if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then

        killall keepalived

    fi

fi

 

5、執行腳本賦權

PS:haproxy_check.sh腳本授權,賦予可執行權限.

chmod +x /etc/keepalived/haproxy_check.sh

 

6、啟動keepalived

PS:當我們啟動倆個haproxy節點以后,我們可以啟動keepalived服務程序:

//啟動兩台機器的keepalived

service keepalived start | stop | status | restart

//查看狀態

ps -ef | grep haproxy

ps -ef | grep keepalived

 

7、高可用測試

PS:vip在27節點上

 

PS:27節點宕機測試:停掉27的keepalived服務即可。

 

PS:查看28節點狀態:我們發現VIP漂移到了28節點上,那么28節點的haproxy可以繼續對外提供服務!

 

 

 

2.7   集群配置文件

創建如下配置文件位於:/etc/rabbitmq目錄下(這個目錄需要自己創建)

環境變量配置文件:rabbitmq-env.conf

配置信息配置文件:rabbitmq.config(可以不創建和配置,修改)

rabbitmq-env.conf配置文件:

---------------------------------------關鍵參數配置-------------------------------------------

RABBITMQ_NODE_IP_ADDRESS=本機IP地址

RABBITMQ_NODE_PORT=5672

RABBITMQ_LOG_BASE=/var/lib/rabbitmq/log

RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia

 

配置參考參數如下:

RABBITMQ_NODENAME=FZTEC-240088 節點名稱

RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 監聽IP

RABBITMQ_NODE_PORT=5672 監聽端口

RABBITMQ_LOG_BASE=/data/rabbitmq/log 日志目錄

RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目錄

RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 后端存儲目錄

更詳細的配置參見: http://www.rabbitmq.com/configure.html#configuration-file

 

配置文件信息修改:

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/ebin/rabbit.app和rabbitmq.config配置文件配置任意一個即可,我們進行配置如下:

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/ebin/rabbit.app

-------------------------------------關鍵參數配置----------------------------------------

tcp_listerners 設置rabbimq的監聽端口,默認為[5672]。
disk_free_limit 磁盤低水位線,若磁盤容量低於指定值則停止接收數據,默認值為{mem_relative, 1.0},即與內存相關聯1:1,也可定制為多少byte.
vm_memory_high_watermark,設置內存低水位線,若低於該水位線,則開啟流控機制,默認值是0.4,即內存總量的40%。
hipe_compile 將部分rabbimq代碼用High Performance Erlang compiler編譯,可提升性能,該參數是實驗性,若出現erlang vm segfaults,應關掉。
force_fine_statistics, 該參數屬於rabbimq_management,若為true則進行精細化的統計,但會影響性能

------------------------------------------------------------------------------------------

更詳細的配置參見:http://www.rabbitmq.com/configure.html

 

3    Stream調研

3.1   Stream簡介

Spring Cloud Stream是創建消息驅動微服務應用的框架。Spring Cloud Stream是基於spring boot創建,用來建立單獨的/工業級spring應用,使用spring integration提供與消息代理之間的連接。本文提供不同代理中的中間件配置,介紹了持久化發布訂閱機制,以及消費組以及分割的概念。
將注解@EnableBinding加到應用上就可以實現與消息代理的連接,@StreamListener注解加到方法上,使之可以接收處理流的事件。

 

3.2   官方參考文檔

原版:

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_main_concepts

翻譯:

http://blog.csdn.net/phyllisy/article/details/51352868

3.3   API操作手冊

3.3.1    生產者示例

PS:生產者yml配置

spring: 

  cloud: 

    stream:

      instanceCount: 3 

      bindings:

        output_channel:       #輸出 生產者 

          group: queue-1  #指定相同的exchange-1和不同的queue 表示廣播模式 #指定相同的exchange和相同的queue表示集群負載均衡模式

          destination: exchange-1 # kafka:發布訂閱模型里面的topic rabbitmq: exchange的概念(但是exchange的類型那里設置呢?) 

          binder: rabbit_cluster

      binders: 

        rabbit_cluster: 

          type: rabbit 

          environment: 

            spring: 

              rabbitmq: 

                host: 192.168.1.27    

                port: 5672           

                username: guest

                password: guest

                virtual-host: /

 

PS: Barista接口為自定義管道

package bhz.spring.cloud.stream;

 

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

 

/**

 * <B>中文類名:</B><BR>

 * <B>概要說明:</B><BR>

 * 這里的Barista接口是定義來作為后面類的參數,這一接口定義來通道類型和通道名稱。

 * 通道名稱是作為配置用,通道類型則決定了app會使用這一通道進行發送消息還是從中接收消息。

 * @author bhz(Alienware)

 * @since 2015年11月22日

 */

public interface Barista {

     

    String INPUT_CHANNEL = "input_channel"; 

    String OUTPUT_CHANNEL = "output_channel"; 

 

 

    //注解@Input聲明了它是一個輸入類型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。這一名字與上述配置app2的配置文件中position1應該一致,表明注入了一個名字叫做input_channel的通道,它的類型是input,訂閱的主題是position2處聲明的mydest這個主題 

    @Input(Barista.INPUT_CHANNEL) 

    SubscribableChannel loginput(); 

    //注解@Output聲明了它是一個輸出類型的通道,名字是output_channel。這一名字與app1中通道名一致,表明注入了一個名字為output_channel的通道,類型是output,發布的主題名為mydest。 

    @Output(Barista.OUTPUT_CHANNEL)

    MessageChannel logoutput();       

 

PS: 生產者消息投遞

package bhz.spring.cloud.stream;

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Service;

 

@Service 

public class RabbitmqSender { 

 

    @Autowired 

    private Barista source; 

 

    // 發送消息 

    public String sendMessage(Object message){ 

        try{ 

            source.logoutput().send(MessageBuilder.withPayload(message).build());

            System.out.println("發送數據:" + message);

        }catch (Exception e){ 

            e.printStackTrace(); 

        } 

        return null; 

    } 

 

PS: Spring Boot應用入口

package bhz.spring.cloud.stream;

 

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

 

@SpringBootApplication 

@EnableBinding(Barista.class) 

public class ProducerApplication { 

    public static void main(String[] args) { 

        SpringApplication.run(ProducerApplication.class, args); 

    } 

 

3.3.2    消費者示例

PS:消費者yml配置

spring: 

  cloud: 

    stream:

      instanceCount: 3

      bindings:

        input_channel:       #輸出 生產者 

          destination: exchange-1 # kafka:發布訂閱模型里面的topic rabbitmq: exchange的概念(但是exchange的類型那里設置呢?) 

          group: queue-1  #指定相同的exchange-1和不同的queue 表示廣播模式 #指定相同的exchange和相同的queue表示集群負載均衡模式

          binder: rabbit_cluster

          consumer:

            concurrency: 1

      rabbit: 

        bindings:

          input_channel:

            consumer:

              transacted: true

              txSize: 10

              acknowledgeMode: MANUAL

              durableSubscription: true

              maxConcurrency: 20

              recoveryInterval: 3000           

      binders: 

        rabbit_cluster: 

          type: rabbit 

          environment: 

            spring: 

              rabbitmq: 

                host: 192.168.1.27    

                port: 5672           

                username: guest

                password: guest

                virtual-host: /             

 

PS: Barista接口為自定義管道

package bhz.spring.cloud.stream;

 

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

 

/**

 * <B>中文類名:</B><BR>

 * <B>概要說明:</B><BR>

 * 這里的Barista接口是定義來作為后面類的參數,這一接口定義來通道類型和通道名稱。

 * 通道名稱是作為配置用,通道類型則決定了app會使用這一通道進行發送消息還是從中接收消息。

 * @author bhz(Alienware)

 * @since 2015年11月22日

 */

public interface Barista {

     

    String INPUT_CHANNEL = "input_channel"; 

    String OUTPUT_CHANNEL = "output_channel"; 

 

    //注解@Input聲明了它是一個輸入類型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。這一名字與上述配置app2的配置文件中position1應該一致,表明注入了一個名字叫做input_channel的通道,它的類型是input,訂閱的主題是position2處聲明的mydest這個主題 

    @Input(Barista.INPUT_CHANNEL) 

    SubscribableChannel loginput(); 

    //注解@Output聲明了它是一個輸出類型的通道,名字是output_channel。這一名字與app1中通道名一致,表明注入了一個名字為output_channel的通道,類型是output,發布的主題名為mydest。 

    @Output(Barista.OUTPUT_CHANNEL)

    MessageChannel logoutput(); 

     

 

PS: 消費者消息獲取

package bhz.spring.cloud.stream;

 

import java.io.IOException;

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.support.CorrelationData;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.binding.ChannelBindingService;

import org.springframework.cloud.stream.config.ChannelBindingServiceConfiguration;

import org.springframework.cloud.stream.endpoint.ChannelsEndpoint;

import org.springframework.integration.channel.PublishSubscribeChannel;

import org.springframework.integration.channel.RendezvousChannel;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

import org.springframework.messaging.core.MessageReceivingOperations;

import org.springframework.messaging.core.MessageRequestReplyOperations;

import org.springframework.messaging.support.ChannelInterceptor;

import org.springframework.stereotype.Service;

 

import com.rabbitmq.client.Channel;

 

 

@EnableBinding(Barista.class)

@Service

public class RabbitmqReceiver { 

 

    @Autowired 

    private Barista source; 

   

    @StreamListener(Barista.INPUT_CHANNEL) 

    public void receiver( Message message) { 

   

     //廣播通道

    //PublishSubscribeChannel psc = new PublishSubscribeChannel();

    //確認通道

     //RendezvousChannel rc = new RendezvousChannel();

    Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);

    Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

    System.out.println("Input Stream 1 接受數據:" + message);

     try {

             channel.basicAck(deliveryTag, false);

         } catch (IOException e) {

             e.printStackTrace();

         }

    } 

 

PS: Spring Boot應用入口

package bhz.spring.cloud.stream;

 

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.transaction.annotation.EnableTransactionManagement;

 

@SpringBootApplication 

@EnableBinding(Barista.class)

@EnableTransactionManagement

public class ConsumerApplication { 

    public static void main(String[] args) { 

        SpringApplication.run(ConsumerApplication.class, args); 

    } 

4    制定擴展

4.1   延遲隊列插件

#step1:upload the ‘rabbitmq_delayed_message_exchange-0.0.1.ez’ file:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

http://www.rabbitmq.com/community-plugins.html

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange/v3.6.x#files/

 

#step2:PUT Directory:

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/plugins

#step3:Then run the following command:

Start the rabbitmq cluster for command ## rabbitmq-server -detached

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

訪問地址:http://192.168.1.21:15672/#/exchanges,添加延遲隊列

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM