kafka-基礎-Producer&Consumer&消息和日志


一. 回顧

producer->topic->consumer
集群運行,每個服務broker
tcp協議,Kafka提供java端,另一端可支持多種語言

topic
topic是對一組消息的歸納,每個topic分區

每個分區都有個offset
consumer需要維護offset
分區目的:1、使得每個日志的數量不會太大;2、不同分區可單獨操作,並發消費topic

分布式
每個分區,多個broker中有副本;
每個分區多個副本有一個leader,其他副本作為followers;
leader讀寫消息,followers則去復制leader;
如果leader down了,followers中的一台則會自動成為leader;
每個服務都會同時扮演兩個角色:

  • 自己分區的leader
  • 其他分區的followers

Producers
將消息發送到topic中,指定分區;
可以指定分區函數,用戶可以指定負載均衡算法;

Consumers
發布模式包括:queuing、publish-subscribe;
queuing模式:

consumers可以同時從服務器讀取消息,但是每個消息只能被一個consumer得到

publish-subscribe模式:

Consumers加入consumer組,這個組內的consumer競爭這個消息,這是分組訂閱;
所有的consumer在一個組中,是傳統的queuing模式
每個consumer一個獨立的組,是publish-subscribe模式

傳統的隊列在服務器上保存有序的消息,如果多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發布消息,但是消息是被異步的分發到各consumer上,所以當消息到達時可能已經失去了原來的順序,這意味着並發消費將導致順序錯亂。為了避免故障,這樣的消息系統通常使用“專用consumer”的概念,其實就是只允許一個消費者消費消息,當然這就意味着失去了並發性。

通過分區的概念,Kafka可以在多個consumer組並發的情況下提供較好的有序性和負載均衡。將每個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就可以順序的消費這個分區的消息。因為有多個分區,依然可以在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就允許多少並發消費。

Kafka只能保證一個分區之內消息的有序性,在不同的分區之間是不可以的,這已經可以滿足大部分應用的需求。如果需要topic中所有消息的有序性,那就只能讓這個topic只有一個分區,當然也就只有一個consumer組消費它。

二. Producer API

kafka.producer.SyncProducer
kafka.producer.async.AsyncProducer
實現了同一接口

class Producer {
	//發送到指定分區
	publicvoid send(ProducerData<K,V> producerData);
	//批量發送	
	publicvoid send(List<ProducerData<K,V>> producerData);
	publicvoid close();
}

本地緩存
本地隊列緩存,異步發送到broker;producer.type=async設置;
后台ProducerSendThread從隊列中取數據,讓EventHandler給handler發送消息
producer處理數據的不同階段,可定制handler,添加日志追蹤,監控等

Encoder序列化消息
默認是DefaultEncoder接口,可以實現這個接口

interface Encoder<T> {
	public Message toMessage(T data);
}

broker自動感知
選中的broker失敗了,會自動切換到下一個broker

消息分區
Partitioner類

interface Partitioner<T> {
	int partition(T key, int numPartitions);
}

numPartitions:可用分區數量
默認的分區策略:hash(key)%numPartitions

三. Consumer API

兩個級別:

  1. 與指定broker保持連接,接收完消息關閉,無狀態,每次讀取都帶有offset
  2. 隱藏broker細節,利用條件指定topic,比如:黑白名單、正則表達式

級別1:

class SimpleConsumer {
	//向一個broker發出讀取請求
	public ByteBufferMessageSet fetch(FetchRequest request);
	//向一個broker發出讀取請求,並得到一個相應集
	public MultiFetchResponse multifetch(List<FetchRequest> fetches);
	/**
	* 得到指定時間之前的offsets
	* 返回值是offsets列表,以倒序排序
	* @param time: 時間,毫秒,
	* 如果指定為OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
	* 如果指定為OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
	*/
	publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

低級別為了一些對維持消費狀態有特殊需求的場景,如Hadoop consumer這樣的離線consumer

級別2:

ConsumerConnector connector = Consumer.create(consumerConfig);
	interface ConsumerConnector {
	/**
	* 這個方法可以得到一個流的列表,每個流都是MessageAndMetadata的迭代,通過MessageAndMetadata可以拿到消息和其他的元數據(目前之后topic)
	* Input: a map of <topic, #streams>
	* Output: a map of <topic, list of message streams>
	*/
	public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
	/**
	* 你也可以得到一個流的列表,它包含了符合TopicFiler的消息的迭代,
	* 一個TopicFilter是一個封裝了白名單或黑名單的正則表達式。
	*/
	public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
	/* 提交目前消費到的offset */
	public commitOffsets()
	/* 關閉連接 */
	public shutdown()
}

圍繞KafkaStream展開,每個流代表一系列從一個或多個分區多和broker上匯聚來的消息,每個流由一個線程處理,可以在創建的時候通過參數指定想要的幾個流;
一個流是多個分區多個broker的合並,但是每個分區的消息只會流向一個流。
createMessageStreams將consumer注冊到topic上,這樣consumer和brokers之間的負載均衡就會進行調整,會消耗更多時間;
API鼓勵每次調用創建更多的topic流以減少這種調整。
createMessageStreamsByFilter方法注冊監聽可以感知新的符合filter的tipic。

四. 消息和日志

具有N個字節的消息的格式如下

如果版本號是0
1. 1個字節的 "magic" 標記
2. 4個字節的CRC32校驗碼
3. N - 5個字節的具體信息

如果版本號是1
1. 1個字節的 "magic" 標記
2. 1個字節的參數允許標注一些附加的信息比如是否壓縮了,解碼類型等
3. 4個字節的CRC32校驗碼
4. N - 6 個字節的具體信息

my_topic日志,由my_topic_0和my_topic_1兩個目錄組成,目錄中存放具體的數據文件;
數據文件:日志實體

消息長度: 4 bytes (value: 1+4+n)
版本號: 1 byte
CRC校驗碼: 4 bytes
具體的消息: n bytes

每個消息都可以由一個64位的整數offset標注
offset標注了這條消息在發送到這個分區的消息流中的起始位置
每個日志文件的名稱都是這個文件第一條日志的offset,第一個日志文件的名字就是00000000000.kafka
每相鄰的兩個文件名字的差就是一個數字S,S差不多就是配置文件中指定的日志文件的最大容量
消息的格式都由一個統一的接口維護,所以消息可以在producer,broker和consumer之間無縫的傳遞

寫操作
消息被不斷的追加到最后一個日志的末尾,當日志的大小達到一個指定的值時就會產生一個新的文件。對於寫操作有兩個參數,一個規定了消息的數量達到這個值時必須將數據刷新到硬盤上,另外一個規定了刷新到硬盤的時間間隔,這對數據的持久性是個保證,在系統崩潰的時候只會丟失一定數量的消息或者一個時間段的消息。

讀操作
讀操作需要兩個參數:一個64位的offset和一個S字節的最大讀取量。S通常比單個消息的大小要大,但在一些個別消息比較大的情況下,S會小於單個消息的大小。這種情況下讀操作會不斷重試,每次重試都會將讀取量加倍,直到讀取到一個完整的消息。可以配置單個消息的最大值,這樣服務器就會拒絕大小超過這個值的消息。也可以給客戶端指定一個嘗試讀取的最大上限,避免為了讀到一個完整的消息而無限次的重試。
在實際執行讀取操縱時,首先需要定位數據所在的日志文件,然后根據offset計算出在這個日志中的offset(前面的的offset是整個分區的offset),然后在這個offset的位置進行讀取。定位操作是由二分查找法完成的,Kafka在內存中為每個文件維護了offset的范圍。
下面是發送給consumer的結果的格式:

MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes
MultiMessageSetSend (multiFetch result)

total length       : 4 bytes
error code         : 2 bytes
messageSetSend 1
...
messageSetSend n

刪除
日志管理器允許定制刪除策略;
目前的策略是刪除修改時間在N天之前的日志(按時間刪除),也可以使用另外一個策略:保留最后的N GB數據的策略(按大小刪除)。
為了避免在刪除時阻塞讀操作,采用了copy-on-write形式的實現,刪除操作進行時,讀取操作的二分查找功能實際是在一個靜態的快照副本上進行的,這類似於Java的CopyOnWriteArrayList。

可靠性保證
日志文件有一個可配置的參數M,緩存超過這個數量的消息將被強行刷新到硬盤。
一個日志矯正線程將循環檢查最新的日志文件中的消息確認每個消息都是合法的。
合法的標准為:所有文件的大小的和最大的offset小於日志文件的大小,並且消息的CRC32校驗碼與存儲在消息實體中的校驗碼一致。如果在某個offset發現不合法的消息,從這個offset到下一個合法的offset之間的內容將被移除。
有兩種情況必須考慮:

  1. 當發生崩潰時有些數據塊未能寫入
  2. 寫入了一些空白數據塊

inode,但無法保證更新inode和寫入數據的順序;
inode保存的大小信息被更新了,但寫入數據時發生了崩潰,就產生了空白數據塊。
CRC校驗碼可以檢查這些塊並移除,當然因為崩潰而未寫入的數據塊也就丟失了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM