問題一:org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired
解決方法:
先將flume的JVM內存改為512M
capacity = 1000
transactionCapacity = 1000
keep-alive = 30
問題二:Unable to deliver event. Exception follows
請求超時,導致發送event失敗
解決:
設置request-timeout長一點,默認20秒
問題三:flume常用source問題總結
一、SpoolDirectorySource
作用:
監測配置的目錄下新增的文件,並將文件中的數據讀取出來。
問題:
1.spool目錄下的文件不可以再打開編輯;spool目錄下不可包含相應的子目錄
2.SpoolSource無法實現實時的收集數據
解決:log4j TimeRolling的插件,分割機制設為1分鍾一次,需改log4j配置文件
3.無法使用TimeRolling插件,手工copy到監控目錄,文件過大,出現邊讀邊寫錯誤
解決:copy文件增加tmp后綴,source配置忽略tmp后綴文件,copy成功后,去掉tmp后綴
4.該源需要一個清理進程來定期移除完成的文件。
5.通道可選地將一個完成路徑的原始文件插入到每個事件的hearder域中。在讀取文件時,source緩存文件數據到內存中。同時,需要確定設置了bufferMaxLineLength選項,以確保該數據遠大於輸入數據中數據最長的某一行。
6.channel只接收spooling directory中唯一命名的文件。如果文件名重復或文件在讀取過程中被修改,則會有讀取失敗返回異常信息。這種場景下,同名的文件復制到這個目錄時建議帶唯一標示,比如時間戳。
二、Exec Source
1.當發生日志輪轉的時候,因為tail -f命令打開的還是原來的文件描述符,所以就無法獲取到當天新日志文件的內容
解決:tail命令的–retry選項會定期檢查文件名對應的文件描述符的變化;使用-F 相當於 -f=filename –retry
解釋:使用–follow (-f)時,tail默認后接文件描述符, 這意味着即使tail顯示的文件改名了,tail仍然可以追蹤到其末尾部分. 如果你確實希望查詢文件的實際名而非文件描述符 (例如,日志輪轉時), 這種默認的操作就不是你所期望的了.
2.flume-ng進程停止后,tail進程沒停止
解決:使用tail –pid $PID ,–pid 表示如果PID進程結束則tail也會跟着結束,所以PID可以設置為flume進程
3.沒有辦法做到生產速率控制
解決:用Java程序實現了“模擬tail -F”的功能,主要是使用ExceSource,定時修改Flume的配置文件
4.Flume不運行或者指令執行出錯時,無法斷點續傳以及恢復(單點故障)
解決:二次開發tailSource或者配置復雜均衡或者使用flume1.7新增加的tailDirSource
5.Flume提供快速恢復的機制,但是默認竟然是關閉的!
解決:設置三個參數:restartThrottle,restart,logStdErr解決
原文鏈接:https://blog.csdn.net/qq_23146763/article/details/54850881
6、
記flume部署過程中遇到的問題以及解決方法
現將調試過程中遇到的問題以及解決方法記錄如下:
1、 [ERROR - org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:484)] Unexpected throwable while invoking!
java.lang.OutOfMemoryError: Java heap space
原因:flume啟動時的默認最大的堆內存大小是20M,實際環境中數據量較大時,很容易出現OOM問題,在flume的基礎配置文件conf下的flume-env.sh中添加
export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
並且在flume啟動腳本flume-ng中,修改JAVA_OPTS="-Xmx20m"為JAVA_OPTS="-Xmx2048m"
此處我們將堆內存的閾值跳轉到了2G,實際生產環境中可以根據具體的硬件情況作出調整
2、 [ERROR - org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:544)] run() exiting due to uncaught error
java.lang.OutOfMemoryError: unable to create new native thread
原因:如果App給flume的thrift source發送數據時,采用短連接,會無限地創建線程,使用命令 pstree 時發現java的線程數隨着發送數據量的增長在不停增長,最終達到了65500多個,超過了linux系統對線程的限制,解決方法是在thrift source配置項中增加一個線程數的限制。
agent.sources.r1.threads = 50
重新啟動agent發現java的線程數達到70多就不再增長了
3、 Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
原因:這是memory channel被占滿導致的錯誤,memory channel默認最多只緩存100條數據,在生產環境中明顯不夠,需要將capacity參數加大
4、warn:"Thrift source %s could not append events to the channel."。
原因:查看flume的配置文檔可以發現,各種類型的sink(thrift、avro、kafka等)的默認batch-size都是100,file channel、memory channel的transactioncapacity默認也都是100,如果修改了sink的batch-size,需要將batch-size設置為小於等於channel的transactioncapacity的值,否則就會出現上面的warn導致數據無法正常發送
5、agent處報
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Failed to send batch
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:315)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Exception thrown from remote handler
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:397)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:374)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:303)
... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Connection reset by peer
at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:389)
... 6 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:59)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
collector報
2017-08-21 16:36:43,010 (New I/O worker #12) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 349070535 items! Connection closed.
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:422)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:478)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:366)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:721)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:111)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:66)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at org.jboss.netty.channel.Channels.close(Channels.java:820)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:202)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:378)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:533)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
原因:當agent到collector的數據在agent的avro sink處進行壓縮時,在collector的avro source處必須解壓,否則數據無法發送
6、org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {ssp_package-0=388595} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.
2017-10-11 01:30:10,000 (PollableSourceRunner-KafkaSource-r1) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {}
原因:配置kafka source時,flume作為kafka的consumer,在consumer消費kafka數據時,默認最大文件大小是1m,如果文件大小超過1m,需要手動在配置里面調整參數,
但是在flume官網的配置說明-kakka source中,並沒有找到配置fetch size的地方,但是在配置的最后一行有一個
Other Kafka Consumer Properties--These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.consumer. For example: kafka.consumer.auto.offset.reset
此處配置用的是kafka的配置方法,在kafka官網的配置文檔-consumer configs-max.partition.fetch.bytes有相關說明
agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 10240000
此處將consumer的fetch.byte加到10m
7、2017-10-13 01:19:47,991 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:240)] Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
原因:與上一點類似,此處是kafka sink時,flume作為producer,也要設置文件的fetch大小,同樣是參考kafka官網的配置
agent.sinks.k1.kafka.producer.max.request.size = 10240000
8、java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at org.mortbay.jetty.nio.SelectChannelConnector$1.acceptChannel(SelectChannelConnector.java:75)
at org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:686)
at org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192)
at org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708)
at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
原因:文件句柄占用太多,首先查看flume占用句柄個數
lsof -p pid | wc -l
pid是flume進程號,
vim /etc/security/limits.conf
在最后加入
* soft nofile 4096
* hard nofile 4096
最前的 * 表示所有用戶,改完后重啟下flume服務
9、(kafka-producer-network-thread | producer-1) [ERROR - org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:130)] Uncaught
error in kafka producer I/O thread:
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:744)
原因:kafka集群版本較老,flume版本較新,此處kafka使用的版本是較老的0.8.2, flume使用1.7則會報上述錯誤,只能將flume降為1.6版本
9、sink到kafka上的數據沒有均勻的分布在各個partition上,而是全部放在了同一個partition上
原因:這是老版本flume遺留下的一個bug,需要在event中構造一個包含key為 key 的header 鍵值對就能達到目的
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
真正沒有隨機的原因本文並沒有直接去找到,是借助另一種方式解決了問題
————————————————
版權聲明:本文為CSDN博主「小麒麟666」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/lijinqi1987/article/details/77449889
現將調試過程中遇到的問題以及解決方法記錄如下:
1、 [ERROR - org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:484)] Unexpected throwable while invoking!
java.lang.OutOfMemoryError: Java heap space
原因:flume啟動時的默認最大的堆內存大小是20M,實際環境中數據量較大時,很容易出現OOM問題,在flume的基礎配置文件conf下的flume-env.sh中添加
export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
並且在flume啟動腳本flume-ng中,修改JAVA_OPTS="-Xmx20m"為JAVA_OPTS="-Xmx2048m"
此處我們將堆內存的閾值跳轉到了2G,實際生產環境中可以根據具體的硬件情況作出調整
2、 [ERROR - org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:544)] run() exiting due to uncaught error
java.lang.OutOfMemoryError: unable to create new native thread
原因:如果App給flume的thrift source發送數據時,采用短連接,會無限地創建線程,使用命令 pstree 時發現java的線程數隨着發送數據量的增長在不停增長,最終達到了65500多個,超過了linux系統對線程的限制,解決方法是在thrift source配置項中增加一個線程數的限制。
agent.sources.r1.threads = 50
重新啟動agent發現java的線程數達到70多就不再增長了
3、 Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
原因:這是memory channel被占滿導致的錯誤,memory channel默認最多只緩存100條數據,在生產環境中明顯不夠,需要將capacity參數加大
4、warn:"Thrift source %s could not append events to the channel."。
原因:查看flume的配置文檔可以發現,各種類型的sink(thrift、avro、kafka等)的默認batch-size都是100,file channel、memory channel的transactioncapacity默認也都是100,如果修改了sink的batch-size,需要將batch-size設置為小於等於channel的transactioncapacity的值,否則就會出現上面的warn導致數據無法正常發送
5、agent處報
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Failed to send batch
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:315)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Exception thrown from remote handler
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:397)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:374)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:303)
... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Connection reset by peer
at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:389)
... 6 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:59)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
collector報
2017-08-21 16:36:43,010 (New I/O worker #12) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 349070535 items! Connection closed.
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:422)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:478)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:366)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:721)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:111)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:66)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at org.jboss.netty.channel.Channels.close(Channels.java:820)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:202)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:378)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:533)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
原因:當agent到collector的數據在agent的avro sink處進行壓縮時,在collector的avro source處必須解壓,否則數據無法發送
6、org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {ssp_package-0=388595} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.
2017-10-11 01:30:10,000 (PollableSourceRunner-KafkaSource-r1) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {}
原因:配置kafka source時,flume作為kafka的consumer,在consumer消費kafka數據時,默認最大文件大小是1m,如果文件大小超過1m,需要手動在配置里面調整參數,
但是在flume官網的配置說明-kakka source中,並沒有找到配置fetch size的地方,但是在配置的最后一行有一個
Other Kafka Consumer Properties--These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.consumer. For example: kafka.consumer.auto.offset.reset
此處配置用的是kafka的配置方法,在kafka官網的配置文檔-consumer configs-max.partition.fetch.bytes有相關說明
agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 10240000
此處將consumer的fetch.byte加到10m
7、2017-10-13 01:19:47,991 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:240)] Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
原因:與上一點類似,此處是kafka sink時,flume作為producer,也要設置文件的fetch大小,同樣是參考kafka官網的配置
agent.sinks.k1.kafka.producer.max.request.size = 10240000
8、java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at org.mortbay.jetty.nio.SelectChannelConnector$1.acceptChannel(SelectChannelConnector.java:75)
at org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:686)
at org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192)
at org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708)
at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
原因:文件句柄占用太多,首先查看flume占用句柄個數
lsof -p pid | wc -l
pid是flume進程號,
vim /etc/security/limits.conf
在最后加入
* soft nofile 4096
* hard nofile 4096
最前的 * 表示所有用戶,改完后重啟下flume服務
9、(kafka-producer-network-thread | producer-1) [ERROR - org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:130)] Uncaught
error in kafka producer I/O thread:
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:744)
原因:kafka集群版本較老,flume版本較新,此處kafka使用的版本是較老的0.8.2, flume使用1.7則會報上述錯誤,只能將flume降為1.6版本
9、sink到kafka上的數據沒有均勻的分布在各個partition上,而是全部放在了同一個partition上
原因:這是老版本flume遺留下的一個bug,需要在event中構造一個包含key為 key 的header 鍵值對就能達到目的
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
真正沒有隨機的原因本文並沒有直接去找到,是借助另一種方式解決了問題
原文鏈接:https://blog.csdn.net/mingming156/article/details/103100829