Flume#描述/配置Source a1.sources.r1.type = http a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 i2 i3 i4 i5 i6 a1.sources.r1.interceptors.i1.type = timestamp #ip是攔截者所在機器的ip a1.sources.r1.初探


flume中的一些重要概念

1. flume Event:flume 事件,被定義為一個具有有效荷載的字節數據流和可選的字符串屬性集。(json格式的字符串,由headers和body兩部分組成)

 

2. flume Agent:flume 代理,是一個進程承載從外部源事件流到下一個目的地的過程。包含source channel和sink。

3. Source:數據源,消耗外部傳遞給他的事件,外部源將數據按照flume Source 能識別的格式將Flume 事件發送給flume Source。

4. Channel:數據通道,是一個被動的存儲,用來保持事件,直到由一個flume Sink消耗。

5. Sink : 數據匯聚點,代表外部數據存放位置。發送flume event到指定的外部目標.

 

配置文件模版:在 conf目錄里面新建一個文件,文件名自定義

#example.conf:單節點Flume配置
#命名Agent a1的組件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#描述/配置Source
a1.sources.r1.type = netcat   Source 類型(還有其他很多)
a1.sources.r1.bind = 0.0.0.0  綁定ip

a1.sources.r1.port = 44444    端口號

#描述Sink
a1.sinks.k1.type = logger       sink類型

#描述內存Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#為Channle綁定Source和Sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意:
(1)一個配置文件中可以配置多個Agent,一個Agent中可以包含多個Source、Sink、Channel。
(2)一個Source 可以綁定到多個通道,但一個Sink只能綁定到一個通道

Source類型:

Avro Source 序列化

Exec Source 命令輸出作為源

#描述/配置Source
a1.sources.r1.type = exec
a1.sources.r1.command = ping 192.168.242.102 

Spooling Directory Source  

這個Source允許你將將要收集的數據放置到"自動搜集"目錄中。這個Source將監視該目錄,並將解析新文件的出現。事件處理邏輯是可插拔的,當一個文件被完全讀入通道,它會被重命名或可選的直接刪除。
要注意的是,放置到自動搜集目錄下的文件不能修改,如果修改,則flume會報錯。另外,也不能產生重名的文件,如果有重名的文件被放置進來,則flume會報錯

#描述/配置Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/park/work/apache-flume-1.6.0-bin/mydata

NetCat Source  監聽一個指定端口,並將接收到的數據的每一行轉換為一個事件

Sequence Generator Source 簡單的序列發生器,不斷的產生事件,值是從0開始每次遞增1。

HTTP Source 接受HTTP的GET和POST請求作為Flume的事件,其中GET方式應該只用於試驗(默認jsonhandler,文件上傳 blobhandler)

#描述/配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 66666

 

主要用來進行測試

通過flume的工具啟動agent

$ bin/flume-ng agent --conf ../conf --conf-file ../conf/example.conf --name a1 -Dflume.root.logger=INFO,console

 

Interceptor

 

#描述/配置Source

a1.sources.r1.type  =  http

a1.sources.r1.port  =  44444

a1.sources.r1.interceptors = i1 i2 i3 i4 i5 i6

a1.sources.r1.interceptors.i1.type = timestamp  時間戳攔截器

#ip是攔截者所在機器的ip

a1.sources.r1.interceptors.i2.type = host     host攔截器

a1.sources.r1.interceptors.i3.type = static      靜態攔截器

a1.sources.r1.interceptors.i3.key = country

a1.sources.r1.interceptors.i3.value = China

a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder UUid攔截器

a1.sources.r1.interceptors.i5.type = search_replace  查找替換攔截器

#將所有的數字替換成*

a1.sources.r1.interceptors.i5.searchPattern = [0-9]  

a1.sources.r1.interceptors.i5.replaceString = *

a1.sources.r1.interceptors.i6.type = regex_filter       正則攔截器

#只要是a開頭的拋棄

a1.sources.r1.interceptors.i6.regex = ^a.*      

a1.sources.r1.interceptors.i6.excludeEvents = true

 

 

 

 

Processor

......

#描述Sink

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover  失敗重連

a1.sinkgroups.g1.processor.priority.k1=5

a1.sinkgroups.g1.processor.priority.k2=10

......

#為Channel綁定Source和Sink

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c1

------------------------------------------------------

#描述Sink 負載均衡方式

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.selector = random


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM