一、kafka 簡介
kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。
1.1 kafka名詞解釋
- producer:生產者。
- consumer:消費者。
- topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分門別類,每一類的消息稱之為一個主題(Topic)。
- broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發布的消息。
每個消息(也叫作record記錄,也被稱為消息)是由一個key,一個value和時間戳構成。
1.2 kafka有四個核心API介紹
- 應用程序使用producer API發布消息到1個或多個topic中。
- 應用程序使用consumer API來訂閱一個或多個topic,並處理產生的消息。
- 應用程序使用streams API充當一個流處理器,從1個或多個topic消費輸入流,並產生一個輸出流到1個或多個topic,有效地將輸入流轉換到輸出流。
- connector API允許構建或運行可重復使用的生產者或消費者,將topic鏈接到現有的應用程序或數據系統。
1.3 kafka基基原理
通常來講,消息模型可以分為兩種:隊列和發布-訂閱式。隊列的處理方式是一組消費者從服務器讀取消息,一條消息只有其中的一個消費者來處理。在發布-訂閱模型中,消息被廣播給所有的消費者,接收到消息的消費者都可以處理此消息。Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組(consumer group)。消費者用一個消費者組名標記自己。
一個發布在Topic上消息被分發給此消費者組中的一個消費者。假如所有的消費者都在一個組中,那么這就變成了queue模型。假如所有的消費者都在不同的組中,那么就完全變成了發布-訂閱模型。更通用的, 我們可以創建一些消費者組作為邏輯上的訂閱者。每個組包含數目不等的消費者,一個組內多個消費者可以用來擴展性能和容錯。
並且,kafka能夠保證生產者發送到一個特定的Topic的分區上,消息將會按照它們發送的順序依次加入,也就是說,如果一個消息M1和M2使用相同的producer發送,M1先發送,那么M1將比M2的offset低,並且優先的出現在日志中。消費者收到的消息也是此順序。如果一個Topic配置了復制因子(replication facto)為N,那么可以允許N-1服務器宕機而不丟失任何已經提交(committed)的消息。此特性說明kafka有比傳統的消息系統更強的順序保證。但是,相同的消費者組中不能有比分區更多的消費者,否則多出的消費者一直處於空等待,不會收到消息。
1.4 kafka應用場景
構建實時的流數據管道,可靠地獲取系統和應用程序之間的數據。
構建實時流的應用程序,對數據流進行轉換或反應。
1.5 主題和日志 (Topic和Log)
每一個分區(partition)都是一個順序的、不可變的消息隊列,並且可以持續的添加。分區中的消息都被分了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。Kafka集群保持所有的消息,直到它們過期,無論消息是否被消費了。實際上消費者所持有的僅有的元數據就是這個偏移量,也就是消費者在這個log中的位置。 這個偏移量由消費者控制:正常情況當消費者消費消息的時候,偏移量也線性的的增加。但是實際偏移量由消費者控制,消費者可以將偏移量重置為更老的一個偏移量,重新讀取消息。 可以看到這種設計對消費者來說操作自如, 一個消費者的操作不會影響其它消費者對此log的處理。 再說說分區。Kafka中采用分區的設計有幾個目的。一是可以處理更多的消息,不受單台服務器的限制。Topic擁有多個分區意味着它可以不受限的處理更多的數據。第二,分區可以作為並行處理的單元,稍后會談到這一點。
1.6 分布式(Distribution)
Log的分區被分布到集群中的多個服務器上。每個服務器處理它分到的分區。根據配置每個分區還可以復制到其它服務器作為備份容錯。 每個分區有一個leader,零或多個follower。Leader處理此分區的所有的讀寫請求,而follower被動的復制數據。如果leader宕機,其它的一個follower會被推舉為新的leader。 一台服務器可能同時是一個分區的leader,另一個分區的follower。 這樣可以平衡負載,避免所有的請求都只讓一台或者某幾台服務器處理。
二、kafka 安裝
2.1 jdk安裝
#以oracle jdk為例,下載地址http://java.sun.com/javase/downloads/index.jsp
yum -y install jdk-8u141-linux-x64.rpm
2.2 安裝zookeeper
wget http://apache.forsale.plus/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz tar zxf zookeeper-3.4.9.tar.gz mv zookeeper-3.4.9 /data/zk
修改配置文件內容如下所示:
[root@localhost ~]# cat /data/zk/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk/data/zookeeper
dataLogDir=/data/zk/data/logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=zk01:2888:3888
server.2=zk02:2888:3888
server.3=zk03:2888:3888
參數說明:
server.id=host:port:port:表示了不同的zookeeper服務器的自身標識,作為集群的一部分,每一台服務器應該知道其他服務器的信息。用戶可以從“server.id=host:port:port” 中讀取到相關信息。在服務器的data(dataDir參數所指定的目錄)下創建一個文件名為myid的文件,這個
文件的內容只有一行,指定的是自身的id值。比如,服務器“1”應該在myid文件中寫入“1”。這個id必須在集群環境中服務器標識中是唯一的,且大小在1~255之間。這一樣配置中,zoo1代表第一台服務器的IP地址。第一個端口號(port)是從follower連接到leader機器的
端口,第二個端口是用來進行leader選舉時所用的端口。所以,在集群配置過程中有三個非常重要的端口:clientPort:2181、port:2888、port:3888。
關於zoo.cfg配置文件說明,參考連接https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_configuration;
如果想更換日志輸出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,還需要修改zkServer.sh文件,大概修改方式地方在125行左右,內容如下:
125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"
126 if [ ! -w "$ZOO_LOG_DIR" ] ; then
127 mkdir -p "$ZOO_LOG_DIR"
128 fi
在啟動服務之前,還需要分別在zookeeper創建myid,方式如下:
echo 1 > /data/zk/data/zookeeper/myid
啟動服務
/data/zk/bin/zkServer.sh start
驗證服務
### 查看相關端口號
[root@localhost ~]# ss -lnpt|grep java LISTEN 0 50 :::34442 :::* users:(("java",pid=2984,fd=18)) LISTEN 0 50 ::ffff:192.168.15.133:3888 :::* users:(("java",pid=2984,fd=26)) LISTEN 0 50 :::2181 :::* users:(("java",pid=2984,fd=25))
###查看zookeeper服務狀態
[root@localhost ~]# /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Mode: follower
zookeeper相關命令說明,參考https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html (文末有說明);
2.3 安裝kafka
tar zxf kafka_2.11-0.11.0.0.tgz mv kafka_2.11-0.11.0.0 /data/kafka
修改配置
[root@localhost ~]# grep -Ev "^#|^$" /data/kafka/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://192.168.15.131:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
提示:其他主機將該機器的kafka目錄拷貝即可,然后需要修改broker.id、listeners地址。有關kafka配置文件參數,參考:http://orchome.com/12;
啟動服務
/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties
驗證服務
### 隨便在其中一台主機執行 /data/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181 --replication-factor 1 --partitions 1 --topic test ###在其他主機查看 /data/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181