一、Kafka概述
1.1、定義
Kafka 是一個分布式的基於發布/訂閱模式的消息隊列(Message Queue),主要應用於 大數據實時處理領域。
1.2、消息隊列
1.2.1、消息隊列的應用場景
1.2.2、消息隊列的好處
1)解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束
2)可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
3)緩沖
有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
4)靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5)異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
1.2.3、消息隊列的兩種模式
(1)點對點模式(一對一,消費者主動拉取數據,消息收到后消息清除)
消息生產者生產消息發送到Queue中,然后消息消費者從Queue中取出並且消費消息。消息被消費以后,queue 中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。 Queue 支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。
(2)發布/訂閱模式(一對多,消費者消費數據之后不會清除消息)
消息生產者(發布)將消息發布到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到 topic 的消息會被所有訂閱者消費。
二、Kafka基礎架構
1)Producer :消息生產者,就是向 kafka broker 發消息的客戶端
2)Consumer :消息消費者,向 kafka broker 取消息的客戶端
3)Consumer Group (CG):消費者組,由多個 consumer 組成。消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。
4)Broker :一台 kafka 服務器就是一個 broker。一個集群由多個 broker 組成。一個 broker可以容納多個 topic。
5)Topic :可以理解為一個隊列,生產者和消費者面向的都是一個 topic
6)Partition:為了實現擴展性,一個非常大的 topic 可以分布到多個 broker(即服務器)上,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列
7)Replica:副本,為保證集群中的某個節點發生故障時,該節點上的 partition 數據不丟失且 kafka 仍然能夠繼續工作,kafka 提供了副本機制,一個 topic 的每個分區都有若干個副本, 一個 leader 和若干個 follower。
8)leader:每個分區多個副本的“主”,生產者發送數據的對象,以及消費者消費數據的對象都是 leader。
9)follower:每個分區多個副本中的“從”,實時從 leader 中同步數據,保持和 leader 數據的同步。leader 發生故障時,某個 follower 會成為新的 follower。
三、Kafka集群安裝
3.1、集群規划
hadoop102 | hadoop103 | hadoop103 |
zookeeper | zookeeper | zookeeper |
Kafka | Kafka | Kafka |
3.2、軟件下載
官網:http://kafka.apache.org/downloads.html
3.3、集群部署
1)解壓安裝包
[hadoop@hadoop102 software]$ tar xf kafka_2.11-0.11.0.0.tgz -C /opt/module/ [hadoop@hadoop102 software]$ ll /opt/module/kafka_2.11-0.11.0.0/ total 48 drwxr-xr-x 3 hadoop hadoop 4096 Jun 23 2017 bin drwxr-xr-x 2 hadoop hadoop 4096 Jun 23 2017 config drwxr-xr-x 2 hadoop hadoop 4096 Jan 29 15:02 libs -rw-r--r-- 1 hadoop hadoop 28824 Jun 23 2017 LICENSE -rw-r--r-- 1 hadoop hadoop 336 Jun 23 2017 NOTICE drwxr-xr-x 2 hadoop hadoop 47 Jun 23 2017 site-docs
2)修改解壓后的文件名稱
[hadoop@hadoop102 software]$ cd ../module/ [hadoop@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
3)在/opt/module/kafka 目錄下創建 logs 文件夾
[hadoop@hadoop102 kafka]$ mkdir logs
4)修改配置文件
[hadoop@hadoop102 kafka]$ cd config/ [hadoop@hadoop102 config]$ vim server.properties
配置項說明:
#broker 的全局唯一編號,不能重復 broker.id=2 #刪除 topic 功能使能 delete.topic.enable=true #處理網絡請求的線程數量 num.network.threads=3 #用來處理磁盤 IO 的現成數量 num.io.threads=8 #發送套接字的緩沖區大小 socket.send.buffer.bytes=102400 #接收套接字的緩沖區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩沖區大小 socket.request.max.bytes=104857600 #kafka 運行日志存放的路徑 log.dirs=/opt/module/kafka/logs #topic 在當前 broker 上的分區個數 num.partitions=1 #用來恢復和清理 data 下數據的線程數量 num.recovery.threads.per.data.dir=1 #segment 文件保留的最長時間,超時將被刪除 log.retention.hours=168 #配置連接 Zookeeper 集群地址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
5)配置環境變量
[hadoop@hadoop102 config]$ sudo vim /etc/profile #KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin [hadoop@hadoop102 config]$ source /etc/profile
6)分發安裝包
[hadoop@hadoop102 config]$ cat /usr/local/bin/xsync #!/bin/bash #1 獲取輸入參數個數,如果沒有參數,直接退出 pcount=$# if((pcount==0)); then echo no args; exit; fi #2 獲取文件名稱 p1=$1 fname=`basename $p1` echo fname=$fname #3 獲取上級目錄到絕對路徑 pdir=`cd -P $(dirname $p1); pwd` echo pdir=$pdir #4 獲取當前用戶名稱 user=`whoami` #5 循環 for((host=103; host<105; host++)); do echo ------------------- hadoop$host -------------- rsync -rvl $pdir/$fname $user@hadoop$host:$pdir done [hadoop@hadoop102 config]$ xsync /opt/module/kafka/
注意:分發之后記得配置其他機器的環境變量
7)分別在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties
中的 broker.id=3、broker.id=4。注:broker.id 不得重復
8)啟動集群
依次在 hadoop102、hadoop103、hadoop104 節點上啟動 kafka
[hadoop@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [hadoop@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [hadoop@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
9)關閉集群
[hadoop@hadoop102 kafka]$ bin/kafka-server-stop.sh stop [hadoop@hadoop103 kafka]$ bin/kafka-server-stop.sh stop [hadoop@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
四、Kafka 命令行操作
1)查看當前服務器中的所有 topic
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list __consumer_offsets
2)創建 topic
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first Created topic "first". [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list __consumer_offsets first
選項說明:
- --topic 定義 topic 名
- --replication-factor 定義副本數
- --partitions 定義分區數
3) 查看某個 Topic 的詳情
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first Topic:first PartitionCount:1 ReplicationFactor:3 Configs: Topic: first Partition: 0 Leader: 4 Replicas: 4,3,2 Isr: 4,3,2
4)修改分區數
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first Topic:first PartitionCount:6 ReplicationFactor:3 Configs: Topic: first Partition: 0 Leader: 4 Replicas: 4,3,2 Isr: 4,3,2 Topic: first Partition: 1 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4 Topic: first Partition: 2 Leader: 3 Replicas: 3,4,2 Isr: 3,4,2 Topic: first Partition: 3 Leader: 4 Replicas: 4,3,2 Isr: 4,3,2 Topic: first Partition: 4 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3 Topic: first Partition: 5 Leader: 3 Replicas: 3,2,4 Isr: 3,2,4
5)刪除 topic
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first Topic first is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list __consumer_offsets #需要 server.properties 中設置 delete.topic.enable=true 否則只是標記刪除。
6) 發送及消費消息
#發送消息 [hadoop@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first #消費消息 [hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first [hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first [hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
--from-beginning:會把主題中以往所有的數據都讀取出來。
五、數據日志分離
1)停止kafka集群並刪除logs目錄
[hadoop@hadoop102 kafka]$ bin/kafka-server-stop.sh stop [hadoop@hadoop103 kafka]$ bin/kafka-server-stop.sh stop [hadoop@hadoop104 kafka]$ bin/kafka-server-stop.sh stop #清除logs目錄 [hadoop@hadoop102 kafka]$ rm -fr logs/ [hadoop@hadoop103 kafka]$ rm -fr logs/ [hadoop@hadoop104 kafka]$ rm -fr logs/
2)重裝zookeeper
zookeeper中會存在kafka相關注冊信息,需要刪除
[zk: localhost:2181(CONNECTED) 0] ls / [cluster, controller_epoch, servers, brokers, zookeeper, atguigu, admin, isr_change_notification, consumers, latest_producer_id_block, config] #停止zookeeper集群 [hadoop@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop [hadoop@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh stop [hadoop@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh stop #刪除數據目錄並重啟 [hadoop@hadoop102 zookeeper-3.4.10]$ rm -fr zkData/version-2/ [hadoop@hadoop103 zookeeper-3.4.10]$ rm -fr zkData/version-2/ [hadoop@hadoop104 zookeeper-3.4.10]$ rm -fr zkData/version-2/ [hadoop@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start [hadoop@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start [hadoop@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
3)修改kafka配置
[hadoop@hadoop102 kafka]$ vim config/server.properties log.dirs=/opt/module/kafka/data [hadoop@hadoop103 kafka]$ vim config/server.properties log.dirs=/opt/module/kafka/data [hadoop@hadoop104 kafka]$ vim config/server.properties log.dirs=/opt/module/kafka/data
4)啟動kafka集群
[hadoop@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [hadoop@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [hadoop@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties #創建topic測試日志與數據分離 [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first Created topic "first". [hadoop@hadoop102 kafka]$ ll total 48 drwxr-xr-x 3 hadoop hadoop 4096 Jun 23 2017 bin drwxr-xr-x 2 hadoop hadoop 4096 Feb 2 14:37 config drwxrwxr-x 3 hadoop hadoop 202 Feb 2 14:40 data drwxr-xr-x 2 hadoop hadoop 4096 Feb 2 11:32 libs -rw-r--r-- 1 hadoop hadoop 28824 Jun 23 2017 LICENSE drwxrwxr-x 2 hadoop hadoop 205 Feb 2 14:39 logs -rw-r--r-- 1 hadoop hadoop 336 Jun 23 2017 NOTICE drwxr-xr-x 2 hadoop hadoop 47 Jun 23 2017 site-docs [hadoop@hadoop102 kafka]$ ll logs/ total 68 -rw-rw-r-- 1 hadoop hadoop 7309 Feb 2 14:40 controller.log -rw-rw-r-- 1 hadoop hadoop 0 Feb 2 14:39 kafka-authorizer.log -rw-rw-r-- 1 hadoop hadoop 0 Feb 2 14:39 kafka-request.log -rw-rw-r-- 1 hadoop hadoop 5952 Feb 2 14:39 kafkaServer-gc.log.0.current -rw-rw-r-- 1 hadoop hadoop 18801 Feb 2 14:40 kafkaServer.out -rw-rw-r-- 1 hadoop hadoop 172 Feb 2 14:39 log-cleaner.log -rw-rw-r-- 1 hadoop hadoop 18801 Feb 2 14:40 server.log -rw-rw-r-- 1 hadoop hadoop 5897 Feb 2 14:40 state-change.log [hadoop@hadoop102 kafka]$ ll data/ total 12 -rw-rw-r-- 1 hadoop hadoop 0 Feb 2 14:39 cleaner-offset-checkpoint drwxrwxr-x 2 hadoop hadoop 141 Feb 2 14:40 first-0 -rw-rw-r-- 1 hadoop hadoop 0 Feb 2 14:39 log-start-offset-checkpoint -rw-rw-r-- 1 hadoop hadoop 54 Feb 2 14:39 meta.properties -rw-rw-r-- 1 hadoop hadoop 14 Feb 2 14:40 recovery-point-offset-checkpoint -rw-rw-r-- 1 hadoop hadoop 14 Feb 2 14:41 replication-offset-checkpoint [hadoop@hadoop102 kafka]$ ll data/first-0/ total 0 -rw-rw-r-- 1 hadoop hadoop 10485760 Feb 2 14:40 00000000000000000000.index -rw-rw-r-- 1 hadoop hadoop 0 Feb 2 14:40 00000000000000000000.log -rw-rw-r-- 1 hadoop hadoop 10485756 Feb 2 14:40 00000000000000000000.timeindex -rw-rw-r-- 1 hadoop hadoop 0 Feb 2 14:40 leader-epoch-checkpoint