前言
elasticsearch官方並沒有提供合適的備份工具,然而生產場景中備份卻是的確需要的。
本文介紹了使用自己寫的php腳本以及第三方工具來進行索引的備份,恢復以及刪除等操作。
全量備份
elasticdump --input http://127.0.0.1:9200/logstash-postback --output logstash-postback_2017.01.17.json --limit 1000
elasticdump安裝方法在下面
恢復
命令例子
elasticdump --input logstash-postback_2017.01.14.json --output http://127.0.0.1:9200/logstash-postback --limit 1000
詳解
elasticdump命令需要單獨安裝。
--input 是指定輸入,可指定文件,或需要備份的es集群。
--output是指定輸出,可指定文件,或需要備份的es集群。
logstash-postback_2017.01.14.json 是指定的已備份的文件。
http://127.0.0.1:9200/logstash-postback 是指定的要恢復數據的elastictic集群,IP加端口加索引名。注意,索引名是不帶時間的。
--limit 1000 一次導入一千條數據,加快進度。
elasticdump命令安裝
yum install npm
npm install elasticdump -g
命令安裝完畢,可以測試。
可能會報出nodejs的版本之類的錯誤,你需要升級一下版本。
npm install -g n
n stable
至此可以使用。
刪除索引
php delete.php --index http://127.0.0.1:9200/<index> --start 2017-01-01 --end 2017-01-02
--start 選擇刪除這個索引中指定日期的內容
--end 不傳則默認刪除一天,也就是start的那天
<index> 要刪除的索引名
cat delete.php
<?php date_default_timezone_set('Asia/Shanghai'); $longopts = array( 'index:', 'query:', 'start:', 'end:', ); $options = getopt('a', $longopts); if(!isset($options['index'])) { fwrite(STDERR, "index必須設置索引地址\n"); exit(1); } $components = parse_url($options['index']); if(!isset($components['path'])) { fwrite(STDERR, 'index不可為空'); exit(1); } $host = "{$components['scheme']}://{$components['host']}:{$components['port']}"; $index = basename($components['path']); $query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}'; if(isset($options['start'])) { $start_time = strtotime($options['start']); $start = date('Y-m-d', $start_time).'T00:00:00+0800'; if(isset($options['end'])) { $end_time = strtotime($options['end']); $end = date('Y-m-d', $end_time).'T00:00:00+0800'; } else { $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800'; } $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date'; $query = '{"size":1000,"_source":false,"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}'; } $scroll_id = null; $retry = 0; $num = 0; while(true) { if(is_null($scroll_id)) { $result = post("{$host}/{$index}/_search?scroll=2m", $query); } else { $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}"); } $json = json_decode($result, true); if(!isset($json['_scroll_id'])) { fwrite(STDERR, "查詢失敗:索引-{$index} 起始-{$start} 截止-{$end}\n"); sleep(5); $retry++; if($retry>10) { exit(4); } } $scroll_id = $json['_scroll_id']; $bulk = []; foreach($json['hits']['hits'] as $row) { unset($row['_score']); $bulk[] = json_encode(['delete'=>$row]); $num++; } if(count($json['hits']['hits'])==0) { break; } $check = post("{$host}/_bulk", implode("\n", $bulk)."\n"); fwrite(STDOUT, "{$num}\n"); usleep(100000); } echo "deleted:{$num}\n"; function get($url) { $handle = curl_init(); //curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); //curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); } function post($url, $data) { $handle = curl_init(); curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); }
索引增量備份
elasticdump增量備份有BUG,無規律的丟數據。
於是我們自己寫了腳本來進行增量備份。
備份腳本分為兩個部分,PHP腳本溝通elasticsearch來進行數據讀取與備份。SHELL腳本來寫作傳參與PHP腳本
SHELL腳本如下
#!/bin/bash #如果有參數,則第一個參數為備份日期,否則默認備份昨天數據 if [ -z "$1" ];then start=$(date +%Y-%m-%d --date '-1 day 00:00:00') end=$(date +%Y-%m-%d --date 'today 00:00:00') else start=$(date +%Y-%m-%d --date $1) end=$(date +%Y-%m-%d --date "$1 +1 day 00:00:00") fi #如果是每月1號則加一天,解決時區導致的數據跨索引問題 if [ $(date +%d --date $end) -eq "01" ] then end=$(date +%Y-%m-%d --date "$end +1 day 00:00:00") fi php esdump.php --input=http://127.0.0.1:9200/logstash-event-$(date +%Y.%m --date $start) --start $start --end $end 2>> backup_error_$start.log | gzip > event/logstash-event-$(date +%Y.%m_%d --date $start).json.gz 2>> backup_error_$start.log &
注:代碼第18行的logstash-event替換為你要備份的索引名
PHP腳本如下(無需修改)
<?php date_default_timezone_set('Asia/Shanghai'); $longopts = array( 'input:', 'output:', 'query:', 'start:', 'end:', ); $options = getopt('a', $longopts); if(!isset($options['input'])) { fwrite(STDERR, "input必須設置索引地址\n"); exit(1); } $check = get($options['input'].'/_count'); if($check===false) { fwrite(STDERR, "input索引地址無效:{$options['input']}\n"); exit(2); } $check = json_decode($check, true); if(!isset($check['count'])) { fwrite(STDERR, "input索引地址無效:{$options['input']}\n"); exit(3); } $components = parse_url($options['input']); $host = "{$components['scheme']}://{$components['host']}:{$components['port']}"; $index = basename($components['path']); $query = isset($options['query']) ? $options['query'] : '{"query":{"match_all":{}}}'; if(isset($options['start'])) { $start_time = strtotime($options['start']); $start = date('Y-m-d', $start_time).'T00:00:00+0800'; if(isset($options['end'])) { $end_time = strtotime($options['end']); $end = date('Y-m-d', $end_time).'T00:00:00+0800'; } else { $end = date('Y-m-d', $start_time+86400).'T00:00:00+0800'; } $field = strpos($index, 'analysis')!==false ? 'create_time' : 'date'; $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"range":{"'.$field.'":{"gte":"'.$start.'","lt":"'.$end.'"}}}}}}'; if(strpos($index, 'eventlogs')!==false) { $query = '{"size":1000,"sort":{"'.$field.'":{"order":"asc"}},"query":{"filtered":{"filter":{"bool":{'. '"must":[{"range":{"date":{"gte":"'.$start.'","lte":"'.$end.'"}}}],'. '"must_not":[{"exists": {"field":"nsp3hq"}},{"exists": {"field":"q0i8u1"}},{"exists": {"field":"eyn916"}},{"exists": {"field":"20mqd8"}},'. '{"exists": {"field":"wwbkux"}},{"exists": {"field":"r5ua96"}},{"exists": {"field":"easiz"}},{"exists": {"field":"dexusu"}},{"exists": {"field":"earts"}},'. '{"exists": {"field":"ealu"}},{"exists": {"field":"ealf"}},{"exists": {"field":"eal"}},{"exists": {"field":"ears"}},{"exists": {"field":"ealuf"}},'. '{"exists": {"field":"ealus"}},{"exists": {"field":"eaatf"}},{"exists": {"field":"enail"}},{"exists": {"field":"enuail"}},{"exists": {"field":"test"}}]'. '}}}}}'; } } $scroll_id = null; $retry = 0; $num = 0; while(true) { if(is_null($scroll_id)) { $result = post("{$host}/{$index}/_search?scroll=2m", $query); } else { $result = get("{$host}/_search/scroll?scroll=2m&scroll_id={$scroll_id}"); } $json = json_decode($result, true); if(!isset($json['_scroll_id'])) { fwrite(STDERR, "查詢失敗:索引-{$index} 起始-{$start} 截止-{$end}\n"); sleep(5); $retry++; if($retry>10) { exit(4); } } $scroll_id = $json['_scroll_id']; foreach($json['hits']['hits'] as $row) { fwrite(STDOUT, json_encode($row)."\n"); $num++; } if(count($json['hits']['hits'])==0) { break; } usleep(100000); } //校驗條數是否一致 $query = json_decode($query, true); unset($query['size'], $query['sort']); $result = post("{$host}/{$index}/_count", json_encode($query)); $json = json_decode($result, true); if(!isset($json['count']) or intval($json['count'])!==$num) { fwrite(STDERR, "校驗失敗:索引-{$index} 起始-{$start} 截止-{$end} 記錄條數-{$json['count']} 導出條數-{$num}\n"); } function get($url) { $handle = curl_init(); //curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); //curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); } function post($url, $data) { $handle = curl_init(); curl_setopt($handle, CURLOPT_POST, 1); curl_setopt($handle, CURLOPT_HEADER, 0); curl_setopt($handle, CURLOPT_RETURNTRANSFER, 1); curl_setopt($handle, CURLOPT_URL, $url); curl_setopt($handle, CURLOPT_POSTFIELDS, $data); return curl_exec($handle); }
備份時執行shell腳本即可備份昨天的增量數據
謝土豪
如果有幫到你的話,請贊賞我吧!
本文為kerwin原創,轉載請注明出處。
http://www.cnblogs.com/kerwinC/p/6296675.html