Kafka實戰解惑(轉發)


原文:https://www.cnblogs.com/wxd0108/p/6475056.html

目錄

一、 kafka簡介
二、 Kafka架構方案
三、 Kafka安裝
四、 Kafka Client API
  4.1 Producers API
  4.2 Consumers API
  4.3 消息高可靠 At-Least-Once
  4.4 消息高可靠Consumer
  4.5 生產者、消費者總結
五、 Kafka運維
  5.1 Broker故障切換
  5.2 Broker動態擴容
  5.2.1 增加分區
  5.2.2 增加Broker Server
  5.3 Kafka配置優化
  5.4 數據清理
  5.4.1 數據刪除
  5.4.2 數據壓縮
  5.5 Kafka運行監控
六、Kafka其他組件](#c6)
  6.1 Kafka Connect
  6.2 Kafka Stream
  6.3 Kafka Camus
七、 Kafka典型應用場景
  7.1 ETL
八、 參考資料


一、Kafka簡介

Kafka是LinkedIn使用scala開發的一個分布式消息系統,它以水平擴展能力和高吞吐率著稱,被廣泛用於日志處理、ETL等應用場景。Kafka具有以下主要特點:

  1. 同時為發布和訂閱提供高吞吐量。據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
  2. 可進行持久化操作。將消息持久化到磁盤,因此可用於批量消費,例如ETL以及實時應用程序。通過將數據持久化到硬盤以及replication防止數據丟失。
  3. 分布式系統,易於向外擴展。所有的producer、broker和consumer都會有多個,均為分布式的,無需停機即可擴展機器。
  4. 消息被處理的狀態由消費者同步到zookeeper而非broker server中,當broker server失效時,通過副本切換機制選擇一個新的broker server,消費者從zookeeper中讀取之前消費消息的位置,不會引起消息丟失。
  5. 支持online和offline的場景。

LinkedIn有個三人小組出來創業了—正是當時開發出Apache Kafka實時信息列隊技術的團隊成員,基於這項技術Jay Kreps帶頭創立了新公司Confluent。Confluent的產品圍繞着Kafka做的,與Kafka相比,Confluent包含了更多的組件:

  1. Confluent Control Center(閉源)。管理和監控Kafka最全面的GUI驅動系統
  2. Confluent Kafka Connectors(開源)。連接SQL數據庫/Hadoop/Hive
  3. Confluent Kafka Clients(開源)。對於其他編程語言,包括C/C++,Python
  4. Confluent Kafka REST Proxy(開源)。允許一些系統通過HTTP和Kafka之間發送和接收消息。
  5. Confluent Schema Registry(開源)。幫助確定每一個應用使用正確的schema當寫數據或者讀數據到Kafka中。

二、 Kafka架構方案

從物理結構上看,整個Kafka系統由消息生產者、消息消費者、消費存儲服務器外加Zookeeper構成。其中消息生產者被稱為Producer、消息消費者被稱為Consumer、消息存儲服務器被稱為Broker。整個Kafka的架構方案非常簡單,典型的無狀態水平擴展架構,通過水平增加Broker實例實現系統的高吞吐率,而有狀態的數據則存儲到Zookeeper中。

Kafka采用Push-Pull模式,生產者發送消息時,可根據策略存儲在Kafka集群的任意一台broker上,消費者通過定時輪詢(非固定周期)的方式從Broker上取得消息。消息發送到哪一台服務器上,又從哪台服務器上獲取消息,則是由邏輯結構解決的,或者說邏輯結構建立在物理結構基礎上,對於生產者、消費者而言,只要了解邏輯結構就可以了。

從邏輯上講,一個Kafka集群中包含若干個消息隊列,每個消息隊列都有自己的名稱,在Kafka中消息隊列的名稱被稱為Topic,為了實現系統的高吞吐率,每個消息隊列被拆分成不同部分,即我們所說的分區(Partition),分區存儲在不同的Broker中。生產者發送消息時可根據一定策略發送到不同的分區中,這類似於數據庫的分庫分表操作,同樣消費者拉取消息時,也可以根據一定策略從某個分區中讀取消息。就物理結構而言,每個分區就是broker上的一個文件,試想一下並發的對多個分布在不同broker上的文件進行讀寫,性能當然顯著優於對單台broker上的文件進行讀寫,我們所說的Kfaka具有高吞吐率就是這個道理。

Kafak每個Topic的消息都存儲在日志文件中,Kafka消息日志文件由一個索引文件和若干個具體的消息文件構成。每個消息文件都由起始消息編號構成,通過索引可以快速定位消息文件進行讀寫,由於消息是順序寫入文件中,所以讀寫效率非常高。在6塊7200轉的SATA RAID-5磁盤陣列的線性寫速度差不多是600MB/s,但是隨即寫的速度卻是100k/s,差了差不多6000倍。現代的操作系統都對次做了大量的優化,使用了 read-ahead 和 write-behind的技巧,讀取的時候成塊的預讀取數據,寫的時候將各種微小瑣碎的邏輯寫入組織合並成一次較大的物理寫入,很多時候線性讀寫磁盤比隨機讀取內存都快。

與其他常見的消息隊列不同,Kafka有一個叫做消費組的概念,多個消費者被邏輯上合並在一起叫做消費組。一個消息隊列理論上可擁有無限個消費組,消費組是Kafka有別於其他消息隊列的一個重要概念,同一個分區的消息只能被一個消費組內的某個消費者讀取,但其他消費組內的消費者仍然可讀取這個分區的消費。如下圖所示整個Kafka消息隊列由兩個broker server構成,server1上包含兩個分區p0、p3,server2上包含兩個分區p1、p2。現在有兩個消費組A、B,消費組A中包含兩個消費者C1、C2,消費組B中包含4個消費者C3、C4、C5、C6。那么假定P0分區上有一條消息。Consumer Group A中的C1、C2其中之一會消費這條消息,Consumer Group B中的C3、C4、C5、C6其中之一也會消費這條消息,也就是說兩個消費組A、B中的消費者都會同時消費這條消息,而組內只能有一個消費者消費這條消息。

我們所說的C1、C2只是一個邏輯上的划分就具體實現而言,C1、C2可以是一個進程內部的兩個線程,也可以是兩個獨立的進程,對於C3、C4、C5、C6也是同樣的道理。我們知道Kafka每個分區中的消息都是以順序結構保存到文件中的,那么消費者每次從什么位置讀取消息呢,奧秘就是每個消費者都保存offset到zookeeper中。

如前所述,Kafka是一個Push-Pull模式的消息隊列,並且可以有多個生產者、多個消費者,那么這些生產者和消費者是如何協同工作的呢?首先我們來看生產者怎么確定把消費發送到哪個分區上。默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions。

def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
}

這就保證了相同key的消息一定會被路由到相同的分區。如果你沒有指定key,那么Kafka是如何確定這條消息去往哪個分區的呢?我們來看下面的代碼:

復制代碼
if(key == null) {  // 如果沒有指定key
        val id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有沒有緩存的現成的分區Id
        id match {
          case Some(partitionId) =>  
            partitionId  // 如果有的話直接使用這個分區Id就好了
          case None => // 如果沒有的話,
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出所有可用分區的leader所在的broker
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 從中隨機挑一個
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用
            partitionId
        }
      }
復制代碼

 

可以看出,Kafka幾乎就是隨機找一個分區發送無key的消息,然后把這個分區號加入到緩存中以備后面直接使用——當然了,Kafka本身也會清空該緩存(默認每10分鍾或每次請求topic元數據時)

接下來我們來看消費者如何獲取消息。對於消費者Kafka提供的兩種分配策略: range和roundrobin,由參數 partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0 ~ P9,consumer線程數是3, C0 ~ C2,那么每個線程都分配哪些分區呢?

C0 消費分區 0, 1, 2, 3
C1 消費分區 4, 5, 6
C2 消費分區 7, 8, 9

為了保證高可靠,Kafka每個分區都有一定數量的副本,當故障發生時通過zookeeper選擇其一作為領導者,Kafka采用同步復制機制,寫leader完成后在寫副本。如果某個副本寫失敗,則將這個副本從當前分區一致集合中摘除,后期根據一定策略在進行異步補償,將不一致狀態變為一致狀態。極端情況下如果所有副本寫入均失敗,變為不一致狀態,如果在變成一致狀態前leader崩潰,那么消息才可能真正丟失,但極端情況很難出現,一旦出現這種極端情況,任何系統都無能為力了,所以我們說Kafka還是非常可靠的。

三、 Kafka安裝

  View Code

四、 Kafka Client API

如前所述Kafka是一個消息隊列,生產者發送消息到Kafka,消費者從Kafka中拉取消息,因此Kafka提供生產者、消費者兩類API供程序開發使用。我們先來看一個生產者、消費者的簡單例子,了解一下Kafka Client API的基本用法,而后在深入了解Kafka Client API的細節。

4.1 Producers API

  View Code

 

Kafka 0.82版之后,提供新的API,對於生產者的API來講,使用邏輯比較簡單,推薦使用新API向Kafka發送消息。向Kafka發送消息時首先需要構建一個KafkaProducer對象,並設置發送消息的一些參數。Producer端的常用配置有:

復制代碼
bootstrap.servers:Kafka集群連接串,可以由多個host:port組成
acks:broker消息確認的模式,有三種:
0:不進行消息接收確認,即Client端發送完成后不會等待Broker的確認
1:由Leader確認,Leader接收到消息后會立即返回確認信息
all:集群完整確認,Leader會等待所有in-sync的follower節點都確認收到消息后,再返回確認信息。
我們可以根據消息的重要程度,設置不同的確認模式。默認為1
retries:發送失敗時Producer端的重試次數,默認為0
batch.size:當同時有大量消息要向同一個分區發送時,Producer端會將消息打包后進行批量發送。如果設置為0,則每條消息都獨立發送。默認為16384字節。
linger.ms:發送消息前等待的毫秒數,與batch.size配合使用。在消息負載不高的情況下,配置linger.ms能夠讓Producer在發送消息前等待一定時間,
以積累更多的消息打包發送,達到節省網絡資源的目的。默認為0。 key.serializer/value.serializer:消息key/value的序列器Class,根據key和value的類型決定。 buffer.memory:消息緩沖池大小。尚未被發送的消息會保存在Producer的內存中,如果消息產生的速度大於消息發送的速度,
那么緩沖池滿后發送消息的請求會被阻塞。默認33554432字節(32MB)。
復制代碼

 

相比起Producers API的便宜使用,Consumer API的使用要復雜很多,核心問題就是如何高可靠的處理消息,保證消息不丟失。Kafka為了保證消息不丟失能被消費者成功的處理,在消費者處理消息成功后需要向Kafka發送確認確認消息被成功的消費。

  View Code

 

上面的代碼很容易看懂,但props.put("auto.commit.interval.ms", "1000")需要特殊說明一下。

4.3 消息高可靠 At-Least-Once

網上各種文章經常談到Kafka丟消息問題,那么Kakfa真的不可靠,只能用在允許有一定錯誤的系統中嗎?這個問題還得從Kaka的設計初衷來看。

Kafka最初是被LinkedIn設計用來處理log的分布式消息系統,因此它的着眼點不在數據的安全性(log偶爾丟幾條無所謂),換句話說Kafka並不能完全保證數據不丟失。盡管Kafka官網聲稱能夠保證at-least-once,但如果consumer進程數小於partition_num,這個結論不一定成立。考慮這樣一個case,partiton_num=2,啟動一個consumer進程訂閱這個topic,對應的,stream_num設為2,也就是說啟兩個線程並行處理message。如果auto.commit.enable=true,當consumer fetch了一些數據但還沒有完全處理掉的時候,剛好到commit interval出發了提交offset操作,接着consumer crash掉了。這時已經fetch的數據還沒有處理完成但已經被commit掉,因此沒有機會再次被處理,數據丟失。如果auto.commit.enable=false,假設consumer的兩個fetcher各自拿了一條數據,並且由兩個線程同時處理,這時線程t1處理完partition1的數據,手動提交offset,這里需要着重說明的是,當手動執行commit的時候,實際上是對這個consumer進程所占有的所有partition進行commit,Kafka暫時還沒有提供更細粒度的commit方式,也就是說,即使t2沒有處理完partition2的數據,offset也被t1提交掉了。如果這時consumer crash掉,t2正在處理的這條數據就丟失了。如果希望能夠嚴格的不丟數據,解決辦法有兩個:

  1. 手動commit offset,並針對partition_num啟同樣數目的consumer進程,這樣就能保證一個consumer進程占有一個partition,commit offset的時候不會影響別的partition的offset。但這個方法比較局限,因為partition和consumer進程的數目必須嚴格對應。
  2. 另一個方法同樣需要手動commit offset,另外在consumer端再將所有fetch到的數據緩存到queue里,當把queue里所有的數據處理完之后,再批量提交offset,這樣就能保證只有處理完的數據才被commit。當然這只是基本思路,實際上操作起來不是這么簡單,具體做法以后我再另開一篇。

4.4 消息高可靠Consumer

  View Code

 

上面例子中我們將自動提交改為手動提交,如果取得消息后,因為某種原因沒有進行提交,那么消息仍然保持在Kafka中,可以重復拉取之前沒有確認的消息,保證消息不會丟失,但有可能重復處理相同的消息,消費者接收到重復消息后應該通過業務邏輯保證重復消息不會帶來額外影響,這就是Kafka所說的At-Least-Once。上面的這種讀取消息的方法是單線程的,除此之外還可以用多線程方法讀取消息,每個線程從指定的分區中讀取消息。
  View Code

 

我們還可以進一步讓消費者消費某個分區的消息。

復制代碼
    public static void main(String[] args) {
        Properties props = new Properties();
        //設置brokerServer(Kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //設置consumer group name
        props.put("group.id", "manual_g4");
        //設置自動提交偏移量(offset),由auto.commit.interval.ms控制提交頻率
        props.put("enable.auto.commit", "true");
        //偏移量(offset)提交頻率
        props.put("auto.commit.interval.ms", "1000");
        //設置使用最開始的offset偏移量為該group.id的最早。如果不設置,則會是latest即該topic最新一個消息的offset
        //如果采用latest,消費者只能得道其啟動后,生產者生產的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        TopicPartition partition0 = new TopicPartition("producer_test", 0);
        TopicPartition partition1 = new TopicPartition("producer_test", 1);
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.assign(Arrays.asList(partition0, partition1));
        while (true) {
              ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
              for (ConsumerRecord<String, String> record : records)
                  System.out.printf("offset = %d, key = %s, value = %s  \r\n", record.offset(), record.key(), record.value());
              try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
}
復制代碼
4.5 生產者、消費者總結
  1. 如果consumer比partition多,是浪費,因為Kafka的設計是在一個partition上是不允許並發的,所以consumer數不要大於partition數。
  2. 如果consumer比partition少,一個consumer會對應於多個partitions,這里主要合理分配consumer數和partition數,否則會導致partition里面的數據被取的不均勻。最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目。
  3. 如果consumer從多個partition讀到數據,不保證數據間的順序性,Kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不同。
  4. 增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發生變化
  5. High-level接口中獲取不到數據的時候是會block的。

五、 Kafka運維

5.1 Broker故障切換

我們在192.168.104.101、192.168.104.102兩台服務器上啟動Kafka組成一個集群,現在我們觀察一下topic t1的情況。

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1 Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs: Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

我們看到t1由兩個分區組成,分布於Leader 0、1兩個服務器上。下面我們運行消費者程序,同時運行生產者程序,我們向topic t1發送111、222、333、444、555、666的數據。

bash-4.1# ./Kafka-console-producer.sh --broker-list 192.168.104.101:9092, 192.168.104.102:9092 --topic t1 111 222 333 444 555 666

接下來我們觀察一下消費者接收消息的情況。

fetched from partition 0, offset: 3, message: fetched from partition 1, offset: 4, message: 111 fetched from partition 0, offset: 4, message: 222 fetched from partition 1, offset: 5, message: 333 fetched from partition 0, offset: 5, message: 444 fetched from partition 1, offset: 6, message: 555 fetched from partition 0, offset: 6, message: 666

可以看到消息被非常均勻的發送到兩個分區,消費者從兩個分區中拉取了消息。為了模擬故障,我們手工kill 101上的Kafka進程。這時我們在觀察Kafka的分區情況,對照之前的結果,我們發現兩個分區的Leader都變為1了,說明Kafka啟用了副本機制進行故障切換。

./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1 Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs:  Topic: t1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1  Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1

我們繼續向分區發送888, 999,消費者仍然能夠接收到發送的消息,而不受故障進度的影響。邏輯上看消費者只是讀取分區上的消息,與具體的服務器沒關系。

fetched from partition 1, offset: 7, message: 999 fetched from partition 0, offset: 7, message: 888

5.2 Broker動態擴容

5.2.1 增加分區

我們為已經創建的包含兩個分區的Topic在添加一個分區。

Kafka-topics.sh --zookeeper 192.168.104.101:2181 --alter --topic t1 --partitions 3

我們觀察一下增加分區后的結果:

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1 Topic:t1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0 Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: t1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

接下來,我們使用生產者程序發送數據,過了一段時間后發現生產者程序已經可以向新增分區寫入數據了。說明分區的增減對正在運行的應用程序(生產者、消費者)沒有影響, 生產者、消費者都不需要重新啟動。

5.2.2 增加Broker Server

待補充

5.3 Kafka配置優化

待補充

5.4 數據清理

5.4.1 數據刪除

消息被kafka存儲后,針對過期消息,可以通過設置策略(log.cleanup.policy=delete)進行刪除。除了在kafka中做默認設置外,也可以再 topic創建時指定參數,這樣將會覆蓋kafka的默認設置,觸發刪除動作有兩種條件:

  1. 清理超過指定時間消息,通過log.retention.hours設置過期時間。
  2. 清理大小超過預定設置消息,通過log.retention.bytes進行設置。

5.4.2 數據壓縮

kafka還可以進行數據壓縮,設置log.cleanup.policy=compact只保留每個key最后一個版本的數據。

5.5 Kafka運行監控

目前Kafka有三個常用的監控系統: Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor,這三個系統或多或少都有些問題,不是特別完善,推薦使用KafkaOffsetMonitor。

六、Kafka其他組件

6.1 Kafka Connect

Kafka 0.9+增加了一個新的特性 Kafka Connect ,可以更方便的創建和管理數據流管道。它為Kafka和其它系統創建規模可擴展的、可信賴的流數據提供了一個簡單的模型,通過 connectors可以將大數據從其它系統導入到Kafka中,也可以從Kafka中導出到其它系統。Kafka Connect可以將完整的數據庫注入到Kafka的Topic中,或者將服務器的系統監控指標注入到Kafka,然后像正常的Kafka流處理機制一樣進行數據流處理。而導出工作則是將數據從Kafka Topic中導出到其它數據存儲系統、查詢系統或者離線分析系統等,比如數據庫、 Elastic Search 、 Apache Ignite 等。

Kafka Connect特性包括:

  • Kafka connector通用框架,提供統一的集成API
  • 同時支持分布式模式和單機模式
  • REST 接口,用來查看和管理Kafka connectors
  • 自動化的offset管理,開發人員不必擔心錯誤處理的影響
  • 分布式、可擴展
  • 流/批處理集成

當前Kafka Connect支持兩種分發擔保:at least once (至少一次) 和 at most once(至多一次),exactly once將在未來支持,當前已有的Connectors包括:

Connector Name Owner Status
HDFS confluent-platform@googlegroups.com Confluentsupported
JDBC confluent-platform@googlegroups.com Confluentsupported
Debezium - CDC Sources debezium@gmail.com Community project
MongoDB Source a.patelli@reply.de a.topchyan@reply.de In progress
MQTT Source tomasz.pietrzak@evok.ly Community project
MySQL Binlog Source wushujames@gmail.com In progress
Twitter Source rollulus@xs4all.nl In progress
Cassandra Sink Cassandra Sink Community project
Elastic Search Sink ksenji@gmail.com Community project
Elastic Search Sink hannes.stockner@gmail.com In progress
Elastic Search Sink a.patelli@reply.de a.topchyan@reply.de In progress

我們來看一個使用Kafka Connect從一個文件讀取數據在傳輸到另一個文件的例子。

  • 首先在192.168.104.101、192.168.104.102兩台服務器上啟動Kafka。
  • 在192.168.104.102服務器的Kafka安裝目錄上,修改connect-standalone.properties文件:
bootstrap.servers=192.168.104.101:9092, 192.168.104.102:9092 key.converter=org.apache.Kafka.connect.storage.StringConverter value.converter=org.apache.Kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false 修改connect-file-source.properties文件: file=/root/data.txt topic=t1 修改connect-file-sink.properties文件: file=/root/output.txt topics=t1
  • 在192.168.104.102服務器上啟動Kafka-connect
    bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 向/root/data.txt中寫入數據,echo “Kafka connect”>> data.txt,可以觀察”Kafka connect”被寫入到/root/output.txt文件中。

6.2 Kafka Stream

Kafka Streams是一套類庫,嵌入到java應用程序中,它使得Apache Kafka可以擁有流處理的能力,通過使用Kafka Stream API進行業務邏輯處理最后寫回Kakfa或者其他系統中。Kafka Stream中有幾個重要的流處理概念:嚴格區分Event time和Process Time、支持窗口函數、應用狀態管理。開發者使用Kafka Stream的門檻非常低,比如單機進行一些小數據量的功能驗證而不需要在其他機器上啟動一些服務(比如在Storm運行Topology需要啟動Nimbus和Supervisor,當然也支持Local Mode),Kafka Stream的並發模型可以對單應用多實例進行負載均衡。有了Kafka Stream可以在很多場景下代替Storm、Spark Streaming減少技術復雜度。目前Kafka Stream仍然處於開發階段,不建議生產環境使用,所以期待正式版發布吧。

6.3 Kafka Camus

Camus是Linkedin開源的一個從Kafka到HDFS的數據管道,本質上上Camus是一個運行在Hadoop中的MapReduce程序,調用一些Camus提供的API從Kafka中讀取數據然后寫入HDFS。Camus2015年已經停止維護了,gobblin是后續產品,camus功能是是Gobblin的一個子集,通過執行MapReduce任務實現從Kafka讀取數據到HDFS,而gobblin是一個通用的數據提取框架,可以將各種來源的數據同步到HDFS上,包括數據庫、FTP、Kafka等。

七、 Kafka典型應用場景

Kafka作為一個消息中間件,最長應用的場景是將數據進行加工后從源系統移動到目的系統,也就是所謂的ETL過程,ETL是一個數據從源頭到目的地的移動過程,當然其中也伴隨數據清洗。通常數據源頭是應用程序所輸出的消息、日志、生產數據庫數據。應用程序輸出消息通常由應用程序主動控制寫入Kfaka的行為,而從日志、生產數據庫到Kfaka通常由第三方獨立應用處理。從日志到Kfaka典型的技術方案如ELK,從生產數據庫到Kafka通常可采用如下三種方式:

  • 通過時間戳方式記錄數據變更並寫入kafka,如使用kettle等ETL工具。
  • 通過觸發器方式記錄數據變更並寫入kafka,如使用kettle等ETL工具。
  • 通過數據庫特有特性記錄數變更並寫入kafka,如Oracle GoldenGate,MySQL Binlog,Postgre SQL Wal,MongoDB Oplog,CouchDB Changes Feed,值得一提的是PostgreSQL 9.4后的Bottled Water是一個非常好用的方案,將PostgreSQL數據同步到Kfaka中。

數據通過Kafka移動到Hadoop通常有如下方案:

  • Kafka -> Flume -> Hadoop Hdfs
  • Kafka -> Gobblin -> Hadoop Hdfs
  • Kafka -> Kafka Hadoop Loader -> Hadoop Hdfs
  • Kafka -> KaBoom -> Hadoop Hdfs
  • Kafka -> Kafka Connect -> Hadoop Hdfs
  • Kafka -> Storm\Spark Streaming -> Hadoop Hdfs

從目前看這些方法都是常用的成熟方案,很多技術也在被一線互聯網公司所使用,比如京東內部在使用Gobblin將數據從Kafka同步到Hdfs中,但從長遠看Kafka Connect則是最佳方案,畢竟是官方標准出品而且Kafka Connect還在快速的發展。

八、 參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM