1. 通常利用SparkSQL將離線或實時流數據的SparkRDD數據寫入Hive,一般有兩種方法。第一種是利用org.apache.spark.sql.types.StructType和org.apache.spark.sql.types.DataTypes來映射拆分RDD的值;第二種方法是利用rdd和Java bean來反射的機制。下面對兩種方法做代碼舉例
2. 利用org.apache.spark.sql.types.StructType和org.apache.spark.sql.types.DataTypes來映射拆分RDD的值
JavaRDD<Row> resultRdd = rdd.map(new Function<String[], Row>() {
@Override
public Row call(String[] line) throws Exception {
if (line != null && line.length > 0) {
return helper.createRow(line);
}
return null;
}
});
StructType structType = helper.createSchame();
Dataset<Row> dataFrame = session.createDataFrame(resultRdd, structType);
DataFrameWriter<Row> writer = dataFrame.coalesce(1).write().format(TableHelperInter.TABLE_FORMAT_TYPE).mode(SaveMode.Append);
String tableName = hiveDataBaseName + "." + helper.getTableName();
writer.insertInto(tableName);
這種方法的有點是寫入簡單,不必去考慮字段映射有誤,但缺點是需要去寫一個TableHelperInter,而且這種方式對字段的類型要求嚴格,在做字段類型和字段校驗時比對時一旦字段過多會及其復雜,所以不推崇這種寫法
3. 利用rdd和Java bean來反射
來一個完整的程序
public class SparkSQLTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("yarn").setAppName("SparkSQL_test");
JavaSparkContext sc = new JavaSparkContext(conf);
String line = "1102,jason,20,male,15927384023,developer,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20";
String line2 = "1103,jason1,21,male,15927352023,developer1,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20";
List<String> list = new ArrayList<String>();
list.add(line);
list.add(line2);
JavaRDD<String> rdd = sc.parallelize(list);
JavaRDD<Person> rddResult = rdd.map(new Function<String, Person>() {
@Override
public Person call(String s) throws Exception {
String[] message = s.split(",");
Person person = new Person();
person.setNo(message[0]);
person.setName(message[1]);
person.setAge(message[2]);
person.setGender(message[3]);
person.setPhone(message[4]);
person.setJob(message[5]);
person.setCol7(message[6]);
person.setCol8(message[7]);
person.setCol9(message[8]);
person.setCol10(message[9]);
person.setCol11(message[10]);
person.setCol12(message[11]);
person.setCol13(message[12]);
person.setCol14(message[13]);
person.setCol15(message[14]);
person.setCol16(message[15]);
person.setCol17(message[16]);
person.setCol18(message[17]);
person.setCol19(message[18]);
person.setCol20(message[19]);
person.setCreate_time_p(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now()));
return person;
}
});
//這行代碼必須在實例SparkSession不然會出錯
SparkSession.clearDefaultSession();
SparkSession session = SparkSession.builder()
.config("hive.metastore.uris", "localhost:9083")
.config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
.config("hive.exec.dynamic.partition", true)
.config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate();
Dataset dataset = session.createDataFrame(rddResult,Person.class);
dataset.registerTempTable("person_temp_table");
session.sql("insert into qwrenzixing.person_table20 partition (create_time_p="+DateTimeFormatter.ofPattern("yyyyMMdd")
.format(LocalDate.now())+") select no,name,age,gender,phone,job,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20 from person_temp_table");
}
}
這種方法比較簡潔,為了避免去做繁瑣的字段比對和校驗。可以將字段類型以string寫入hive。同時通過SparkSession操作SQL的方法是spark2.0后的。這里是將dataset寫成一張臨時表,再將臨時表的值查詢出來insert into到hive表中。但將DataSet通過SparkSQL寫成一張臨時表的操作,Spark原生提供了四個關於這種操作API
dataset.registerTempTable("temp_table");
dataset.createGlobalTempView("temp_table");
dataset.createOrReplaceTempView("temp_table");
dataset.createTempView("temp_table");
4. 關於這四個將DataSet寫成一張臨時表的作用和坑點
1>. dataset.registerTempTable("temp_table")
這個方法建議在離線,批處理中使用,在實時流式計算中會導致后續寫入hive值與字段不匹配亂序的問題
2>. dataset.createGlobalTempView("temp_table")
這個方法是創建一個全局臨時表,意思就是別的spark-submit也可以用,這種場景很少,而且無法用在實時流式計算中,因為創建一次表后不能再創建會包表已經存在的錯誤
3>. dataset.createOrReplaceTempView("temp_table");
這個其實比較好理解,如果存在就覆蓋
4>. dataset.createTempView("temp_table");
這個方法當spark程序沒有結束時不能重復創建
這里的創建臨時表在spark程序結束后臨時表不存在,所以spark streaming程序要特別注意用法
5. 關於Spark SQL的一個坑點
在mysql中insert into有兩種方式
INSERT INTO table_name VALUES (value1, value2,....)
INSERT INTO table_name (column1, column2,...) VALUES (value1, value2,....)
要注意第二種寫法在SparkSQL會報錯,SparkSQL不支持這種寫法,只支持第一種寫法。這個是為什么其實也很好理解,每個人想法不一樣。第一次使用要避免這個坑點
最后附上我在利用SparkSQL將kafka數據寫入hive的重要環節的代碼:
String tableName = hiveDataBaseName + ".test_data";
Dataset dataFrame = session.createDataFrame(resultRdd, SJGJEntity.class);
// createOrReplaceTempView API方式將數據寫入hive 不存在值與字段名錯亂的問題
dataFrame.createOrReplaceTempView("temp_table");
session.sql("insert into " + tableName + " partition(create_time_p=" + DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
+ ") select base_name,base_num,serviceCode,phoneno,called_phoneno,call_time,call_longth,lac,ci,xpoint,ypoint,imei,imsi,insert_time,call_address," + "source_table,mark_type,companyId,type,createKafkaTime from temp_table");