紀錄:Solr6.4.2+Flume1.7.0 +morphline+kafka集成


     當前大多數企業版hadoop的solr版本都還停留在solr4.x,由於這個版本的solr本身的bug較多,使用起來會出很多奇怪的問題。如部分更新日期字段失敗的問題

     最新的solr版本不僅修復了以前的一些常見bug,還提供了更簡便易用的功能,如ManagedSchema替代schema.xml來管理索引的schema。

    由於solr自帶的接口和入庫工具需要一些定制開發,所以通常用flume來作為數據采集的工具。數據流圖如下:

image

具體見前文:《json數據處理實戰:Kafka+Flume+Morphline+Solr+Hue數據組合索引

在Cloudera等企業版hadoop中,Solr和Flume已經集成,並能互通。如果你目前的情況是使用Cloudera企業版,請看上面這篇文章。

然而由於集成的版本跟不上開源社區最新版本,還是很嫌棄的。於是就有了下面的配置最新版本的Solr和Flume互通:

 

1.Solr最新版服務部署及入門:

見solr官網quickstart。

http://lucene.apache.org/solr/quickstart.html

說明:創建Solr集合的部分,不是本章重點,所以這里沒有介紹。

另,本例中和前文不同,使用的不是SolrCloud模式,而是單機的Solr。

 

2.Flume最新版部署及入門

下載地址:http://flume.apache.org/download.html

入門介紹:https://cwiki.apache.org//confluence/display/FLUME/Getting+Started

詳細配置介紹:http://flume.apache.org/FlumeUserGuide.html

詳細配置介紹中,需要關注的是KafkaSource和MorphlineSolrSink。

最終的flume.conf配置為:

kafka2solr.sources = source_from_kafka
kafka2solr.channels = customer_doc_channel
kafka2solr.sinks = solr_sink1

# For each one of the sources, the type is defined  
kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
kafka2solr.sources.source_from_kafka.channels = customer_doc_channel
kafka2solr.sources.source_from_kafka.batchSize = 100
kafka2solr.sources.source_from_kafka.useFlumeEventFormat=false
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
kafka2solr.sources.source_from_kafka.kafka.topics = tablecardLogin
kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = catering_customer_core_070327
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=earliest

# Other config values specific to each type of channel(sink or source)  
# can be defined as well  
kafka2solr.channels.customer_doc_channel.type = file
kafka2solr.channels.customer_doc_channel.capacity=10000000
kafka2solr.channels.customer_doc_channel.checkpointDir = /home/arli/data/flume-ng/customer_doc/checkpoint
kafka2solr.channels.customer_doc_channel.dataDirs = /home/arli/data/flume-ng/customer_doc/data

kafka2solr.sinks.solr_sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
kafka2solr.sinks.solr_sink1.channel = customer_doc_channel
kafka2solr.sinks.solr_sink1.batchSize = 5000
kafka2solr.sinks.solr_sink1.batchDurationMillis = 2000
kafka2solr.sinks.solr_sink1.morphlineFile = /home/arli/flume-config/morphlines.conf
kafka2solr.sinks.solr_sink1.morphlineId=morphline1
kafka2solr.sinks.solr_sink1.isIgnoringRecoverableExceptions=true
#kafka2solr.sinks.solr_sink1.isProductionMode=true

 

3.新建一個Flume配置目錄,下面四個文件是比較重要的。

image

flume.conf 來自上一節的配置。

flume-env.sh 來自安裝目錄conf下的flume-env.sh.template。需要改動。

log4j.properties 在調試過程中可以開啟更低級別的日志打印。

morphline.conf 參考Morphline的文檔:http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

 

4.接下來詳細介紹上面的后三個配置文件。

1)flume-env.sh

image

需要改動的地方如上:

#默認的內存是不夠的。需要擴大內存。
export JAVA_OPTS="-Xms100m -Xmx500m -Dcom.sun.management.jmxremote"

#Flume官方下載的包少了一些Solr相關的包,需要把solr的lib目錄加到flume的classpath下。
FLUME_CLASSPATH="/xxx/solr-6.4.2/contrib/morphlines-core/lib/*:/xxxi/solr-6.4.2/dist/*:/xxx/solr-6.4.2/dist/solrj-lib/*:/xxx/solr-6.4.2/server/solr-webapp/webapp/WEB-INF/lib/*"

 

2)log4j.properties

100MB改成10MB,以防打日志太多日志文件過大。

image

在調試階段,加上如下兩行會省心很多,調試完再去掉。

log4j.logger.org.apache.flume.sink.solr=DEBUG
log4j.logger.org.kitesdk.morphline=TRACE

 

3)morphline.conf

大部分和前文:《json數據處理實戰:Kafka+Flume+Morphline+Solr+Hue數據組合索引》雷同。由於我使用的是單機版本的Solr,所以在配置時如下。

注意solrUrl和solrHomeDir的配置,在官網中沒有介紹(因為morphline是cloudera開發並開源的,cloudera的solr默認是solrCloud),但是在源碼閱讀時可以看到這兩個單機solr配置參數。

SOLR_LOCATOR : {
   solrUrl : "http:\/\/localhost:8983\/solr\/catering_customer_core1"
   solrHomeDir : "/xxx/server/solr/catering_customer_core1/conf"
}

morphlines : [
  {
    #customer morphline
    id : morphline1
    
    # Import all morphline commands in these java packages and their subpackages.
    # Other commands that may be present on the classpath are not visible to this morphline.
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
    
    commands : [                 
    { 
        readJson {}
    } 
    
    {
        tryRules 
        {
            catchExceptions : false
            throwExceptionIfAllRulesFailed : true
            rules : [
            {
              commands : [
                { 
                    contains {topic : [tablecardLogin] } 
                }
                
                #field need to be indexed from json.
                {
                    extractJsonPaths {
                      flatten : false
                      paths : {
                        account:/account         
                        customer_id:/customerId
                        history_signin_dates:/opt_time
                        history_signin_timestamps:/opt_time
                        name:/name
                        sex:/sex
                        }
                    }
                }
              ]
              
            }
            

            # if desired, the last rule can serve as a fallback mechanism 
            # for records that don't match any rule:
            {
                    commands : [
                    { logWarn { format : "Ignoring record with unsupported input format: {}", args : ["@{}"] } }
                    { dropRecord {} }    
                    ]
            }
          ]
        }
    }

    {
        convertTimestamp {
            field : history_signin_dates
            inputFormats : ["yyyy-MM-dd HH:mm:ss"]
            inputTimezone : Asia/Shanghai
            outputFormat : "yyyy-MM-dd'T'HH:mm:ss'Z/SECOND'"
            outputTimezone : Asia/Shanghai
            }
    }

    {
        convertTimestamp {
            field : history_signin_timestamps
            inputFormats : ["yyyy-MM-dd HH:mm:ss"]
            inputTimezone : Asia/Shanghai
            outputFormat : "unixTimeInMillis"
            outputTimezone : UTC
            }
    }

    {
        java {
            imports : "import java.util.*;import org.kitesdk.morphline.api.Command;import org.kitesdk.morphline.api.Record;"
            code:     """
                            Object customerId = record.getFirstValue("customer_id");
                            Object account = record.getFirstValue("account");
                            record.put("id", account + "@" + customerId);
                            return child.process(record);
                    """
            }
    }
    
    {sanitizeUnknownSolrFields {solrLocator : ${SOLR_LOCATOR}}}

    #將數據導入到solr中
    {loadSolr {solrLocator : ${SOLR_LOCATOR}}}

     ]
  }
]

 

4.Morphline中的sanitizeUnknownSolrFields命令需要有schema.xml才能使用。

Solr6.4.2的schema默認是用managed-schema文件管理的。如果上面配置中的solrHomeDir目錄下沒有shema.xml文件,則會報錯。

好在managed-schema和之前schema.xml文件內容幾乎一致。執行如下命令即可。

cp managed-schema schema.xml

 

5.解決Flume1.7.0和solr6.4.2的jar包沖突問題。

Flume1.7在編譯時使用的是Solr4.10.1的包,而其中lib目錄下,Solrj依賴的httpcore-4.1.3包已與最新的Solrj不兼容,因此在solr目錄dist/solrj-lib下找到對應的包然后替換。

image

另外還需要清理的兩種包:1.Flume的lib目錄老的solr版本相關的包,2.若存在kite-morphline-solr-core(因為solr自己發布的版本已經包含了等價的solr-morphline-core包)則需要清理。(由於本文在寫作時相應的包都已經清理了,所以記錄的不夠細節,望見諒。)

 

6.啟動flume。調試時可以先在控制台啟動,去掉最后的&。

bin/flume-ng agent --conf ~/flume-config/ -f ~/flume-config/flume.conf  -n kafka2solr &


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM