flume面試題


1 你是如何實現Flume數據傳輸的監控的
使用第三方框架Ganglia實時監控Flume。

2 Flume的Source,Sink,Channel的作用?你們Source是什么類型?
1、作用 (1)Source組件是專門用來收集數據的,可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy (2)Channel組件對采集到的數據進行緩存,可以存放在Memory或File中。 (3)Sink組件是用於把數據發送到目的地的組件,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定義。

2、我公司采用的Source類型為: (1)監控后台日志:exec (2)監控后台產生日志的端口:netcat

3 Flume 的 Channel Selectors


 

4 Flume 參數調優
Source 增加Source個數(使用Tair Dir Source時可增加FileGroups個數)可以增大Source的讀取數據的能力。 例如:當某一個目錄產生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個 Source 以保證 Source 有足夠的能力獲取到新產生的數據。 batchSize 參數決定 Source 一次批量運輸到 Channel 的event條數,適當調大這個參數可以提高 Source 搬運 Event 到 Channel 時的性能。

Channel type 選擇 memory 時 Channel 的性能最好,但是如果 Flume 進程意外掛掉可能會丟失數據。type 選擇 file 時 Channel 的容錯性更好,但是性能上會比 memory channel 差。 使用file Channel時 dataDirs 配置多個不同盤下的目錄可以提高性能。 Capacity 參數決定 Channel 可容納最大的 event 條數。transactionCapacity 參數決定每次 Source 往 channel 里面寫的最大event 條數和每次 Sink 從channel 里面讀的最大 event 條數。transactionCapacity 需要大於 Source 和Sink的batchSize 參數。

Sink 增加 Sink 的個數可以增加 Sink 消費 event 的能力。Sink 也不是越多越好夠用就行,過多的 Sink 會占用系統資源,造成系統資源不必要的浪費。 batchSize 參數決定 Sink 一次批量從 Channel 讀取的 event 條數,適當調大這個參數可以提高 Sink 從 Channel 搬出 event 的性能。

5 Flume 的事務機制
Flume的事務機制(類似數據庫的事務機制):

Flume 使用兩個獨立的事務分別負責從 Soucrce 到 Channel(put),以及從 Channel 到Sink 的事件傳遞(take)。

比如 spooling directory source 為文件的每一行創建一個事件,一旦事務中所有的事件全部傳遞到 Channel 且提交成功,那么 Soucrce 就將該文件標記為完成。

同理,事務以類似的方式處理從 Channel 到 Sink 的傳遞過程,如果因為某種原因使得事件無法記錄,那么事務將會回滾。且所有的事件都會保持到 Channel 中,等待重新傳遞。

6 Flume 采集數據會丟失嗎?
不會,Channel 存儲可以存儲在 File 中,數據傳輸自身有事務。

 

------------------------------------------------------------------------------------------------------

 

1.Flume 采集數據會丟失嗎?

不會,Channel 存儲可以存儲在 File 中,數據傳輸自身有事務。

2.Flume 與 Kafka 的選取?

采集層主要可以使用 Flume、Kafka 兩種技術。
Flume:Flume 是管道流方式,提供了很多的默認實現,讓用戶通過參數部署,及擴展 API。
Kafka:Kafka 是一個可持久化的分布式的消息隊列。
Kafka 是一個非常通用的系統。你可以有許多生產者和很多的消費者共享多個主題Topics。相比之下,Flume 是一個專用工具被設計為旨在往 HDFS,HBase 發送數據。它對HDFS 有特殊的優化,並且集成了 Hadoop 的安全特性。所以,Cloudera 建議如果數據被多個系統消費的話,使用 kafka;如果數據被設計給 Hadoop 使用,使用 Flume。正如你們所知 Flume 內置很多的 source 和 sink 組件。然而,Kafka 明顯有一個更小的生產消費者生態系統,並且 Kafka 的社區支持不好。希望將來這種情況會得到改善,但是目前:使用 Kafka 意味着你准備好了編寫你自己的生產者和消費者代碼。如果已經存在的 Flume    Sources 和 Sinks 滿足你的需求,並且你更喜歡不需要任何開發的系統,請使用 Flume。Flume 可以使用攔截器實時處理數據。這些對數據屏蔽或者過量是很有用的。Kafka 需要外部的流處理系統才能做到。
Kafka 和 Flume 都是可靠的系統,通過適當的配置能保證零數據丟失。然而,Flume 不支持副本事件。於是,如果 Flume 代理的一個節點奔潰了,即使使用了可靠的文件管道方式,你也將丟失這些事件直到你恢復這些磁盤。如果你需要一個高可靠行的管道,那么使用Kafka 是個更好的選擇。
Flume 和 Kafka 可以很好地結合起來使用。如果你的設計需要從 Kafka 到 Hadoop 的流數據,使用 Flume 代理並配置 Kafka 的 Source 讀取數據也是可行的:你沒有必要實現自己的消費者。你可以直接利用Flume 與HDFS 及HBase 的結合的所有好處。你可以使用ClouderaManager 對消費者的監控,並且你甚至可以添加攔截器進行一些流處理。

