一、kafka
1.1kafka是什么
在流式計算中,Kafka一般用來緩存數據,Storm通過消費Kafka的數據進行計算。
1)Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。
2)Kafka最初是由LinkedIn公司開發,並於 2011年初開源。2012年10月從Apache Incubator畢業。該項目的目標是為處理實時數據提供一個統一、高通量、低等待的平台。
3)Kafka是一個分布式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。
4)無論是kafka集群,還是producer和consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。
1.2消息通信圖
點對點模式(一對一,消費者主動拉取數據,輪詢機制,消息收到后消息清除,ack確認機制)
點對點模型通常是一個基於拉取
或者輪詢
的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。
這個模型的特點是發送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監聽者也是如此。
發布/訂閱模式(一對多,數據生產后,推送給所有訂閱者)
發布訂閱模型則是一個基於推送的消息傳送模型。
發布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使當前訂閱者不可用,處於離線狀態。
1.3消息隊列作用
1)程序解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2)冗余:
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。
許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3)峰值處理能力:
(大白話,就是本來公司業務只需要5台機器,但是臨時的秒殺活動,5台機器肯定受不了這個壓力,我們又不可能將整體服務器架構提升到10台,那在秒殺活動后,機器不就浪費了嗎?因此引入消息隊列)
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。
如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。
使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
4)可恢復性:
系統的一部分組件失效時,不會影響到整個系統。
消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
5)順序保證:
在大多使用場景下,數據處理的順序都很重要。
大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)
6)緩沖:
有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
7)異步通信:
很多時候,用戶不想也不需要立即處理消息。比如發紅包,發短信等流程。
消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
1.4 kafka架構
1)Producer :消息生產者,就是向kafka broker發消息的客戶端。
2)Consumer :消息消費者,向kafka broker取消息的客戶端
3)Topic :主題,可以理解為一個隊列。
4) Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制-給consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。
5)Broker :一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
6)Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka
1.4分布式模型
Kafka每個主題的多個分區日志分布式地存儲在Kafka集群上,同時為了故障容錯,每個(partition)分區都會以副本的方式復制到多個消息代理節點上。
其中一個節點會作為主副本(Leader),其他節點作為備份副本(Follower,也叫作從副本)。主副本會負責所有的客戶端讀寫操作,備份副本僅僅從主副本同步數據。當主副本出現故障時,備份副本中的一個副本會被選擇為新的主副本。因為每個分區的副本中只有主副本接受讀寫,所以每個服務器端都會作為某些分區的主副本,以及另外一些分區的備份副本,這樣Kafka集群的所有服務端整體上對客戶端是負載均衡的。
Kafka的生產者和消費者相對於服務器端而言都是客戶端。
Kafka生產者客戶端發布消息到服務端的指定主題,會指定消息所屬的分區。
生產者發布消息時根據消息是否有鍵,采用不同的分區策略。消息沒有鍵時,通過輪詢方式進行客戶端負載均衡;消息有鍵時,根據分區語義(例如hash)確保相同鍵的消息總是發送到同一分區。
Kafka的消費者通過訂閱主題來消費消息,並且每個消費者都會設置一個消費組名稱。因為生產者發布到主題的每一條消息都只會發送給消費者組的一個消費者。
所以,如果要實現傳統消息系統的“隊列”模型,可以讓每個消費者都擁有相同的消費組名稱,這樣消息就會負責均衡到所有的消費者;如果要實現“發布-訂閱”模型,則每個消費者的消費者組名稱都不相同,這樣每條消息就會廣播給所有的消費者。
分區是消費者現場模型的最小並行單位。
如下圖(圖1)所示,生產者發布消息到一台服務器的3個分區時,只有一個消費者消費所有的3個分區。在下圖(圖2)中,3個分區分布在3台服務器上,同時有3個消費者分別消費不同的分區。假設每個服務器的吞吐量時300MB,在下圖(圖1)中分攤到每個分區只有100MB,而在下圖(圖2)中,集群整體的吞吐量有900MB。可以看到,增加服務器節點會提升集群的性能,增加消費者數量會提升處理性能。
同一個消費組下多個消費者互相協調消費工作,Kafka會將所有的分區平均地分配給所有的消費者實例,這樣每個消費者都可以分配到數量均等的分區。Kafka的消費組管理協議會動態地維護消費組的成員列表,當一個新消費者加入消費者組,或者有消費者離開消費組,都會觸發再平衡操作。
Kafka的消費者消費消息時,只保證在一個分區內的消息的完全有序性,並不保證同一個主題匯中多個分區的消息順序。而且,消費者讀取一個分區消息的順序和生產者寫入到這個分區的順序是一致的。比如,生產者寫入“hello”和“Kafka”兩條消息到分區P1,則消費者讀取到的順序也一定是“hello”和“Kafka”。如果業務上需要保證所有消息完全一致,只能通過設置一個分區完成,但這種做法的缺點是最多只能有一個消費者進行消費。一般來說,只需要保證每個分區的有序性,再對消息假設鍵來保證相同鍵的所有消息落入同一分區,就可以滿足絕大多數的應用。
1.5kafka部署啟動
配置jdk環境
下載網址
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
找到
jdk-8u201-linux-x64.tar.gz
解壓縮,配置java環境變量
tar -zxvf jdk-8u201-linux-x64.tar.gz
PATH="$PATH:/opt/jdk1.8.0_201/bin"
配置zookeeper環境,配置環境變量
tar -zxvf zookeeper-3.4.14.tar.gz
PATH="$PATH:/opt/jdk1.8.0_201/bin:/opt/zookeeper-3.4.14/bin"
zookeeper端口解釋
1、2181
2、3888
3、2888
二、3個端口的作用
1、2181:對cline端提供服務
2、3888:選舉leader使用
3、2888:集群內機器通訊使用(Leader監聽此端口)
部署時注意
1、單機單實例,只要端口不被占用即可
2、單機偽集群(單機,部署多個實例),三個端口必須修改為組組不一樣
如:myid1 : 2181,3888,2888
myid2 : 2182,3788,2788
myid3 : 2183,3688,2688
3、集群(一台機器部署一個實例)
四、集群為大於等於3個基數,如 3、5、7....,不宜太多,集群機器多了選舉和數據同步耗時時長長,不穩定。目前覺得,三台選舉+N台observe很不錯。
1.6啟動安裝zookeeper
本文以standalone模式運行,並非集群模式
1.解壓縮zk壓縮包,配置好環境變量
2.在zk解壓縮包目錄下創建 zkData目錄
3.修改zk解壓縮包目錄下conf/zoo_sample.cfg為zoo.cfg
4.編輯zoo.cfg配置文件,修改代碼
zookeeper-3.4.14/conf/zoo.cfg修改如下參數
dataDir=/opt/zookeeper-3.4.14/zkData
server.2=192.168.119.10:2888:3888 #修改為你自己服務器的ip
參數解釋
Server.A=B:C:D。
A是一個數字,表示這個是第幾號服務器;
B是這個服務器的ip地址;
C是這個服務器與集群中的Leader服務器交換信息的端口;
D是萬一集群中的Leader服務器掛了,需要一個端口來重新進行選舉,選出一個新的Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。
集群模式下配置一個文件myid,這個文件在dataDir目錄下,這個文件里面有一個數據就是A的值,Zookeeper啟動時讀取此文件,拿到里面的數據與zoo.cfg里面的配置信息比較從而判斷到底是哪個server。
啟動zk服務端
zkServer.sh start #啟動
zkServer.sh status #檢查狀態
1.7kafka部署
下載二進制kafka代碼包
wget http://apache.claz.org/kafka/2.2.0/kafka_2.11-2.2.0.tgz
解壓縮
tar -xf kafka_2.11-2.2.0.tgz
修改kafka服務端配置文件
/opt/kafka_2.11-2.2.0/config/server.properties
#創建kafka日志文件夾
mkdir -p /opt/kafka_2.11-2.2.0/logs
/opt/kafka_2.11-2.2.0/config/server.properties修改如下參數
如果修改了kafka的啟動地址參數,注意可能出現的權限問題,或者刪除logs目錄下的數據文件
9092是kafka服務端
#broker的全局唯一編號,不能重復
broker.id=0
#是否允許刪除topic
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的最大緩沖區大小
socket.request.max.bytes=104857600
#kafka運行日志存放的路徑
log.dirs=/opt/kafka_2.11-2.2.0/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址,確保zk正確啟動2181已經打開
zookeeper.connect=192.168.119.10:2181
修改linux的PATH環境變量,支持kafka命令
[root@localhost bin]# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/jdk1.8.0_201/bin:/opt/zookeeper-3.4.14/bin:/opt/kafka_2.11-2.2.0/bin
啟動kafka服務端,指定配置文件,后台啟動
[root@localhost kafka_2.11-2.2.0]# kafka-server-start.sh config/server.properties &
看到如下提示,代表kafka啟動成功
[2019-04-12 23:53:33,229] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 23:53:33,229] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 23:53:33,231] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
1.8kafka命令行操作
查看當前服務器中的所有topic
[root@localhost kafka_2.11-2.2.0]# kafka-topics.sh --zookeeper 192.168.119.10:2181 --list
創建topic
[root@localhost kafka_2.11-2.2.0]# kafka-topics.sh --zookeeper 192.168.119.10:2181 --create --replication-factor 1 --partitions 1 --topic first
選項說明:
--topic 定義topic名
--replication-factor 定義副本數
--partitions 定義分區數
刪除topic
kafka-topics.sh --zookeeper 192.168.119.10:2181 --delete --topic first
需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。
發送消息,9092是kafka的服務端口
[root@localhost kafka_2.11-2.2.0]# kafka-console-producer.sh --broker-list 192.168.119.10:9092 --topic first
>hello kafka
>chaoge niubi
消費消息,注意kafka的版本,以及新參數特性
[root@localhost kafka_2.11-2.2.0]# kafka-console-consumer.sh --bootstrap-server 192.168.119.10:9092 --from-beginning --topic first
--from-beginning:會把first主題中以往所有的數據都讀取出來。根據業務場景選擇是否增加該配置。
broker
topic
partition
三者包含關系
二、python操作kafka
環境准備
[root@localhost pykafka]# python3 -V
Python 3.6.7
啟動好zk,kafka,確保2181端口,9092端口啟動
Python模塊安裝
pip3 install kafka-python
生產者
[root@localhost pykafka]# cat pro.py
import time
from kafka import KafkaProducer
#連接上kafka服務端9092端口
producer = KafkaProducer(bootstrap_servers = ['192.168.119.10:9092'])
# 注冊一個主題,名字topic
topic = 'oldboy'
#每秒鍾,寫入一個消息數據
def test():
print ('begin produce..')
n = 1
try:
while (n<=100):
#向主題oldboy中發送byte數據
producer.send(topic, str(n).encode())
print("send" + str(n))
n += 1
time.sleep(0.5)
except KafkaError as e:
print(e)
finally:
#關閉連接
producer.close()
print('done')
if __name__ == '__main__':
test()
消費者
[root@localhost pykafka]# cat consumer.py
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('oldboy', group_id = 'oldboy_group', bootstrap_servers = ['192.168.119.10:9092'])
try:
for msg in consumer:
print(msg)
print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition,msg.offset, msg.key, msg.value))
except KeyboardInterrupt as e:
print(e)