這里面介紹一下kafka connect的一些使用。
kafka connect的使用
一、在config目錄下面復制一個file-srouce.properties並且修改內容
huhx@gohuhx:~/server/kafka_2.11-1.1.0/config$ cp connect-file-source.properties connect-file-source-test.properties
huhx@gohuhx:~/server/kafka_2.11-1.1.0/config$ cp connect-standalone.properties connect-standalone-test.properties
修改huhx目錄下面的connect-standalone-test.properties文件里面的內容如下:
key.converter.schemas.enable=false value.converter.schemas.enable=false
connect-file-source-test.properties的內容如下:
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/home/huhx/Documents/linux.txt topic=connect-linux transforms=MakeMap, InsertSource transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value transforms.MakeMap.field=line transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value transforms.InsertSource.static.field=data_source transforms.InsertSource.static.value=test-file-source
指定了topic為connect-test,指定了讀取的文件為/home/huhx/Documents/linux.txt。其中linux.txt的內容如下
I love you. my name is huhx. code for me?
二、運行connect-standalone.sh命令,可以將對huhx.txt文件處理之后的內容發布到connect-test的topic上
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/connect-standalone.sh config/connect-standalone-test.properties config/connect-file-source-test.properties
運行之后,我們在connect-test主題里面可以看到如下的輸出:
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-linux --from-beginning {"line":"I love you.","data_source":"test-file-source"} {"line":"my name is huhx.","data_source":"test-file-source"} {"line":"code for me?","data_source":"test-file-source"}
關於上述配置的transforms可以參考下述的文檔https://kafka.apache.org/documentation/#connect
友情鏈接