場景描述:前段時間,將實時數據通過kafka+flume+morphline的方式接入到solr中。新進來的數據已經可以在solr中看到了,但是以前的歷史數據還沒有導入solr。
CDH提供利用MapReduceIndexerTool來將HDFS的數據導入到solr。
歷史數據格式類似如下按年/月/日保存在HDFS上每天一個文件:
-/user/data/2016
-11
-1
-data.txt
-2
-data.txt
-12
-1
-data.txt
-2
-data.txt
文件的格式為一行一行的json。
思路:
先對2016目錄下的所有子目錄遍歷文件,
再對文件進行批量的索引操作。
使用命令:(jar包在/opt/cloudera/parcels/CDH/jars下)
hadoop jar search-mr-1.0.0-cdh5.8.0-job.jar org.apache.solr.hadoop.HdfsFindTool -find hdfs://cdh-master/user/kafkadata/eventCount/2016/11 -type f | sudo -u xuyali hadoop --config /etc/hadoop/conf.solrindexer/ jar search-mr-1.0.0-cdh5.8.0-job.jar org.apache.solr.hadoop.MapReduceIndexerTool --log4j log4j.properties --morphline-file morphline.conf --zk-host cdh-master:2181/solr --collection event_count_records --output-dir hdfs://cdh-master/user/hdfs/test/ --verbose --go-live --input-list -
參考:cdh官方文檔——batch indexing solr
*注意:官方文檔中用的配置是mapreduce1,可以用yarn的客戶端配置來代替該配置。
morphline.conf
SOLR_LOCATOR : { # Name of solr collection collection : event_count_records # ZooKeeper ensemble #CDH的專有寫法,開源版本不支持。 zkHost : "$ZK_HOST" } morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { readLine { charset : UTF-8 } } {setValues:{_attachment_body : "@{message}"}} {java:{ imports:"import java.io.*;import org.kitesdk.morphline.base.Fields;" code:""" String message=(String)record.getFirstValue(Fields.ATTACHMENT_BODY); if(message.contains("'")) { return true; } InputStream inputStream = new ByteArrayInputStream(message.getBytes()); record.removeAll(Fields.ATTACHMENT_BODY); record.put(Fields.ATTACHMENT_BODY, inputStream); return child.process(record); """ }} { #Flume傳過來的kafka的json數據是用二進制流的形式,需要先讀取json readJson{} } { #讀出來的json字段必須轉換成filed才能被solr索引到 extractJsonPaths { flatten:true paths:{ account:/account accountName:/accountName subaccount:/subaccount subaccountName:/subaccountName eventTime:/timestamp eventType:/eventType eventTags:"/eventTags[]/name" #按UTC時間存timestamp eventTimeInMinuteUTC_tdt:/timestamp #按China時間存timestamp eventTimeInMinuteChina_tdt:/timestamp #按UTC時間存timestamp eventTimeInHourUTC_tdt:/timestamp #_tdt后綴會被動態識別為日期類型的索引字段 #按不同時間間隔存索引以增加查詢性能 } } } #轉換long型時間為Date格式 {convertTimestamp { field : eventTimeInMinuteChina_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'" outputTimezone : Asia/Shanghai }} {convertTimestamp { field : eventTimeInMinuteUTC_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'" outputTimezone : UTC }} {convertTimestamp { field : eventTimeInHourUTC_tdt inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'" outputTimezone : UTC }} #kafka中的json數據傳到flume中時會被放入_attachment_body字段,readJson后會變成JsonNode對象,需要toString之后才能保存 {toString { field : _attachment_body }} #為每一條記錄生成一個UUID {generateUUID { field : id }} sanitizeUnknownSolrFields { solrLocator : ${SOLR_LOCATOR} } #對未定義的Solr字段加tws前綴,根據schema.xml中定義的tws_*為text_ws類型,會動態未未定義的字段建索引。 #將數據導入到solr中 {loadSolr {solrLocator : ${SOLR_LOCATOR}}} ] } ]
log4j.properties:
log4j.rootLogger=WARN, A1 log4j.logger.org.apache.flume.sink=INFO #log4j.logger.org.apache.flume.sink.solr=DEBUG log4j.logger.org.apache.solr=INFO #log4j.logger.org.apache.solr.hadoop=DEBUG log4j.logger.org.kitesdk.morphline=TRACE #log4j.logger.org.apache.solr.morphline=DEBUG log4j.logger.org.apache.solr.update.processor.LogUpdateProcessor=WARN log4j.logger.org.apache.solr.core.SolrCore=WARN log4j.logger.org.apache.solr.search.SolrIndexSearcher=ERROR # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
map數等於要被批量索引的文件數。
yarn的客戶端配置可以作下修改,設置reduce的個數,每個map占用的內存cpu等(map數不能修改)。
任務完成提示:
*批量索引的效率並不一定總是比實時索引高,但優點是不吃solr服務性能——沒有調用solr接口,而是直接生成索引文件后移至solr collection目錄下。
*調試morphline.conf bug時先用小點的單個文件,如果morphline寫的有錯,一個文件的任務失敗會導致整個任務失敗。