3.數據怎么采集到 Kafka,實現方式?

使用官方提供的 flumeKafka 插件,插件的實現方式是自定義了 flume 的 sink,將數據從channle 中取出,通過 kafka 的producer 寫入到 kafka 中,可以自定義分區等。

4.flume 管道內存,flume 宕機了數據丟失怎么解決?

 1)Flume 的 channel 分為很多種,可以將數據寫入到文件。
 2)防止非首個 agent 宕機的方法數可以做集群或者主備

5.  flume 和 kafka 采集日志區別,采集日志時中間停了,怎么記錄之前的日志?

 Flume 采集日志是通過流的方式直接將日志收集到存儲層,而 kafka 是將緩存在 kafka集群,待后期可以采集到存儲層。
Flume 采集中間停了,可以采用文件的方式記錄之前的日志,而 kafka 是采用 offset 的方式記錄之前的日志。

6.flume 有哪些組件,flume 的 source、channel、sink 具體是做什么的?

1)source:用於采集數據,Source 是產生數據流的地方,同時 Source 會將產生的數據
流傳輸到 Channel,這個有點類似於 Java IO 部分的 Channel。
2)channel:用於橋接 Sources 和 Sinks,類似於一個隊列。
3)sink:從 Channel 收集數據,將數據寫到目標源(可以是下一個 Source,也可以是 HDFS
或者 HBase)。 

7.為什么使用Flume?

 

8.Flume組成架構?

關於flume事務
flume要盡可能的保證數據的安全性,其在source推送數據到channel以及sink從channel拉取數據時都是以事務方式進行的。因為在agent內的兩次數據傳遞間都會涉及到數據的傳送、從數據上游刪除數據的問題;就比如sink從channel拉取數據並提交到數據下游之后需要從channel中刪除已獲取到的批次數據,其中跨越了多個原子事件,故而需要以事務的方式將這些原子事件進一步綁定在一起,以便在其中某個環節出錯時進行回滾防止數據丟失。所以在選用file channel時一般來說是不會丟失數據的。
 

9.FlumeAgent內部原理?

 

10.Flume Event 是數據流的基本單元

它由一個裝載數據的字節數組(byte payload)和一系列可選的字符串屬性來組成(可選頭部).


 

11.Flume agent

Flume source 消耗從類似於 web 服務器這樣的外部源傳來的 events.

外部數據源以一種 Flume source 能夠認識的格式發送 event 給 Flume source.

Flume source 組件可以處理各種類型、各種格式的日志數據,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy.

flume source 是負責接收數據到 Flume Agent 的組件


2. Flume channel
當 Flume source 接受到一個 event 的時, Flume source 會把這個 event 存儲在一個或多個 channel 中.

Channel 是連接Source和Sink的組件, 是位於 Source 和 Sink 之間的數據緩沖區。

Flume channel 使用被動存儲機制. 它存儲的數據的寫入是靠 Flume source 來完成的, 數據的讀取是靠后面的組件 Flume sink 來完成的.

Channel 是線程安全的,可以同時處理幾個 Source 的寫入操作和幾個 Sink 的讀取操作。

Flume 自帶兩種 Channel:

Memory Channel

Memory Channel是內存中的隊列。

Memory Channel在不需要關心數據丟失的情景下適用。

如果需要關心數據丟失,那么Memory Channel就不應該使用,因為程序死亡、機器宕機或者重啟都會導致數據丟失。

File Channel。

File Channel將所有事件寫到磁盤。

因此在程序關閉或機器宕機的情況下不會丟失數據。

還可以有其他的 channel: 比如 JDBC channel.


 

3. Flume sink
Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者發送到另一個Flume Agent。

Sink 是完全事務性的。

在從 Channel 批量刪除數據之前,每個 Sink 用 Channel 啟動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink 就利用 Channel 提交事務。事務一旦被提交,該 Channel 從自己的內部緩沖區刪除事件。如果寫入失敗,將緩沖區takeList中的數據歸還給Channel。

Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。


12.你是如何實現Flume數據傳輸的監控的

使用第三方框架Ganglia實時監控Flume。

13.flume 調優 :

source :
1 ,增加 source 個數,可以增大 source 讀取能力。
2 ,具體做法 : 如果一個目錄下生成的文件過多,可以將它拆分成多個目錄。每個目錄都配置一個 source 。
3 ,增大 batchSize : 可以增大一次性批處理的 event 條數,適當調大這個參數,可以調高 source 搬運數據到 channel 的性能。
channel :
1 ,memory :性能好,但是,如果發生意外,可能丟失數據。
2 ,使用 file channel 時,dataDirs 配置多個不同盤下的目錄可以提高性能。
3 ,transactionCapacity 需要大於 source 和 sink 的 batchSize 參數
sink :
增加 sink 個數可以增加消費 event 能力

 

------------------------------------------------------------------------------------------------------------------------------------

 

1 ,如何實現 flume 傳輸數據的實時監控
使用第三方框架 ganglia

2 ,flume 的 source,sink,channel 的作用,你們的 source 類型是 ?
source :搜集數據
channel :數據緩存
sink :把數據發送到目的地
常用 source 類型 :
1 ,監控文件 :exec
2 ,監控目錄 :spooldir
3 ,flume 選擇器 :
包括兩種 :
1 ,每個通道都復制一份文件,replicating 。
2 ,選擇性發往某個通道,Multiplexing 。
4 ,flume 調優 :
source :
1 ,增加 source 個數,可以增大 source 讀取能力。
2 ,具體做法 : 如果一個目錄下生成的文件過多,可以將它拆分成多個目錄。每個目錄都配置一個 source 。
3 ,增大 batchSize : 可以增大一次性批處理的 event 條數,適當調大這個參數,可以調高 source 搬運數據到 channel 的性能。
channel :
1 ,memory :性能好,但是,如果發生意外,可能丟失數據。
2 ,使用 file channel 時,dataDirs 配置多個不同盤下的目錄可以提高性能。
3 ,transactionCapacity 需要大於 source 和 sink 的 batchSize 參數
sink :
增加 sink 個數可以增加消費 event 能力
5 ,事務機制 :
channel : 是位於 source 和 sink 之間的緩沖區。
1 ,flume 自帶兩種緩沖區,file channel 和 memory channel
2 ,file channel : 硬盤緩沖區,性能低,但是安全。系統宕機也不會丟失數據。
3 ,memory channel :內存緩沖區,性能高,但是有可能丟數據,在不關心數據有可能丟失的情況下使用。
put 事務流程 : 源將數據給管道
1 ,doPut :把數據寫入臨時緩沖區 putList 。
2 ,doCommit :檢查 channel 內存隊列是否足夠合並。
3 ,doRollBack : 如果 channel 不行,我們就回滾數據。
take 事務流程 :
1 ,先將數據取到臨時緩沖區 takeList。
2 ,doCommit :如果數據全部發送成功,就清除臨時緩沖區。
3 ,doRollBack :如果數據發送過程中出現異常,doRollBack 將臨時緩沖區的數據還給 channel 隊列


6 ,flume 傳數據會丟失嗎 :
不會,因為 channel 可以存儲在 file 中,而且 flume 本身是有事務的。
可以做 sink 組,一個壞掉了,就用另一個。
7 ,個人總結 : flume 總結
transactionCapacity :事務容量大小,指的就是 putList 和 takeList 的大小
transactionCapacity > batchSize

 

-----------------------------------------------------------------------------------------------------------------------------

 

1  怎樣對Flume  數據傳輸進行監控
    使用第三方框架 Ganglia 實時監控 Flume。
2 Flume的Source ,Sink ,Channel的作用? 使用過哪些Source類型?
    作用

Source 組件是專門用來收集數據的,可以處理各種類型、各種格式的日志數據,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
Channel 組件對采集到的數據進行緩存,可以存放在 Memory 或 File 中。
Sink 組件是用於把數據發送到目的地的組件,目的地包括 Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定義。
  常用的Source 類型:

(1)監控后台日志:exec
(2)監控后台產生日志的端口:netcat  ,Exec spooldir
3 Flume 的 Channel Selectors有幾種
Channel Selectors,可以讓不同的項目日志通過不同的Channel到不同的Sink中去。
官方文檔上Channel Selectors 有兩種類型:Replicating Channel Selector (default)和Multiplexing Channel Selector
這兩種Selector的區別是:Replicating 會將source過來的events發往所有channel,而Multiplexing可以選擇該發往哪些Channel。
4 Flume參數調優
Source
增加 Source 個(使用 Tair Dir Source 時可增加 FileGroups 個數)可以增大 Source 的讀取數據的能力。例如:當某一個目錄產生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個 Source 以保證 Source 有足夠的能力獲取到新產生的數據。batchSize 參數決定 Source 一次批量運輸到 Channel 的 event 條數,適當調大這個參數可以提高 Source 搬運 Event 到 Channel 時的性能。
Channel
type 選擇 memory 時 Channel 的性能最好,但是如果 Flume 進程意外掛掉可能會丟失數據。type 選擇 file 時 Channel 的容錯性更好,但是性能上會比 memory channel 差。使用 file Channel 時 dataDirs 配置多個不同盤下的目錄可以提高性能。
Capacity 參數決定 Channel 可容納最大的 event 條數。transactionCapacity 參數決定每次Source 往 channel 里面寫的最大 event 條數和每次 Sink 從 channel 里面讀的最大 event 條數。
transactionCapacity 需要大於 Source 和 Sink 的 batchSize 參數。
Sink
增加 Sink 的個數可以增加 Sink 消費 event 的能力。Sink 也不是越多越好夠用就行,過多的 Sink 會占用系統資源,造成系統資源不必要的浪費。
batchSize 參數決定 Sink 一次批量從 Channel 讀取的 event 條數,適當調大這個參數可以提高 Sink 從 Channel 搬出 event 的性能。

5 Flume的事務機制
Flume 的事務機制(類似數據庫的事務機制):Flume 使用兩個獨立的事務分別負責從Soucrce 到 Channel,以及從 Channel 到 Sink 的事件傳遞。比如 spooling directory source 為文件的每一行創建一個事件,一旦事務中所有的事件全部傳遞到 Channel 且提交成功,那么Soucrce 就將該文件標記為完成。同理,事務以類似的方式處理從 Channel 到 Sink 的傳遞過程,如果因為某種原因使得事件無法記錄,那么事務將會回滾。且所有的事件都會保持到Channel 中,等待重新傳遞。

6 Flume可以保證采集和發送數據的正確性嗎?
在單機 upd環境下,采集數據時,如果讀寫大於100M/s的情況下會出現丟包現象,可以對丟失率進行統計,如果在可接受范圍內不做處理,如果不能接受可以修改源碼或者借鑒flume原理自行開發收集框架;
對收集到的數據,flume可以將其存儲下磁盤中,且數據傳輸自身有事務。確保發送數據不丟失。

 

-----------------------------------------------------------------------------------------------------------------------------

 

Flume + Kafka 面試

1、flume 如何保證數據的可靠性?

  • Flume 提供三種可靠性:JDBC、FILE、MEMORY
  • Flume 使用事務的辦法來保證 event 的可靠傳遞。Source 和 Sink 分別被封裝在事務中,這些事務由保存 event 的存儲提供或者由 Channel 提供。這就保證了 event 在數據流的點對點傳輸中是可靠的。

2、kafka 數據丟失問題,及如何保證?

1、kafka 數據丟失問題

a、acks=1    的時候(只保證寫入 leader 成功),如果剛好 leader 掛了,則數據會丟失。
b、acks=0    的時候,使用異步模式的時候,該模式下 kafka 無法保證消息,有可能會丟。

2、brocker 如何保證不丟失

a、acks=all      所有副本都寫入成功並確認。
b、retries=一個合理值        kafka 發送數據失敗后的重試值。(如果總是失敗,則可能是網絡原因)
c、min.insync.replicas=2     消息至少要被寫入到這么多副本才算成功。
d、unclean.leader.election.enable=false      關閉 unclean leader 選舉,即不允許非 ISR 中的副本被選舉為 leader,以避免數據丟失。

3、consumer 如何保證不丟失?

a、如果在消息處理完成前就提交了 offset,那么就有可能造成數據的丟失。
b、enable.auto.commit=false     關閉自動提交 offset。
c、處理完數據之后手動提交。

3、kafka 工作流程原理

大致原理即可。有幾個點稍微詳細即可。

4、kafka 保證消息順序

1、全局順序
  a、全局使用一個生產者,一個分區,一個消費者。
2、局部順序
  a、每個分區是有序的,根據業務場景制定不同的 key 進入不同的分區。

5、zero copy 原理及如何使用?

  • 1、zero copy 在內核層直接將文件內容傳送給網絡 socket,避免應用層數據拷貝,減小 IO 開銷。
  • 2、java.nio.channel.FileChannel 的 transferTo() 方法實現 zero copy。

6、spark Join 常見分類以及基本實現機制

1、shuffle hash join、broadcast hash join 以及 sort merge join。

2、shuffle hash join

小表 join 大表,依次讀取小表的數據,對於每一行數據根據 join key 進行 hash,hash 到對應的 Bucket(桶),生成 hash table 中的一條記錄。
數據緩存在內存中,如果內存放不下需要 dump 到外存。
再依次掃描大表的數據,使用相同的 hash 函數映射 Hash Table 中的記錄,映射成功之后再檢查 join 條件,如果匹配成功就可以將兩者 join 在一起。

3、broadcast hash join

如果小表數據量增大,內存不能放下的時候,分別將兩個表按照 join key 進行分區,將相同 join key 的記錄重分布到同一節點,兩張表的數據會被重分布到集群中所有節點。這個過程稱為 shuffle(網絡混啟)。
每個分區節點上的數據單獨執行單機 hash join 算法。

4、sort merge join

兩張大表 join 采用了 sort merge join 算法:
    shuffle 階段:將兩張大表根據 join key 進行重新分區,兩張表數據會分布到整個集群,以便分布式並行處理。
    sort 階段:對單個分區節點的兩表數據,分別進行排序。
    merge 階段:對排好序的兩張分區表數據執行 join 操作。join 操作很簡單,分別遍歷兩個有序序列,碰到相同 join key 就 merge 輸出,否則取更小一邊。

128G 內存、多磁盤、萬兆網卡、吞吐(幾千到一萬

 

--------------------------------------------------------------------------------------------------------------

flume三個器:監控器,攔截器,選擇器

選擇器:

 

Channel選擇器是決定Source接收的一個特定事件寫入哪些Channel的組件,它們告知Channel處理器,然后由其將事件寫入到每個Channel。

 

Flume內置兩種選擇器:replicating和multiplexing。如果source的配置中沒有指定選擇器,那么會自動使用復制Channel選擇器。

 

二、Replicationg Channel Selector


三、Multiplexing Channel Selector

 

攔截器:

 

攔截器的種類介紹
1、Timestamp Interceptor(時間戳攔截器)

 

flume中一個最經常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置。 參數 默認值 描述 type   類型名稱timestamp,也可以使用類名的全路徑 preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值

 

source連接到時間戳攔截器的配置:

 

a1.sources.r1.interceptors = timestamp
a1.sources.r1.interceptors.timestamp.type=timestamp
a1.sources.r1.interceptors.timestamp.preserveExisting=false
 2、Host Interceptor(主機攔截器)

 

主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。時間報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置參數 默認值 描述 type   類型名稱host hostHeader host 事件投的key useIP true 如果設置為false,host鍵插入主機名 preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換host報頭的值

 

source連接到主機攔截器的配置:

 

a1.sources.r1.interceptors = host
a1.sources.r1.interceptors.host.type=host
a1.sources.r1.interceptors.host.useIP=false
a1.sources.r1.interceptors.timestamp.preserveExisting=true
3、靜態攔截器(Static Interceptor)

 

靜態攔截器的作用是將k/v插入到事件的報頭中。配置如下參數 默認值 描述 type   類型名稱static key key 事件頭的key value value key對應的value值 preserveExisting true 如果設置為true,若事件中報頭已經存在該key,不會替換value的值source連接到靜態攔截器的配置:

 

a1.sources.r1.interceptors = static
a1.sources.r1.interceptors.static.type=static
a1.sources.r1.interceptors.static.key=logs
a1.sources.r1.interceptors.static.value=logFlume
a1.sources.r1.interceptors.static.preserveExisting=false
4、正則過濾攔截器(Regex Filtering Interceptor)

 

在日志采集的時候,可能有一些數據是我們不需要的,這樣添加過濾攔截器,可以過濾掉不需要的日志,也可以根據需要收集滿足正則條件的日志。參數默認值描述 type 類型名稱REGEX_FILTER regex .* 匹配除“\n”之外的任何個字符 excludeEvents false 默認收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的。

 

source連接到正則過濾攔截器的配置:

 

a1.sources.r1.interceptors = regex
a1.sources.r1.interceptors.regex.type=REGEX_FILTER
a1.sources.r1.interceptors.regex.regex=(rm)|(kill)
a1.sources.r1.interceptors.regex.excludeEvents=false
這樣配置的攔截器就只會接收日志消息中帶有rm 或者kill的日志。

 

5、Regex Extractor Interceptor

 

通過正則表達式來在header中添加指定的key,value則為正則匹配的部分

 

6、UUID Interceptor

 

用於在每個events header中生成一個UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中讀取並使用。根據上面的source,攔截器的配置如下:

 

# source 攔截器
a1.sources.sources1.interceptors = i1
a1.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.sources1.interceptors.i1.headerName = uuid
a1.sources.sources1.interceptors.i1.preserveExisting = true
a1.sources.sources1.interceptors.i1.prefix = UUID-
7、Morphline Interceptor

 

Morphline攔截器,該攔截器使用Morphline對每個events數據做相應的轉換。關於Morphline的使用,可參考

 

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
8、Search and Replace Interceptor

 

此攔截器基於Java正則表達式提供簡單的基於字符串的搜索和替換功能。還可以進行回溯/群組捕捉。此攔截器使用與Java Matcher.replaceAll()方法中相同的規則

 

# 攔截器別名
a1.sources.avroSrc.interceptors = search-replace
# 攔截器類型,必須是search_replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

#刪除事件正文中的前導字母數字字符,根據正則匹配event內容
a1.sources.avroSrc.interceptors.search-replace.searchPattern = 快速褐色([az] +)跳過懶惰([az] +)
# 替換匹配到的event內容
a1.sources.avroSrc.interceptors.search-replace.replaceString = 飢餓的$ 2吃了不小心$ 1
# 設置字符集,默認是utf8
a1.sources.avroSrc.interceptors.search-replace.charset = utf8

 

 

自定義攔截器:

定義實現類MyInterceptor ,只需要實現Interceptor接口,定義內部類Builder實現Interceptor.Builder即可

 

監控器:

監控器可以看到:

source嘗試寫入channel中的event數量,成功寫入且提交的event數量;

sink嘗試從channel中拉取的event數量,成功讀取的事件數量;

channel相關信息,例如:啟動時間,停止時間,目前的event總數,容量,占用百分比


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM