1. Kafka
1. 簡介
Kafka 是一種分布式的、分區的、多副本的基於發布/訂閱的消息系統。它是通過 zookeeper 進行協調,常見可以用於 web/nginx 日志、訪問日志、消息服務等。主要應用場景為:日志收集系統和消息系統。
Kafka 的主要設計目標如下:
1. 以時間復雜度為 O(1) 的方式提供持久化能力,即使對 TB 級別以上的數據也能保證常數時間的訪問性能。
2. 高吞吐率,即使在十分廉價的機器上也能實現單機支持每秒 100K 條消息的傳輸。
3. 支持 Kafka Server (即 Kafka 集群的服務器)間的消息分區,及分布式消費,同時保證每個 partition 內的消息順序傳輸。
4. 同時支持離線數據處理和實時數據處理
2. Kafka 架構
如上圖所示,一個 Kafka 集群由若干producer、若干consumer、若干broker,以及一個zookeeper集群所組成。Kafka通過zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發布到broker,consumer使用pull模式從broker訂閱並消費消息。
Kafka名詞解釋:
broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群,相當於物理層面上的一台服務器。
topic:存放同一類消息的位置,是一個概念層面上的名詞,Kafka集群可以負責多個topic的分發。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存於何處)
partition:topic在物理層面上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列,創建topic時可以指定partition數量,每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件。一般來說partition的數量大於等於broker的數量。
producer:負責發布消息到Kafka broker
consumer:消費消息,每個consumer屬於一個特定的consumer group(可為每個consumer指定group name,若不指定group name則為默認的group)。使用consumer high level API時,同一topic的一條消息只能被一個consumer group的一個consumer消費,但多個consumer group可同時消費這條消息。
consumer group:每個consumer屬於一個特定的consumer group,consumer group是實際記錄的概念。
3. Kafka數據傳輸的事務特點
1. at most once
這種模式下consumer fetch消息,先進行commit,再進行處理。如果再處理消息的過程中出現異常,下次重新開始工作就無法讀到之前已經確認而未處理的消息。
2. at least once
消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功。消費者fetch消息,然后處理消息,然后保存offset。如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是”at least once”,原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。
3. exactly once
消息只會發送一次,Kafka中並沒有嚴格的去實現,我們認為這種策略在Kafka中是沒有必要的。
通常情況下,Kafka默認保證at least once。
5. Push & Pull
作為一個消息系統,Kafka遵循了傳統的方式,選擇由producer向broker push消息,並由consumer從broker中pull消息。
push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標就是以盡可能快的速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現是拒絕服務以及網絡擁塞。而pull模式可以根據consumer的消費能力以適當的速率消費消息。
6. Topic & Partition
Topic在邏輯上可以認為是一個存在的queue,每條消息都必須指定它的topic,可以簡單的理解為必須指明把這條消息放進哪個queue里。為了使Kafka的吞吐率可以水平擴展,物理上把topic分成一個或多個partition,每個partition在物理上對應一個文件夾,該文件夾下存儲這個partition的所有消息和索引文件。
每個日志文件都是"log entries"序列,每一個log entry包含一個4字節整型值(值為N),其后跟N個字節的消息體。每條消息都有一個當前partition下唯一的64字節的offset,它指明了這條消息的起始位置,也是對數據的唯一標識,Kafka中並沒有提供額外的索引機制來存儲offset,因為在Kafka中幾乎不允許對消息進行"隨機讀寫"。磁盤上log entry的存儲格式如下:
message length:4 bytes(它的具體值為1+4+n,如下所示)
"magic" value:1 byte
crc:4 bytes
payload:n bytes
這個log entries並非由一個文件構成,而是分為多個segment,每個segment名為該segment第一條消息的offset和".kafka"組成。另外會有一個索引文件,它標明了每個segment下包含的log entry的offset范圍。
因為每條消息都被append到該partition中,是順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤比隨機寫內存還要高,這是Kafka高吞吐率的一個保證)。
每條消息被發送到topic時,會根據指定的partition規則選擇被存儲到哪一個partition。如果partition規則設計的合理,所有的消息會均勻分配到不同的partition里,這樣就實現了水平擴展。(如果一個topic對應一個文件,那這個文件所在的機器I/O將會成為這個topic的性能瓶頸,而partition解決了這個問題)。在創建topic時,可以在$KAFKA_HOME/config/server.properties
中指定這個partition的數量(如下所示),當然也可以在topic創建之后去修改parition數量。
# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=3
在發送一條消息時,可以指定這條消息的key,producer根據這個key和partition機制來判斷這個將這條消息發送到哪個partition。paritition機制可以通過指定producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner
接口。(比如如果一個key能夠被解析為整數,那么將對應的整數與partition總數取余,可以作為該消息被發送到的partition id)
7. 歷史數據刪除機制
對於傳統的message queue而言,一般會刪除已經消費過的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,由於磁盤限制,不可能永久保留,因此Kafka提供兩種機制去刪除舊數據。一是基於時間,一是基於partition文件大小。(例如可以通過配置$KAFKA_HOME/config/server.properties
,讓Kafka刪除一周前的數據,也可通過配置讓Kafka在partition文件超過1GB時刪除舊數據)
這里要注意,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata信息—當前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息后線性增加這個offset。當然,consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group只有一個consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
8. Consumer Group
對於程序員來說,consumer group 是消費 Kafka 消息隊列中消息的接口,每個 consumer group 可以消費多個 topic,對於每一個 topic 可以有多個消費者實體 consumer(對應多個程序或者進程)。在消費過程中,同一個 topic 中的消息只會被同一個 consumer group 中的一個 consumer 消費,而不會出現重復訂閱的情況。對於每一個 consumer group 消費 topic,可以手動commit,也可以設置參數集群自動commit(確認)消費進行的位置,保證下一次能接着從上次的位置繼續消費。
consumer group 和 topic 一樣,也是直接使用就能新建的。如果直接新建一個 consumer,而不指定具體的 consumer group,系統會自動的指定默認的 consumer group,並且從最老的數據(EARLIEST)位置開始消費。
詳情參考:
http://developer.51cto.com/art/201501/464491.htm
http://geek.csdn.net/news/detail/229569
http://www.cnblogs.com/likehua/p/3999538.html
2. PyKafka 的使用
1. 導入 pykafka 模塊
import pykafka
from pykafka import KafkaClient
2. 初始化 KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...") 可以通過 hosts 地址初始化,也可以使用 zookeeper_hosts 進行初始化:
client = KafkaClient(zookeeper_hosts = 'yq01-ps-4-m42-pc177.yq01:2181,yq01-ps-4-m42-pc186.yq01:2181,yq01-ps-4-m42-pc187.yq01:2181,yq01-ps-4-m42-pc191.yq01:2181,yq01-ps-4-m42-pc192.yq01:2181')
3. Topic 對象
client.topics 可以查看當前所有的 topic。
topic = client.topics['bjhapp_history'] #如果該 topic 存在,那么會選中對應的 topic;如果不存在,會自動新建該 topic。
Topic 對象包含的方法:
1. get_balanced_consumer(consumer_group, managed=False, **kwargs) :生成對應 consumer_group 對 topic 下消息消費的一個 balanced_consumer,與 simple_consumer 的差別在於如果有多個 consumer 進來對同一個 topic 的消息進行訂閱,balanced_consumer 會自動平衡和分配 partitions 給每個 consumer;而先進來的 simple_consumer 會對當前 topic 的partition 有100%的占有權。
參數:consumer_group:消費的 consumer_group 名
managed:是否對 consumer_group 進行管理
**kwargs:對應於 consumer 對象的眾多參數
2. get_producer(use_rdkafka=False, **kwargs):生成對應 topic 的一個異步消息 producer
3. get_simple_consumer(consumer_group=None, use_rdkafka=False, **kwargs):生成對應 consumer_group 對 topic 的一個 simple_consumer
4. get_sync_producer(**kwargs):生成對 topic 的一個同步 producer
成員變量:
name:topic 的名字
partitions:包含當前 topic 對應 partitions 的字典
4. Producer 對象
1. 同步的 producer 對象
producer = topic.get_sync_producer()
producer.produce("test")
同步的 producer 對象發布消息時,只有在確認消息成功發送到集群時才返回,因此網絡 IO 的速度會影響程序的整體速度。
2. 異步的 producer 對象
為了實現更高的吞吐量,我們推薦使用異步模式的 producer,這樣 produce() 函數能夠立即返回,並且可以批量處理更多的消息,而不用等待當前消息發布成功的確認。我們通過隊列的接口同樣可以在之后收到消息發布成功的確認,需要設置參數 delivery_reports = True。
producer = topic.get_producer(delivery_reports=True) #初始化異步的 producer
count = 0 #定義 count 變量用來存儲消息發送的條數,以便於定期檢查之前的消息是否發送成功
def produce(msg, partition_key): global count producer.produce(msg, partition_key = partition_key) count += 1 if count % 10 == 0: while True: try: old_msg, exc = producer.get_delivery_report(block = False) if exc is not None: log.warn("fail to delivery msg: %s, exc: %s, try again", \ old_msg.partition_key, exc) if type(exc) is not MessageSizeTooLarge: producer.produce(old_msg.value, partition_key = old_msg.partition_key) else: log.info("succ delivery msg: %s", old_msg.partition_key) except Queue.Empty: break;
上述代碼每嘗試發布十條消息,就對之前發送的消息的 delivery_report 進行檢查,查看其發布是否成功的狀態,通過 producer.get_delivery_report() 函數返回之前發送失敗的消息和結果,如果沒有發布成功,會嘗試重新進行發送。直到 delivery_report 的隊列為空。
要注意 producer 發布消息時是先將消息存儲在緩存區,再將緩存區的消息發布到 Kafka 集群。所以異步的 produce() 函數執行完后,依然需要一定的時間來實現消息從緩存區的發布。所以如果文件執行結束,producer 對象會自動釋放,導致消息發布不成功,返回錯誤:ReferenceError: weakly-referenced object no longer exists。解決方法,在程序的尾部讓程序等待一段時間使消息發布完成,例如:sleep(6)
5. Consumer 對象
當一個 PyKafka consumer 開始從一個 topic 中訂閱消息時,它在記錄器中的起始位置是由 auto_offset_reset 和 reset_offset_on_start 兩個參數確定的。
consumer = topic.get_simple_consumer( consumer_group = 'my_group', auto_offset_reset = OffsetType.EARLIEST, reset_offset_on_start=False )
同樣,是否 Kafka 集群保有任何之前的 consumer group/topic/partition set 的消費偏移量也會影響數據的初始訂閱點。一個 new group/topic/partition set 就是之前沒有任何 commited offsets,一個存在的就是有 commited offsets 的。這兩者的訂閱點由下面的規則決定:
1. 對於一個新的 consumer group/topic/partitions,不管參數 reset_offset_on_start 的參數是什么,都會從 auto_offset_reset 指定的位置開始消息訂閱。
2. 對於一個已經存在的 consumer group/topic/partitions,假設參數 reset_offset_on_start 為false,那么消費會從上一次消費的偏移量之后開始進行(比如上一次的消費偏移量為4,那么消費會從5開始)。假設參數為 true,會自動從 auto_offset_reset 指定的位置開始消費。
Tips:
1. No handlers could be found for logger "pykafka.simpleconsumer"
錯誤的原因是 consumer 在訂閱消息時需要有一個 logger 來記錄日志,如果有一個全局 logger 對象,會自動的寫入該全局對象中,否則會報這條信息,但是不影響消息訂閱。
2. 有的時候會出現 consumer 在訂閱消息時遲遲不能讀出現的情況,這是由於 KafkaClient 的未知原因導致的,可以嘗試在初始化 consumer 的參數中加上 consumer_timeout_ms 參數來解決問題。該參數表示 consumer 在返回 None 前嘗試等待可以消費的消息的時間。