1,讀取實現了,也是找的資料,核心就是實現了
HCatInputFormat
HCatInputFormatBase
上面這兩個類,底層也是 繼承實現了 RichInputFormat:
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryabl
百度下載這個jar,然后把類找出來
依賴:(大概是這些)
<!--flink_hive依賴--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-fs</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>com.jolbox</groupId> <artifactId>bonecp</artifactId> <version>0.8.0.RELEASE</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hive-bundle</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-cli</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-common</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-shims</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-core</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> <version>0.9.3</version> <type>pom</type> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>1.6.2</version> </dependency>
讀取hive數據:
package com.coder.flink.core.FlinkHive import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.flink.api.scala._ //讀取hive的數據 object ReadHive { def main(args: Array[String]): Unit = { val conf = new Configuration() conf.set("hive.metastore.local", "false") conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083") //如果是高可用 就需要是nameserver // conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083") val env = ExecutionEnvironment.getExecutionEnvironment //todo 返回類型 val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf)) dataset.first(10).print() // env.execute("flink hive test") } }
好消息是 Flink 1.9支持了Hive讀寫接口不過我們可以用Hive Jdbc的方式去讀寫hive,可能就是性能會比較慢:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency>
package com.coder.flink.core.FlinkHive; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.sql.*; public class FlinkReadHive { public static void main(String[] args) throws ClassNotFoundException, SQLException { Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection con = DriverManager.getConnection("jdbc:hive2://172.10.4.143:10000/aijiami","hive","hive"); Statement st = con.createStatement(); ResultSet rs = st.executeQuery("SELECT * from ods_scenes_detail_new limit 10"); while (rs.next()){ System.out.println(rs.getString(1) + "," + rs.getString(2)); } rs.close(); st.close(); con.close(); } }
public class HiveApp { private static String driver = "org.apache.hive.jdbc.HiveDriver"; private static String url = "jdbc:hive2://Master:10000/default"; private static String user = "root"; //一般情況下可以使用匿名的方式,在這里使用了root是因為整個Hive的所有安裝等操作都是root private static String password = ""; public static void main(String[] args) { ResultSet res = null; try { /** * 第一步:把JDBC驅動通過反射的方式加載進來 */ Class.forName(driver); /** * 第二步:通過JDBC建立和Hive的連接器,默認端口是10000,默認用戶名和密碼都為空 */ Connection conn = DriverManager.getConnection(url, user, password); /** * 第三步:創建Statement句柄,基於該句柄進行SQL的各種操作; */ Statement stmt = conn.createStatement(); /** * 接下來就是SQL的各種操作; * 第4.1步驟:建表Table,如果已經存在的話就要首先刪除; */ String tableName = "testHiveDriverTable"; stmt.execute("drop table if exists " + tableName ); stmt.execute("create table " + tableName + " (id int, name string)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'"); /** * 第4.2步驟:查詢建立的Table; */ String sql = "show tables '" + tableName + "'"; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); if (res.next()) { System.out.println(res.getString(1)); } /** * 第4.3步驟:查詢建立的Table的schema; */ sql = "describe " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println(res.getString(1) + "\t" + res.getString(2)); } /** * 第4.4步驟:加載數據進入Hive中的Table; */ String filepath = "/root/Documents/data/sql/testHiveDriver.txt"; sql = "load data local inpath '" + filepath + "' into table " + tableName; System.out.println("Running: " + sql); stmt.execute(sql); /** * 第4.5步驟:查詢進入Hive中的Table的數據; */ sql = "select * from " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2)); } /** * 第4.6步驟:Hive中的對Table進行統計操作; */ sql = "select count(1) from " + tableName; //在執行select count(*) 時候會生成mapreduce 操作 ,那么需要啟動資源管理器 yarn : start-yarn.sh System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println("Total lines :" + res.getString(1)); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
寫入HDFS的簡單案例:
package com.coder.flink.core.test_demo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import org.apache.flink.core.fs.FileSystem.WriteMode object WriteToHDFS { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment //2.定義數據 stu(age,name,height) val stu: DataSet[(Int, String, String)] = env.fromElements( (19, "zhangsan","aaaa"), (1449, "zhangsan","aaaa"), (33, "zhangsan","aaaa"), (22, "zhangsan","aaaa") ) //todo 輸出到本地 stu.setParallelism(1).writeAsText("file:///C:/Users/Administrator/Desktop/Flink代碼/測試數據/test001.txt", WriteMode.OVERWRITE) env.execute() //todo 寫入到hdfs,文本文檔,路徑不存在則自動創建路徑。 stu.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt", WriteMode.OVERWRITE) env.execute() //todo 寫入到hdfs,CSV文檔 //3.1讀取csv文件 val inPath = "hdfs:///input/flink/sales.csv" case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double) val ds2 = env.readCsvFile[Sales]( filePath = inPath, lineDelimiter = "\n", fieldDelimiter = ",", lenient = false, ignoreFirstLine = true, includedFields = Array(0, 1, 2, 3), pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid") ) //3.2將CSV文檔寫入到hdfs val outPath = "hdfs:///output/flink/datasink/sales.csv" ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",fieldDelimiter = "|", WriteMode.OVERWRITE) env.execute() } }