背景
本公司是.Net項目,在.Net可選的MQ比較少,主要Kafka和RabbitMQ,RabbitMQ我也是使用多年了,最近的Kafka廣告與流行度我也是無法無視,因此也是花了點時間收集了資料做了些對比。
如果下文有總結不到位的,或者差錯的,可以在下方評論反饋給我
RabbitMQ模型
名詞 | 描述 |
Queue | 用於存儲消息,消費者直接綁定Queue進行消費消息 |
Exchange | 生產者將消息發送到Exchange,由交換器將消息通過匹配Exchange Type、Binding Key、Routing Key后路由到一個或者多個隊列中。 |
Exchange Type | Direct、Fanout、Topic、Headers |
Routing Key | 生產者發送消息給Exchange會指定一個Routing Key。 |
Binding Key | 在綁定Exchange與Queue時會指定一個Binding Key |
1.Exchange在聲明時會綁定Queue和Binding Key,當Exchange收到消息會根據消息的
2.Routing Key與Exchange Type、Binding Key進行匹配,最后會路由到相關的隊列當中。
Fanout,將消息發送到與該交換器所綁定的所有隊列中,與Routing Key、Bind Key無關,這就是廣播模式。
Topic,通過對消息的Routing Key和Exchange、Queue進行匹配,將消息路由給一個或多個隊列,以此來達到發布/訂閱模式。
Direct,把消息路由到哪些Bind Key和Routing Key完全匹配的隊列中。
Headers,不依賴與路由鍵的匹配規則,基本用不上。
3.消費者會直接訂閱Queue里的消息進行消費,多個消費者訂閱同個Queue會形成消息競爭狀態,以此達到負載均衡作用。
Kafka模型
名詞 | 描述 |
Topic | 隊列是通過Topic進行隔離的,生產者發送消息必須指定Topic |
Broker | 一個Kafka Server的被稱為一個Broker。 |
Partition | 每個Topic可以包含多個Partition,多個Partition會平均分配給同一個Consumer Group里的不同Consumer進行消費 |
Consumer Group | 不在同一個Group 的Consumer能重復消費同一條消息(訂閱),相同Group的Consumer存在消費競爭(負載均衡) |
- Kafka與RabbitMQ比沒有Exchange的概念,生產者直接發消息Topic(隊列)。
- Kafka的訂閱者是通過消費組(Consumer Group)來體現的,每個消費組都可以重復消費Topic一份完整的消息,不同消費組之間消費進度彼此不受影響。例如Message1能被Consumer Group 1和Consumer Group2里的消費者都消費一次。
- 消費組中包含多個消費者,同個Group的消費者之間是競爭消費的關系。例如Message2只能夠被Consumer Group里某一個Consumer只消費一次。
- Kafka具有消息存儲的功能,消息被消費后不會被立即刪除,因為需要被不同的Consumer Group多次消費同個消息,因此會在Topic維護一個Consumer Offset,每消費成功Offset自增1.
功能對比
對比項 | RabbitMQ | Kafka |
吞吐量 | 低 | 高 |
有序性 | 全局有序性 | 分區有序性 |
消息可靠性 | 多策略組合 | 消息持久化 |
流處理 | 不支持 | 支持 |
時效性 | 高 | 中 |
運維便捷度 | 高 | 中 |
系統依賴 | 無 | zookeeper |
Web監控 | 自帶 | 第三方 |
優先級隊列 | 支持 | 不支持 |
死信 | 支持 | 不支持 |
客戶端支持 | 支持多種語言 | |
社區生態 | 好 | |
安全機制 | (TLS/SSL、SASL)身份認證和(讀寫)權限控制 | |
消息回溯 | 支持 | 不支持 |
對比描述
共同點
RabbitMQ與Kafka都有很好的客戶端語言支持、安全機制與生態支持。
性能
Kafka的誕生的是處理高並發日志的,吞吐量比較高,每秒請求數達到數十萬量級
RabbitMQ每秒請求數則為萬級別,有測試報告指出Kafka是RabbitMQ的10倍以上性能。
運維便捷
RabbitMQ相對比較方便,可以使用yum或者docker安裝,自帶Web管理UI,沒有額外的依賴,除了需要做鏡像隊列外需要引入HAproxy。
Kafka則需要依賴Zookeeper,也沒有自帶的管理工具,可以使用第三方的Kafka Eagle代替,Kafka Manager過於難用,另外Kafka沒有yum安裝,docker鏡像也是社區人員自己建的。
有序性
RabbitMQ理論上是全局有序性的,但是由於【發后既忘】+【自動確認】機制的原因,如果在同個隊列的多個消費者做相同的業務處理時,他們的各自的執行任務無法保證有序完成。如果確保100%有序可以使用【非自動確認】,但會影響消費性能。
Kafka支持分區有序性,如果對有序性有嚴格要求可以設置單個Partition,可是單個Partition並發性比較低,因此在多個Partition情況下可以根據業務指定key把相關的消息路由到同一個Partition,例如相同UserId行為信息可以到Partition 1進行處理。
時效性
Kafka基本上無論在客戶端還是服務端都是以【異步批量】的機制進行處理,通俗的講就是先攢起來一堆消息,到了某個閥值再發送,也會導致一些消息可靠性與消息有時效上的問題,當然可以通過各種配置策略進行解決。
消息回溯
Kafka在消費完了消息后不會立即刪除,只會修改offset,如果之前部分業務消費失敗了可以重新設置offset進行重新消費。
RabbitMQ則是[發后既忘]的機制,一但消費者確認消息則刪除,但是可以通過死信進行補償消費。此外RabbitMQ在隊列消息堆積多的情況下性能表現不佳,所以盡可能的及時消費消息。
特色功能
RabbitMQ具有死信的功能,可以通過死信形成重復消費與延時發送。
Kafka具有流處理功能,可以收集用戶的行為日志進行存儲與分析。
Kafka為什么快?
關鍵核心技術點:
- 異步批量處理
- 磁盤順序讀寫
- 操作系統PageCache緩存數據
- 零拷貝加速消費
Kafka的誕生就是為了高並發日志處理的,那么在他整個機制里使用了很多批量、異步、緩存。例如生產者客戶端,他會積累一定量(條數、大小)的消息,再批量的發給kafka broker,如果在這段時間客戶端服務掛了,就等於消息丟失了。當broker接受到了消息后,還有一堆騷操作-異步刷盤,也就是生產者發送給broker之后他是記錄在緩存的,每隔一段時間才會持久化到磁盤,假如這段真空期broker掛了,消息也是丟了。
Kafka是否消息不可靠?
Kafka快是因為犧牲了消息可靠換取回來的性能,在最早期版本的確沒提供消息可靠的策略,經過多個版本迭代后的功能完善,已經不存在這種舊觀念。那么可靠的關鍵點有以下:
生產者
設置ack:
- 0:producer不等待broker的ack,broker一接收到還沒有寫入磁盤就已經返回,可靠性最低;
- 1:producer等待broker的ack,partition的leader刷盤成功后返回ack,如果在follower同步成功之前leader故障,那么將會丟失數據,可靠性中;
- -1:producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack,數據一般不會丟失,延遲時間長但是可靠性高
消費者
設置enable.auto.commitrue,不管執行結果如何,消費者會自動提交offset。
選型總結
對於選擇Kafka還是RabbitMQ,主要考慮三個因素:吞吐量、運維能力和平台熟悉度。如果是需要流處理和高並發的日志處理,首選Kafka。但是大部分公司並沒有什么高並發的處理,因此可以着重考慮運維程度和平台熟悉度,前面提到Kafka也是有策略可以設置消息可靠的。RabbiMQ運維比較直接,包括客戶端EasyNetQ使用簡易性,基本上就是”開箱即用“。
客戶端
MQ | 客戶端名稱 | 文檔地址 |
Kafka | confluent-kafka-dotnet | https://github.com/confluentinc/confluent-kafka-dotnet/wiki |
RabbitMQ | EasyNetQ | https://github.com/EasyNetQ/EasyNetQ/wiki/Introduction |
RabbitMQ單節點部署
安裝yum install -y rabbitmq-server
開放相關端口
firewall-cmd --permanent --add-port=15672/tcp firewall-cmd --permanent --add-port=5672/tcp firewall-cmd --reload
啟動服務
service rabbitmq-server start
設置開機啟動
chkconfig rabbitmq-server on
啟動web管理界面
rabbitmq-plugins enable rabbitmq_management
增加訪問admin用戶,默認用戶guest只能本地訪問。
rabbitmqctl add_user admin 123456
設置admin用戶為管理員角色
rabbitmqctl set_user_tags admin administrator
設置默認admin用戶訪問權限
rabbitmqctl set_permissions -p "/" admin "." "." ".*"
重啟服務
service rabbitmq-server restart
瀏覽器訪問:http://IP:15672
RabbitMQ鏡像集群部署
1.所有實例都需要執行以上單實例部署的命令
2.所有實例服務器的打開相關端口
firewall-cmd --permanent --add-port=59984/tcp firewall-cmd --permanent --add-port=25672/tcp firewall-cmd --permanent --add-port=4369/tcp firewall-cmd --reload
3.在每台服務器添加其他rabbitmq 實例服務的hosts映射
#修復服務器hosts文件 vim /etc/hosts 192.168.88.138 server-a 192.168.88.139 server-c
先查看集群信息
rabbitmqctl cluster_status
4.同步erlang cookie
在server-c(192.168.88.139)執行遠程拷貝到server-a(192.168.88.138)
rabbitmqctl stop scp /var/lib/rabbitmq/.erlang.cookie root@192.168.88.138:/var/lib/rabbitmq/.erlang.cookie rabbitmq-server -detached
5.server-c加入RabbitMQ集群
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@server-a rabbitmqctl start_app
6.設置鏡像隊列策略
#任意節點執行 rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
7.安裝HAProxy並啟動
在服務器server-b執行下面命令:
yum install haproxy -y service haproxy start chkconfig haproxy on vim /etc/haproxy/haproxy.cfg
覆蓋配置
# Global settings #--------------------------------------------------------------------- global # to have these messages end up in /var/log/haproxy.log you will # need to: # # 1) configure syslog to accept network log events. This is done # by adding the '-r' option to the SYSLOGD_OPTIONS in # /etc/sysconfig/syslog # # 2) configure local2 events to go to the /var/log/haproxy.log # file. A line like the following can be added to # /etc/sysconfig/syslog # # local2.* /var/log/haproxy.log # log 127.0.0.1 local2 chroot /var/lib/haproxy pidfile /var/run/haproxy.pid maxconn 4000 user haproxy group haproxy daemon # turn on stats unix socket stats socket /var/lib/haproxy/stats #--------------------------------------------------------------------- # common defaults that all the 'listen' and 'backend' sections will # use if not designated in their block #--------------------------------------------------------------------- defaults mode http log global option httplog option dontlognull option http-server-close option forwardfor except 127.0.0.0/8 option redispatch retries 3 timeout http-request 10s timeout queue 1m timeout connect 10s timeout client 1m timeout server 1m timeout http-keep-alive 10s timeout check 10s maxconn 3000 #--------------------------------------------------------------------- # main frontend which proxys to the backends #--------------------------------------------------------------------- ###############RabbitMQ服務################# listen rabbitmq_cluster bind 0.0.0.0:5672 mode tcp balance roundrobin server server-a 192.168.88.138:5672 check inter 5000 rise 2 fall 3 weight 1 server server-c 192.168.88.139:5672 check inter 5000 rise 2 fall 3 weight 1 ###############RabbitMQ管理界面################# listen rabbitmq_ui bind 0.0.0.0:15672 server server-a 192.168.88.138:15672 server server-c 192.168.88.139:15672
開放端口
firewall-cmd --permanent --add-port=15672/tcp firewall-cmd --permanent --add-port=5672/tcp firewall-cmd --reload
Kafka單節點部署
Zookeeper部署
下載Zookeeper並啟動
docker run -d --restart always --name zookeeper -p 2181:2181 -v /root/zookeeper/data:/data -v /root/zookeeper/conf:/conf -v /root/zookeeper/logs:/logs zookeeper:3.6.1
開放2181端口
firewall-cmd --permanent --add-port=2181/tcp firewall-cmd --reload
Kafka服務部署
下載kafka 鏡像並啟動
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.88.139:2181 -e KAFKA_ADVERTISED_HOST_NAME=192.168.88.141 -e KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka:2.12-2.5.0
創建目錄並拷貝
mkdir /root/kafka docker cp kafka:/opt/kafka/config /root/kafka/config
刪除原有的容器並重新創建
docker stop kafka docker rm kafka docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.88.139:2181 -e KAFKA_ADVERTISED_HOST_NAME=192.168.88.141 -e KAFKA_ADVERTISED_PORT=9092 -v /root/kafka/config: /opt/kafka/config wurstmeister/kafka:2.12-2.5.0
開放9092端口
firewall-cmd --permanent --add-port=9092/tcp firewall-cmd --reload
Kafka-eagle
下載jdk依賴
yum -y install java-1.8.0-openjdk*
下載kafka-eagle-bin包
wget -o kafka-eagle-bin.tar.gz https://codeload.github.com/smartloli/kafka-eagle-bin/tar.gz/v2.0.1
解壓
tar -zxvf kafka-eagle-bin.tar.gz tar -zxvf kafka-eagle-bin-2.0.1/kafka-eagle-web-2.0.1-bin.tar.gz mv kafka-eagle-web-2.0.1 kafka-eagle
添加環境變量
vim /etc/profile export JAVA_HOME=/usr export KE_HOME=/etc/kafka-eagle export PATH=$PATH:$KE_HOME/bin:$JAVA_HOME/bin
生效環境變量
source /etc/profile
cd /etc/kafka-eagle/conf vim system-config.properties #注釋 #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 #cluster2.kafka.eagle.offset.storage=zk #cluster1.zk.acl.enable=false #cluster1.zk.acl.schema=digest #cluster1.zk.acl.username=test #cluster1.zk.acl.password=test123 修改 kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=192.168.88.139:2181 kafka.eagle.metrics.charts=true kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/etc/kafka-eagle/db/ke.db kafka.eagle.username=root kafka.eagle.password=root
啟動kafka-eagle服務
cd /etc/kafka-eagle/bin chmod +x ke.sh ke.sh start
開啟防火牆
firewall-cmd --permanent --add-port=8048/tcp firewall-cmd --reload
瀏覽器訪問:http://IP:8048
阿里雲費用
以下截圖基本以最低配置。