spark利用sparkSQL將數據寫入hive兩種通用方式實現及比較


1.寫在前面

在利用spark計算引擎將kafka或其他源數據組件的數據入hive形成數倉的過程中有兩種方式,一種方式是利用spark Rdd的API將數據寫入hdfs形成hdfs文件,之后再將文件和hdfs文件和hive表做加載映射。第二種方式是利用sparkSQL將獲取的數據Rdd轉換成dataFrame,再將dataFrame寫成緩存表,最后利用sparkSQL直接插入hive表中。這兩種方式各有各自的優點。但大多數開發者更傾向於后者一次編碼一步到位的方式。而對於利用sparkSQL寫hive表官方有兩種常見的API,第一種是利用JavaBean做映射,第二種是利用StructType創建Schema做映射,下面根據代碼來分析這兩種API 。

2.樣例數據

原始數據:
tom,1
jim,2
lily,3
lucy,4
寫入hive數據
字段 : word num
值 : tom 1
jim 2
lily 3
lucy 4

3.JavaBean做映射方式

	    String hiveTableColumns = "word,num";
            dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    JavaRDD<TestBean> beanJavaRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, TestBean>() {
                        @Override
                        public Iterable<TestBean> call(Iterator<String> iterator) throws Exception {
                            List<TestBean> beans = new ArrayList<>();
                            while (iterator.hasNext()){
                                String message = iterator.next().toString();
                                TestBean bean = new TestBean();
                                bean.setWord(message.split(",",-1)[0]);
                                bean.setNum(message.split(",",-1)[1]);
                                beans.add(bean);
                            }
                            return beans;
                        }
                    });

                    DataFrame dataFrame = session.createDataFrame(beanJavaRDD, TestBean.class);
                    dataFrame.registerTempTable("temp_test");

                    session.sql("insert into test partition(create_time_p=" + new SimpleDateFormat("yyyyMMdd").format(new Date())
                            + ") select " + hiveTableColumns + " from temp_test");
                }
            });
public class TestBean implements Serializable {
    private String word;
    private String num;

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public String getNum() {
        return num;
    }

    public void setNum(String num) {
        this.num = num;
    }
}

3.利用StructType創建Schema做映射方式

	    String hiveTableColumns = "word,num";
            dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    JavaRDD<Row> rowJavaRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() {
                        @Override
                        public Iterable<Row> call(Iterator<String> iterator) throws Exception {
                            List<Row> rowList = new ArrayList<>();
                            while (iterator.hasNext()){
                                String message = iterator.next().toString();
                                rowList.add(RowFactory.create(message.split(",", -1)));

                            }
                            return rowList;
                        }
                    });

                    DataFrame dataFrame = session.createDataFrame(rowJavaRDD, getSchema(hiveTableColumns.split(",",-1)));
                    dataFrame.registerTempTable("temp_test");

                    session.sql("insert into " + databaseAndTableName + " partition(create_time_p=" + new SimpleDateFormat("yyyyMMdd").format(new Date())
                            + ") select " + hiveTableColumns + " from temp_test");
                }
            });
      public static StructType getSchema(String[] columns) {
        List<StructField> schemaFields = new ArrayList<>();
        for (int i = 0; i < columns.length - 1; i++) {
            schemaFields.add(DataTypes.createStructField(columns[i], DataTypes.StringType, true));
        }
        return DataTypes.createStructType(schemaFields);
    }

4.對比這兩種方式

這兩種方式實現方式都相對簡單,也比較簡潔。對於很多大數據初學者可能首先會想到第一種方式。但是第一種方式不具備通用性,也就是新增一種類型數據。又需要新建bean,然后這里JavaRDD<TestBean> beanJavaRDD需要動態,這里DataFrame dataFrame = session.createDataFrame(beanJavaRDD, TestBean.class);也比較麻煩。最后發現根本無法通用多種類型的數據,如果數據有幾百種類,這種方式就不夠通用,每一類數據都需要對應的程序。而第二種方式就可以通用了,只需要將數據的字段抽取配置,一個類是可以兼容無論多少種數據的。所以在開發過程中還是推薦第二種方式。但是第一種方式也有自己的優點,不會出現字段與值對應錯亂的問題。而第二種方式可能稍不小心會出現字段與值錯亂的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM