大數據實時項目(ads層)


 

第一章  ADS 聚合層

   ads層,主要是根據各種報表及可視化來生成統計數據。通常這些報表及可視化都是基於某些維度的匯總統計。

 

1  需求

熱門商品統計(作業)

熱門品類統計(作業)

熱門品牌統計 

交易用戶性別對比(作業)

交易用戶年齡段對比(作業)

交易額省市分布(作業)

2  分析

以熱門商品統計為例

 

統計表分為三個部分

時間點、 維度 、 度量

 

時間點:即統計結果產生的時間,或者本批次數據中業務日期最早的時間。

維度:統計維度,比如地區、商品名稱、性別

度量:匯總的數據,比如金額、數量

 

每個批次進行一次聚合,根據數據的及時性要求,可以調整批次的時間長度。

聚合后的結果存放到數據庫中。

 

 

3 數據庫的選型與難點

 

聚合數據本身並不麻煩,利用reducebykey或者groupbykey都可以聚合。

但是麻煩的是實現精確性一次消費。

 

因為聚合數據不是明細,沒有確定的主鍵,所以沒有辦法實現冪等。

 

那么如果想實現精確一次消費,就要考慮利用關系型數據庫的事務處理。

 

用本地事務管理最大的問題是數據保存操作要放在driver端變成單線程操作。性能降低。 但是由於本業務保存的是聚合后的數據所以數據量並不大,即使單線程保存也是可以接受的。

 

因此數據庫和偏移量選用mysql進行保存。

 

 

 

4 代碼實現

4.1 工具類

pom.xml 增加

<dependency>
    <groupId>org.scalikejdbc</groupId>
    <artifactId>scalikejdbc_2.11</artifactId>
    <version>2.5.0</version>
</dependency>
<!-- scalikejdbc-config_2.11 -->
<dependency>

    <groupId>org.scalikejdbc</groupId>
    <artifactId>scalikejdbc-config_2.11</artifactId>
    <version>2.5.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>

 

 

 

MysqlUtil  用於查詢Mysql數據庫


import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, Statement}
import com.alibaba.fastjson.JSONObject
import scala.collection.mutable.ListBuffer

object  MysqlUtil {

  def main(args: Array[String]): Unit = {
      val list:  List[ JSONObject] = queryList("select * from offset_2020")
      println(list)
  }

  def   queryList(sql:String):List[JSONObject]={
         Class.forName("com.mysql.jdbc.Driver")
        val resultList: ListBuffer[JSONObject] = new  ListBuffer[ JSONObject]()
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://hadoop2:3306/gmall1122_rs?characterEncoding=utf-8&useSSL=false","root","123123")
        val stat: Statement = conn.createStatement
        println(sql)
        val rs: ResultSet = stat.executeQuery(sql )
        val md: ResultSetMetaData = rs.getMetaData
        while (  rs.next ) {
           val rowData = new JSONObject();
          for (i  <-1 to md.getColumnCount  ) {
              rowData.put(md.getColumnName(i), rs.getObject(i))
          }
          resultList+=rowData
        }

        stat.close()
        conn.close()
        resultList.toList
  }

}

 

 

 

 

 

 

 

 

 

 

 

OffsetManagerM 用於查詢Mysql數據庫中的偏移量

import java.util

import com.alibaba.fastjson.JSONObject
import org.apache.kafka.common.TopicPartition
import redis.clients.jedis.Jedis

object OffsetManagerM {


  /**
    * 從Mysql中讀取偏移量
    * @param groupId
    * @param topic
    * @return
    */
  def getOffset(groupId:String,topic:String):Map[TopicPartition,Long]={

      var offsetMap=Map[TopicPartition,Long]()

      val jedisClient: Jedis = RedisUtil.getJedisClient

      val redisOffsetMap: util.Map[String, String] = jedisClient.hgetAll("offset:"+groupId+":"+topic)


      val offsetJsonObjList: List[JSONObject] = MysqlUtil.queryList("SELECT  group_id ,topic,partition_id  , topic_offset  FROM offset_2020 where group_id='"+groupId+"' and topic='"+topic+"'")

      jedisClient.close()
      if(offsetJsonObjList!=null&&offsetJsonObjList.size==0){
            null
      }else {


            val kafkaOffsetList: List[(TopicPartition, Long)] = offsetJsonObjList.map { offsetJsonObj  =>
             (new TopicPartition(offsetJsonObj.getString("topic"),offsetJsonObj.getIntValue("partition_id")), offsetJsonObj.getLongValue("topic_offset"))
           }
           kafkaOffsetList.toMap
      }
   }
 
}

 

 

 

 

 

 

4.2  數據庫准備

 

創建專用保存數據結果的數據庫

create database gmall1122_rs

 

用於保存偏移量

CREATE TABLE `offset_1122` (

  `group_id` varchar(200) NOT NULL,

  `topic` varchar(200) NOT NULL,

  `partition_id` int(11) NOT NULL,

  `topic_offset` bigint(20) DEFAULT NULL,

  PRIMARY KEY (`group_id`,`topic`,`partition_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

 

 

用戶保存商品聚合結果

CREATE TABLE `spu_order_final_detail_amount_stat` ( stat_time datetime  ,spu_id varchar(20) ,spu_name  varchar(200),amount decimal(16,2) ,

  PRIMARY KEY (`stat_time`,`spu_id`,`spu_name`)

  )ENGINE=InnoDB  DEFAULT CHARSET=utf8

 

 

 

 

 

 

 

 

4.3  實時計算代碼

import java.text.SimpleDateFormat
import java.util.Date

import com.alibaba.fastjson.JSON
import com.atguigu.gmall1122.realtime.bean.OrderDetailWide
import com.atguigu.gmall1122.realtime.util.{MyKafkaUtil, OffsetManager, OffsetManagerM}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs


object SpuAmountSumApp {


  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ads_spu_amount_sum_app")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val topic = "DWS_ORDER_DETAIL_WIDE";
    val groupId = "ads_spu_amount_sum_group"


    /////////////////////  偏移量處理///////////////////////////
    val offset: Map[TopicPartition, Long] = OffsetManagerM.getOffset(groupId, topic)


    var inputDstream: InputDStream[ConsumerRecord[String, String]] = null
    // 判斷如果從redis中讀取當前最新偏移量 則用該偏移量加載kafka中的數據  否則直接用kafka讀出默認最新的數據
    if (offset != null && offset.size > 0) {

      inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, offset, groupId)
    } else {
      inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId)
    }

    //取得偏移量步長
    var offsetRanges: Array[OffsetRange] = null
    val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = inputDstream.transform { rdd =>

      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }

    val orderDstreamDetailWideDstream: DStream[OrderDetailWide] = inputGetOffsetDstream.map { record =>
      val jsonStr: String = record.value()
      val orderDetailWide: OrderDetailWide = JSON.parseObject(jsonStr, classOf[OrderDetailWide])
      orderDetailWide
    }
    val orderWideWithSpuDstream: DStream[(String, Double)] = orderDstreamDetailWideDstream.map(orderWide=>(orderWide.spu_id+":"+orderWide.spu_name,orderWide.final_detail_amount))

    val spuAmountDstream: DStream[(String, Double)] = orderWideWithSpuDstream.reduceByKey(_+_)

    spuAmountDstream.foreachRDD { rdd =>
      val resultArr: Array[(String, Double)] = rdd.collect()
      if (resultArr != null && resultArr.size > 0) {
        DBs.setup()
        DB.localTx(implicit session => {
          val dateTime: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
          for ((spu, amount) <- resultArr) {
            val spuArr: Array[String] = spu.split(":")
            val spuId: String = spuArr(0)
            val spuName: String = spuArr(1)
            SQL("INSERT INTO spu_order_final_detail_amount_stat(stat_time,spu_id, spu_name, amount) VALUES (?,?,?,?)").bind(dateTime, spuId, spuName, amount).update().apply()
          }
          throw new RuntimeException("測試異常!!")
          for (offset <- offsetRanges) {
            //主鍵相同替換 主鍵不同新增
            SQL("replace INTO offset_2020(group_id,topic, partition_id, topic_offset) VALUES (?,?,?,?)").bind(groupId, topic, offset.partition, offset.untilOffset).update().apply()

          }

        }

        )
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }

}

 

 

 

 

4.4 關於本地事務保存MySql

此處引用了一個 scala的MySQL工具:scalikeJdbc

配置文件: 默認使用 application.conf

db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop2/gmall1122_rs?characterEncoding=utf-8&useSSL=false"
db.default.user="root"
db.default.password="123123"

 

 

加載配置

DBs.setup()

 

本地事務提交數據

DB.localTx(implicit session => {
         SQL("INSERT INTO spu_order_final_detail_amount_stat(stat_time,spu_id, spu_name, amount) VALUES (?,?,?,?)").bind(dateTime, spuId, spuName, amount).update().apply()
        
      SQL("replace INTO offset_2020(group_id,topic, partition_id, topic_offset) VALUES (?,?,?,?)").bind(groupId, topic, offset.partition, offset.untilOffset).update().apply()
           
    }
)

 

凡是在 DB.localTx(implicit session => { } )中的SQL全部被本地事務進行關聯,一條失敗全部回滾。

 

 

第二章  發布接口

發布接口的目的是為可視化工具提供數據服務。

發布接口的地址和參數都要根據可視化工具的要求進行設置。

后面的可視化工具選用了阿里雲服務的DataV,由於DataV對地址沒有要求(可以自行配置),只對返回數據格式有一定要求。最好可以提前了解一下數據格式的要求。

或者可以不考慮接口格式,先完成service的查詢,然后再controller針對不同的格式要求在進行調整。

 

 

 1  配置文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>gmall2019_dw</artifactId>
    <groupId>com.atguigu.gmall2019.dw</groupId>
    <version>1.0-SNAPSHOT</version>
  </parent>
  <groupId>com.atguigu.gmall2019.dw.publisher</groupId>
  <artifactId>dw-publisher</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>dw-publisher</name>
  <description>Demo project for Spring Boot</description>

  <properties>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
 
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.searchbox/jest -->


    <dependency>

       <groupId>com.atguigu.gmall2019.dw</groupId>
       <artifactId>dw-common</artifactId>
       <version>1.0-SNAPSHOT</version>
    </dependency>

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
    </dependency>

 
 

 

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

 

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>

</dependency>

 


<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.3.4</version>
</dependency>

 

 


  </dependencies>

  <build>
    <plugins>
       <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
    </plugins>
  </build>

</project>
 

 

 

 

application.properties

server.port=8070

 

logging.level.root=error

 

spring.datasource.driver-class-name= com.mysql.jdbc.Driver


spring.datasource.url= jdbc:mysql://hadoop2/gmall1122_rs?characterEncoding=utf-8&useSSL=false


spring.datasource.data-username=root
spring.datasource.data-password=123123

# mybatis
mybatis.mapperLocations=classpath:mapper/*.xml

mybatis.configuration.map-underscore-to-camel-case=true

 

其中mybatis.mapperLocations作用能夠讓spring容器找到mapper.xml用於和mapper接口進行配對。

 

5 代碼部分

控制層

PublisherController

實現接口的web發布

服務層

MySQLService

數據業務查詢interface

MySQLServiceImpl

業務查詢的實現類

數據層

TrademarkAmountSumMapper

數據層查詢的interface

TrademarkAmountSum.xml

數據層查詢的實現配置,實質上是Mapper接口的“實現類”。

主程序

GmallPublisherApplication

增加掃描包

 

 

 

 

 

 

5.1   GmallPublisherApplication 增加掃描包

作用:能夠讓spring容器找到mapper接口用於和mapper.xml進行配對

@SpringBootApplication
@MapperScan(basePackages = "com.atguigu.gmallXXXXXXX.publisher.mapper")
public class Gmall2019PublisherApplication{

  public static void main(String[] args) {
     SpringApplication.run(Gmall2019PublisherApplication.class, args);
  }

}

 

 

5.2  controller

package com.atguigu.gmall1122.publisher.controller;


import com.alibaba.fastjson.JSON;
import com.atguigu.gmall1122.publisher.service.MysqlService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Map;

@RestController
public class DataVController {

    //路徑和參數隨便定 ,但是返回值要看datav的需要
    @Autowired

    MysqlService mysqlService;

    @GetMapping("trademark-sum")
    public String trademarkSum(@RequestParam("start_date") String startDate, @RequestParam("end_date") String endDate){
         if(startDate.length()==0||  endDate.length()==0){
                return "參數不能為空!";
         }
        startDate = startDate.replace("_", " ");
        endDate = endDate.replace("_", " ");
        List<Map> trademardSum = mysqlService.getTrademardSum(startDate, endDate);

        return JSON.toJSONString(trademardSum) ;


    }

 
}

 

 

5.3  service

public interface MysqlService {


    public List<Map> getTrademardSum(String startDate,String endDate);
}

 

 

 

5.4  service層實現類

@Service
public class MysqlServiceImpl implements MysqlService {

    @Autowired
    TrademarkAmountSumMapper trademarkAmountSumMapper;

    @Override
    public List<Map> getTrademardSum(String startDate, String endDate) {
        return trademarkAmountSumMapper.selectTradeSum(startDate,endDate);
    }
}

 

 

 

 

5.5 數據層 mapper

public interface TrademarkAmountSumMapper {

    public List<Map> selectTradeSum(@Param("start_Date") String startDate , @Param("end_Date")String endDate);
}

 

 

5.6 數據層 實現配置

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper SYSTEM "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.atguigu.gmall1122.publisher.mapper.TrademarkAmountSumMapper">

    <select id="selectTradeSum"  resultMap="trademarkSumMap">
         select tm_name ,sum(amount) amount from `trademark_amount_sum_stat`
        where stat_time >= #{start_Date}  and stat_time < #{end_Date}
        group by tm_id ,tm_name
        order by sum(amount) desc
    </select>

    <resultMap id="trademarkSumMap" type="java.util.Map" autoMapping="true">
    </resultMap>

</mapper>

 

 

 

 

 

第三章 可視化

 1  DataV

阿里雲網址https://datav.aliyun.com/

 

  官方幫助手冊: https://help.aliyun.com/document_detail/30360.html

 

    阿里雲有兩大數據可視化服務,一個是QuickBI,一個就是DataV。

QuickBI定位BI工具定位由數據分析師使用,通過靈活配置各種多維分析、深度鑽取,生成各種報表和可交互的圖形化展示。

而DataV ,傾向於定制數據大屏,針對運營團隊使用的信息豐富炫酷的監控型可視化工具。

 

 

 

 

2   數據源

    DataV的數據源主要是兩方面,阿里雲數據服務體系內的數據源和外部數據源。

阿里雲數據服務體系內的數據源,類型非常多,包括RDS服務,ADS服務,TableStore服務等等。

本文只介紹基於外部數據源的配置方式。 外部數據源就是要發布出可以外網訪問的地址,每一個可視化組件都要對應一個訪問地址。

 

3  配置步驟

 

3.1 首先來到首頁,在我的可視化標簽中,選擇【新建可視化】

 

 

 

 

2 選擇合適的大屏模板

 

 

 

 

3 選中你要配置的組件

 

 

 

 

 

4 選擇左側中間的標簽頁

 

 

 

 

 

5 觀察左側下方的靜態數據,實現對應的接口程序

 

 

 

 

6 根據數據結構調整web接口的響應數據

@GetMapping("trademark-sum")
public String trademarkSum(@RequestParam("start_date") String startDate, @RequestParam("end_date") String endDate){
     if(startDate.length()==0||  endDate.length()==0){
            return "參數不能為空!";
     }
    startDate = startDate.replace("_", " ");
    endDate = endDate.replace("_", " ");
    List<Map> trademarkSumList = mysqlService.getTrademarkSum(startDate, endDate);

    //根據DataV圖形數據要求進行調整,  x :品牌 ,y 金額, s 1
    List<Map> datavList=new ArrayList<>();

    for (Map trademardSumMap : trademarkSumList) {
        Map  map = new HashMap<>();
        map.put("x",trademardSumMap.get("tm_name"));
        map.put("y",trademardSumMap.get("amount"));
        map.put("s",1);
        datavList.add(map);
    }

    return JSON.toJSONString(datavList) ;

}

 

 

7 配置組件的數據源

點擊【配置數據源】

 

 

 

 

數據源的頁面選擇API

 

 

 

 

 

 

選擇API后,填寫URL,數據接口訪問路徑

(實現個人電腦發布服務需要內網穿透,請參考第二章)

 

 

 

 

 

 

調整自動更新速度

 

 

 

 

下方能看到數據發生變化

 

 

 

 

可視化效果

 

 

 

第四章 內網穿透

1 作用

通常個人電腦無論是連接WIFI上網還是用網線上網,都是屬於局域網里邊的,外網無法直接訪問到你的電腦,內網穿透可以讓你的局域網中的電腦實現被外網訪問功能。

 

 

2  工具

  目前國內網穿透工具很多,常見的比如花生殼、Ngrok。

  官網:

花生殼:https://hsk.oray.com

 

Ngrok:  http://www.ngrok.cc

 

   本文以介紹花生殼為主

 

3  准備工作

  

  首先注冊、登錄

  並且需要實名認證(要提供身份證正反面照片)

 

 

 

 

 

 

 

 

4  下載安裝電腦客戶端

 

5 在客戶端進行配置

在登錄后的界面

 

 

 

 

在右下角點擊加號

 

6 進行內網穿透的核心配置

 

 

 

 

7 發布

保存后就可以用下方圖中箭頭處使用開關來確認發布。

發布地址就如圖中網址

 

 

 

 

 

 

 

 

 

 

8 測試:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM