一:介紹
1.官網
kafka.apache.org
2.產生
Kafka由 linked-in 開源
kafka-即是解決上述這類問題的一個框架,它實現了生產者和消費者之間的無縫連接。
kafka-高產出的分布式消息系統(A high-throughput distributed messaging system)
3.狀況
Apache kafka 是一個分布式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。
它現在是Apache旗下的一個開源系統,作為Hadoop生態系統的一部分,被各種商業公司廣泛應用。
它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎。
4.特性
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失
- 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高並發:支持數千個客戶端同時讀寫
5.設計思想
- Consumergroup:各個consumer可以組成一個組,每個消息只能被組中的一個consumer消費,如果一個消息可以被多個consumer消費的話,那么這些consumer必須在不同的組。
- 消息狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着如果consumer處理不好的話,broker上的一個消息可能會被消費多次。
- 消息持久化:Kafka中會把消息持久化到本地文件系統中,並且保持極高的效率。
- 消息有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節是可配置的。
- 批量發送:Kafka支持以消息集合為單位進行批量發送,以提高push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產和消費是異步的。
- Kafka集群中broker之間的關系:不是主從關系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節點。
- 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
- 同步異步:Producer采用異步push方式,極大提高Kafka系統的吞吐率(可以通過參數控制是采用同步還是異步方式)。
- 分區機制partition:Kafka的broker端支持消息分區,Producer可以決定把消息發到哪個分區,在一個分區中消息的順序就是Producer發送消息的順序,一個主題中可以有多個分區,具體分區的數量是可配置的。分區的意義很重大,后面的內容會逐漸體現。
- 離線數據裝載:Kafka由於對可拓展的數據持久化的支持,它也非常適合向Hadoop或者數據倉庫中進行數據裝載。
- 插件支持:現在不少活躍的社區已經開發出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。
6.應用場景
- 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
- 流式處理:比如spark streaming和storm
- 事件源
7.組件
Kafka中發布訂閱的對象是topic。
我們可以為每類數據創建一個topic,把向topic發布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。
Producers和consumers可以同時從多個topic讀寫數據。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。
- topic:消息存放的目錄即主題
- Producer:生產消息到topic的一方
- Consumer:訂閱topic消費消息的一方
- Broker:Kafka的服務實例就是一個broker
8.Kafka Topic&Partition
消息發送時都被發送到一個topic,其本質就是一個目錄,而topic由是由一些Partition Logs(分區日志)組成,其組織結構如下圖所示:
我們可以看到,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
Kafka集群會保存所有的消息,不管消息有沒有被消費;我們可以設定消息的過期時間,只有過期的數據才會被自動清除以釋放磁盤空間。比如我們設置消息過期時間為2天,那么這2天內的所有消息都會被保存到集群中,數據只有超過了兩天才會被清除。
Kafka需要維持的元數據只有一個–消費消息在Partition中的offset值,Consumer每消費一個消息,offset就會加1。其實消息的狀態完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。
把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;第二就是可以提高並發,因為可以以Partition為單位讀寫了。
二:核心組件
1.Replications、Partitions 和Leaders
通過上面介紹的我們可以知道,kafka中的數據是持久化的並且能夠容錯的。Kafka允許用戶為每個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。如果你的副本數量設置為3,那么一份數據就會被存放在3台不同的機器上,那么就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重啟機器時不會影響到數據消費。如果對數據持久化有更高的要求,可以把副本數量設置為3或者更多。
Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數量,Partition的數量決定了組成topic的log的數量。Producer在生產數據時,會按照一定規則(這個規則是可以自定義的)把消息發布到topic的各個partition中。上面將的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。
關於如何設置partition值需要考慮的因素。一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。所以,推薦partition的數量一定要大於同時運行的consumer的數量。另外一方面,建議partition的數量大於集群broker的數量,這樣leader partition就可以均勻的分布在各個broker中,最終使得集群負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些內存來緩存消息數據,如果partition數量越大,就要為kafka分配更大的heap space。
2. Producers
Producers直接發送消息到broker上的leader partition,不需要經過任何中介一系列的路由轉發。為了實現這個特性,kafka集群中的每個broker都可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的。
Producer客戶端自己控制着消息被推送到哪些partition。實現的方式可以是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的分區,用戶可以為每個消息指定一個partitionKey,通過這個key來實現一些hash分區算法。比如,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個分區。
以Batch的方式推送數據可以極大的提高處理效率,kafka Producer 可以將消息在內存中累計到一定數量后作為一個batch發送請求。Batch的數量大小可以通過Producer的參數控制,參數值可以設置為累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。通過增加batch的大小,可以減少網絡請求和磁盤IO的次數,當然具體參數設置需要在效率和時效性方面做一個權衡。
Producers可以異步的並行的向kafka發送消息,但是通常producer在發送完消息之后會得到一個future響應,返回的是offset值或者發送過程中遇到的錯誤。這其中有個非常重要的參數“acks”,這個參數決定了producer要求leader partition 收到確認的副本個數,如果acks設置數量為0,表示producer不會等待broker的響應,所以,producer無法知道消息是否發送成功,這樣有可能會導致數據丟失,但同時,acks值為0會得到最大的系統吞吐量。
若acks設置為1,表示producer會在leader partition收到消息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到消息。若設置為-1,producer會在所有備份的partition收到消息時得到broker的確認,這個設置可以得到最高的可靠性保證。
Kafka 消息有一個定長的header和變長的字節數組組成。因為kafka消息支持字節數組,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前。
3.Consumers
Kafka提供了兩套consumer api,分為high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連接,並且這個API是完全無狀態的,每次請求都需要指定offset值,因此,這套API也是最靈活的。
在kafka中,當前讀到消息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的數據。比如,consumer可以通過重設offset值來重新消費已消費過的數據。不管有沒有被消費,kafka會保存數據一段時間,這個時間周期是可配置的,只有到了過期時間,kafka才會刪除這些數據。
High-level API封裝了對集群中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費消息的狀態,即每次消費的都是下一個消息。
High-level API還支持以組的形式消費topic,如果consumers有同一個組名,那么kafka就相當於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不同的組名,那么此時kafka就相當與一個廣播服務,會把topic中的所有消息廣播到每個consumer。
二:安裝
1.上傳
這個可以在官網上進行下載。
上傳。
2.解壓到modules
tar -zxvf kafka_2.10-0.8.1.1.tgz -C /opt/modules/
3.修改配置文件server.properties
這個配置文件用於服務器節點的配置文件。
)修改kafa收集到的日志數據存儲文件夾
地址可以先不用創建,在首次啟動kafka的時候,會自動進行創建。
)修改zookeeper
三:啟動
1.啟動zookeeper
這個是在啟動broker之前需要保證zookeeper集群是運行着的。
2.啟動broker
先啟動一下,開有沒有報錯。
如果沒有出現日志錯誤,就使用下面的命令:
nohup bin/kafka-server-start.sh config/server.properties > logs/server-start.log 2>&1 &
其中,server-start.log是自己寫的一個log文件,在原有的文件logs下面是沒有的。
3.檢驗
一個jps,一個ps查看進程。
查看端口9092是否開放
4.創建topic(使用help幫助)
5.創建一個nginxlog的topic
bin/kafka-topics.sh --create --topic nginxlog --partitions 1 --replication-factor 1 --zookeeper linux-hadoop3.ibeifeng.com:2181
說明:
創建的消息主題是nginxlog。
指定topic的分區數:partitons
指定topic的備份數:replication-factor
6.查看詳情
bin/kafka-topics.sh --describe --topic nginxlog --zookeeper linux-hadoop3.ibeifeng.com:2181
三:檢測topic
一個產生,一個在在消費的測試。
1.啟動消息生產者,將消息發送到kafka的topic上
bin/kafka-console-producer.sh --broker-list linux-hadoop3.ibeifeng.com:9092 --topic nginxlog
說明:
--broker-list是指定broker的節點地址。
2.啟動消息消費者
bin/kafka-console-consumer.sh --zookeeper linux-hadoop3.ibeifeng.com:2181 --topic nginxlog --from-beginning
--from-beginning是從開頭讀取的意思。
四:模擬產生nginx日志
1.在服務器上創建一個根目錄
2.上傳jar包
3.執行命令
java -jar data-generate-1.0-SNAPSHOT-jar-with-dependencies.jar 1000 >>nginx.log
4.查看
tail -f nginx.log
五:使用flume將模擬產生的nginx日志上傳到hdfs和kafka上
1.當前狀態
2.project_agent.conf
3.具體的代碼

1 #exec source - memory channel - kafka sink/hdfs sink 2 a1.sources = r1 3 #a1.sinks = kafka_sink hdfs_sink 4 a1.sinks = kafka_sink 5 #a1.channels = c1 c2 6 a1.channels = c1 7 8 a1.sources.r1.type = exec 9 a1.sources.r1.command = tail -F /home/beifeng/workspace/nginx.log 10 11 12 # kafka_sink 13 a1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink 14 a1.sinks.kafka_sink.topic = nginxlog 15 a1.sinks.kafka_sink.brokerList =linux-hadoop3.ibeifeng.com:9092 16 # 確認級別: 0 表示不確認 17 # 1 topic有個多個備份 replication 主 leader備份 18 # 只要求leader備份寫入成功就任務消息發送到Topic成功 19 # -1 topic有個多個備份 要求所有備份寫入成功,才算消息生產成功 20 a1.sinks.kafka_sink.requiredAcks = 1 21 a1.sinks.kafka_sink.batchSize = 20 22 a1.sinks.kafka_sink.channel = c1 23 24 # hdfs_sink 25 #a1.sinks.hdfs_sink.type = hdfs 26 #a1.sinks.hdfs_sink.hdfs.path = /flume/events/%Y%m%d 27 #a1.sinks.hdfs_sink.hdfs.filePrefix = nginx_log- 28 #a1.sinks.hdfs_sink.hdfs.fileType = DataStream 29 #a1.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true 30 #a1.sinks.hdfs_sink.hdfs.rollInterval = 0 31 #rollSize值 比hdfs block大小 小一點 (10M) 32 #a1.sinks.hdfs_sink.hdfs.rollSize = 102400 33 #a1.sinks.hdfs_sink.hdfs.rollCount = 0 34 35 36 # Use a channel which buffers events in memory 37 a1.channels.c1.type = memory 38 a1.channels.c1.capacity = 1000 39 a1.channels.c1.transactionCapacity = 100 40 41 #a1.channels.c2.type = memory 42 #a1.channels.c2.capacity = 1000 43 #a1.channels.c2.transactionCapacity = 100 44 45 # Bind the source and sink to the channel 46 #a1.sources.r1.channels = c1 c2 47 #a1.sinks.kafka_sink.channel = c1 48 #a1.sinks.hdfs_sink.channel = c2 49 #指定source 與 channel之間的關系 復制(默認) --- 多路復用模式 event header body 50 #a1.sources.r1.selector.type = replicating 51 52 ######################################################### 53 # Bind the source and sink to the channel 54 a1.sources.r1.channels = c1 55 a1.sinks.kafka_sink.channel = c1
六:具體情況
思路:由jar產生數據到nginx.log,然后使用flume讀取,傳送到kafka,這樣就有了生產的數據。
然后,消費者就可以消費jar產生的數據了。
1.執行flume
bin/flume-ng agent -n a1 -c conf/ --conf-file conf/project_agent.conf -Dflume-root-logger=INFO,console
2.啟動生產者,就是模擬問價jar
讓其不斷的產生日志到nginx.log中,代替生產者。
3.觀看啟動了消費者的窗口
就會發現不斷的產生日志被消費
七:注意
1.關閉kafka