Hive表數據同步到es


1.首先服務器節點,進入到對應的數據庫。
2. 然后找到要同步的表,show create table + 表名查看一下
或者自己可以新建一個表,用來測試原表,如下

 

CREATE TABLE `wb_tmp`(                                         
`surface` string,
`radiation` string, 
`loader_id` string)                                                                  
 ROW FORMAT DELIMITED                                                                
FIELDS TERMINATED BY ','                                                                
STORED AS INPUTFORMAT                                                                   
   'org.apache.hadoop.mapred.TextInputFormat'                             
 OUTPUTFORMAT                                                            
   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'                           
 LOCATION                                                                                  
   'hdfs://ffcs/user/projectquene001/publictest/' 
 TBLPROPERTIES (                                                                      
   'transient_lastDdlTime'='154762891');

其中 hdfs 地址可以通過當前數據庫其他表結構獲取hdfs路徑。

如果新建的表沒有數據可以采用2種方式加載數據

 load data local inpath '/projectquene001/wb.txt' into table projectquene001.wb_tmp;  

這種加載本地文件數據到hive表中,在beeline中識別不到本地路徑,可能是beeline的sever多台,所以識別不到,只能用hdfs系統導入,如下

 load data inpath '/user/projectquene001/publictest/wb.txt' into table wb_tmp; 

 可以本地文件上傳至hdfs系統,用 hdfs dfs -put /home/projectquene001/wb.txt /user/projectquene001/publictest

3.先設置引擎同步方式為mr

 set hive.execution.engine=mr; 

4. 建es關聯外表之前,先加載es-hadoop接口包,在hive數據庫中執行

add jar hdfs://ffcs/user/feilongv3/public/elasticsearch-hadoop-6.3.2.jar

 ps: 添加jar包 只對當前會話有效,jar包路徑可以自己用hdfs - put 命令上傳

5.然后我們開始建外表關聯es

create external table ES_WB(
surface string,
radiation string,
loader_id string)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'es_mytest/es_mytest',
'es.nodes'='192.168.12.141',
'es.port'='9200',
'es.index.auto.create' = 'true',
'es.index.refresh_interval' = '-1',
'es.index.number_of_replicas' = '0',
'es.batch.write.retry.count' = '6',
'es.batch.write.retry.wait' = '60s');

首先 es.resource  xx/yy , 其中xx是索引名稱,yy是類型。

 

注意: es.resource xx/yy 索引名(xx)不能為大寫, 因為用大寫發現同步的時候報錯,如下

所以用小寫比較安全。

 

 6.然后開始同步

 insert overwrite table es_wb select surface,radiation,loader_id from wb_tmp; 

7.最后結果成功

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM