第一章 ADS 聚合層
ads層,主要是根據各種報表及可視化來生成統計數據。通常這些報表及可視化都是基於某些維度的匯總統計。
1 需求
熱門商品統計(作業)
熱門品類統計(作業)
熱門品牌統計
交易用戶性別對比(作業)
交易用戶年齡段對比(作業)
交易額省市分布(作業)
2 分析
以熱門商品統計為例
統計表分為三個部分
時間點、 維度 、 度量
時間點:即統計結果產生的時間,或者本批次數據中業務日期最早的時間。
維度:統計維度,比如地區、商品名稱、性別
度量:匯總的數據,比如金額、數量
每個批次進行一次聚合,根據數據的及時性要求,可以調整批次的時間長度。
聚合后的結果存放到數據庫中。
3 數據庫的選型與難點
聚合數據本身並不麻煩,利用reducebykey或者groupbykey都可以聚合。
但是麻煩的是實現精確性一次消費。
因為聚合數據不是明細,沒有確定的主鍵,所以沒有辦法實現冪等。
那么如果想實現精確一次消費,就要考慮利用關系型數據庫的事務處理。
用本地事務管理最大的問題是數據保存操作要放在driver端變成單線程操作。性能降低。 但是由於本業務保存的是聚合后的數據所以數據量並不大,即使單線程保存也是可以接受的。
因此數據庫和偏移量選用mysql進行保存。
4 代碼實現
4.1 工具類
pom.xml 增加
<dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc_2.11</artifactId> <version>2.5.0</version> </dependency> <!-- scalikejdbc-config_2.11 --> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc-config_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> |
MysqlUtil 用於查詢Mysql數據庫
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, Statement} import com.alibaba.fastjson.JSONObject import scala.collection.mutable.ListBuffer
object MysqlUtil {
def main(args: Array[String]): Unit = { val list: List[ JSONObject] = queryList("select * from offset_2020") println(list) }
def queryList(sql:String):List[JSONObject]={ Class.forName("com.mysql.jdbc.Driver") val resultList: ListBuffer[JSONObject] = new ListBuffer[ JSONObject]() val conn: Connection = DriverManager.getConnection("jdbc:mysql://hadoop2:3306/gmall1122_rs?characterEncoding=utf-8&useSSL=false","root","123123") val stat: Statement = conn.createStatement println(sql) val rs: ResultSet = stat.executeQuery(sql ) val md: ResultSetMetaData = rs.getMetaData while ( rs.next ) { val rowData = new JSONObject(); for (i <-1 to md.getColumnCount ) { rowData.put(md.getColumnName(i), rs.getObject(i)) } resultList+=rowData }
stat.close() conn.close() resultList.toList }
} |
OffsetManagerM 用於查詢Mysql數據庫中的偏移量
import java.util
import com.alibaba.fastjson.JSONObject import org.apache.kafka.common.TopicPartition import redis.clients.jedis.Jedis
object OffsetManagerM {
/** * 從Mysql中讀取偏移量 * @param groupId * @param topic * @return */ def getOffset(groupId:String,topic:String):Map[TopicPartition,Long]={ var offsetMap=Map[TopicPartition,Long]()
val jedisClient: Jedis = RedisUtil.getJedisClient
val redisOffsetMap: util.Map[String, String] = jedisClient.hgetAll("offset:"+groupId+":"+topic)
val offsetJsonObjList: List[JSONObject] = MysqlUtil.queryList("SELECT group_id ,topic,partition_id , topic_offset FROM offset_2020 where group_id='"+groupId+"' and topic='"+topic+"'")
jedisClient.close() if(offsetJsonObjList!=null&&offsetJsonObjList.size==0){ null }else {
val kafkaOffsetList: List[(TopicPartition, Long)] = offsetJsonObjList.map { offsetJsonObj => (new TopicPartition(offsetJsonObj.getString("topic"),offsetJsonObj.getIntValue("partition_id")), offsetJsonObj.getLongValue("topic_offset")) } kafkaOffsetList.toMap } } } |
4.2 數據庫准備
創建專用保存數據結果的數據庫 create database gmall1122_rs 用於保存偏移量 CREATE TABLE `offset_1122` ( `group_id` varchar(200) NOT NULL, `topic` varchar(200) NOT NULL, `partition_id` int(11) NOT NULL, `topic_offset` bigint(20) DEFAULT NULL, PRIMARY KEY (`group_id`,`topic`,`partition_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 用戶保存商品聚合結果 CREATE TABLE `spu_order_final_detail_amount_stat` ( stat_time datetime ,spu_id varchar(20) ,spu_name varchar(200),amount decimal(16,2) , PRIMARY KEY (`stat_time`,`spu_id`,`spu_name`) )ENGINE=InnoDB DEFAULT CHARSET=utf8 |
4.3 實時計算代碼
import java.text.SimpleDateFormat import java.util.Date
import com.alibaba.fastjson.JSON import com.atguigu.gmall1122.realtime.bean.OrderDetailWide import com.atguigu.gmall1122.realtime.util.{MyKafkaUtil, OffsetManager, OffsetManagerM} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkConf, rdd} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} import scalikejdbc.{DB, SQL} import scalikejdbc.config.DBs
object SpuAmountSumApp {
def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ads_spu_amount_sum_app")
val ssc = new StreamingContext(sparkConf, Seconds(5)) val topic = "DWS_ORDER_DETAIL_WIDE"; val groupId = "ads_spu_amount_sum_group"
///////////////////// 偏移量處理/////////////////////////// val offset: Map[TopicPartition, Long] = OffsetManagerM.getOffset(groupId, topic)
var inputDstream: InputDStream[ConsumerRecord[String, String]] = null // 判斷如果從redis中讀取當前最新偏移量 則用該偏移量加載kafka中的數據 否則直接用kafka讀出默認最新的數據 if (offset != null && offset.size > 0) { inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, offset, groupId) } else { inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId) }
//取得偏移量步長 var offsetRanges: Array[OffsetRange] = null val inputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = inputDstream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }
val orderDstreamDetailWideDstream: DStream[OrderDetailWide] = inputGetOffsetDstream.map { record => val jsonStr: String = record.value() val orderDetailWide: OrderDetailWide = JSON.parseObject(jsonStr, classOf[OrderDetailWide]) orderDetailWide } val orderWideWithSpuDstream: DStream[(String, Double)] = orderDstreamDetailWideDstream.map(orderWide=>(orderWide.spu_id+":"+orderWide.spu_name,orderWide.final_detail_amount))
val spuAmountDstream: DStream[(String, Double)] = orderWideWithSpuDstream.reduceByKey(_+_)
spuAmountDstream.foreachRDD { rdd => val resultArr: Array[(String, Double)] = rdd.collect() if (resultArr != null && resultArr.size > 0) { DBs.setup() DB.localTx(implicit session => { val dateTime: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) for ((spu, amount) <- resultArr) { val spuArr: Array[String] = spu.split(":") val spuId: String = spuArr(0) val spuName: String = spuArr(1) SQL("INSERT INTO spu_order_final_detail_amount_stat(stat_time,spu_id, spu_name, amount) VALUES (?,?,?,?)").bind(dateTime, spuId, spuName, amount).update().apply() } throw new RuntimeException("測試異常!!") for (offset <- offsetRanges) { //主鍵相同替換 主鍵不同新增 SQL("replace INTO offset_2020(group_id,topic, partition_id, topic_offset) VALUES (?,?,?,?)").bind(groupId, topic, offset.partition, offset.untilOffset).update().apply() }
}
) } } ssc.start() ssc.awaitTermination() }
} |
4.4 關於本地事務保存MySql
此處引用了一個 scala的MySQL工具:scalikeJdbc
配置文件: 默認使用 application.conf
db.default.driver="com.mysql.jdbc.Driver" db.default.url="jdbc:mysql://hadoop2/gmall1122_rs?characterEncoding=utf-8&useSSL=false" db.default.user="root" db.default.password="123123" |
加載配置
本地事務提交數據
DB.localTx(implicit session => { SQL("INSERT INTO spu_order_final_detail_amount_stat(stat_time,spu_id, spu_name, amount) VALUES (?,?,?,?)").bind(dateTime, spuId, spuName, amount).update().apply() SQL("replace INTO offset_2020(group_id,topic, partition_id, topic_offset) VALUES (?,?,?,?)").bind(groupId, topic, offset.partition, offset.untilOffset).update().apply() } ) |
凡是在 DB.localTx(implicit session => { } )中的SQL全部被本地事務進行關聯,一條失敗全部回滾。
第二章 發布接口
發布接口的目的是為可視化工具提供數據服務。
發布接口的地址和參數都要根據可視化工具的要求進行設置。
后面的可視化工具選用了阿里雲服務的DataV,由於DataV對地址沒有要求(可以自行配置),只對返回數據格式有一定要求。最好可以提前了解一下數據格式的要求。
或者可以不考慮接口格式,先完成service的查詢,然后再controller針對不同的格式要求在進行調整。
1 配置文件
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>gmall2019_dw</artifactId> <groupId>com.atguigu.gmall2019.dw</groupId> <version>1.0-SNAPSHOT</version> </parent> <groupId>com.atguigu.gmall2019.dw.publisher</groupId> <artifactId>dw-publisher</artifactId> <version>0.0.1-SNAPSHOT</version> <name>dw-publisher</name> <description>Demo project for Spring Boot</description>
<properties> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/io.searchbox/jest -->
<dependency> <groupId>com.atguigu.gmall2019.dw</groupId> <artifactId>dw-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.4</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project> |
|
application.properties
server.port=8070 logging.level.root=error spring.datasource.driver-class-name= com.mysql.jdbc.Driver spring.datasource.url= jdbc:mysql://hadoop2/gmall1122_rs?characterEncoding=utf-8&useSSL=false spring.datasource.data-username=root spring.datasource.data-password=123123
# mybatis mybatis.mapperLocations=classpath:mapper/*.xml mybatis.configuration.map-underscore-to-camel-case=true
|
其中mybatis.mapperLocations的作用:能夠讓spring容器找到mapper.xml,用於和mapper接口進行配對。
5 代碼部分
控制層 |
PublisherController |
實現接口的web發布 |
服務層 |
MySQLService |
數據業務查詢interface |
MySQLServiceImpl |
業務查詢的實現類 |
數據層 |
TrademarkAmountSumMapper |
數據層查詢的interface |
TrademarkAmountSum.xml |
數據層查詢的實現配置,實質上是Mapper接口的“實現類”。 |
主程序 |
GmallPublisherApplication |
增加掃描包 |
5.1 GmallPublisherApplication 增加掃描包
作用:能夠讓spring容器找到mapper接口用於和mapper.xml進行配對
@SpringBootApplication @MapperScan(basePackages = "com.atguigu.gmallXXXXXXX.publisher.mapper") public class Gmall2019PublisherApplication{
public static void main(String[] args) { SpringApplication.run(Gmall2019PublisherApplication.class, args); }
} |
5.2 controller層
package com.atguigu.gmall1122.publisher.controller;
import com.alibaba.fastjson.JSON; import com.atguigu.gmall1122.publisher.service.MysqlService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.Map;
@RestController public class DataVController {
//路徑和參數隨便定 ,但是返回值要看datav的需要 @Autowired MysqlService mysqlService;
@GetMapping("trademark-sum") public String trademarkSum(@RequestParam("start_date") String startDate, @RequestParam("end_date") String endDate){ if(startDate.length()==0|| endDate.length()==0){ return "參數不能為空!"; } startDate = startDate.replace("_", " "); endDate = endDate.replace("_", " "); List<Map> trademardSum = mysqlService.getTrademardSum(startDate, endDate);
return JSON.toJSONString(trademardSum) ;
}
} |
5.3 service層
public interface MysqlService {
public List<Map> getTrademardSum(String startDate,String endDate); } |
5.4 service層實現類
@Service public class MysqlServiceImpl implements MysqlService {
@Autowired TrademarkAmountSumMapper trademarkAmountSumMapper;
@Override public List<Map> getTrademardSum(String startDate, String endDate) { return trademarkAmountSumMapper.selectTradeSum(startDate,endDate); } } |
5.5 數據層 mapper
public interface TrademarkAmountSumMapper {
public List<Map> selectTradeSum(@Param("start_Date") String startDate , @Param("end_Date")String endDate); } |
5.6 數據層 實現配置
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper SYSTEM "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.atguigu.gmall1122.publisher.mapper.TrademarkAmountSumMapper"> <select id="selectTradeSum" resultMap="trademarkSumMap"> select tm_name ,sum(amount) amount from `trademark_amount_sum_stat` where stat_time >= #{start_Date} and stat_time < #{end_Date} group by tm_id ,tm_name order by sum(amount) desc </select>
<resultMap id="trademarkSumMap" type="java.util.Map" autoMapping="true"> </resultMap>
</mapper> |
第三章 可視化
1 DataV
阿里雲網址: https://datav.aliyun.com/
官方幫助手冊: https://help.aliyun.com/document_detail/30360.html
阿里雲有兩大數據可視化服務,一個是QuickBI,一個就是DataV。
QuickBI定位BI工具定位由數據分析師使用,通過靈活配置各種多維分析、深度鑽取,生成各種報表和可交互的圖形化展示。
而DataV ,傾向於定制數據大屏,針對運營團隊使用的信息豐富炫酷的監控型可視化工具。
2 數據源
DataV的數據源主要是兩方面,阿里雲數據服務體系內的數據源和外部數據源。
阿里雲數據服務體系內的數據源,類型非常多,包括RDS服務,ADS服務,TableStore服務等等。
本文只介紹基於外部數據源的配置方式。 外部數據源就是要發布出可以外網訪問的地址,每一個可視化組件都要對應一個訪問地址。
3 配置步驟
3.1 首先來到首頁,在我的可視化標簽中,選擇【新建可視化】

2 選擇合適的大屏模板

3 選中你要配置的組件

4 選擇左側中間的標簽頁

5 觀察左側下方的靜態數據,實現對應的接口程序

6 根據數據結構調整web接口的響應數據
@GetMapping("trademark-sum") public String trademarkSum(@RequestParam("start_date") String startDate, @RequestParam("end_date") String endDate){ if(startDate.length()==0|| endDate.length()==0){ return "參數不能為空!"; } startDate = startDate.replace("_", " "); endDate = endDate.replace("_", " "); List<Map> trademarkSumList = mysqlService.getTrademarkSum(startDate, endDate);
//根據DataV圖形數據要求進行調整, x :品牌 ,y 金額, s 1 List<Map> datavList=new ArrayList<>(); for (Map trademardSumMap : trademarkSumList) { Map map = new HashMap<>(); map.put("x",trademardSumMap.get("tm_name")); map.put("y",trademardSumMap.get("amount")); map.put("s",1); datavList.add(map); }
return JSON.toJSONString(datavList) ;
} |
7 配置組件的數據源
點擊【配置數據源】

數據源的頁面選擇API

選擇API后,填寫URL,數據接口訪問路徑
(實現個人電腦發布服務需要內網穿透,請參考第二章)

調整自動更新速度

下方能看到數據發生變化

可視化效果

第四章 內網穿透
1 作用
通常個人電腦無論是連接WIFI上網還是用網線上網,都是屬於局域網里邊的,外網無法直接訪問到你的電腦,內網穿透可以讓你的局域網中的電腦實現被外網訪問功能。
2 工具
目前國內網穿透工具很多,常見的比如花生殼、Ngrok。
官網:
花生殼:https://hsk.oray.com
Ngrok: http://www.ngrok.cc
本文以介紹花生殼為主
3 准備工作
首先注冊、登錄
並且需要實名認證(要提供身份證正反面照片)

4 下載安裝電腦客戶端
5 在客戶端進行配置
在登錄后的界面

在右下角點擊加號
6 進行內網穿透的核心配置

7 發布
保存后就可以用下方圖中箭頭處使用開關來確認發布。
發布地址就如圖中網址

8 測試:
