一個flume agent異常的解決過程記錄


今天在使用flume agent的時候,遇到了一個異常,  現把解決的過程記錄如下:

問題的背景:

我使用flume agent 來接收從storm topology發送下來的accesslog , 做本地文件落盤。flume配置文件如下:

#用於syslog和accesslog的本地文件滾動。
 
a1.sources=r1
a1.sinks = sink1
a1.channels = c1
 
#thrift source;
a1.sources.r1.type= thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 8081
 
 
 
#先用內存通道測試。
#a1.channels.c1.type = memory
#a1.channels.c1.capacity = 10000000
#a1.channels.c1.byteCapacity = 524288000
 
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/Users/dongqingswt/workdir/logs/checkpoint
a1.channels.c1.dataDirs = /Users/dongqingswt/workdir/logs/datadir
 
 
 
#使用自定義的文件滾動sink
a1.sinks.sink1.type=  com.beibei.datahamal.flume.extend.sink.roll.FileRollSink
a1.sinks.sink1.channel = c1
 
 
 
 
啟動flume agent以后, thrift source在本機建立套接字監聽,然后用flume-ng-sdk提供的RpcClientFactory 創建thrift rpc client , 進行消息發送的壓測。
發現rpcclient 發送消息到一定量(100000) 時, FileRollSink就無法從memory channel拉取數據了, 而thrift rpc client 也得到了如下異常:
Exception in thread "main" org.apache.flume.EventDeliveryException: Failed to send event.
at org.apache.flume.api.ThriftRpcClient.append(ThriftRpcClient.java:155)
at com.beibei.datahamal.flume.extend.sink.rolling.FilerRollSinkSender.main(FilerRollSinkSender.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:201)
at org.apache.flume.api.ThriftRpcClient.append(ThriftRpcClient.java:134)
... 6 more

 查看源碼, 發現thrift client有一個20s的請求超時,這時候 , 查看thrift 錯誤日志,發現了如下異常:

 

java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method) ~[na:1.7.0_45]
at java.lang.Thread.start(Thread.java:713) ~[na:1.7.0_45]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:949) ~[na:1.7.0_45]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1360) ~[na:1.7.0_45]
at org.apache.thrift.server.TThreadedSelectorServer.requestInvoke(TThreadedSelectorServer.java:306) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:210) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.select(TThreadedSelectorServer.java:586) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:541) ~[libthrift-0.9.3.jar:0.9.3]

 

 

直觀的看這個異常,會讓人想到是堆內存不夠用, 調節-Xmx1024m 參數以后,問題依舊。

接着用jconsole attach到flume agent這個進程,查看堆內存的使用量, 遠沒有達到預設的xmx閾值。

但是·有一個表現就是線程數猛漲到2000以后,直接掉到了20左右,為什么會創建這么多線程呢, 都是什么線程呢?

 

結合thrift source 的源碼, 發現thrift server 采用的也是java nio   , 有SelectorThread做socket的read/write 就緒select 

AcceptorThread做socket的accept的select ,而socket讀就緒以后,收到的FrameBuffer會被包裝成一個Runnable丟到線程池處理(查看

TThreadedSelectorServer的304 行),代碼如下:
protected boolean requestInvoke(FrameBuffer frameBuffer) {
Runnable invocation = getRunnable(frameBuffer);
if (invoker != null) {
try {
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
} else {
// Invoke on the caller's thread
invocation.run();
return true;
}
}

這個任務invocation的處理邏輯是在ThriftSource的ThriftSourceHandler定義的,也就是把thrift flume event直接丟到memchannel以后返回。起初 ,我懷疑是不是flume event丟到memchannel處理太慢(比如有線程死鎖),導致線程堆積, 但后面換成file channel
問題依舊,於是繼續看jconsole上thrift 的Flume 線程,因為線程工廠在創建線程的時候,指定了線程名:
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
"Flume Thrift IPC Thread %d").build();
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}

所以, 查看這些線程發現這些線程:
堆棧跟蹤:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
 
說明線程池中的這些線程都在等任務工作, 既然大量的線程都在等任務,為什么還要創建這么多線程呢?是不是因為超過了單進程最大可建立的線程數呢,
結合ThriftSource源碼,發現不指定最大線程數時, thrift server的線程池的確是不停的新建線程,而maxThreads又是一個Integer.MAX_VALUE,
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}


試着在flume 配置文件中指定最大線程數:
a1.channels.r1.threads=10

問題解決。
 這個問題的解決也讓我們反思線程池的使用,不要設定太大的最大線程數。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM