flume(1.9.0)把數據導入hive(文件方式)


1. 配置表支持事務

  • (1)改配置文件hive-site.xml 或者 臨時設置參數 命令行
<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>
<property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
</property>
<property>
    <name>hive.compactor.worker.threads</name>
    <value>1</value>
    <!--這里的線程數必須大於0 :理想狀態和分桶數一致-->
</property>
<property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
</property>
  • (2)建表時 分區 分桶 stored as orc tblproperties('transactional'='true')

2. 版本問題導jar包

把${HIVE_HOME}/hcatalog/share/hcatalog下的所有包,拷貝入${FLUME_HOME}/lib

3. copyhive文件(這步好像不必要的??)

將hive.xml和hive-env.sh放到${HIVE_HOME}/conf下

4. 修改hdfs目錄權限(這步不知道是不是必要的)

hadoop fs -chmod 777/tmp/hive chmod 777 /tmp/hive

5.建表

正確的建表實例

create table flume_hive.flume_hive(nid int,name string,phone string)
partitioned by(time string)
clustered by(nid) into 3 buckets
row format delimited fields terminated by ','
stored as orc tblproperties('transactional'='true');

6.配置flume的配置文件

配置文件flume_hive.cnof

#定義agent名, source、channel、sink的名稱
agent3.sources = source3
agent3.channels = channel3
agent3.sinks = sink3
#具體定義source
agent3.sources.source3.type = spooldir
agent3.sources.source3.spoolDir = /soft/flume/logstohive
agent3.sources.source3.fileHeader=false
#定義攔截器,為消息添加時間戳
agent3.sources.source3.interceptors = i1
agent3.sources.source3.interceptors.i1.type=timestamp

#設置channel類型為磁盤
agent3.channels.channel3.type = file
#file channle checkpoint文件的路徑
agent3.channels.channel3.checkpointDir=/soft/flume/tmp/point
# file channel data文件的路徑
agent3.channels.channel3.dataDirs=/soft/flume/tmp

#具體定義sink
agent3.sinks.sink3.type = hive
agent3.sinks.sink3.hive.metastore = thrift://hadoop1:9083
agent3.sinks.sink3.hive.database = flume_hive
agent3.sinks.sink3.hive.table = flume_hive
agent3.sinks.sink3.hive.partition = %y-%m-%d-%H-%M
agent3.sinks.sink3.useLocalTimeStamp = false
agent3.sinks.sink3.round = true
agent3.sinks.sink3.roundValue = 10
agent3.sinks.sink3.roundUnit = minute
agent3.sinks.sink3.serializer = DELIMITED
agent3.sinks.sink3.serializer.delimiter = ","
agent3.sinks.sink3.serializer.serdeSeparator = ','
agent3.sinks.sink3.serializer.fieldnames = nid,name,phone
agent3.sinks.sink3.batchSize = 90

#組裝source、channel、sink
agent3.sources.source3.channels = channel3
agent3.sinks.sink3.channel = channel3

7.啟動

  • 先啟動hive hive hive --service metastore -p 9083(這個端口號要配置到flume文件中,可用netstat -tulpn | grep 9083查看端口是否監聽)
  • 然后啟動flume
  • 拷貝文件

數據內容

[root@hadoop1 flume]# cat flume_hive.csv 
1001,aaa,12312453359,
1002,bbb,12678723873,
1003,ccc,12736732989,
1004,ddd,12327836839,
1005,eee,23728179728,
1006,fff,12387623878,
[root@hadoop1 flume]# cat flume_hive1.csv 
1007,aaa,12312453359,
1008,bbb,12678723873,
1009,ccc,12736732989,
1010,ddd,12327836839,
1011,eee,23728179728,
1012,fff,12387623878,


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM