Spark項目應用-電子商務大數據分析總結


一. 數據采集(要求至少爬取三千條記錄,時間跨度超過一星期)數據采集到本地文件內容

   爬取詳見:python爬取京東評論

   爬取了將近20000條數據,156個商品種類,用時2個多小時,期間中斷數次

  

   

 二、數據預處理:要求使用MapReduce或者kettle實現源數據的預處理,對大量的Json文件,進行清洗,以得到結構化的文本文件

    在解析json時,處理了一部分,包括日期格式修改,數據格式轉換等,在kettle中做去重、排序處理

 

三.數據統計:生成Hive用戶評論數據

(1)在Hive創建一張表,用於存放清洗后的數據,表名為pinglun,(創建數據表SQL語句),創建成功導入數據截圖:

  使用presto查詢,速度比hive快好幾倍

  

  需求1:分析用戶使用移動端購買還是PC端購買,及移動端和PC端的用戶比例,生成ismobilehive表,存儲統計結果;

  

  需求2:分析用戶評論周期(收到貨后,一般多久進行評論),生成dayssql表,存儲統計結果;

   

  需求3:分析會員級別(判斷購買此商品的用戶級別),生成userlevelname_out表,存儲統計結果;

  

   需求4:分析每天評論量,生成creationtime_out表,存儲統計結果;

  

四.利用Sqoop進行數據遷移至Mysql數據庫: 

   四個表導入mysql數據庫中四個表截圖

  

         

 

五.數據可視化:利用JavaWeb+Echarts完成數據圖表展示過程

  需求1可視化展示截圖

  

  需求2可視化展示截圖

  

  需求3可視化展示截圖

  

  需求4可視化展示截圖

  

六.中文分詞實現用戶評價分析

(1)本節通過對商品評論表中的差評數據,進行分析,篩選用戶差評點,以知己知彼。(篩選差評數據集截圖)

    基於TextRank算法進行關鍵詞抽取

import json

import jieba
from jieba import analyse

ll =[]
# 引入TextRank關鍵詞抽取接口
textrank = analyse.textrank
with open('shoes283.json', 'r', encoding='utf-8') as f:
    data = json.load(f)
for i in range(0, len(data)):
    ll.append(data[i]['content'].replace(',','').replace('\n',';'))

# 基於TextRank算法進行關鍵詞抽取
keywords = textrank(' '.join(ll),topK=100)
# 輸出抽取出的關鍵詞
for keyword in keywords:
    print(keyword + "/")

 

(2)利用 python 結巴分詞實現用戶評價信息中的中文分詞及詞頻統計;(分詞后截圖)

   主要利用jieba去除標點符號,停用詞,單個字符等等

import csv
import json
import re

import jieba

ll =[]
def regex_change(line):
    # 前綴的正則
    username_regex = re.compile(r"^\d+::")
    # URL,為了防止對中文的過濾,所以使用[a-zA-Z0-9]而不是\w
    url_regex = re.compile(r"""
        (https?://)?
        ([a-zA-Z0-9]+)
        (\.[a-zA-Z0-9]+)
        (\.[a-zA-Z0-9]+)*
        (/[a-zA-Z0-9]+)*
    """, re.VERBOSE | re.IGNORECASE)
    # 剔除日期
    data_regex = re.compile(u"""        #utf-8編碼
        年 |
        月 |
        日 |
        (周一) |
        (周二) | 
        (周三) | 
        (周四) | 
        (周五) | 
        (周六)
    """, re.VERBOSE)
    # 剔除所有數字
    decimal_regex = re.compile(r"[^a-zA-Z]\d+")
    # 剔除空格
    space_regex = re.compile(r"\s+")
    regEx = "[\n”“|,,;;''/?! 。的了是]"  # 去除字符串中的換行符、中文冒號、|,需要去除什么字符就在里面寫什么字符
    line = re.sub(regEx, "", line)
    line = username_regex.sub(r"", line)
    line = url_regex.sub(r"", line)
    line = data_regex.sub(r"", line)
    line = decimal_regex.sub(r"", line)
    line = space_regex.sub(r"", line)

    return line


with open('data/shoes156.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

for i in range(0, len(data)):
    ll.append(data[i]['content'].replace(',','').replace('\n',';'))

words = jieba.lcut(regex_change(' '.join(ll)))  # 使用精確模式對文本進行分詞
counts = {}  # 通過鍵值對的形式存儲詞語及其出現的次數

for word in words:
    if word in "|,,;;''/?! 。::" or len(word) == 1:  # 單個詞語不計算在內
        continue
    else:
        counts[word] = counts.get(word, 0) + 1  # 遍歷所有詞語,每出現一次其對應的值加 1

items = list(counts.items())  # 將鍵值對轉換成列表
items.sort(key=lambda x: x[1], reverse=True)  # 根據詞語出現的次數進行從大到小排序

a =[]
b = []
for i in range(0,len(items)):
    #word, count = items[i]
    #b.append([list(items[i])[0],list(items[i])[1]])
    print([list(items[i])[0],list(items[i])[1]])
    with open("comments_jieba.txt", "a+", encoding='utf-8',newline='') as file:  # 處理csv讀寫時不同換行符  linux:\n    windows:\r\n    mac:\r
        csv_file = csv.writer(file)
        csv_file.writerow([list(items[i])[0],list(items[i])[1]])

(3)在 hive 中新建詞頻統計表並加載分詞數據;

 

 ①柱狀圖可視化展示用戶差評的統計前十類

 ②用詞雲圖可視化展示用戶差評分詞

 

七.利用Spark進行實時數據分析

本實驗以京東商品評論為目標網站,架構采用爬蟲+Flume+Kafka+Spark Streaming+Mysql,實現數據動態實時的采集、分析、展示數據。

 

具體工作流程如下圖:

 

操作步驟截圖

1.啟動flume

 flume配置文件

pro.sources = s1
pro.channels = c1
pro.sinks = k1
 
pro.sources.s1.type = exec
pro.sources.s1.command = tail -F /opt/a.log #日志文件
 
pro.channels.c1.type = memory
pro.channels.c1.capacity = 1000
pro.channels.c1.transactionCapacity = 100
 
pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
pro.sinks.k1.kafka.topic = ct #需要啟動這個topic
pro.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
pro.sinks.k1.kafka.flumeBatchSize = 20
pro.sinks.k1.kafka.producer.acks = 1
pro.sinks.k1.kafka.producer.linger.ms = 1
pro.sinks.k1.kafka.producer.compression.type = snappy
 
pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1

啟動:

 bin/flume-ng agent --conf-file flume-kafka.conf --name pro -Dflume.root.logger=INFO,LOGFILE

 

2.啟動kafka

#查看topic信息
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181

#刪除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic edu

#創建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic ct

#模擬消費者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ct --from-beginning

3.編寫爬蟲向日志文件傳輸數據

 代碼:

# -*- coding: utf-8 -*-
import gzip
import urllib.request
import json
import time
import random
import demjson as dj
import requests
import itertools

headers = {
    "Cookie": "__jdu=1507876332; shshshfpa=2ea021ee-52dd-c54e-1be1-f5aa9e333af2-1640075639; areaId=5; PCSYCityID=CN_0_0_0; shshshfpb=n7UymiTWOsGPvQfCup%2B3J1g%3D%3D; ipLoc-djd=5-142-42547-54561; jwotest_product=99; pinId=S4TjgVP4kjjnul02leqp07V9-x-f3wj7; pin=jd_60a1ab2940be3; unick=jd_60a1ab2940be3; ceshi3.com=000; _tp=672TNfWmOtaDFuqCPQqYycXMpi6F%2BRiwrhIuumNmoJ4%3D; _pst=jd_60a1ab2940be3; __jdc=122270672; shshshfp=4e8d45f57897e469586da47a0016f20e; ip_cityCode=142; CCC_SE=ADC_rzqTR2%2bUDTtHDYjJdX25PEGvHsBpPY%2bC9pRDVdNK7pU%2fwikRihpN3XEXZ1cn4Jd4w5OWdpJuduhBFwUvdeB6X1VFb7eIZkqL0OJvBn9RB6AJYo4An%2fGTiU%2b8rvqQwYxBI4QCM8a9w9kYQczygSjPxPjn1pbQLtBgo%2fzKBhwfKhAWs563NfBjmnRlkGHPX6E7jy6%2fEdfEhtkNSTCQod238cEpUFpKiQ%2bWV%2bW8MiaL3ti7d7ozdlNbZ03ylqRbI1XrXylDiqzW%2b2uALhF5H1eHuk3yH3t4ojXZmRbDy3k2OoZFk%2bcmrXD0eWhcIaD5RnhHbToYLuX%2byx7otaPuemTVAG4Z7CSyEfmUBAj7QuGmHg647a7KuoaR3hoCvxj%2f3woXdd2H9b40oqmJ5PO958Z1g%2fr7Jbk8a5w2CU547IaXRzakehLhW9xzG57Ak0Jhv85Jlt9A5N6hl%2ft4DSAwh%2bGhwg%3d%3d; unpl=JF8EAJJnNSttDBxWAxxSEkUVQg4EW1QKTx9TazcCAV8KSFICE1FIF0N7XlVdXhRKFR9vYhRUW1NPVA4ZBysSEXteVV1YCE0TAGlnNWRtW0tkBCsCHxMWQltTXF8LeycDZ2M1VFxZSlYGHQEbEBBCbWRbXQlKFQBpYQVQbVl7VTVZbEJTDBkCBxNdDEoRCmlgB1ZeaEpkBg; JSESSIONID=347F847A6818E35675648739BD4BA9FF.s1; __jda=122270672.1507876332.1640075637.1647251498.1647261295.13; thor=8D225D1673AA75681B9D3811417B0D325568BB2DD7F2729798D3AECF0428F59F7C70EA7504347F8E059F895AEE7D6E2662F565665845F0D94F2D7D56739CF3BC2B15F5F6E2ADDB891DDA80A9E9F88B7BA0BA95147512F78D28D8095E52379AB78550E451558DB6595C2270A1D5CFA2E211FF20F22ADA1987C6AE9E864DA6A7364D5BFD3EE08DA597D2EF2B37444CFD7A47134EFFD71B3A70B0C8BD55D51F274F; token=397b2c7c58f4021bbe9a9bbe9eeda694,3,915145; __tk=46fbcc7e51f75824dcdc2e8820904365,3,915145; shshshsID=5c5095f0b5728a839c0397308d625da5_1_1647261360535; __jdb=122270672.2.1507876332|13.1647261295; __jdv=122270672|jd.idey.cn|t_2024175271_|tuiguang|ef376a8f48ba48359a5a6d3c2769bb4b|1647261360584; 3AB9D23F7A4B3C9B=24HI5ARAA3SK7RJERTWUDZKA2NYJIXX3ING24VG466VC3ANKUALJLLD7VBYLQ3QPRYUSO3R6QBJYLVTVXBDIGJLGBA",
    "Accept": "*/*",
    "Accept-Encoding": "gzip, deflate, br",
    "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
    "Connection": "keep-alive",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36 Edg/99.0.1150.39"
}
headers2 = {
    "accept": "*/*",
    "accept-encoding": "gzip, deflate, br",
    "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
    "cookie": "__jdu=1507876332; shshshfpa=2ea021ee-52dd-c54e-1be1-f5aa9e333af2-1640075639; areaId=5; PCSYCityID=CN_0_0_0; ipLoc-djd=5-142-42547-54561; pinId=S4TjgVP4kjjnul02leqp07V9-x-f3wj7; pin=jd_60a1ab2940be3; unick=jd_60a1ab2940be3; _tp=672TNfWmOtaDFuqCPQqYycXMpi6F%2BRiwrhIuumNmoJ4%3D; _pst=jd_60a1ab2940be3; user-key=a2aaf011-2c1e-4dea-bf76-3392d16b1fb1; __jdc=122270672; wlfstk_smdl=jlwwba2gmccq62touff9evvbp3fk8gbr; ceshi3.com=000; shshshfp=4e8d45f57897e469586da47a0016f20e; ip_cityCode=142; shshshfpb=n7UymiTWOsGPvQfCup%2B3J1g%3D%3D; joyya=1647305570.1647305909.27.0kop377; __jda=122270672.1507876332.1640075637.1647314039.1647318046.22; token=d5899471c4530886f6a9658cbea3ca94,3,915176; __tk=1570759a7dd1a720b0db2dec5df8d044,3,915176; CCC_SE=ADC_Wj0UWzuXioxsiUvbIxw9PbW9q011vNMASHkfjXFO%2fZlkeGDtZUHe5qgaEpWv8RDEkCruGSGmCItsvHjIZ3aHbh9heUjNIZh6WZl9ZDfDokk66kRX6I%2by%2bDsdf4JtPOQUuULSsWOA%2fcDyP7Bb91YuHOwNnciLtS97UIKO7XA5sAd34Rf4XDKijy6Fw1DFTx%2b7izzme6YALuLp9Y%2bByC6aUTDzU9te7g1BZXPXtfGGwqu52ZVkdVId2jpxPnhX24fFD9WI9aX1qgswZ1PPZSGYKswUkqXhIf2S9aLFkjXW2n61LVzw2ZeqJRQI8QIcmi%2fF7WHOHLbWScnKwG594WIk0SRiCa0n2aEJAhVlXmzEE%2f5%2f%2bXWsKhlneTLduVs52ST5m96zdx%2bLnNGgDERqznFNu3AT5zvLcN0PyVq08n4keSv2ngLLTZK4QQJslS4he9MT3XJoEUfe9L8beZNh1239eLHYF6w4KWMCWWTfwxdCUOY%3d; unpl=JF8EAJZnNSttDEhSAkwDE0dEGAoEWw8LSh9TbjRVXV5QHFIDGwMfGhd7XlVdXhRKFR9vYxRUXlNIUw4ZBysSEXteVV1YCE0TAGlnNWRtW0tkBCsCHxMWQltTXF8LeycDZ2M1VFxZSlYGGwcTEhhObWRbXQlKFQBpYQVQbVl7VTVNbBsTEUpcVVteDENaA2tmA11bX0lWBisDKxE; __jdv=122270672|jd.idey.cn|t_2024175271_|tuiguang|e276f09debfa4c209a0ba829f7710596|1647318395561; thor=8D225D1673AA75681B9D3811417B0D325568BB2DD7F2729798D3AECF0428F59F4C39726C44E930AA2DD868FC4BCA33EA0D52228F39A68FC9F5C1157433CAACF1110B20B6975502864453B70E6B21C0ED165B733359002643CD05BDBA37E4A673AF38CC827B6013BCB5961ADA022E57DB6811E99E10E9C4E6410D844CD129071F7646EC7CE120A0B3D2F768020B044A010452D9F8ABD67A59D41880DD1991935C; 3AB9D23F7A4B3C9B=24HI5ARAA3SK7RJERTWUDZKA2NYJIXX3ING24VG466VC3ANKUALJLLD7VBYLQ3QPRYUSO3R6QBJYLVTVXBDIGJLGBA; __jdb=122270672.5.1507876332|22.1647318046; shshshsID=d7a96097b296c895558adfd840546a72_5_1647318650562",
    "referer": "https://search.jd.com/"
}
def crawlProductComment(url):
    # 讀取原始數據(注意選擇gbk編碼方式)
    try:
        req = requests.get(url=url, headers=headers2).text
        reqs = req.replace("fetchJSON_comment98(", "").strip(');')
        print(reqs)
        jsondata = json.loads(reqs)
        # 遍歷商品評論列表
        comments = jsondata['comments']
        return comments
    except IOError:
        print("Error: gbk不合適")
    # 從原始數據中提取出JSON格式數據(分別以'{'和'}'作為開始和結束標志)



def getProduct(url):
    ids = []
    req = requests.get(url=url, headers=headers2).text
    reqs = req.replace("jQuery1544821(", "").strip(')')
    jsondata = json.loads(reqs)['291']
    for i in range(0, len(jsondata)):
        ids.append(jsondata[i]['sku_id'])
    print(ids)
    return ids







import paramiko

#服務器信息,主機名(IP地址)、端口號、用戶名及密碼
hostname = "node1"
port = 22
username = "root"
password = "123456"


client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, port, username, password, compress=True)
sftp_client = client.open_sftp()
remote_file = sftp_client.open("/opt/a.log",'a+')  # 文件路徑
ids = []
for i in range(2,3):
    product_id = getProduct(
        "https://search-x.jd.com/Search?callback=jQuery1544821&area=5&enc=utf-8&keyword=%E7%94%B7%E5%A3%AB%E8%BF%90%E5%8A%A8%E9%9E%8B&adType=7&page="+str(i)+"&ad_ids=291%3A33&xtest=new_search&_=1647325621019")
    time.sleep(random.randint(1, 3))
    ids.append(product_id)

data = []
count = 0
for k in list(set(itertools.chain.from_iterable(ids))):
    for i in range(0, 100):
        url = 'https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98&productId=' + str(
            k) + '&score=0&sortType=5&page=' \
              + str(i) + '&pageSize=10&isShadowSku=0&fold=1'
        comments = crawlProductComment(url)
        if len(comments) <= 0:
            break
        print(comments)
        remote_file.writelines(str(len(comments))+"\n")
        data.extend(comments)
        # 設置休眠時間
        time.sleep(random.randint(1, 5))
        print('-------', i)

    print("這是第{}類商品".format(count))
    count += 1

kafka接收到數據並顯示出來

4.sparkstreaming接收kafka數據並保存至mysql,動態存儲

 代碼:

package cn.itcast.edu.analysis.streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, DriverManager}
import java.text.SimpleDateFormat
import java.util.Date


/**
 * Author itcast
 * Desc Direct模式連接Kafka消費數據
 */
object Streaming {
  def main(args: Array[String]): Unit = {
    //TODO 0.准備環境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //the time interval at which streaming data will be divided into batches
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一個批次
    ssc.checkpoint("./ckp")

    //TODO 1.加載數據-從Kafka
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node1:9092",//kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],//key的反序列化規則
      "value.deserializer" -> classOf[StringDeserializer],//value的反序列化規則
      "group.id" -> "ct",//消費者組名稱
      //earliest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最早的消息開始消費
      //latest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最后/最新的消息開始消費
      //none:表示如果有offset記錄從offset記錄開始消費,如果沒有就報錯
      "auto.offset.reset" -> "latest",
      "auto.commit.interval.ms"->"5000",//自動提交的時間間隔
      "enable.auto.commit" -> (true: java.lang.Boolean)//是否自動提交
    )
    val topics = Array("ct")//要訂閱的主題
    //使用工具類從Kafka中消費消息
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, //位置策略,使用源碼中推薦的
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消費策略,使用源碼中推薦的
    )

    //TODO 2.處理消息
    val infoDS: DStream[Int] = kafkaDS.map(record => {
      val nowDate = new Date()
      val strDate: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(nowDate)
      val value: String = record.value()
      Class.forName("com.mysql.jdbc.Driver")
      //獲取mysql連接
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://node3:3306/edu?useUnicode=true&characterEncoding=utf-8", "root", "123456")
      //把數據寫入mysql
      try {
          val sql: String = "insert into spider(spider_time,number)values('" + strDate + "','" + value.toInt + "')"
          conn.prepareStatement(sql).executeUpdate()
      } finally {
        conn.close()
      }
      value.toInt
    })
    //TODO 3.輸出結果
    infoDS.print()
    //TODO 4.啟動並等待結束
    ssc.start()
    ssc.awaitTermination()//注意:流式應用程序啟動之后需要一直運行等待手動停止/等待數據到來

    //TODO 5.關閉資源
    ssc.stop(stopSparkContext = true, stopGracefully = true)//優雅關閉
  
  }
}

pom.xml配置

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.itcast</groupId>
  <artifactId>dataproject</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>dataproject Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <scala.version>2.12.11</scala.version>
    <spark.version>3.0.3</spark.version>
    <hadoop.version>3.1.4</hadoop.version>
  </properties>

  <repositories>
    <repository>
      <id>aliyun</id>
      <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
      <id>apache</id>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
    </repository>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>
  <dependencies>
    <!--依賴Scala語言-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!--SparkCore依賴-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- spark-streaming-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!--spark-streaming+Kafka依賴-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!--SparkSQL依賴-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!--SparkSQL+ Hive依賴-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive-thriftserver_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!--StructuredStreaming+Kafka依賴-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- SparkMlLib機器學習模塊,里面有ALS推薦算法-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.5</version>
    </dependency>

    <dependency>
      <groupId>com.hankcs</groupId>
      <artifactId>hanlp</artifactId>
      <version>portable-1.7.7</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.22</version>
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.47</version>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.0.3</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.2</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <plugins>
      <!-- 指定編譯java的插件 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
      </plugin>
      <!-- 指定編譯scala的插件 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.3</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer
                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass></mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

5.可視化展示

 動態顯示爬取的評論數量,每5秒更新圖像,這里就不展示了

 詳見:ecahrts實現動態刷新(隔幾秒重新顯示)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM