Kafka是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日志、訪問日志,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源 項目。
kafka對消息保存時根據Topic進行歸類,發送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。

-
Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
-
Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
-
Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列
-
Segment:partition物理上由多個segment組成,每個Segment存着message信息
-
Producer : 生產message發送到topic
-
Consumer : 訂閱topic消費message, consumer作為一個線程來消費
-
Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。

對於consumer而言,它需要保存消費消息的offset,對於offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費.事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值..(offset將會保存在zookeeper中,參見下文)
kafka集群幾乎不需要維護任何consumer和producer狀態信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響.
partitions的設計目的有多個.最根本原因是kafka基於文件存儲.通過分區,可以將日志內容分散到多個server上,來避免文件尺寸達到單機磁盤的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率.此外越多的partitions意味着可以容納更多的consumer,有效提升並發消費的能力.(具體原理參見下文).
Distribution
Kafka和JMS(Java Message Service)實現(activeMQ)不同的是:
即使消息被消費,消息仍然不會被立即刪除.日志文件將會根據broker中的配置要求,保留一定的時間之后刪除;比如log文件保留2天,那么兩天后,文件會被清除,無論其中的消息是否被消費.kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費之后對文件內容改動的磁盤IO開支.
Kafka的特性:
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失
- 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高並發:支持數千個客戶端同時讀寫
Kafka的使用場景:
- 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源
設計原理:
kafka的設計初衷是希望作為一個統一的信息收集平台,能夠實時的收集反饋信息,並需要能夠支撐較大的數據量,且具備良好的容錯能力.
kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性.且無論任何OS下,對文件系統本身的優化幾乎沒有可能.文件緩存/直接內存映射等是常用的手段.因為kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數.
2、性能
需要考慮的影響性能點很多,除磁盤IO之外,我們還需要考慮網絡IO,這直接關系到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次copy和交換. 其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網絡IO更應該需要考慮.可以將任何在網絡上傳輸的消息都經過壓縮.kafka支持gzip/snappy等多種壓縮方式.
負載均衡: producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".事實上,消息被路由到哪個partition上,有producer客戶端決定.比如可以采用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那么在producer端實現"消息均衡分發"是必要的.
其中partition leader的位置(host:port)注冊在zookeeper中,producer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件.
異步發送:將多條消息暫且在客戶端buffer起來,並將他們批量的發送到broker,小數據IO太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效率。不過這也有一定的隱患,比如說當producer失效時,那些尚未發送的消息將會丟失。
consumer端向broker發送"fetch"請求,並告知其獲取消息的offset;此后consumer將會獲得一定條數的消息;consumer端也可以重置offset來重新消費消息.
在JMS實現中,Topic模型基於push方式,即broker將消息推送給consumer端.不過在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動去pull(或者說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch消息並處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數量,batch fetch.
其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之后,consumer可以在本地保存最后消息的offset,並間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級.

對於JMS實現,消息傳輸擔保非常直接:有且只有一次(exactly once)。在kafka中稍有不同:
- at most once:最多一次,這個和JMS中"非持久化"消息類似.發送一次,無論成敗,將不會重發.
- at least once:消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功.
- exactly once:消息只會發送一次.
at most once:消費者fetch消息,然后保存offset,然后處理消息;當client保存offset之后,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理.那么此后"未處理"的消息將不能被fetch到,這就是"at most once".
at least once:消費者fetch消息,然后處理消息,然后保存offset,如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態.
exactly once:kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的.
通常情況下“at-least-once"是我們首選.(相比at most once而言,重復接收數據總比丟失數據要好).
6、復制備份
kafka將每個partition數據復制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置文件來設定,leader處理所有的read-write請求,follower需要和leader保持同步。Follower和consumer一樣,消費消息並保存在本地日志中;leader負責跟蹤所有的follower狀態,如果follower"落后"太多或者失效,leader將會把它從replicas同步列表中刪除,當所有的follower都將一條消息保存成功,此消息才被認為是"committed",那么此時consumer才能消費它,即使只有一個replicas實例存活,仍然可以保證消息的正常發送和接收,只要zookeeper集群存活即可.(不同於其他分布式存儲,比如hbase需要"多數派"存活才行)
當leader失效時,需在followers中選取出新的leader,可能此時follower落后於leader,因此需要選擇一個"up-to-date"的follower,選擇follower時需要兼顧一個問題,就是新leaderserver上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力,在選取新leader,需要考慮到"負載均衡".
如果一個topic的名稱為"my_topic",它有2個partitions,那么日志將會保存在my_topic_0和my_topic_1兩個目錄中;日志文件中保存了一序列"log entries"(日志條目),每個log entry格式為"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每個日志都有一個offset來唯一的標記一條消息,offset的值為8個字節的數字,表示此消息在此partition中所處的起始位置,每個partition在物理存儲層面,有多個log file組成(稱為segment),segmentfile的命名為"最小offset".kafka,例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)
1) 、Broker node registry: 當一個kafkabroker啟動后,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.
格式: /broker/ids/[0...N] -->host:port; 其中[0..N]表示broker id,每個broker的配置文件中都需要指定一個數字類型的id(全局不可重復),znode的值為此broker的host:port信息.
2)、 Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.
格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引號.
3) 、Consumer and Consumer group: 每個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".
一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.
4) 、Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此id用來標記消費者信息.
格式:/consumers/[group_id]/ids/[consumer_id] 仍然是一個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表.
5)、 Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.
格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value
此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.
6)、 Partition Owner registry: 用來標記partition被哪個consumer消費.臨時znode
格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id 當consumer啟動時,所觸發的操作:
A) 首先進行"Consumer id Registry";
B) 然后在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
-
- Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連接並發送消息.
- Broker端使用zookeeper用來注冊broker信息,已經監測partitionleader存活性.
- Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連接,並獲取消息.
主要配置:
- borker配置
- Consumer主要配置
- Producer主要配置
以上是關於kafka一些基礎說明,在其中我們知道如果要kafka正常運行,必須配置zookeeper,否則無論是kafka集群還是客戶端的生存者和消費者都無法正常的工作的,以下是對zookeeper進行一些簡單的介紹:
zookeeper集群
一、下載zookeeper
二、安裝zookeeper
將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下。然后按照如下值修改其中的配置: # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/wwb/zookeeper /data dataLogDir=/home/wwb/zookeeper/logs # the port at which the clients will connect clientPort=2181 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. #http://zookeeper.apache.org/doc/ ... html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.1=192.168.0.1:3888:4888 server.2=192.168.0.2:3888:4888 server.3=192.168.0.3:3888:4888
tickTime:這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
dataDir:顧名思義就是 Zookeeper 保存數據的目錄,默認情況下,Zookeeper 將寫數據的日志文件也保存在這個目錄里。
clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。
initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒
syncLimit:這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是2*2000=4 秒
server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;B 是這個服務器的 ip 地址;C 表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;D 表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。如果是偽集群的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 實例通信端口號不能一樣,所以要給它們分配不同的端口號
注意:dataDir,dataLogDir中的wwb是當前登錄用戶名,data,logs目錄開始是不存在,需要使用mkdir命令創建相應的目錄。並且在該目錄下創建文件myid,serve1,server2,server3該文件內容分別為1,2,3。
針對服務器server2,server3可以將server1復制到相應的目錄,不過需要注意dataDir,dataLogDir目錄,並且文件myid內容分別為2,3
三、依次啟動server1,server2,server3的zookeeper.
/home/wwb/zookeeper/bin/zkServer.sh start,出現類似以下內容 JMX enabled by default Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
四、測試zookeeper是否正常工作,在server1上執行以下命令
/home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出現類似以下內容 JLine support is enabled 2013-11-27 19:59:40,560 - INFO [main-SendThread(localhost.localdomain:2181):ClientCnxn$SendThread@736]- Session establishmentcomplete on server localhost.localdomain/127.0.0.1:2181, sessionid = 0x1429cdb49220000, negotiatedtimeout = 30000
WatchedEvent state:SyncConnected type:None path:null [zk: 127.0.0.1:2181(CONNECTED) 0] [root@localhostzookeeper2]#
即代表集群構建成功了,如果出現錯誤那應該是第三部時沒有啟動好集群,
運行,先利用 ps aux | grep zookeeper 查看是否有相應的進程的,沒有話,說明集群啟動出現問題,可以在每個服務器上使用 ./home/wwb/zookeeper/bin/zkServer.sh stop。再依次使用./home/wwb/zookeeper/binzkServer.sh start,這時在執行4一般是沒有問題,如果還是有問題,那么先stop再到bin的上級目錄執行./bin/zkServer.shstart試試。
注意:zookeeper集群時,zookeeper要求半數以上的機器可用,zookeeper才能提供服務。
kafka集群
broker.id=1 port=9091 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=./logs num.partitions=2 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=536870912 num.replica.fetchers=2 log.cleanup.interval.mins=10 zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183 zookeeper.connection.timeout.ms=1000000 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics kafka.csv.metrics.reporter.enabled=false
> cd kafka01 > ./sbt update > ./sbt package > ./sbt assembly-package-dependency 在第二個命令時可能需要一定時間,由於要下載更新一些依賴包。所以請大家 耐心點。
>JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties &
- kafka02操作步驟與kafka01雷同,不同的地方如下
- kafka03操作步驟與kafka01雷同,不同的地方如下
>bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic
>bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181 topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0
>bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic my test message1 my test message2 ^C
>bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic ... my test message1 my test message2 ^C
>pkill -9 -f config/server.properties
>bin/kafka-list-top.sh --zookeeper192.168.0.1:2181 topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0 發現topic還正常的存在
>bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C 說明一切都是正常的。
-
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap),其中該方法的參數Map的key為topic名稱,value為topic對應的分區數,譬如說如果在kafka中不存在相應的topic時,則會創建一個topic,分區數為value,如果存在的話,該處的value則不起什么作用
-
關於生產者向指定的分區發送數據,通過設置partitioner.class的屬性來指定向那個分區發送數據,如果自己指定必須編寫相應的程序,默認是kafka.producer.DefaultPartitioner,分區程序是基於散列的鍵。
-
在多個消費者讀取同一個topic的數據,為了保證每個消費者讀取數據的唯一性,必須將這些消費者group_id定義為同一個值,這樣就構建了一個類似隊列的數據結構,如果定義不同,則類似一種廣播結構的。
-
在consumerapi中,參數設計到數字部分,類似Map<String,Integer>,
-
numStream,指的都是在topic不存在的時,會創建一個topic,並且分區個數為Integer,numStream,注意如果數字大於broker的配置中num.partitions屬性,會以num.partitions為依據創建分區個數的。
-
producerapi,調用send時,如果不存在topic,也會創建topic,在該方法中沒有提供分區個數的參數,在這里分區個數是由服務端broker的配置中num.partitions屬性決定的