使用Flink時遇到的問題(不斷更新中)


1.啟動不起來

查看JobManager日志:

WARN  org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@t-sha1-flk-01:6123/), Path(/user/jobmanager)]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
    at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
    at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
    at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
    at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
    at java.lang.Thread.run(Thread.java:748)

 解決方案:/etc/hosts中配置的主機名都是小寫,但是在Flink配置文件(flink-config.yaml、masters、slaves)中配置的都是大寫的hostname,將flink配置文件中的hostname都改為小寫或者IP地址

 

2.運行一段時間退出

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 4 for operator Compute By Event Time (1/12).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator Compute By Event Time (1/12).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
        ... 7 more
    Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
        at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
        at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:144)
        at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:125)
        at org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
        at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        ... 1 more
    [CIRCULAR REFERENCE:java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.]

 解決方案:

   狀態存儲,默認是在內存中,改為存儲到HDFS中:

state.backend.fs.checkpointdir: hdfs://t-sha1-flk-01:9000/flink-checkpoints

 

3.長時間運行后,多次重啟

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1488 for operator Compute By Event Time -> (MonitorData, MonitorDataMapping, MonitorSamplingData) (6/6).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 1488 for operator Compute By Event Time -> (MonitorData, MonitorDataMapping, MonitorSamplingData) (6/6).
... 6 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink-checkpoints/8c274785f1ab027e6146a59364be645f/chk-1488/2c612f30-c57d-4ede-9025-9554ca11fd12 could only be replicated to 0 nodes instead of minReplication (=1). There are 3 datanode(s) running and no node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)

 查看hdfs日志,

WARN org.apache.hadoop.hdfs.protocol.BlockStoragePolicy: 
Failed to place enough replicas: expected size is 2 but only 0 storage types can be selected
(replication=3, selected=[], unavailable=[DISK], removed=[DISK, DISK],
policy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]})

  搭建的Flink使用HDFS作為CheckPoint的存儲,當flink重啟時,原來的checkpoint沒有用了,我就手動給刪了,不知道和這個有沒有關系,為了不繼續報異常,便重啟了Flink、HDFS,重啟后不再有異常信息了。

   但是查看HDFS日志時,發現如下警告(不合規范的URI格式):

WARN org.apache.hadoop.hdfs.server.common.Util:
Path /mnt/hadoop/dfs/name should be specified as a URI in configuration files.
Please update hdfs configuration

   原來是配置錯了,hdfs-site.xml中的

    <property>
      <name>dfs.namenode.name.dir</name>
      <value>/mnt/hadoop/dfs/name</value>
    </property>

應該改為:

    <property>
      <name>dfs.namenode.name.dir</name>
      <value>file:/mnt/hadoop/dfs/name</value>
    </property>

  至此問題解決,根上的問題應該是hdfs-site.xml配置的不對導致的。 

4.Unable to load native-hadoop library for your platform

  Flink啟動時,有時會有如下警告信息:

WARN  org.apache.hadoop.util.NativeCodeLoader    

- Unable to load native-hadoop library for your platform...

using builtin-java classes where applicable

  參考資料1:http://blog.csdn.net/jack85986370/article/details/51902871

  解決方案:編輯/etc/profile文件,增加

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native  

export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"  

  未能解決該問題

 5.hadoop checknative -a 

WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version
INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Native library checking:
hadoop:  true /usr/hadoop-2.7.3/lib/native/libhadoop.so.1.0.0
zlib:    true /lib64/libz.so.1
snappy:  false 
lz4:     true revision:99
bzip2:   false 
openssl: false Cannot load libcrypto.so (libcrypto.so: cannot open shared object file: No such file or directory)!
INFO util.ExitUtil: Exiting with status 1

    參考資料:http://blog.csdn.net/zhangzhaokun/article/details/50951238

  解決方案 

 cd /usr/lib64/
 ln -s libcrypto.so.1.0.1e libcrypto.so

 6.TaskManager退出

   Flink運行一段時間后,出現TaskManager退出情況,通過jvisualvm抓取TaskManager的Dump,使用MAT進行分析,結果如下:

  

One instance of "org.apache.flink.runtime.io.network.buffer.NetworkBufferPool"
loaded by "sun.misc.Launcher$AppClassLoader @ 0x6c01de310" occupies 403,429,704 (76.24%) bytes.
The memory is accumulated in one instance of "java.lang.Object[]" loaded by "<system class loader>". Keywords sun.misc.Launcher$AppClassLoader @ 0x6c01de310 java.lang.Object[] org.apache.flink.runtime.io.network.buffer.NetworkBufferPool

 發現是網絡緩沖池不足,查到一篇文章:

 https://issues.apache.org/jira/browse/FLINK-4536

 和我遇到的情況差不多,也是使用了InfluxDB作為Sink,最后在Close里進行關閉,問題解決。

 另外,在$FLINK_HOME/conf/flink-conf.yaml中,也有關於TaskManager網絡棧的配置,暫時未調整

# The number of buffers for the network stack.
#
# taskmanager.network.numberOfBuffers: 2048

 7.Kafka partition leader切換導致Flink重啟

現象:

 7.1 Flink重啟,查看日志,顯示:

java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:280)
  at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
  at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

7.2 查看Kafka的Controller日志,顯示:

 INFO [SessionExpirationListener on 10], ZK expired; shut down all controller components and 

try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)

7.3 設置retries參數

   參考:http://colabug.com/122248.html 以及 Kafka官方文檔(http://kafka.apache.org/082/documentation.html#producerconfigs),關於producer參數設置

   設置了retries參數,可以在Kafka的Partition發生leader切換時,Flink不重啟,而是做3次嘗試:

        kafkaProducerConfig
          {
                "bootstrap.servers": "192.169.2.20:9093,192.169.2.21:9093,192.169.2.22:9093"
                "retries":3
          }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM