前段時間做的東西,閑下來做一下整理記錄。
業務:將數據從本地恢復到ES上,本地文件較大,解壓后數據量在10個G左右的數據。
邏輯處理:針對業務需求,共嘗試過三種次實踐。
一、使用bulk:ES本地支持的批量導入方式,推薦文本大小在10-15M左右,文件的上限應該是不能超過200M(不確定)。
二、使用logstash:ES官方的另一個產品,將數據文本轉換為ES的數據源。
三、使用Java:springData-ES的java方式。第三種方式使用線程池+緩存隊列+springData對Es的封裝邏輯,晚點另更
一、使用bulk(win7+es6.6.1+json文本)
1.准備正確的json數據格式
es對於json文本的格式要求是很嚴格的,合理的json數據格式如下:
{"index":"demo","id":0} {"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包過濾日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=網關產品線 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允許 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包過濾日志\"","stauts":"success","failMsg":null} {"index":"demo","id":1} {"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包過濾日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=網關產品線 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允許 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包過濾日志\"","stauts":"success","failMsg":null} {"index":"demo","id":2} {"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包過濾日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=網關產品線 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允許 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包過濾日志\"","stauts":"success","failMsg":null}
官方所要求標准的json格式就是如上
2.cmd運行(如果使用curl異常可百度下載curl插件)
curl -H "Content-Type:appliaction/json" -XPOST localhost:9200/index/mapping/_bulk --data-binary @xxx.json
需注意:cmd突突突的滾動起來就是成功了!
二、使用logstash
1.安裝logstash(官網下載即可)
2.進入logstash中bin目錄下,創建logstash_def.conf文件(提供啟動logstash啟動時加載的配置文件)
3.文件如下:
input{ file{ path => "D:/log/packet.json" type => "log" start_position => "beginning" codec => json{ charset => "UTF-8" } } } output{ elasticsearch{ hosts => "http://127.0.0.1:9200" index => "venus" document_type => "log_packet" } }
4.cmd進入logstash下bin目錄(ES已經啟動的前提)
命令:logstash -f logstash_def.conf
需注意:不成功會拋錯,不然會一直在加載,查看狀態可以使用head插件查看數據增加情況