flink1.12连接hive修改并行度


flink1.12连接hive修改并行度

在HiveTableSource 中. getDataStream 方法:

		int parallelism = 0;
			int splitNum = new HiveParallelismInference(tablePath, flinkConf)
				.infer(
					() -> HiveSourceFileEnumerator.getNumFiles(allHivePartitions, jobConf),
					() -> HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions, jobConf).size())
				.limit(limit);
			// sql-client-defaults.yaml  max-parallelism: 30
			int max = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
			// 传进来的参数
			// sql-client-defaults.yaml  中的 parallelism: 20
			int hiveParallelism = source.getExecutionConfig().getParallelism();
			parallelism = Math.min(splitNum, max);
			parallelism = Math.min(parallelism, hiveParallelism);
			return source.setParallelism(parallelism);

下载 flink1.12 版本的源码.

用idea打开, 修改后打包flink-connector-hive_2.11 module即可. 上传jar包.


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM