Flink開發中的問題


1. 流與批處理的區別

  • 流處理系統

流處理系統,其節點間數據傳輸的標准模型是:當一條數據被處理完成后,序列化到緩存中,然后立刻通過網絡傳輸到下一個節點,由下一個節點繼續處理。

  • 批處理系統

批處理系統,其節點間數據傳輸的標准模型是:當一條數據被處理完成后,序列化到緩存中,並不會立刻通過網絡傳輸到下一個節點,當緩存寫滿,就持久化到本地硬盤上,當所有數據都被處理完成后,才開始將處理后的數據通過網絡傳輸到下一個節點。

  • flink的流處理和批處理

Flink的執行引擎采用了一種十分靈活的方式,同時支持了這兩種數據傳輸模型:
• Flink以固定的緩存塊為單位進行網絡數據傳輸,用戶可以通過設置緩存塊超時值指定緩存塊的傳輸時機。如果緩存塊的超時值為0,則Flink的數據傳輸方式類似上文所提到流處理系統的標准模型,此時系統可以獲得最低的處理延遲
• 如果緩存塊的超時值為無限大,則Flink的數據傳輸方式類似上文所提到批處理系統的標准模型,此時系統可以獲得最高的吞吐量
• 同時緩存塊的超時值也可以設置為0到無限大之間的任意值。緩存塊的超時閾值越小,則Flink流處理執行引擎的數據處理延遲越低,但吞吐量也會降低,反之亦然。通過調整緩存塊的超時閾值,用戶可根據需求靈活地權衡系統延遲和吞吐量

原文鏈接:https://blog.csdn.net/shujuelin/article/details/89351157

2. 恢復作業 checkpoint

檢查點(checkpoint)的目錄是依賴JobID的,每次運行任務都是一個唯一的JobID(好像不能手動設置),所以要找到上一次任務的JobID才能找到檢查點。
保存點(savepoint)需要手動觸發,而且在指定目錄下還生成一個唯一的子目錄。

# savepoint
flink run -s /tmp/state.backend/s1/savepoint-17b840-2cfe3bd5bc0c -c flink.HelloWorld target/scala-flink-0.1.jar

# checkpoint
flink run -s /tmp/state.backend/17b840a3d2221b1400ec03f7e3949b17/chk-960 -c flink.HelloWorld target/scala-flink-0.1.jar

檢查點和保存點的恢復方法一樣的

3. 用流處理批數據,最后一個窗口不計算

  • 現象

    用流處理,處理kafka里面的數據時,最后一個窗口會不關閉.導致最后的數據會丟失.

  • 原因

    最后一個窗口的水位線還沒到 窗口關閉時間.

  • 解決方案

    自定義觸發器.以機器時鍾為准,5秒觸發一次.

5. flink 消費kafka的多個topic

  1. 傳入 List topics , kafka 支持 多個topic.

  2. 多個kafka消費,然后用union 連接.

8.Flink state 調優跟注意點

https://blog.csdn.net/qq_31866793/article/details/97272103

9 Flink1.8.0重大更新-Flink中State的自動清除詳解

https://blog.csdn.net/u013411339/article/details/90625604

10. 內存溢出

  • 現象

    yang gc 時間達到30秒,fullgc 很少發生.

11 linux 內存過多

運行sync將dirty的內容寫回硬盤
sync

通過修改proc系統的drop_caches清理free的cache
echo 1 > /proc/sys/vm/drop_caches

13 ask timeout

增加參數

akka.ask.timeout: 100s
web.timeout: 300000

參看:https://www.cnblogs.com/createweb/p/12027737.html

14 Container exited with a non-zero exit code 143

at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

16 Flink 清理過期 Checkpoint 目錄的正確姿勢

https://www.jianshu.com/p/165a1bf33e4a

17 flink 內存越來越大,越來越慢

將窗口滑動時間由1分鍾改為10分鍾

18. flink 與 kafka

consumer.setStartFromEarliest();     //從最早的數據開始消費
consumer.setStartFromLatest();       //從最新的數據開始消費
consumer.setStartFromTimestamp(...); //從根據指定的時間戳(ms)處開始消費
consumer.setStartFromGroupOffsets(); //默認從提交的 offset 開始消費

反序列化用 KafkaDeserializationSchema 可以獲取到topic的信息

public class ConsumerRecord<K, V> {
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final long checksum;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final K key;
    private final V value;
}

21 集群啟動

./hadoop-daemon.sh start journalnode
./hadoop-daemon.sh start zkfc
./hadoop-daemon.sh start datanode
./hadoop-daemon.sh start namenode
./yarn-daemon.sh start nodemanager
./yarn-daemon.sh start resourcemanager

./bin/kafka-server-start.sh -daemon ./config/server.properties
./zkServer.sh start




21 flink Reduce、GroupReduce、GroupCombine筆記

reduce

應用於分組DataSet的Reduce轉換使用用戶定義的reduce函數將每個組減少為單個元素。對於每組輸入元素,reduce函數連續地將元素對組合成一個元素,直到每個組只剩下一個元素。

注意,對於ReduceFunction,返回對象的key字段應與輸入值匹配。這是因為reduce是可隱式組合(combine)的,並且從combine運算符發出的對象在傳遞給reduce運算符時再次按key分組。

GroupReduce

應用於分組DataSet的GroupReduce調用用戶定義的group-reduce函數轉換每個分組。
這與Reduce的區別在於用戶定義的函數會立即獲得整個組。在組的所有元素上使用Iterable調用該函數,並且可以返回任意數量的結果元素

GroupCombine 分組連接 (少用)

該策略可能不會一次處理所有數據,而是以多個步驟處理

GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入類型I組合到任意輸出類型O.
相反,GroupReduce中的組合步驟僅允許從輸入類型I到輸出類型I的組合。這是因為reduce步驟中,GroupReduceFunction期望輸入類型為I.

在一些應用中,期望在執行附加變換(例如,減小數據大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。

注意:分組數據集上的GroupCombine在內存中使用貪婪策略執行,該策略可能不會一次處理所有數據,而是以多個步驟處理。
它也可以在各個分區上執行,而無需像GroupReduce轉換那樣進行數據交換。這可能會導致輸出的是部分結果,
所以GroupCombine是不能替代GroupReduce操作的,盡管它們的操作內容可能看起來都一樣。

22flink 歷史服務器

修改歷史服務器配置

org.apache.flink.configuration.HistoryServerOptions

historyserver.web.tmpdir  文件地址.

23 Could not deploy Yarn job cluster

新增:

flink-conf.yaml:rest.port: 8082

24 Flink:Could not forward element to next operator

前后時間窗口不一致導致的.

25flink報錯org.apache.commons.cli.Option.builder

刪除$FLINK_HOME/lib下面的/commons-cli-1.4.jar

26 Flink中的序列化失敗問題

聲明為@transent

27 Line could not be encoded

Caused by: java.lang.RuntimeException: Line could not be encoded: [49, 56, 49, 49, 90, 77, 119, 66, 54, 48, 54, 71, 48, 53, 55, 50, 48, 49, 53, 48, 56, 48, 53, 49, 56, 52, 52, 48, 56, 109, 49, 106, 124, -26, -84, -94, -24, -65, -114, -28, -67, -65, -25]
	at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:127)
	at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:38)
	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.charset.MalformedInputException: Input length = 1
	at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
	at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:816)
	at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:117)
	... 6 more

解決方案:

Configuration conf = new Configuration();
conf.setBoolean("recursive.file.enumeration", true);
TextValueInputFormat inputFormat = new TextValueInputFormat(new Path(path));
inputFormat.setSkipInvalidLines(true);

28 Embedded metastore is not allowed

解決方案:flink 集成 hive 時 不支持embedded metastore的,配置hive時 需要起一個hive metastore 並在conf文件配置 hive.metastore.uris

29 flink實戰--開發中常見的錯誤與問題

https://blog.csdn.net/aa518189/article/details/103622261

30 Exceeded checkpoint tolerable failure threshold.

重啟


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM