新建AccessLogDriverCluster類
package com.it19gong.clickproject; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class AccessLogDriverCluster { static DBHelper db1=null; public static void main(String[] args) throws Exception { // 創建SparkConf、JavaSparkContext、SQLContext SparkConf conf = new SparkConf() .setAppName("RDD2DataFrameProgrammatically"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // 第一步,創建一個普通的RDD,但是,必須將其轉換為RDD<Row>的這種格式 //獲取昨天時間 JavaRDD<String> lines = sc.textFile("hdfs://node1/data/clickLog/2019/08/31"); // 分析一下 // 它報了一個,不能直接從String轉換為Integer的一個類型轉換的錯誤 // 就說明什么,說明有個數據,給定義成了String類型,結果使用的時候,要用Integer類型來使用 // 而且,錯誤報在sql相關的代碼中 // 所以,基本可以斷定,就是說,在sql中,用到age<=18的語法,所以就強行就將age轉換為Integer來使用 // 但是,肯定是之前有些步驟,將age定義為了String // 所以就往前找,就找到了這里 // 往Row中塞數據的時候,要注意,什么格式的數據,就用什么格式轉換一下,再塞進去 JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(String line) throws Exception { String itr[] = line.split(" "); String ip = itr[0]; String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]); String url = itr[6]; String upFlow = itr[9]; return RowFactory.create( ip, date, url, Integer.valueOf(upFlow) ); } }); // 第二步,動態構造元數據 // 比如說,id、name等,field的名稱和類型,可能都是在程序運行過程中,動態從mysql db里 // 或者是配置文件中,加載出來的,是不固定的 // 所以特別適合用這種編程的方式,來構造元數據 List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); // 第三步,使用動態構造的元數據,將RDD轉換為DataFrame DataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType); // 后面,就可以使用DataFrame了 studentDF.registerTempTable("log"); DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc"); db1=new DBHelper(); final String sql="insert into upflow(ip,sum) values(?,?) "; sumFlowDF.javaRDD().foreach(new VoidFunction<Row>() { @Override public void call(Row t) throws Exception { // TODO Auto-generated method stub PreparedStatement pt = db1.conn.prepareStatement(sql); pt.setString(1,t.getString(0)); pt.setString(2,String.valueOf(t.getLong(1))); pt.executeUpdate(); } });; } }
打包
報錯
刪除apptest文件
再次打包
把打好的包拷貝出來
並且重命名
vim project.sh
/opt/modules/spark-1.5.1-bin-hadoop2.6/bin/spark-submit --class com.it19gong.clickproject.AccessLogDriverCluster --num-executors 3 --driver-memory 100m --executor-memory 100m --executor-cores 3 --files /opt/modules/hive/conf/hive-site.xml --driver-class-path /opt/modules/hive/lib/mysql-connector-java-5.1.28.jar /home/hadoop/sparkproject.jar
把原來的包刪除
上傳新的包
執行腳本
mysql數據多了兩條
打開azkaban的頁面,這里再次提醒要用谷歌瀏覽器
新建spark.job文件
#command.job type=command command=bash project.sh
打包成zip包
上傳zip包
開始執行
mysql數據庫多了兩天數據
到此為止整個項目結束了,由於本次項目中途事情比較多,所以從開始到結束花的時間比較長,請諒解!!!