ElasticSearch導入txt文本或者json文本


前段時間做的東西,閑下來做一下整理記錄。


 業務:將數據從本地恢復到ES上,本地文件較大,解壓后數據量在10個G左右的數據。


 邏輯處理:針對業務需求,共嘗試過三種次實踐。

  一、使用bulk:ES本地支持的批量導入方式,推薦文本大小在10-15M左右,文件的上限應該是不能超過200M(不確定)。

  二、使用logstash:ES官方的另一個產品,將數據文本轉換為ES的數據源。

  三、使用Java:springData-ES的java方式。第三種方式使用線程池+緩存隊列+springData對Es的封裝邏輯,晚點另更


一、使用bulk(win7+es6.6.1+json文本)

1.准備正確的json數據格式

es對於json文本的格式要求是很嚴格的,合理的json數據格式如下:

{"index":"demo","id":0}
{"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包過濾日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=網關產品線 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允許 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包過濾日志\"","stauts":"success","failMsg":null}
{"index":"demo","id":1}
{"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包過濾日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=網關產品線 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允許 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包過濾日志\"","stauts":"success","failMsg":null}
{"index":"demo","id":2}
{"id":null,"dev_id":"1","rcv_time":1557303257,"date":null,"dname":null,"logtype":"1","pri":null,"mod":"pf","sa":null,"sport":null,"ttype":null,"da":null,"dport":null,"code":null,"proto":null,"policy":null,"duration":"0","rcvd":null,"sent":null,"fwlog":null,"dsp_msg":"包過濾日志","failmsg":null,"custom":null,"smac":null,"dmac":null,"type":null,"in_traffic":"52","out_traffic":"52","gen_time":"1557303257","src_ip":"710191296","dest_ip":"896426877","src_port":"51411","dest_port":"443","protocol_id":"1","action_id":"2","filter_policy_id":"0","sat_ip":"0","sat_port":"0","i_ip":"0","i_port":"0","insert_time":"0","p_ip":"0","p_port":"0","rulename_id":"3","min_id":"25955054","svm":null,"dvm":null,"repeat_num":null,"event_type_id":216001001,"event_level_id":1,"org_log":"devid=2 date=\"2019/05/08 16:14:17\" dname=venus logtype=1 pri=5 ver=0.3.0 rule_name=網關產品線 mod=pf sa=192.168.84.42 sport=51411 type=NULL da=125.99.110.53 dport=443 code=NULL proto=IPPROTO_TCP policy=允許 duration=0 rcvd=52 sent=52 fwlog=0 dsp_msg=\"包過濾日志\"","stauts":"success","failMsg":null}

官方所要求標准的json格式就是如上

2.cmd運行(如果使用curl異常可百度下載curl插件)

curl -H "Content-Type:appliaction/json"  -XPOST localhost:9200/index/mapping/_bulk --data-binary @xxx.json

需注意:cmd突突突的滾動起來就是成功了!


 二、使用logstash 

1.安裝logstash(官網下載即可)

2.進入logstash中bin目錄下,創建logstash_def.conf文件(提供啟動logstash啟動時加載的配置文件)

3.文件如下:

input{
	file{
		path => "D:/log/packet.json" 
		type => "log"
		
		start_position => "beginning"
		codec => json{  
		charset => "UTF-8"     
		}
	}
}

output{
	elasticsearch{
		hosts => "http://127.0.0.1:9200"    
		index => "venus"				
		document_type => "log_packet"		
	}
}

4.cmd進入logstash下bin目錄(ES已經啟動的前提)

命令:logstash -f logstash_def.conf

需注意:不成功會拋錯,不然會一直在加載,查看狀態可以使用head插件查看數據增加情況

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM