基於Spark的電影推薦系統(推薦系統~2)


第四部分-推薦系統-數據ETL

  • 本模塊完成數據清洗,並將清洗后的數據load到Hive數據表里面去

前置准備:

spark +hive

vim $SPARK_HOME/conf/hive-site.xml 
	<?xml version="1.0"?>
		<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
	<configuration>
	<property>
			<name>hive.metastore.uris</name>
			<value>thrift://hadoop001:9083</value>
	</property>
	</configuration>  
  • 啟動Hive metastore server

[root@hadoop001 conf]# nohup hive --service metastore &

[root@hadoop001 conf]# netstat -tanp | grep 9083
tcp 0 0 0.0.0.0:9083 0.0.0.0:* LISTEN 24787/java
[root@hadoop001 conf]#

測試:
[root@hadoop001 ~]# spark-shell --master local[2]

scala> spark.sql("select * from liuge_db.dept").show;
+------+-------+-----+                                                          
|deptno|  dname|  loc|
+------+-------+-----+
|     1|  caiwu| 3lou|
|     2|  renli| 4lou|
|     3|  kaifa| 5lou|
|     4|qiantai| 1lou|
|     5|lingdao|4 lou|
+------+-------+-----+

==》保證Spark SQL 能夠訪問到Hive 的元數據才行。

然而我們采用的是standalone模式:需要啟動master worker
[root@hadoop001 sbin]# pwd
/root/app/spark-2.4.3-bin-2.6.0-cdh5.7.0/sbin
[root@hadoop001 sbin]# ./start-all.sh

[root@hadoop001 sbin]# jps
26023 Master
26445 Worker

Spark常用端口

8080	spark.master.ui.port	Master WebUI
8081	spark.worker.ui.port	Worker WebUI
18080	spark.history.ui.port	History server WebUI
7077	SPARK_MASTER_PORT	    Master port
6066	spark.master.rest.port	Master REST port
4040	spark.ui.port	        Driver WebUI

這個時候打開:http://hadoop001:8080/

在這里插入圖片描述

開始項目Coding

IDEA+Scala+Maven進行項目的構建

步驟一: 新建scala項目后,可以參照如下pom進行配置修改

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.csylh</groupId>
  <artifactId>movie-recommend</artifactId>
  <version>1.0</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.4.3</spark.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.1.1</version>
    </dependency>
    <!--// 0.10.2.1-->

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.39</version>
    </dependency>

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

  </dependencies>

  <build>
    <!--<sourceDirectory>src/main/scala</sourceDirectory>-->
    <!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
    <plugins>
      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.1.3</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.13</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <!-- If you have classpath issue like NoDefClassError,... -->
          <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

步驟二:新建com.csylh.recommend.dataclearer.SourceDataETLApp

import com.csylh.recommend.entity.{Links, Movies, Ratings, Tags}
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  * Description:
  *    hadoop001  file:///root/data/ml/ml-latest 下的文件
  *    ====>  SparkSQL ETL
  *    ===>  load data to Hive數據倉庫
  *
  * @Author: 留歌36
  * @Date: 2019-07-12 13:48
  */
object SourceDataETLApp{
  def main(args: Array[String]): Unit = {
    // 面向SparkSession編程
    val spark = SparkSession.builder()
//          .master("local[2]")
      .enableHiveSupport() //開啟訪問Hive數據, 要將hive-site.xml等文件放入Spark的conf路徑
      .getOrCreate()

    val sc = spark.sparkContext

    // 設置RDD的partitions 的數量一般以集群分配給應用的CPU核數的整數倍為宜, 4核8G ,設置為8就可以
    // 問題一:為什么設置為CPU核心數的整數倍?
    // 問題二:數據傾斜,拿到數據大的partitions的處理,會消耗大量的時間,因此做數據預處理的時候,需要考量會不會發生數據傾斜
    val minPartitions = 8

    //  在生產環境中一定要注意設置spark.sql.shuffle.partitions,默認是200,及需要配置分區的數量
    val shuffleMinPartitions = "8"
    spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions)
    /**
      * 1
      */
    import spark.implicits._
    val links = sc.textFile("file:///root/data/ml/ml-latest/links.txt",minPartitions) //DRIVER
      .filter(!_.endsWith(",")) //EXRCUTER
      .map(_.split(",")) //EXRCUTER
      .map(x => Links(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toInt)) //EXRCUTER
      .toDF()
    println("===============links===================:",links.count())
    links.show()

    // 把數據寫入到HDFS上
    links.write.mode(SaveMode.Overwrite).parquet("/tmp/links")

    // 將數據從HDFS加載到Hive數據倉庫中去
    spark.sql("drop table if exists links")
    spark.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet")
    spark.sql("load data inpath '/tmp/links' overwrite into table links")

    /**
      * 2
      */
    val movies = sc.textFile("file:///root/data/ml/ml-latest/movies.txt",minPartitions)
      .filter(!_.endsWith(","))
      .map(_.split(","))
      .map(x => Movies(x(0).trim.toInt, x(1).trim.toString, x(2).trim.toString))
      .toDF()
    println("===============movies===================:",movies.count())
    movies.show()

    // 把數據寫入到HDFS上
    movies.write.mode(SaveMode.Overwrite).parquet("/tmp/movies")

    // 將數據從HDFS加載到Hive數據倉庫中去
    spark.sql("drop table if exists movies")
    spark.sql("create table if not exists movies(movieId int,title String,genres String) stored as parquet")
    spark.sql("load data inpath '/tmp/movies' overwrite into table movies")

    /**
      * 3
      */
    val ratings = sc.textFile("file:///root/data/ml/ml-latest/ratings.txt",minPartitions)
      .filter(!_.endsWith(","))
      .map(_.split(","))
      .map(x => Ratings(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toDouble, x(3).trim.toInt))
      .toDF()
    println("===============ratings===================:",ratings.count())

    ratings.show()

    // 把數據寫入到HDFS上
    ratings.write.mode(SaveMode.Overwrite).parquet("/tmp/ratings")

    // 將數據從HDFS加載到Hive數據倉庫中去
    spark.sql("drop table if exists ratings")
    spark.sql("create table if not exists ratings(userId int,movieId int,rating Double,timestamp int) stored as parquet")
    spark.sql("load data inpath '/tmp/ratings' overwrite into table ratings")

    /**
      * 4
      */
    val tags = sc.textFile("file:///root/data/ml/ml-latest/tags.txt",minPartitions)
      .filter(!_.endsWith(","))
      .map(x => rebuild(x))  // 注意這個坑的解決思路
      .map(_.split(","))
      .map(x => Tags(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toString, x(3).trim.toInt))
      .toDF()

    tags.show()

    // 把數據寫入到HDFS上
    tags.write.mode(SaveMode.Overwrite).parquet("/tmp/tags")

    // 將數據從HDFS加載到Hive數據倉庫中去
    spark.sql("drop table if exists tags")
    spark.sql("create table if not exists tags(userId int,movieId int,tag String,timestamp int) stored as parquet")
    spark.sql("load data inpath '/tmp/tags' overwrite into table tags")
  }
  /**
    * 該方法是用於處理不符合規范的數據
    * @param input
    * @return
    */
  private def rebuild(input:String): String ={
    val a = input.split(",")

    val head = a.take(2).mkString(",")
    val tail = a.takeRight(1).mkString
    val tag = a.drop(2).dropRight(1).mkString.replaceAll("\"","")
    val output = head + "," + tag + "," + tail
    output
  }
}

再有一些上面主類引用到的case 對象,你可以理解為Java 實體類

package com.csylh.recommend.entity

/**
  * Description: 數據的schema
  *
  * @Author: 留歌36
  * @Date: 2019-07-12 13:46
  */
case class Links(movieId:Int,imdbId:Int,tmdbId:Int)
package com.csylh.recommend.entity

/**
  * Description: TODO
  *
  * @Author: 留歌36
  * @Date: 2019-07-12 14:09
  */
case class Movies(movieId:Int,title:String,genres:String)

package com.csylh.recommend.entity

/**
  * Description: TODO
  *
  * @Author: 留歌36
  * @Date: 2019-07-12 14:10
  */
case class Ratings(userId:Int,movieId:Int,rating:Double,timestamp:Int)

package com.csylh.recommend.entity

/**
  * Description: TODO
  *
  * @Author: 留歌36
  * @Date: 2019-07-12 14:11
  */
case class Tags(userId:Int,movieId:Int,tag:String,timestamp:Int)

步驟三:將創建的項目進行打包上傳到服務器
mvn clean package -Dmaven.test.skip=true

[root@hadoop001 ml]# ll -h movie-recommend-1.0.jar 
-rw-r--r--. 1 root root 156K 10月 20 13:56 movie-recommend-1.0.jar
[root@hadoop001 ml]# 

步驟四:提交運行上面的jar,編寫shell腳本

[root@hadoop001 ml]# vim etl.sh
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

$SPARK_HOME/bin/spark-submit
--class com.csylh.recommend.dataclearer.SourceDataETLApp
--master spark://hadoop001:7077
--name SourceDataETLApp
--driver-memory 10g
--executor-memory 5g
/root/data/ml/movie-recommend-1.0.jar

步驟五:sh etl.sh 即可

先把數據寫入到HDFS上
創建Hive表
load 數據到表

sh etl.sh之前:

[root@hadoop001 ml]# hadoop fs -ls /tmp
19/10/20 19:26:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive

[root@hadoop001 ml]# hadoop fs -ls /user/hive/warehouse
19/10/20 19:27:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[root@hadoop001 ml]#

sh etl.sh之后:
這里的shell 是 ,spark on standalone,后面會spark on yarn。其實也沒差,都可以

[root@hadoop001 ~]# hadoop fs -ls /tmp
19/10/20 19:43:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 6 items
drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/links
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/movies
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/ratings
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/tags
[root@hadoop001 ~]# hadoop fs -ls /user/hive/warehouse
19/10/20 19:43:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /user/hive/warehouse/links
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /user/hive/warehouse/movies
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /user/hive/warehouse/ratings
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /user/hive/warehouse/tags
[root@hadoop001 ~]# 

這樣我們就把數據etl到我們的數據倉庫里了,接下來,基於這份基礎數據做數據加工

有任何問題,歡迎留言一起交流~~
更多文章:基於Spark的電影推薦系統:https://blog.csdn.net/liuge36/column/info/29285


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM