【原創】大數據基礎之Flume(2)應用之kafka-kudu


應用一:kafka數據同步到kudu

 

1 准備kafka topic

# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -create --topic test_sync --partitions 2 --replication-factor 2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "test_sync".
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -describe --topic test_sync
Topic:test_sync PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: test_sync        Partition: 0    Leader: 112     Replicas: 112,111       Isr: 112,111
        Topic: test_sync        Partition: 1    Leader: 110     Replicas: 110,112       Isr: 110,112

2 准備kudu表

impala-shell

CREATE TABLE test.test_sync (
id int,
name string,
description string,
create_time timestamp,
update_time timestamp,
primary key (id)
)
PARTITION BY HASH (id) PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='$kudu_master:7051');

3 准備flume kudu支持

3.1 下載jar

# wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-flume-sink/1.7.0-cdh5.16.1/kudu-flume-sink-1.7.0-cdh5.16.1.jar
# mv kudu-flume-sink-1.7.0-cdh5.16.1.jar $FLUME_HOME/lib/

# wget http://central.maven.org/maven2/org/json/json/20160810/json-20160810.jar
# mv json-20160810.jar $FLUME_HOME/lib/

3.2 開發

代碼庫:https://github.com/apache/kudu/tree/master/java/kudu-flume-sink

kudu-flume-sink默認使用的producer是

org.apache.kudu.flume.sink.SimpleKuduOperationsProducer

  public List<Operation> getOperations(Event event) throws FlumeException {
    try {
      Insert insert = table.newInsert();
      PartialRow row = insert.getRow();
      row.addBinary(payloadColumn, event.getBody());

      return Collections.singletonList((Operation) insert);
    } catch (Exception e) {
      throw new FlumeException("Failed to create Kudu Insert object", e);
    }
  }

是將消息直接存放到一個payload列中

 

如果想要支持json格式數據,需要二次開發

package com.cloudera.kudu;
public class JsonKuduOperationsProducer implements KuduOperationsProducer {

 

代碼詳見:https://www.cnblogs.com/barneywill/p/10573221.html

打包放到$FLUME_HOME/lib下

4 准備flume conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.0.1:9092
a1.sources.r1.kafka.topics = test_sync
a1.sources.r1.kafka.consumer.group.id = flume-consumer

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink
a1.sinks.k1.producer = com.cloudera.kudu.JsonKuduOperationsProducer
a1.sinks.k1.masterAddresses = 192.168.0.1:7051
a1.sinks.k1.tableName = impala::test.test_sync
a1.sinks.k1.batchSize = 50

5 啟動flume

bin/flume-ng agent --conf conf --conf-file conf/order.properties --name a1

6 kudu確認

impala-shell

select * from test_sync limit 10;

 

參考:https://kudu.apache.org/2016/08/31/intro-flume-kudu-sink.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM