1. 自定義Sink寫入hbase?
使用的是原生的hbase客戶端,可以自己控制每多少條記錄刷新一次。遇到了幾個坑導致數據寫不到hbase里邊去:
- 集群hbase版本和客戶端版本不一致(版本1和版本2相互之間會有沖突)
- Jar包沖突
例如protobuf-java版本沖突,常見的是兩個關鍵錯誤,java.io.IOException: java.lang.reflect.InvocationTargetException 和 Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.protobuf.ProtobufUtil。
2. Flink 消費Kafka偏移量
Flink讀寫Kafka,如果使用Consumer08的話,偏移量會提交Zk,下邊這個配置可以寫在Conf文件中,提交偏移量的Zk可以直接指定。Consumer09以后版本就不向Zk提交了,Kafka自己會單獨搞一個Topic存儲消費狀態。
1 xxxx08 { 2 bootstrap.servers = "ip:9092" 3 zookeeper.connect = "ip1:2181,ip2/vio" 4 group.id = "group1" 5 auto.commit.enable = true 6 auto.commit.interval.ms = 30000 7 zookeeper.session.timeout.ms = 60000 8 zookeeper.connection.timeout.ms = 30000 9 }
1 final Properties consumerProps = ConfigUtil 2 .getProperties(config, “xxxx08");// 使用自己編寫的Util函數讀取配置 3 4 final FlinkKafkaConsumer08<String> source = 5 new FlinkKafkaConsumer08<String>(topic, new SimpleStringSchema(), consumerProps);
3. Flink 的日志打印
Flink打印日志的時候,日志打印到哪,日志文件是不是切塊,並不是在工程resource下配置文件里指定的!!!而是在flink/conf中指定的,比如我安裝的Flink On Yarn模式,只需要在安裝的機器上flink/conf文件夾下修改對應的配置文件即可,如下:
具體可以參考:Flink日志配置
4. Flink 的akka時間超時
這個問題比較常見,碰見過兩次,總結下:
首先是集群機器負載比較高,有的機器負載百分之幾萬都有,在這時候taskmanager、jobmanager就會報akka超時的異常,可以適當增大akka超時時間扛過這段時間;
然后最常見的是程序里調用外部接口,延遲較高,有的是5秒甚至10秒,這種時候akka就會超時
5. Flink 的讀HDFS寫Kafka
flink讀hdfs的時候用了DataSet,自己在中間map里邊已經寫到kafka里邊了,所以不想要sink,但flink要求必須有sink,所以只能加個.output(new DiscardingOutputFormat<>()),這樣對程序不會造成影響。
6. 本地測試Flink
本地測試Flink偶爾會報錯,記錄下:
(1)本地Apache flink集群沒有運行,會報下面連接被拒絕的錯誤,你只需要啟動它:./bin/start-cluster.sh
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel $ AnnotatedConnectException:拒絕連接:localhost / 127.0.0.1:8081
7. Flink on YARN
8. Flink並行度設置

java.io.IOException: Insufficient number of network buffers: required 4, but only 0 available. The total number of network buffers is currently set to 6472 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
at java.lang.Thread.run(Thread.java:748)
並行度設置不合理,按報的錯設置即可:
source.parallelism = 40
map.parallelism = 40
sink.parallelism = 40
9. Flink常見報錯
- java.lang.Exception: Container released on a lost node
異常原因是 Container 運行所在節點在 YARN 集群中被標記為 LOST,該節點上的所有 Container 都將被 YARN RM 主動釋放並通知 AM,JobManager 收到此異常后會 Failover 自行恢復(重新申請資源並啟動新的 TaskManager),遺留的 TaskManager 進程可在超時后自行退出
- Could not build the program from JAR file.
這個問題的迷惑性較大,很多時候並非指定運行的 JAR 文件問題,而是提交過程中發生了異常,需要根據日志信息進一步排查。
10. Flink消費Kafka單條數據過大起Lag
可以在kafka consumer中設置下列參數:
pro.put("fetch.message.max.bytes", "8388608"); pro.put("max.partition.fetch.bytes", "8388608");