1. 配置表支持事務
- (1)改配置文件hive-site.xml 或者 臨時設置參數 命令行
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
<!--這里的線程數必須大於0 :理想狀態和分桶數一致-->
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
- (2)建表時 分區 分桶 stored as orc tblproperties('transactional'='true')
2. 版本問題導jar包
把${HIVE_HOME}/hcatalog/share/hcatalog下的所有包,拷貝入${FLUME_HOME}/lib
3. copyhive文件(這步好像不必要的??)
將hive.xml和hive-env.sh放到${HIVE_HOME}/conf下
4. 修改hdfs目錄權限(這步不知道是不是必要的)
hadoop fs -chmod 777/tmp/hive
chmod 777 /tmp/hive
5.建表
正確的建表實例
create table flume_hive.flume_hive(nid int,name string,phone string)
partitioned by(time string)
clustered by(nid) into 3 buckets
row format delimited fields terminated by ','
stored as orc tblproperties('transactional'='true');
6.配置flume的配置文件
配置文件flume_hive.cnof
#定義agent名, source、channel、sink的名稱
agent3.sources = source3
agent3.channels = channel3
agent3.sinks = sink3
#具體定義source
agent3.sources.source3.type = spooldir
agent3.sources.source3.spoolDir = /soft/flume/logstohive
agent3.sources.source3.fileHeader=false
#定義攔截器,為消息添加時間戳
agent3.sources.source3.interceptors = i1
agent3.sources.source3.interceptors.i1.type=timestamp
#設置channel類型為磁盤
agent3.channels.channel3.type = file
#file channle checkpoint文件的路徑
agent3.channels.channel3.checkpointDir=/soft/flume/tmp/point
# file channel data文件的路徑
agent3.channels.channel3.dataDirs=/soft/flume/tmp
#具體定義sink
agent3.sinks.sink3.type = hive
agent3.sinks.sink3.hive.metastore = thrift://hadoop1:9083
agent3.sinks.sink3.hive.database = flume_hive
agent3.sinks.sink3.hive.table = flume_hive
agent3.sinks.sink3.hive.partition = %y-%m-%d-%H-%M
agent3.sinks.sink3.useLocalTimeStamp = false
agent3.sinks.sink3.round = true
agent3.sinks.sink3.roundValue = 10
agent3.sinks.sink3.roundUnit = minute
agent3.sinks.sink3.serializer = DELIMITED
agent3.sinks.sink3.serializer.delimiter = ","
agent3.sinks.sink3.serializer.serdeSeparator = ','
agent3.sinks.sink3.serializer.fieldnames = nid,name,phone
agent3.sinks.sink3.batchSize = 90
#組裝source、channel、sink
agent3.sources.source3.channels = channel3
agent3.sinks.sink3.channel = channel3
7.啟動
- 先啟動hive
hive
hive --service metastore -p 9083
(這個端口號要配置到flume文件中,可用netstat -tulpn | grep 9083
查看端口是否監聽) - 然后啟動flume
- 拷貝文件
數據內容
[root@hadoop1 flume]# cat flume_hive.csv
1001,aaa,12312453359,
1002,bbb,12678723873,
1003,ccc,12736732989,
1004,ddd,12327836839,
1005,eee,23728179728,
1006,fff,12387623878,
[root@hadoop1 flume]# cat flume_hive1.csv
1007,aaa,12312453359,
1008,bbb,12678723873,
1009,ccc,12736732989,
1010,ddd,12327836839,
1011,eee,23728179728,
1012,fff,12387623878,