Flink批处理读写Hive




import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 * @Auther WeiJiQian
 * @描述  
 */
public class FlinkReadHiveAndWriteHive {


    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);
        String name = "myhive";      // Catalog名称,定义一个唯一的名称表示
        String defaultDatabase = "test";  // 默认数据库名称
        String hiveConfDir = "/data/apache-hive-2.3.6-bin/conf";  // hive-site.xml路径
        String version = "2.3.6";       // Hive版本号

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        StatementSet statementSet = tableEnv.createStatementSet();

        tableEnv.registerCatalog(name, hive);
        tableEnv.useCatalog(name);
		// 把stu77 的数据写入stu88 里面,中间可以加自己的逻辑.
        Table sqlResult = tableEnv.sqlQuery("select name,age from test.stu77");
        statementSet.addInsert("test.stu88",sqlResult);
        statementSet.execute();
    }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM