通過hive向寫elasticsearch的寫如數據
hive 和 elasticsearch 的整合可以參考官方的文檔:
ES-hadoop的hive整合 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/hive.html#hive
ES-hadoop的配置說明 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
1. 新建一個hive表es_goods_order
將該hive表的數據存儲指定到ES上,指定索引的ID列是goods_order_id('es.mapping.id' = 'goods_order_id',);
指定數據寫入的方式是upsert('es.write.operation'='upsert'),如果id不存在就插入,如果存在就執行更新操作。
add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
set username=fxin.zhao
use temp;
CREATE EXTERNAL TABLE es_goods_order(
goods_order_id string,
sale_place string,
station_place string,
multi_channel_id string,
business_date string,
discount string,
discount_type string,
payment_amouunt string,
refun_amount string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'test_crm/es_goods_order',
'es.nodes'='10.10.110.125',
'es.port'='9200',
'es.mapping.id' = 'goods_order_id',
'es.write.operation'='upsert'
);
向es_goods_order表中插入數據:3分鍾啟用1個maper寫入80萬數據。Es中的index是在導入數據的時候檢查的,如果不存在,則會創建。
add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
use temp;
insert into table es_goods_order
select goods_order_id,
sale_place,
station_place,
multi_channel_id,
business_date,
discount,
discount_type,
payment_amouunt,
refun_amount
from ods.goods_order
where dt >= '2016-10-01'
and dt <= '2016-10-04';
- 驗證upsert功能是否有效:再重新寫入部分相同的數據。
insert into table es_goods_order
select goods_order_id,
sale_place,
station_place,
multi_channel_id,
business_date,
discount,
discount_type,
payment_amouunt,
refun_amount
from ods.goods_order
where dt = '2016-10-01'
limit 1000;
結論:
指定ID問題: 通過'es.mapping.id' = 'goods_order_id' 指定id。
數據更新問題: 通過'es.write.operation'='upsert' 來執行插入或者更新操作(如果id存在)。
- ES 的hive表基於json存儲。
hadoop fs -put 20170111202237 /tmp/fuxin.zhao/es_json
add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
use temp;
##創建一個臨時表
CREATE EXTERNAL TABLE es_json_tmp (
json STRING
);
##給臨時表添加數據
load data inpath '/tmp/fuxin.zhao/es_json/20170116185548' into table es_json_tmp;
drop table es_json;
##創建json格式的hive表
CREATE EXTERNAL TABLE es_json (
json STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'test_crm/es_json',
'es.nodes'='10.10.110.125',
'es.port'='9200',
'es.input.json' = 'yes',
'es.mapping.id' = 'uid'
);
##執行插入數據操作
insert into table es_json
select json
from es_json_tmp;
報出如下錯誤:
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [10.10.110.125:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
原因是json文檔中的_id 字段是ES的元數據字段,屬於ES的關鍵字,解決方法:
vi 20170116185548
將文檔中的_id 替換成uid。
1,$s/_id/uid/g