kafka 是 linkedin 公司開發的,為解決用戶行為日志而產生;
kafka的最初目的並非消息隊列組件,而流式處理平台;
只是其中有的功能看起來像一個消息隊列組件;
1. 發布訂閱功能;
2. 存儲流式日志數據;
3. 實時處理流式數據;
kafka安裝很簡單,主要是要調整下一些配置參數。broker.id, port, zookeeper.connect, log.dirs, num.recovery.thread.per.data.dir, auto.create.topics.enable, num.partions, log.retention.ms, ...
Kafka部署環境:
1. 磁盤性能直接影響kafka吞吐量。
3. 內存同樣影響其吞吐能力,jvm不需要太多內存,但是多余的系統內存可以被用作頁面緩存(性能關鍵)。
4. 網絡同樣影響其吞吐量,在其他性能都很好時,如果帶寬受限則吞吐量受限,且kafka還會存在數據集群復制和鏡像也會占用網絡流量。
5. kafka對cpu要求相對不高,但在壓縮方面會影響cpu負載。
kafka容量估算,受限於復制系統和單個分區存儲能力;適當對操作系統調優;
kafka處理模型為(推送和拉取):生產者發送數據到kafka,發送將會按照分區規則進行指定分區,kafka會暫時將數據存儲在內存,然后在適當時機沖刷到磁盤,隨后消費者進行主動拉取式消費,自行維護數據偏移,從而完成消息從生產者轉移到消費者的傳遞。
kafka由很多broker組成,單個broker可以輕松處理數千個分區以及每秒百萬級的消息量。
kafka一般都是集群部署的,每個集群都有一個broker充當了集群控制器的角色(集群選舉產生)。控制器負責管理工作,包括將分區分配給broker和監控broker。一個分區從屬於broker,即每個分區的首領。如果一個broker失效,其他broker可以接管領導權。
kafka具有保留消息的能力,這是它和很多消息中間件差異較大地方。可以設置保留天數,可以設置到達一定數量后。
消費者群組可以作為一個統一的消費者,共享一個消息流,整個群組對同一個消息只處理一次。而普通非群組機器,則是任意消費數據。
遠程讀取比遠程生成更加安全。因為如果消費者連接到生產集群,最多也就是無法讀取數據,但數據仍然會保存在kafka里很長時間。而相反居發生網絡分區時,已經讀取了數據卻無法發送生成數據到目標機器,那么就會造成數據丟失。(MirrorMaker)
生產者主要負責數據發送,關鍵點:
1. 需要指定幾個broker地址信息,當一個不可用時,可以切到下一個。
2. 指定序列化方式,key和value序列化方式,avro;
3. 有異步發送和同步等待,差別在於是否阻塞get();
4. 異步發送可以設置回調方法;
5. acks用於設置與kafka的確認方式,取決性能和可靠性的平衡。值有0, 1, all.
6. retries 配置重試次數;
7. buffer.memory
8. compression.type snapy, gzip, lz4
9. batch.size 一個批次可使用大小;
10. 可以自定義分區器,用於指定發送的分區,實現 Partioner 接口即可。
kafka消費者,一般為群組消費,要點:
1. 群組會自動實現負載均衡;
2. 分區再均衡時,消費者將暫停消息,並收到通知,可以自行處理數據;
3. 消費者上線或下線時會觸發再均衡;
4. consumer.subscribe("test.*") 消息數據;
5. 通過 consumer.poll(100)拉取數據;
6. 可以自動提交偏移,也可以主動提交偏移;
7. fetch.min.bytes 獲取最小字節數;
8. fetch.max.wait.ms 最長等待時間;
9. session.timeout.ms 會話超時;
10. auto.offset.reset 無偏移時的消費偏移策略,earliest, latest .
11. enable.auto.commit 自動提交;
12. partition.assignment.strategy 分區分配策略;
13. max.poll.records 單次最大數;
14. __consumer_offset 特殊主題保存偏移量;
15. ...
16. 再均衡監聽器,可以進行清理工作;實現 ConsumerRebalanceListener. 可以監聽 丟失分區前事件(清理現場),也可以監聽新分區被分配時事件(初始化數據);
17.
深入kafka
1. kafka如何復制;
2. kafka如何處理來自生產者和消費者的請求;
3. kafka存儲細節;
控制器由/controller 節點控制;使用epoch來避免腦裂;
復制特性是kafka的重要特性和架構核心。
副本類型有 首領副本和跟隨者副本。首選首領是第一個副本;
處理請求流程:broker會在各監聽端口運行一個acceptor,創建連接后交給processor處理,processor數量可配置,它從客戶端獲取連接請求,把它放到請求隊列,然后從響應隊列里獲取消息,發送給客戶端;具體業務處理由io線程處理(從請求隊列獲取數據,完后放入響應隊列)
消費者可以設定,讓broker等到有足夠多數據后再返回(實現原理應該是sleep()或者wait/notify模式),也可以設置最大超時時間;
可以自定義實現與broker通信協議,但broker之間也有同樣的通信協議,客戶端不應該使用這些請求。比如新首領選舉出來,控制器會發送LeaderAndIsr請求給新首領和跟隨者。
kafka基本存儲單元是分區,無法再細分;
kafka分區分配策略會保證,分區均衡地分布在broker集群里,包括機架信息。
文件會按修改日期和設置的過期策略進行刪除,活躍片段不會刪除。
kafka的索引是為了讓broker快速定位到偏移處,索引把偏移量映射到片段和偏移量所在文件的位置;索引也被分為片段,所以在刪除消息時,也可以刪除相應索引;(索引是非必須的,如果不存在將被kafka重建)
清理使用map保存散列值和偏移量,將污濁部分清理掉。
事件刪除場景,比如將用戶從系統刪除,應用程序發送null消息,墓碑消息,該消息會被清理線程從kafka里刪除。
kafka可靠的數據傳遞
kafka可靠性保證:
1. 保證分區有序;
2. 只有當消息寫入所有副本時,才認為消息已提交,消費者才可用;
3. 只要有一個副本是活躍的,那么消息就不會丟失;
4. 消費者只能讀取到已提交的消息;
復制系統默認為3,可以自定義,但盡量為奇數;
不完全首領選舉,可以開啟但是有風險;
應用程序驗證,依次重啟broker,依次重啟消費者,依次重啟生產者,客戶端從服務器斷開,首領選舉。
構建數據管道connect
數據庫A到數據庫C的遷移;mysql到es;
跨集群數據鏡像 mirrormaker
使用場景
1. 區域集群和中心集群;
2. 冗余dr;
3. 雲遷移;
流處理:
kafka是個穩定的數據來源;
流處理的設計模式:
時間窗口,流join,
流處理的設計模式:單個事件處理,本地狀態;map或filter模式;
單機10w/30s,tps: 3300+;
實際上只成功7w/30s,即tps: 2000+, 即被內存鎖住(512M);