通過hive向寫elasticsearch的寫如數據


通過hive向寫elasticsearch的寫如數據

hive 和 elasticsearch 的整合可以參考官方的文檔:
ES-hadoop的hive整合 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/hive.html#hive
ES-hadoop的配置說明 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

1. 新建一個hive表es_goods_order

將該hive表的數據存儲指定到ES上,指定索引的ID列是goods_order_id('es.mapping.id' = 'goods_order_id',);
指定數據寫入的方式是upsert('es.write.operation'='upsert'),如果id不存在就插入,如果存在就執行更新操作。

add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
set username=fxin.zhao
use temp;
CREATE EXTERNAL TABLE es_goods_order(
	goods_order_id string, 
	sale_place string,
	station_place string,
	multi_channel_id string,
	business_date string,
	discount  string,
	discount_type string,
	payment_amouunt string,
	refun_amount string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'test_crm/es_goods_order', 
'es.nodes'='10.10.110.125',
'es.port'='9200',
'es.mapping.id' = 'goods_order_id',
'es.write.operation'='upsert'
);

向es_goods_order表中插入數據:3分鍾啟用1個maper寫入80萬數據。Es中的index是在導入數據的時候檢查的,如果不存在,則會創建。

add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
use temp;
insert into table es_goods_order 
select goods_order_id,
       sale_place,
       station_place,
       multi_channel_id,
       business_date,
       discount,
       discount_type,
       payment_amouunt,
       refun_amount
  from ods.goods_order
 where dt >= '2016-10-01'
   and dt <= '2016-10-04';
  • 驗證upsert功能是否有效:再重新寫入部分相同的數據。
insert into table es_goods_order
select goods_order_id,
       sale_place,
       station_place,
       multi_channel_id,
       business_date,
       discount,
       discount_type,
       payment_amouunt,
       refun_amount
  from ods.goods_order
 where dt = '2016-10-01'
 limit 1000;

結論:
指定ID問題: 通過'es.mapping.id' = 'goods_order_id' 指定id。
數據更新問題: 通過'es.write.operation'='upsert' 來執行插入或者更新操作(如果id存在)。

  • ES 的hive表基於json存儲。

hadoop fs -put 20170111202237 /tmp/fuxin.zhao/es_json

add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
use temp;
##創建一個臨時表
CREATE EXTERNAL TABLE es_json_tmp (
    json    STRING
 );
##給臨時表添加數據
load data  inpath '/tmp/fuxin.zhao/es_json/20170116185548' into table es_json_tmp;


drop table es_json;
##創建json格式的hive表
CREATE EXTERNAL TABLE es_json (
    json    STRING
 )
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'test_crm/es_json', 
'es.nodes'='10.10.110.125',
'es.port'='9200',
'es.input.json' = 'yes',
'es.mapping.id' = 'uid'
);

##執行插入數據操作
insert into table es_json
select json
  from es_json_tmp;

報出如下錯誤:

Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [10.10.110.125:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..

原因是json文檔中的_id 字段是ES的元數據字段,屬於ES的關鍵字,解決方法:
vi 20170116185548
將文檔中的_id 替換成uid。
1,$s/_id/uid/g


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM