本節內容:
- 消息中間件
- 消息中間件特點
- 消息中間件的傳遞模型
- Kafka介紹
- 安裝部署Kafka集群
- 安裝Yahoo kafka manager
- kafka-manager添加kafka cluster
一、消息中間件
消息中間件是在消息的傳輸過程中保存消息的容器。消息中間件在將消息從消息生產者到消費者時充當中間人的作用。隊列的主要目的是提供路由並保證消息的傳送;如果發送消息時接收者不可用,消息對列會保留消息,直到可以成功地傳遞它為止,當然,消息隊列保存消息也是有期限的。
二、消息中間件特點
1. 采用異步處理模式
消息發送者可以發送一個消息而無須等待響應。消息發送者將消息發送到一條虛擬的通道(主題或者隊列)上,消息接收者則訂閱或者監聽該通道。一條消息可能最終轉發給一個或多個消息接收者,這些接收者都無需對消息發送者做出同步回應。整個過程是異步的。
- 比如用戶信息注冊。注冊完成后過段時間發送郵件或者短信。
2. 應用程序和應用程序調用關系為松耦合關系
- 發送者和接收者不必要了解對方、只需要確認消息
- 發送者和接收者不必同時在線
比如在線交易系統為了保證數據的最終一致,在支付系統處理完成后會把支付結果放到信息中間件里通知訂單系統修改訂單支付狀態。兩個系統通過消息中間件解耦。
三、消息中間件的傳遞模型
1. 點對點模型(PTP)
點對點模型用於消息生產者和消息消費者之間點對點的通信。消息生產者將消息發送到由某個名字標識的特定消費者。這個名字實際上對應於消費服務中的一個隊列(Queue),在消息傳遞給消費者之前它被存儲在這個隊列中。隊列消息可以放在內存中也可以是持久的,以保證在消息服務出現故障時仍然能夠傳遞消息。
點對點模型特性:
- 每個消息只有一個消費者
- 發送者和接受者沒有時間依賴
- 接受者確認消息接受和處理成功
2. 發布—訂閱模型(Pub/Sub)
發布者/訂閱者模型支持向一個特定的消息主題生產消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得消息。在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便能夠讓消費者訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在這種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。
其實消息中間件,像MySQL其實也可以作為消息中間件,只要你把消息中間件原理搞清楚,你會發現目前所有的存儲,包括NoSQL,只要支持順序性東西的,就可以作為一個消息中間件。就看你怎么去利用它了。就像redis里面那個隊列list,就可以作為一個消息隊列。
發布—訂閱模型特性:
- 每個消息可以有多個訂閱者
- 客戶端只有訂閱后才能接收到消息
- 持久訂閱和非持久訂閱
(1) 發布者和訂閱者有時間依賴
接收者和發布者只有建立訂閱關系才能收到消息。
(2) 持久訂閱
訂閱關系建立后,消息就不會消失,不管訂閱者是否在線。
(3) 非持久訂閱
訂閱者為了接收消息,必須一直在線
當只有一個訂閱者時約等於點對點模式。
大部分情況下會使用持久訂閱。常用的消息隊列有Kafka、RabbitMQ、ActiveMQ、metaq等。
四、Kafka介紹
Kafka是一種分布式消息系統,由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎,具有高水平擴展和高吞吐量。
目前越來越多的開源分布式處理系統如Apache flume、Apache Storm、Spark、Elasticsearch都支持與Kafka集成。
五、安裝部署Kafka集群
1. 環境信息
主機名 | 操作系統版本 | IP地址 | 安裝軟件 |
log1 | CentOS 7.0 | 114.55.29.86 | JDK1.7、kafka_2.11-0.9.0.1 |
log2 | CentOS 7.0 | 114.55.29.241 | JDK1.7、kafka_2.11-0.9.0.1 |
log3 | CentOS 7.0 | 114.55.253.15 | JDK1.7、kafka_2.11-0.9.0.1 |
2. 安裝JDK1.7
3台機器都需要安裝JDK1.7。

[root@log1 local]# mkdir /usr/java [root@log1 local]# tar zxf jdk-7u80-linux-x64.gz -C /usr/java/ [root@log1 local]# vim /etc/profile export JAVA_HOME=/usr/java/jdk1.7.0_80 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar [root@log1 local]# source /etc/profile
3. 安裝集群
需要先安裝好Zookeeper集群,見之前的文章《Zookeeper介紹及安裝部署》。
(1)創建消息持久化目錄
[root@log1 ~]# mkdir /kafkaLogs
(2)下載解壓kafka,版本是kafka_2.11-0.9.0.1
[root@log1 local]# wget http://mirrors.cnnic.cn/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz [root@log1 local]# tar zxf kafka_2.11-0.9.0.1.tgz
(3)修改配置
[root@log1 local]# cd kafka_2.11-0.9.0.1/config/ [root@log1 config]# vim server.properties
- 修改broker.id
- 修改kafka監聽地址
注意: advertised.host.name參數用來配置返回的host.name值,把這個參數配置為IP地址。這樣客戶端在使用java.net.InetAddress.getCanonicalHostName()獲取時拿到的就是ip地址而不是主機名。
- 修改消息持久化目錄
- 修改zk地址
- 添加啟用刪除topic配置
- 關閉自動創建topic
是否允許自動創建topic。如果設為true,那么produce,consume或者fetch metadata一個不存在的topic時,就會自動創建一個默認replication factor和partition number的topic。默認是true。
auto.create.topics.enable=false
(4)把log1的配置好的kafka拷貝到log2和log3上
[root@log1 local]# scp -rp kafka_2.11-0.9.0.1 root@114.55.29.241:/usr/local/ [root@log1 local]# scp -rp kafka_2.11-0.9.0.1 root@114.55.253.15:/usr/local/
(5)log2和log3主機上創建消息持久化目錄
[root@log2 ~]# mkdir /kafkaLogs [root@log3 ~]# mkdir /kafkaLogs
(6)修改log2配置文件中的broker.id為1,log3主機的為2
[root@log2 config]# vim server.properties
4. 啟動集群
log1主機啟動kafka:

[root@log1 ~]# cd /usr/local/kafka_2.11-0.9.0.1/ [root@log1 kafka_2.11-0.9.0.1]# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &
log2主機啟動kafka:

[root@log2 ~]# cd /usr/local/kafka_2.11-0.9.0.1/ [root@log2 kafka_2.11-0.9.0.1]# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &
log3主機啟動kafka:

[root@log3 ~]# cd /usr/local/kafka_2.11-0.9.0.1/ [root@log3 kafka_2.11-0.9.0.1]# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &
5. 腳本定期清理logs下的日志文件
默認kafka是按天切割日志的,而且不刪除:
這里寫一個簡單的腳本來清理這些日志,主要是清理server.log和controller.log。

[root@log1 ~]# cd /usr/local/kafka_2.11-0.9.0.1/ [root@log1 kafka_2.11-0.9.0.1]# vim clean_kafkalog.sh #!/bin/bash ###Description:This script is used to clear kafka logs, not message file. ###Written by: jkzhao - jkzhao@wisedu.com ###History: 2016-04-18 First release. # log file dir. logDir=/usr/local/kafka_2.11-0.9.0.1/logs # Reserved 7 files. COUNT=7 ls -t $logDir/server.log* | tail -n +$[$COUNT+1] | xargs rm -f ls -t $logDir/controller.log* | tail -n +$[$COUNT+1] | xargs rm -f ls -t $logDir/state-change.log* | tail -n +$[$COUNT+1] | xargs rm -f ls -t $logDir/log-cleaner.log* | tail -n +$[$COUNT+1] | xargs rm –f
賦予腳本執行權限:
[root@log1 kafka_2.11-0.9.0.1]# chmod +x clean_kafkalog.sh
周期性任務策略:每周日的0點0分去執行這個腳本。
[root@log1 logs]# crontab -e 0 0 * * 0 /usr/local/kafka_2.11-0.9.0.1/clean_kafkalog.sh
把清理日志的腳本拷貝到第二台和第三台主機:
[root@log1 kafka_2.11-0.9.0.1]# scp -p clean_kafkalog.sh root@114.55.29.241:/usr/local/kafka_2.11-0.9.0.1 [root@log1 kafka_2.11-0.9.0.1]# scp -p clean_kafkalog.sh root@114.55.253.15:/usr/local/kafka_2.11-0.9.0.1
6. 停止kafka命令
[root@log1 ~]# /usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-stop.sh
7. 測試集群
(1)log1主機上創建一個名為test的topic
[root@log1 kafka_2.11-0.9.0.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(2)log2和log3主機上利用命令行工具創建一個consumer程序
[root@log2 kafka_2.11-0.9.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning [root@log2 kafka_2.11-0.9.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
(3)log1主機上利用命令行工具創建一個producer程序
[root@log1 kafka_2.11-0.9.0.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
log1主機上終端輸入message,然后到log2和log3主機的終端查看:
8. 創建生產環境topic
如果kafka集群是3台,我們創建一個名為business的Topic,如下:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic business
注意:為Topic創建分區時,--partitions(分區數)最好是broker數量的整數倍,這樣才能使一個Topic的分區均勻的分布在整個Kafka集群中。
9. Kafka常用命令
(1)啟動kafka
nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
(2)查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
(3)控制台消費
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning
(4)刪除topic
- 刪除kafka存儲目錄(server.properties文件log.dirs配置,默認為"/tmp/kafka-logs")相關topic目錄
- 如果配置了delete.topic.enable=true直接通過命令刪除,如果命令刪除不掉,直接通過zookeeper-client 刪除掉"/brokers/topics/"目錄下相關topic節點。
注意: 如果你要刪除一個topic並且重建,那么必須重新啟動kafka,否則新建的topic在zookeeper的/brokers/topics/test-topic/目錄下沒有partitions這個目錄,也就是沒有分區信息。
六、安裝Yahoo kafka manager
1. Yahoo kafka manager介紹
項目地址:https://github.com/yahoo/kafka-manager
Requirements:
- Kafka 0.8.1.1 or 0.8.2.*
- sbt 0.13.x
- Java 8+
Kafka Manager是一個管控台,這款工具主要支持以下幾個功能:
- 管理多個不同的集群;
- 很容易地檢查集群的狀態(topics, brokers, 副本的分布, 分區的分布);
- 選擇副本;
- 產生分區分配(Generate partition assignments)基於集群的當前狀態;
- 重新分配分區。
2. 環境信息
主機名 | 操作系統版本 | IP地址 | 安裝軟件 |
console | CentOS 7.0 | 114.55.29.246 | JDK1.8、kafka-manager-1.3.0.6.zip |
Kafka Manager可以裝在任何一台機器上,我這里部署在一台單獨的機器上。
3. 安裝jdk1.8

[root@console local]# tar zxf jdk-8u73-linux-x64.gz -C /usr/java/ [root@console ~]# vim /etc/profile export JAVA_HOME=/usr/java/jdk1.8.0_73 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar [root@console ~]# source /etc/profile
4. 安裝sbt0.13.9

[root@console ~]# curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo [root@console ~]# yum install -y sbt
5. 構建kafka manager包
[root@console ~]# git clone https://github.com/yahoo/kafka-manager.git
[root@console ~]# unzip -oq kafka-manager-upgrade-to-90.zip [root@console ~]# mv kafka-manager-upgrade-to-90 kafka-manager [root@console ~]# cd kafka-manager [root@console kafka-manager]# sbt clean dist The command below will create a zip file which can be used to deploy the application.
使用sbt編譯打包的時候時間可能會比較長。
這個需要翻牆才能完成。配置代理:
[root@console ~]# vim /usr/share/sbt-launcher-packaging/conf/sbtconfig.txt -Dhttp.proxyHost=proxy -Dhttp.proxyPort=8080
再次運行這個命令,依然需要等待較長的時間,有可能還會失敗。如果失敗就多次嘗試打包:
打包完成后會創建一個zip壓縮包,而這個壓縮包可以用來部署該應用。生成的包會在kafka-manager/target/universal 下面。生成的包只需要java環境就可以運行了,在以后部署到其他機器上不需要安裝sbt進行打包構建了。
6. 安裝kafka manager

[root@console kafka-manager]# cp target/universal/kafka-manager-1.3.0.6.zip ~/ [root@console kafka-manager]# cd [root@console ~]# unzip -oq kafka-manager-1.3.0.6.zip
7. 配置kafka-manager
[root@console ~]# cd kafka-manager-1.3.0.6/ [root@console kafka-manager-1.3.0.6]# vim conf/application.conf
設置zkhosts:
kafka-manager.zkhosts="114.55.29.246:2181,114.55.29.86:2181,114.55.29.241:2181"
8. 啟動kafka-manager
[root@console kafka-manager-1.3.0.6]# bin/kafka-manager
默認監聽的端口是9000。你也可以在啟動時指定配置文件和監聽端口:
# bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080
啟動並置於后台運行:
[kmanager@console kafka-manager-1.3.0.6]$ nohup bin/kafka-manager > /dev/null 2>&1 &
七、kafka-manager添加kafka cluster
瀏覽器輸入地址訪問:http://114.55.29.246:9000/
注意:安裝完成后需要手動添加Cluster。添加Cluster是指添加一個已有的Kafka集群進入監控列表,而非通過Kafka Manager部署一個新的Kafka Cluster,這一點與Cloudera Manager不同。