flume監控


flume提供了一個度量框架,可以通過http的方式進行展現,當啟動agent的時候通過傳遞參數 -Dflume.monitoring.type=http參數給flume agent:

1
2
3
4
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 
-Dflume.monitoring.type=http
-Dflume.monitoring.port=5653
-Dflume.root.logger=INFO,console

這樣flume會在5653端口上啟動一個HTTP服務器,訪問如下地址,將返回JSON格式的flume相關指標參數:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
demo:

訪問: http://flume-agent-host:5653/metrics
結果: 其中src-1是子自定義的source名稱
{
"SOURCE.src-1":{
"OpenConnectionCount":"0", //目前與客戶端或sink保持連接的總數量(目前只有avro source展現該度量)
"Type":"SOURCE",
"AppendBatchAcceptedCount":"1355", //成功提交到channel的批次的總數量
"AppendBatchReceivedCount":"1355", //接收到事件批次的總數量
"EventAcceptedCount":"28286", //成功寫出到channel的事件總數量,且source返回success給創建事件的sink或RPC客戶端系統
"AppendReceivedCount":"0", //每批只有一個事件的事件總數量(與RPC調用中的一個append調用相等)
"StopTime":"0", //source停止時自Epoch以來的毫秒值時間
"StartTime":"1442566410435", //source啟動時自Epoch以來的毫秒值時間
"EventReceivedCount":"28286", //目前為止source已經接收到的事件總數量
"AppendAcceptedCount":"0" //單獨傳入的事件到Channel且成功返回的事件總數量
},
"CHANNEL.ch-1":{
"EventPutSuccessCount":"28286", //成功寫入channel且提交的事件總數量
"ChannelFillPercentage":"0.0", //channel滿時的百分比
"Type":"CHANNEL",
"StopTime":"0", //channel停止時自Epoch以來的毫秒值時間
"EventPutAttemptCount":"28286", //Source嘗試寫入Channe的事件總數量
"ChannelSize":"0", //目前channel中事件的總數量
"StartTime":"1442566410326", //channel啟動時自Epoch以來的毫秒值時間
"EventTakeSuccessCount":"28286", //sink成功讀取的事件的總數量
"ChannelCapacity":"1000000", //channel的容量
"EventTakeAttemptCount":"313734329512" //sink嘗試從channel拉取事件的總數量。這不意味着每次事件都被返回,因為sink拉取的時候channel可能沒有任何數據
},
"SINK.sink-1":{
"Type":"SINK",
"ConnectionClosedCount":"0", //下一階段或存儲系統關閉的連接數量(如在HDFS中關閉一個文件)
"EventDrainSuccessCount":"28286", //sink成功寫出到存儲的事件總數量
"KafkaEventSendTimer":"482493",
"BatchCompleteCount":"0", //與最大批量尺寸相等的批量的數量
"ConnectionFailedCount":"0", //下一階段或存儲系統由於錯誤關閉的連接數量(如HDFS上一個新創建的文件因為超時而關閉)
"EventDrainAttemptCount":"0", //sink嘗試寫出到存儲的事件總數量
"ConnectionCreatedCount":"0", //下一個階段或存儲系統創建的連接數量(如HDFS創建一個新文件)
"BatchEmptyCount":"0", //空的批量的數量,如果數量很大表示souce寫數據比sink清理數據慢速度慢很多
"StopTime":"0",
"RollbackCount":"9", //
"StartTime":"1442566411897",
"BatchUnderflowCount":"0" //比sink配置使用的最大批量尺寸更小的批量的數量,如果該值很高也表示sink比souce更快
}
}

Flume也可發送度量信息給Ganglia,用來監控Flume。在任何時候只能啟用一個Ganglia或HTTP監控。Flume默認一分鍾一次周期性的向Ganglia報告度量:

1
2
3
4
5
6
7
demo:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
-Dflume.monitoring.type=ganglia # 默認情況下flume以Ganglia3.1格式報告指標
-Dflume.monitoring.pollFrequency=45 # 報告間隔時間(秒)
-Dflume.monitoring.isGanglia3=true # 啟用ganglia3個格式報告
-Dflume.root.logger=INFO,console

日志相關

1
2
3
4
$ bin/flume-ng agent --conf conf --conf-file example.conf  --name a1 -Dflume.root.logger=INFO,console

# -Dflume.root.logger=INFO,console 該參數將會把flume的日志輸出到console,為了將其輸出到日志文件(默認
$FLUME_HOME/logs),可以將console改為LOGFILE形式,具體的配置可以修改$FLUME_HOME/conf/log4j.properties

自定義Flume組件

Flume本身可插拔的架構設計,使得開發自定義插件變得很容易。Flume本身提供了非常豐富的source、channel、sink以及攔截器等插件可供選擇,基本可以滿足生產需要。具體可以參考Flume用戶文檔.

plugins.d目錄

plugins.d是flume事先約定的存放自定義組件的目錄。flume在啟動的時候會自動將該目錄下的文件添加到classpath下,當然你也可以在flume-ng 啟動時通過指定--classpath,-C <cp>參數將自己的文件手動添加到classpath下。
相關目錄說明:

1
2
3
plugins.d/xxx/lib - 插件jar
plugins.d/xxx/libext - 插件依賴jar
plugins.d/xxx/native - 本地庫文件如 .so文件

攔截器

攔截器(Interceptor)是簡單插件式組件,設置在Source和Source寫入數據的Channel之間。Source接收到的事件在寫入對應的Channel之前,攔截器都可以轉換或刪除這些事件。每個攔截器實例只處理同一個Source接收的事件。攔截器可以基於任意標准刪除或轉換事件,但是攔截器必須返回盡可能多(盡可能少)的事件,如同原始傳遞過來的事件.因為攔截器必須在事件寫入Channel之前完成操作,只有當攔截器已成功轉換事件后,RPC Source(和任何其他可能產生超時的Source)才會響應發送事件的客戶端或Sink。因此盡量不要在攔截器中做大量耗時的處理操作。如果不得已這么處理了,那么需要相應的調整超時時間屬性。Flume自身提供了多種類型的攔截器,比如:時間戳攔截器主機攔截器正則過濾攔截器等等。更多內容可以參考Flume Interceptors

攔截器一般用於分析事件以及在需要的時候丟棄事件。編寫攔截器時,實現者只需要寫以一個實現Interceptor接口的類,同時實現Interceptor$Builder接口的Builer類。所有的Builder類必須有一個公共無參的構造方法,Flume使用該方法來進行實例化。可以使用傳遞到Builder類的Context實例配置攔截器。所有需要的參數都要傳遞到Context實例。下面是時間戳攔截器的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class TimestampInterceptor implements Interceptor {

private final boolean preserveExisting;

/**
* 該構造方法只能被Builder調用
*/
private TimestampInterceptor(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
}

@Override
public void initialize() {
// no-op
}

/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(TIMESTAMP)) {
// we must preserve the existing timestamp
} else {
long now = System.currentTimeMillis();
headers.put(TIMESTAMP, Long.toString(now));
}
return event;
}

/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}

@Override
public void close() {
// no-op
}

/**
* Builder which builds new instances of the TimestampInterceptor.
*/
public static class Builder implements Interceptor.Builder {

private boolean preserveExisting = PRESERVE_DFLT;

@Override
public Interceptor build() {
return new TimestampInterceptor(preserveExisting);
}

//通過Context傳遞配置參數
@Override
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}

}

public static class Constants {
public static String TIMESTAMP = "timestamp";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
}

}

注:

  1. intercept()的兩個方法必須是線程安全的,因為如果source運行在多線程情況下,這些方法可能會被多個線程調用。
  2. 自定義攔截器的配置方式,interceptors type配置的是XXXInterceptor$Builder:

    1
    2
    3
    #自定義攔截器  --producer agent名稱  --src-1 source名稱 —-i1 攔截器名稱 
    producer.sources.src-1.interceptors = i1
    producer.sources.src-1.interceptors.i1.type = com.networkbench.browser.flume.interceptor.MyBrowserInterceptor$Builder
  3. 將自定義代碼打包放置到前面的plugins.d/ext-interceptors(可以自己命名)/lib目錄下,啟動flume時會自動加載該jar到classpath

解析器

Source使用嵌入式的反序列化器讀取監控目錄下的文件(這里以Spooling Directory Source為例),默認的反序列化器是LineDeserializer。該反序列化器會按行讀取文件中的內容,封裝成一個Event消息。默認一次讀取的最大長度是2048個字符,你可以通過如下配置參數設置改值:

1
2
# --producer agent名稱 --src-1 source名稱 
producer.sources.src-1.deserializer.maxLineLength = 20480

因此在使用LineDeserializer時對源文件內容有個粗略的估計,否則,當某行的內容超出最大長度時。該行內容會被截取成兩個部分,封裝成兩個Event發送到channel中。這樣,在某些場景下該行消息相當於非法消息了。如,某個文件按行記錄一個http請求的所有內容,而事先我們無法預知一行http請求的最大長度(當然理論上你可以將maxLineLength設置成一個較大的值,解決該問題)。但是這里要說的是另外一種解決方案,很簡單,參考LineDeserializer實現一個不限制最大長度的解析器(flume之所以這么設計是出於什么角度考慮?)。反序列化器的定義和前面的攔截器基本相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class LineDeserializer implements EventDeserializer {

private static final Logger logger = LoggerFactory.getLogger(LineDeserializer.class);

private final ResettableInputStream in;
private final Charset outputCharset;
private final int maxLineLength;
private volatile boolean isOpen;

public static final String OUT_CHARSET_KEY = "outputCharset";
public static final String CHARSET_DFLT = "UTF-8";

public static final String MAXLINE_KEY = "maxLineLength";
public static final int MAXLINE_DFLT = 2048;

LineDeserializer(Context context, ResettableInputStream in) {
this.in = in;
this.outputCharset = Charset.forName(context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
this.isOpen = true;
}

@Override
public Event readEvent() throws IOException {
ensureOpen();
String line = readLine();
if (line == null) {
return null;
} else {
return EventBuilder.withBody(line, outputCharset);
}
}

...

// TODO: consider not returning a final character that is a high surrogate
// when truncating
private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;

// FIXME: support \r\n
if (c == '\n') {
break;
}

sb.append((char)c);

// 限制最大長度
if (readChars >= maxLineLength) {
logger.warn("Line length exceeds max ({}), truncating line!", maxLineLength);
break;
}
}

if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}

//這里和Interceptor$Builder很像
public static class Builder implements EventDeserializer.Builder {

@Override
public EventDeserializer build(Context context, ResettableInputStream in) {
return new LineDeserializer(context, in);
}

}

}

接下來的步驟和攔截器一致

1
2
3
#自定義解析器
producer.sources.src-1.deserializers = d1
producer.sources.src-1.deserializer = com.networkbench.browser.flume.interceptor.MyLineDeserializer$Builder

source

Flume提供了豐富的source類型,如Avro SourceExec SourceSpooling Directory Source .
這里要說的是實際使用過程中遇到的一個問題。還是前面記錄http請求內容的場景,為了及時分析http請求的數據,我們將記錄http請求的原始文件按照分鍾進行切割,然后移動到spooling directory監控目錄(如/tmp-logs)下。但是由於一些原因,會出現監控目錄下文件重名的情況.

1
2
/tmp-logs/access_2015_10_01_16_30.log.COMPLETED #flume處理完的文件會自動進行重命名.COMPLETED 
/tmp-logs/access_2015_10_01_16_30.log #剛進來的文件

這種情況下后進來的access_2015_10_01_16_30.log,在flume讀取完成后會對其進行重命名,但是該文件名已經被占用了,flume就會拋出如下的異常信息,停止處理該監控目錄下的其他文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
25 九月 2015 16:48:59,228 INFO [pool-22-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348) - Preparing to move file /opt/nginx/tmp_logs/access-2015-09-25-13-51.log to /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
25 九月 2015 16:48:59,229 ERROR [pool-22-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256) - FATAL: Spool Directory source src-1: { spoolDir: /opt/nginx/tmp_logs }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:378)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:330)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:259)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

跟蹤拋出異常的源碼,SpoolDirectorySource會啟動一個線程輪詢監控目錄下的目標文件,當讀取完該文件(readEvents)之后會對該文件進行重名(rollCurrentFile),當重命名失敗時會拋出IllegalStateException,被SpoolDirectoryRunnable catch重新拋出RuntimeException,導致當前線程退出,從源碼看SpoolDirectoryRunnable是單線程執行的,因此線程結束后,監控目錄下其他文件不再被處理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# SpoolDirectorySource 啟動SpoolDirectoryRunnable

executor = Executors.newSingleThreadScheduledExecutor();

File directory = new File(spoolDirectory);
...

Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
//默認500毫秒
executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
...


# SpoolDirectorySource$SpoolDirectoryRunnable.run():

@Override
public void run() {
int backoffInterval = 250;
try {
while (!Thread.interrupted()) {
//這里讀取文件內容,當該文件沒有可讀內容時,會調用ReliableSpoolingFileEventReader.retireCurrentFile()->ReliableSpoolingFileEventReader.rollCurrentFile()
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();

try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now. The " +
"source will try again after " + String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
backoffInterval;
}
continue;
}
backoffInterval = 250;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
}
} catch (Throwable t) {
//異常輸出部分
logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
"Uncaught exception in SpoolDirectorySource thread. " +
"Restart or reconfigure Flume to continue processing.", t);
hasFatalError = true;
//這里將該異常重新封裝成了RuntimeException,導致當前線程退出
Throwables.propagate(t);
}
}

# ReliableSpoolingFileEventReader.rollCurrentFile():

private void rollCurrentFile(File fileToRoll) throws IOException {

File dest = new File(fileToRoll.getPath() + completedSuffix);
logger.info("Preparing to move file {} to {}", fileToRoll, dest);

// Before renaming, check whether destination file name exists
if (dest.exists() && PlatformDetect.isWindows()) {
/*
* If we are here, it means the completed file already exists. In almost
* every case this means the user is violating an assumption of Flume
* (that log files are placed in the spooling directory with unique
* names). However, there is a corner case on Windows systems where the
* file was already rolled but the rename was not atomic. If that seems
* likely, we let it pass with only a warning.
*/
if (Files.equal(currentFile.get().getFile(), dest)) {
logger.warn("Completed file " + dest +
" already exists, but files match, so continuing.");
boolean deleted = fileToRoll.delete();
if (!deleted) {
logger.error("Unable to delete file " + fileToRoll.getAbsolutePath() +
". It will likely be ingested another time.");
}
} else {
String message = "File name has been re-used with different" +
" files. Spooling assumptions violated for " + dest;
throw new IllegalStateException(message);
}

// Dest file exists and not on windows
} else if (dest.exists()) {
//這里拋出目標文件已經存在的異常
String message = "File name has been re-used with different" +
" files. Spooling assumptions violated for " + dest;
throw new IllegalStateException(message);

// Destination file does not already exist. We are good to go!
} else {
boolean renamed = fileToRoll.renameTo(dest);
if (renamed) {
logger.debug("Successfully rolled file {} to {}", fileToRoll, dest);

// now we no longer need the meta file
deleteMetaFile();
} else {
/* If we are here then the file cannot be renamed for a reason other
* than that the destination file exists (actually, that remains
* possible w/ small probability due to TOC-TOU conditions).*/
String message = "Unable to move " + fileToRoll + " to " + dest +
". This will likely cause duplicate events. Please verify that " +
"flume has sufficient permissions to perform these operations.";
throw new FlumeException(message); } } }

現在基本清楚了異常棧的調用邏輯,那么和前面自定義解析器一樣,我們可以重寫ReliableSpoolingFileEventReader以及SpoolDirectorySource的相關實現,也就是自定義一個spooling source,在rollCurrentFile()重命名失敗時,做些處理措施,比如將該文件重新命名為access_2015_10_01_16_30.log(2).COMPLETED(此時文件內容已經讀取完畢了)繼續處理(注意要是.COMPLETED結尾,不然flume會再次讀取該文件)。

改寫完成之后,就和前面自定義解析器的處理步驟一樣了,打包放在plugins.d目錄下,配置:

1
producer.sources.src-1.type = com.networkbench.flume.source.SpoolDirectoryExtSource

總結

基本上flume的各種組件都可以自定義開發,本人使用flume時間也沒多久,截止到目前為止遇到問題還有以下幾個:

消息重發

這個坑其實是自己挖的,當時想當然的理解flume的配置參數#producer.sinks.sink-1.requiredAcks = 1(默認是1),我設置成了10,當時使用的kafka sink,由於某個kafka節點出現了問題(還沒有仔細驗證,是否kafka正常時也會出現該問題?),導致flume一直重發某個時間點的數據,而最新的數據一直被阻塞(可能是被緩存在了channel中)。導致后台接收的一直是某個時間點的消息。后台想到自己改動的這個參數,改回1之后就正常了。下面是官方文檔對該參數的說明:

requiredAcks 1 (默認值) How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.

channel溢出

chanel溢出是因為前面的消息重發導致的,當時使用的channle是File Channel,其中有幾個配置項值得注意:

配置項 默認值 說明
transactionCapacity 10000 單個事務中可以寫入或讀取的事務的最大數量
maxFileSize 2146435071 每個數據文件的最大大小(字節),一旦文件達到這個大小(或一旦寫入下個文件達到這個大小),該文件保存關閉並在那個目錄下創建一個新的數據文件。如果此值設置為高於默認值,仍以默認值為准
minimumRequiredSpace 524288000 channel繼續操作時每個卷所需的最少空間(字節),如果任何一個掛載數據目錄的卷只有這么多空間剩余,channel將停止操作來防止損壞和避免不完整的數據被寫入
capacity 1000000 channel可以保存的提交事件的最大數量
keep-alive 3 每次寫入或讀取應該等待完成的最大的時間周期(秒)

前面的channel溢出推測就是由capacity的達到了限制造成的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM