json數據處理實戰:Kafka+Flume+Morphline+Solr+Hue數據組合索引


背景:Kafka消息總線的建成,使各個系統的數據得以在kafka節點中匯聚,接下來面臨的任務是最大化數據的價值,讓數據“慧”說話。

環境准備:

Kafka服務器*3。

CDH 5.8.3服務器*3,安裝Flume,Solr,Hue,HDFS,Zookeeper服務。

Flume提供了可擴展的實時數據傳輸通道,Morphline提供了輕量級的ETL功能,SolrCloud+Hue提供了高性能搜索引擎和多樣的數據展現形式。

12.20補充:(Hue的另外一種代替方式:Banana。)

2017.3.28補充:如果不使用CDH,而是使用開源的Flume+Solr=>紀錄:Solr6.4.2+Flume1.7.0 +kafka集成

一.環境安裝(略)

 

二.修改CDH默認配置:

1.在Flume配置界面配置Flume依賴Solr。

image

2.在Solr配置界面配置Solr使用Zookeeper存儲配置文件,使用HDFS存儲索引文件。

image

3.在Hue配置界面配置Hue依賴Solr

image

4.配置Hue界面可以被外網訪問。

image

 

三.按場景配置各CDH服務及開發代碼。

Kafka Topic: eventCount

Topic數據格式:

{
    "timestamp": "1481077173000",
    "accountName": "旺小寶",
    "tagNames": [
        "incoming"
    ],
    "account": "WXB",
    "eventType": "phone",
    "eventTags": [
        {
            "value": 1,
            "name": "incoming"
        }
    ]
}

 

1.Solr創建對應Collection。

1)登錄任意CDH節點。生成collection配置文件骨架。

$ solrctl instancedir --generate $HOME/solr_configs

2)找到文件夾中的schema.xml文件,修改collection的schema。

第一步:修改field(先不要動type和dynamicField這些)。schema.xml中預定義了很多field,field對應的是json中需要被索引的字段。除了name=id,_root_,_version_不能去掉之外,其他的field可以去掉。

(Notice:json中的timestamp對應的是下面的eventTime,而下面的timestamp是flume接受kafka數據的時間。這是通過Morphline配置實現的轉換)

<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
   
   <!-- points to the root document of a block of nested documents. Required for nested
        document support, may be removed otherwise
   -->
   <field name="_root_" type="string" indexed="true" stored="false"/>
   <field name="account" type="string" indexed="true" stored="true"/>
   <field name="accountName" type="string" indexed="true" stored="true"/>
   <field name="subaccount" type="string" indexed="true" stored="true"/>
   <field name="subaccountName" type="string" indexed="true" stored="true"/>
   <field name="eventTime" type="tlong" indexed="false" stored="true"/>
   <field name="eventType" type="string" indexed="true" stored="true"/>
   <field name="eventTags" type="string" indexed="true" stored="true" multiValued="true"/>
   <field name="_attachment_body" type="string" indexed="false" stored="true"/>
   <field name="timestamp" type="tlong" indexed="false" stored="true"/>
   <field name="_version_" type="long" indexed="true" stored="true"/>

第二步:去掉所有copy field。

第三步:添加動態字段dynamicFiled。

<dynamicField name="tws_*" type="text_ws" indexed="true" stored="true" multiValued="true"/>

3) 上傳配置,創建collection

$ solrctl instancedir --create event_count_records solr_configs
$ solrctl collection --create event_count_records -s 3 -c event_count_records

 

2.Flume配置

創建一個新的角色組kafka2solr,修改代理名稱為kafka2solr,並為該角色組分配服務器。

# 配置 source  channel sink 的名字
kafka2solr.sources = source_from_kafka
kafka2solr.channels = mem_channel
kafka2solr.sinks = solrSink

# 配置Source類別為kafka
kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
kafka2solr.sources.source_from_kafka.channels = mem_channel
kafka2solr.sources.source_from_kafka.batchSize = 100
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
kafka2solr.sources.source_from_kafka.kafka.topics = eventCount
kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_caller
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest

#配置channel type為memory,通常生產環境中設置為file或者直接用kafka作為channel
kafka2solr.channels.mem_channel.type = memory
kafka2solr.channels.mem_channel.keep-alive = 60
  
 
# Other config values specific to each type of channel(sink or source)  
# can be defined as well  
# In this case, it specifies the capacity of the memory channel  
kafka2solr.channels.mem_channel.capacity = 10000 
kafka2solr.channels.mem_channel.transactionCapacity = 3000  

# 配置sink到solr,並使用morphline轉換數據
kafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
kafka2solr.sinks.solrSink.channel = mem_channel
kafka2solr.sinks.solrSink.morphlineFile = morphlines.conf
kafka2solr.sinks.solrSink.morphlineId=morphline1
kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true

 

3.Flume-NG的Solr接收器配置

SOLR_LOCATOR : {
  # Name of solr collection
  collection : event_count_records
  
  # ZooKeeper ensemble 
  #CDH的專有寫法,開源版本不支持。
  zkHost : "$ZK_HOST"
  }

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
    
    commands : [   
{
  #Flume傳過來的kafka的json數據是用二進制流的形式,需要先讀取json
   readJson{}
}

{
 #讀出來的json字段必須轉換成filed才能被solr索引到
extractJsonPaths {
 flatten:true
 paths:{
account:/account
accountName:/accountName
subaccount:/subaccount
subaccountName:/subaccountName
eventTime:/timestamp
eventType:/eventType
eventTags:"/eventTags[]/name"
#按分鍾存timestamp
eventTimeInMinute_tdt:/timestamp
#按小時存timestamp
eventTimeInHour_tdt:/timestamp
#按天存timestamp
eventTimeInDay_tdt:/timestamp
#_tdt后綴會被動態識別為日期類型的索引字段
#按不同時間間隔存索引以增加查詢性能
}
 
}
}

#轉換long型時間為Date格式
{convertTimestamp {
  field : eventTimeInMinute_tdt
  inputFormats : ["unixTimeInMillis"]
  inputTimezone : UTC
  outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/MINUTE'"
  outputTimezone : Asia/Shanghai
}}

{convertTimestamp {
  field : eventTimeInHour_tdt
  inputFormats : ["unixTimeInMillis"]
  inputTimezone : UTC
  outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/HOUR'"
  outputTimezone : Asia/Shanghai
}}
{convertTimestamp {
  field : eventTimeInDay_tdt
  inputFormats : ["unixTimeInMillis"]
  inputTimezone : UTC
  outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z/DAY'"
  outputTimezone : Asia/Shanghai
}}

#kafka中的json數據傳到flume中時會被放入_attachment_body字段,readJson后會變成JsonNode對象,需要toString之后才能保存
{toString { field : _attachment_body }}


#為每一條記錄生成一個UUID
{generateUUID {
  field : id
}}

#對未定義的Solr字段加tws前綴,根據schema.xml中定義的tws_*為text_ws類型,會動態未未定義的字段建索引。
          {  
        sanitizeUnknownSolrFields {  
          # Location from which to fetch Solr schema  
          solrLocator : ${SOLR_LOCATOR} 
          renameToPrefix:"tws_"
        }  
      }  

#將數據導入到solr中                
      {loadSolr {solrLocator : ${SOLR_LOCATOR}}}
    ]
  }
]

重啟被影響的Flume節點,數據開始導入solr。

 

3.通過Hue查詢Solr中的數據。

Solr+Hue實戰

 (作者卡爾:http://www.cnblogs.com/arli/p/6158771.html )


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM