32.電視采集項目流程spark篇通過Azkaban調度spark


新建AccessLogDriverCluster類

 

 

 

package com.it19gong.clickproject;



import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;



import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;



public class AccessLogDriverCluster {
    static DBHelper db1=null;
    public static void main(String[] args) throws Exception {
        
        // 創建SparkConf、JavaSparkContext、SQLContext
                SparkConf conf = new SparkConf() 
                        .setAppName("RDD2DataFrameProgrammatically");  
                JavaSparkContext sc = new JavaSparkContext(conf);
                SQLContext sqlContext = new SQLContext(sc);
            
                // 第一步,創建一個普通的RDD,但是,必須將其轉換為RDD<Row>的這種格式
                //獲取昨天時間
                JavaRDD<String> lines = sc.textFile("hdfs://node1/data/clickLog/2019/08/31");
                
                // 分析一下
                // 它報了一個,不能直接從String轉換為Integer的一個類型轉換的錯誤
                // 就說明什么,說明有個數據,給定義成了String類型,結果使用的時候,要用Integer類型來使用
                // 而且,錯誤報在sql相關的代碼中
                // 所以,基本可以斷定,就是說,在sql中,用到age<=18的語法,所以就強行就將age轉換為Integer來使用
                // 但是,肯定是之前有些步驟,將age定義為了String
                // 所以就往前找,就找到了這里
                // 往Row中塞數據的時候,要注意,什么格式的數據,就用什么格式轉換一下,再塞進去
                JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Row call(String line) throws Exception {
                        String itr[] = line.split(" ");
                           
                            String ip = itr[0];
                            String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
                            String url = itr[6];
                            String upFlow = itr[9];
                        
                        return RowFactory.create(
                                ip,
                                date,
                                url,
                                Integer.valueOf(upFlow)
                                );      
                    }
                    
                });
                
                // 第二步,動態構造元數據
                // 比如說,id、name等,field的名稱和類型,可能都是在程序運行過程中,動態從mysql db里
                // 或者是配置文件中,加載出來的,是不固定的
                // 所以特別適合用這種編程的方式,來構造元數據
                List<StructField> structFields = new ArrayList<StructField>();
                structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true));  
                structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true));  
                structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true)); 
                structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true));  
                StructType structType = DataTypes.createStructType(structFields);
                
                // 第三步,使用動態構造的元數據,將RDD轉換為DataFrame
                DataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType);
            
                // 后面,就可以使用DataFrame了
                studentDF.registerTempTable("log");  
                
                DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc"); 
                
                db1=new DBHelper();
                final String sql="insert into upflow(ip,sum) values(?,?) ";
                sumFlowDF.javaRDD().foreach(new VoidFunction<Row>() {
                    
                    @Override
                    public void call(Row t) throws Exception {
                        // TODO Auto-generated method stub
                        PreparedStatement pt = db1.conn.prepareStatement(sql);
                        pt.setString(1,t.getString(0));
                        pt.setString(2,String.valueOf(t.getLong(1)));
                        pt.executeUpdate();
                    }
                });;
                
        
    }

}

 

 

 

打包

 

 

 

 

報錯

 

 

 

刪除apptest文件

 

 

 

再次打包

 

 

 

 

 

把打好的包拷貝出來

 

 

 

 

並且重命名

 

 

 

 

 

 

 

vim project.sh

 

/opt/modules/spark-1.5.1-bin-hadoop2.6/bin/spark-submit --class com.it19gong.clickproject.AccessLogDriverCluster --num-executors 3 --driver-memory 100m --executor-memory 100m --executor-cores 3 --files /opt/modules/hive/conf/hive-site.xml --driver-class-path /opt/modules/hive/lib/mysql-connector-java-5.1.28.jar /home/hadoop/sparkproject.jar

 

 

 

把原來的包刪除

 

 

 

上傳新的包

 

 

 

執行腳本

 

 

 

 

 

mysql數據多了兩條

 

 

 打開azkaban的頁面,這里再次提醒要用谷歌瀏覽器

 

 

 

 

 

 

 新建spark.job文件

#command.job
type=command
command=bash project.sh

 

 

 

打包成zip包

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 上傳zip包

 

 

 

 

 

 

開始執行

 

 

 

 

 

 

 

 

 

 

 

 

mysql數據庫多了兩天數據

 

 

到此為止整個項目結束了,由於本次項目中途事情比較多,所以從開始到結束花的時間比較長,請諒解!!!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM