項目實戰 從 0 到 1 學習之Flink (23)Flink 讀取hive並寫入hive


1,讀取實現了,也是找的資料,核心就是實現了

HCatInputFormat
HCatInputFormatBase

上面這兩個類,底層也是 繼承實現了 RichInputFormat:

public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryabl

百度下載這個jar,然后把類找出來


 

依賴:(大概是這些)

<!--flink_hive依賴-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-fs</artifactId>
    <version>1.6.2</version>
</dependency>
 
<dependency>
    <groupId>com.jolbox</groupId>
    <artifactId>bonecp</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>
 
<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>parquet-hive-bundle</artifactId>
    <version>1.6.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.1.0</version>
</dependency>
 
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>2.1.0</version>
</dependency>
 
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-cli</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-common</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-service</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-shims</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.hive.hcatalog</groupId>
    <artifactId>hive-hcatalog-core</artifactId>
    <version>2.1.0</version>
</dependency>
 
<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libfb303</artifactId>
    <version>0.9.3</version>
    <type>pom</type>
</dependency>
 
 
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>1.6.2</version>
 
</dependency>
 
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop2</artifactId>
    <version>1.6.2</version>
</dependency>

讀取hive數據:

package com.coder.flink.core.FlinkHive
 
 
import org.apache.flink.api.scala.ExecutionEnvironment
 
import org.apache.hadoop.conf.Configuration
import org.apache.flink.api.scala._
 
 
//讀取hive的數據
object ReadHive {
  def main(args: Array[String]): Unit = {
 
      val conf = new Configuration()
      conf.set("hive.metastore.local", "false")
 
      conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083")
       //如果是高可用 就需要是nameserver
//      conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083")
 
      val env = ExecutionEnvironment.getExecutionEnvironment
 
      //todo 返回類型
      val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf))
 
      dataset.first(10).print()
//      env.execute("flink hive test")
 
 
  }
 
}

好消息是 Flink 1.9支持了Hive讀寫接口不過我們可以用Hive Jdbc的方式去讀寫hive,可能就是性能會比較慢:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>2.1.0</version>
</dependency>
package com.coder.flink.core.FlinkHive;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
 
import java.sql.*;
 
public class FlinkReadHive {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
 
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection con = DriverManager.getConnection("jdbc:hive2://172.10.4.143:10000/aijiami","hive","hive");
        Statement st = con.createStatement();
        ResultSet rs = st.executeQuery("SELECT * from ods_scenes_detail_new limit 10");
        while (rs.next()){
            System.out.println(rs.getString(1) + "," + rs.getString(2));
        }
        rs.close();
        st.close();
        con.close();
 
 
    }
}
public class HiveApp {
     
    private static String driver = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://Master:10000/default";
    private static String user = "root"; //一般情況下可以使用匿名的方式,在這里使用了root是因為整個Hive的所有安裝等操作都是root
    private static String password = "";
 
    public static void main(String[] args) {
        ResultSet res = null;
         
        try {
            /**
             * 第一步:把JDBC驅動通過反射的方式加載進來
             */
            Class.forName(driver);
             
            /**
             * 第二步:通過JDBC建立和Hive的連接器,默認端口是10000,默認用戶名和密碼都為空
             */
            Connection conn = DriverManager.getConnection(url, user, password); 
             
            /**
             * 第三步:創建Statement句柄,基於該句柄進行SQL的各種操作;
             */
            Statement stmt = conn.createStatement();
             
            /**
             * 接下來就是SQL的各種操作;
             * 第4.1步驟:建表Table,如果已經存在的話就要首先刪除;
             */
            String tableName = "testHiveDriverTable";
            stmt.execute("drop table if exists " + tableName );
            
             
            stmt.execute("create table " + tableName + " (id int, name string)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'");
            /**
             *  第4.2步驟:查詢建立的Table;
             */
            String sql = "show tables '" + tableName + "'";
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            if (res.next()) {
              System.out.println(res.getString(1));
            }
            /**
             *  第4.3步驟:查詢建立的Table的schema;
             */
            sql = "describe " + tableName;
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println(res.getString(1) + "\t" + res.getString(2));
            }
          
            /**
             *  第4.4步驟:加載數據進入Hive中的Table;
             */
            String filepath = "/root/Documents/data/sql/testHiveDriver.txt";
            sql = "load data local inpath '" + filepath + "' into table " + tableName;
            System.out.println("Running: " + sql);
            stmt.execute(sql);
          
            /**
             *  第4.5步驟:查詢進入Hive中的Table的數據;
             */
            sql = "select * from " + tableName;
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2));
            }
          
            /**
             *  第4.6步驟:Hive中的對Table進行統計操作;
             */
            sql = "select count(1) from " + tableName;   //在執行select count(*) 時候會生成mapreduce 操作  ,那么需要啟動資源管理器 yarn  : start-yarn.sh 
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
           
            while (res.next()) {
              System.out.println("Total lines :" + res.getString(1));
            }    
             
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
         
         
 
    }
 
}

 

寫入HDFS的簡單案例:

package com.coder.flink.core.test_demo
 
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
 
object WriteToHDFS {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2.定義數據 stu(age,name,height)
    val stu: DataSet[(Int, String, String)] = env.fromElements(
      (19, "zhangsan","aaaa"),
      (1449, "zhangsan","aaaa"),
      (33, "zhangsan","aaaa"),
      (22, "zhangsan","aaaa")
    )
 
    //todo 輸出到本地
    stu.setParallelism(1).writeAsText("file:///C:/Users/Administrator/Desktop/Flink代碼/測試數據/test001.txt",
      WriteMode.OVERWRITE)
    env.execute()
 
 
    //todo 寫入到hdfs,文本文檔,路徑不存在則自動創建路徑。
    stu.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt",
      WriteMode.OVERWRITE)
    env.execute()
 
    //todo 寫入到hdfs,CSV文檔
    //3.1讀取csv文件
    val inPath = "hdfs:///input/flink/sales.csv"
    case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)
    val ds2 = env.readCsvFile[Sales](
      filePath = inPath,
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      lenient = false,
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3),
      pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")
    )
    //3.2將CSV文檔寫入到hdfs
    val outPath = "hdfs:///output/flink/datasink/sales.csv"
    ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",fieldDelimiter = "|", WriteMode.OVERWRITE)
 
    env.execute()
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM