flink1.12連接hive修改並行度


flink1.12連接hive修改並行度

在HiveTableSource 中. getDataStream 方法:

		int parallelism = 0;
			int splitNum = new HiveParallelismInference(tablePath, flinkConf)
				.infer(
					() -> HiveSourceFileEnumerator.getNumFiles(allHivePartitions, jobConf),
					() -> HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions, jobConf).size())
				.limit(limit);
			// sql-client-defaults.yaml  max-parallelism: 30
			int max = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
			// 傳進來的參數
			// sql-client-defaults.yaml  中的 parallelism: 20
			int hiveParallelism = source.getExecutionConfig().getParallelism();
			parallelism = Math.min(splitNum, max);
			parallelism = Math.min(parallelism, hiveParallelism);
			return source.setParallelism(parallelism);

下載 flink1.12 版本的源碼.

用idea打開, 修改后打包flink-connector-hive_2.11 module即可. 上傳jar包.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM