Kafka Connect HDFS


概述

Kafka 的數據如何傳輸到HDFS?如果仔細思考,會發現這個問題並不簡單。

不妨先想一下這兩個問題?

1)為什么要將Kafka的數據傳輸到HDFS上?

2)為什么不直接寫HDFS而要通過Kafka?

HDFS一直以來是為離線數據的存儲和計算設計的,因此對實時事件數據的寫入並不友好,而Kafka生來就是為實時數據設計的,但是數據在Kafka上無法使用離線計算框架來作批量離線分析。

那么,Kafka為什么就不能支持批量離線分析呢?想象我們將Kafka的數據按天拆分topic,並建足夠多的分區,然后通過Spark-Streaming,Flink,又或者是KSql等來處理單個topic中的所有數據--這就相當於處理某一天的所有數據。這種計算的性能從原理上來說是不比Spark或者Hive離線計算差的。

而且更好的是,這樣我們就不用將kafka中的數據翻來覆去的導到hdfs,而是直接在kafka上作計算。

后面我們將對此展開更多的討論,這里先回歸正題,在常見的大數據系統架構(lambda)中,通常會將kafka中的數據導入到HDFS來作離線的數據分析。在Kafka的官方wiki中提到了這樣的一些方式來對接Hadoop生態。

https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

image

其中最常用的是Flume,尤其是在CDH集群中,能夠很方便的集成Flume和Kafka。

而HortonWorks在其3.0之后的HDP版本中去掉了Flume,原因是想將Flume放到HDF(HortonWorks Data Flow)中,這個做法還是比較失策的,雖然成全了HDF,但卻讓HDP失去了其完整性。

本案例中使用Ambari 2.7.4+HDP3.1 由於缺少了Flume組件,因此使用Kafka Connect HDFS來連接Hadoop。

下面記錄了連接過程。以下操作的基礎是,有一個搭建好的Ambari集群,並安裝了Kafka+HDFS。

參考安裝文檔:

https://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/index.html

項目github地址:

https://github.com/confluentinc/kafka-connect-hdfs


一.下載軟件包

[work@node2 ~]$ wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
[work@node2 ~]$ unzip confluent-3.0.0-2.11.zip


二.快速體驗Kafka-Connect

下面的例子其實不需要下載Confluent,是Kafka2.0中自帶的FileSource和FileSink,而Confluent中也包含了這些功能,如果需要用到Kafka Connect HDFS,就需要Confluent了,這里只是用最簡單的例子快速了解Kafka-Connect的用法。

2.1 在主目錄下寫test.txt文件,內容如下

[work@node2 confluent-3.0.0]$ ls
bin  etc  README.archive  share  src  test.txt
[work@node2 confluent-3.0.0]$ cat test.txt
foo
bar
New Record
New Record


2.2 修改etc/kafka/connect-standalone.properties

[work@node2 confluent-3.0.0]$ vi etc/kafka/connect-standalone.properties

image

Ambari的kafka端口不是9092,而是6667。

Connector的rest.port默認是8083,和Ambari中安裝的Druid的端口有沖突,所以改成8822。

2.3 運行命令

[work@node2 confluent-3.0.0]$ ./bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka/connect-file-source.properties etc/kafka/connect-file-sink.properties


2.4 生成sink文件

[work@node2 confluent-3.0.0]$ ls
bin  etc  logs  README.archive  share  src  test.sink.txt  test.txt
[work@node2 confluent-3.0.0]$ cat test.sink.txt
foo
bar
New Record
New Record

嘗試新加一行數據

[work@node2 confluent-3.0.0]$ echo "Hello World" >> test.txt
[work@node2 confluent-3.0.0]$ cat test.sink.txt
foo
bar
New Record
New Record
Hello World


2.5 分析 etc/kafka/connect-file-source.properties 和 etc/kafka/connecfile-sink.properties

etc/kafka/connect-file-source.properties 如下

image

etc/kafka/connecfile-sink.properties 如下

image

通過Kafka Console Consumer查看 connect-test topic

[work@node2 confluent-3.0.0]$ ./bin/kafka-console-consumer  --bootstrap-server node1:6667 --topic connect-test --from-beginning --new-consumer
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"New Record"}
{"schema":{"type":"string","optional":false},"payload":"New Record"}
{"schema":{"type":"string","optional":false},"payload":"Hello World"}


2.6 Converter

從上一節中可以看到一行行json格式的數據,其中payload是原始數據。在這里connect-test這個topic有點類似於flume中的channel的角色,用來連接source和sink緩存中間數據。

當數據量非常大的情況下,這種額外的處理會造成性能和空間的浪費。

[work@node2 confluent-3.0.0]$ vi etc/kafka/connect-standalone.properties

image

修改connect的配置,數據在傳遞過程中將不再作任何處理。StringConverter源碼傳送門:

https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


三.通過Kafka將數據寫入到HDFS

image

https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/string/StringFormat.java

如果不使用Avro格式來存儲和處理數據,那么這里要加一個配置

format.class=io.confluent.connect.hdfs.string.StringFormat

但是比較遺憾的是Confluent3.0.0的版本中不包含這個類。因此這里我使用了confluent-5.3.1的版本,然后再通過如下命令安裝kafka-connect-hdfs

confluent-hub install confluentinc/kafka-connect-hdfs:latest

啟動

bin/connect-standalone etc/kafka/connect-standalone.properties  share/confluent-hub-components/confluentinc-kafka-connect-hdfs/etc/quickstart-hdfs.properties

所有寫入到test_hdfs這個topic中的數據都會寫入到hdfs中。

[work@node2 confluent-5.3.1]$ bin/kafka-console-producer --broker-list node1:6667 --topic test_hdfs
>123
>456
>789
>13
>213w
>asd
>

查看hdfs中的結果

[work@node2 ~]$ hadoop fs -ls /topics/test_hdfs/partition=0
Found 2 items
-rw-r--r--   3 work work         12 2019-11-08 10:18 /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.txt
-rw-r--r--   3 work work         12 2019-11-08 10:20 /topics/test_hdfs/partition=0/test_hdfs+0+0000000003+0000000005.txt
[work@node2 ~]$ hadoop fs -cat /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.txt
123
123
456

Connect HDFS完畢。


三.總結

優勢:

1.操作簡單,部署方便。

2.可以直接和hive的元數據集成自動生成分區。

缺點:

1.支持的數據格式少,avro在國內並不流行。

2.一個致命缺陷,不支持壓縮!!不知道是confluent的疏忽還是有特地的考慮?因為不支持壓縮,使用這個組件會浪費80%的存儲空間,無實用性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM