圖計算實現ID_Mapping、Oneid打通數據孤島


 

ID_Mapping與Oneid的作用

大神告訴我們Oneid能用來做什么

在這里插入圖片描述

輸入數據源格式樣例

樣例數據圖1
在這里插入圖片描述
整理后數據圖2
在這里插入圖片描述

實現原理

聯通圖
在這里插入圖片描述
生成最大聯通圖
在這里插入圖片描述
留下耀總的數據給大家練習了

當日代碼生成


import java.util.UUID import cn.scfl.ebt.util.UtilTool import org.apache.spark.SparkContext import org.apache.spark.graphx._ import org.apache.spark.sql.SparkSession import org.spark_project.jetty.util.StringUtil /** * @Author: baierfa * @version: v1.0 * @description: id_mapping 單天實現暫時不加入多天滾動計算 多天計算需要看另一文件YeAndTodayGraphx * @Date: 2020-07-05 10:24 */ object TodayGraphx { def main(args: Array[String]): Unit = { //聲明環境變量 val spark = SparkSession .builder .appName(s"${this.getClass.getName}") .master("local[*]") .getOrCreate() val sc = spark.sparkContext val todayPath = "D:\\TESTPATH\\inputpath\\today\\dt=202-07-13" val outPutPath="D:\\TESTPATH\\outtpath\\today\\dt=202-07-13" val edgeoutPutPath="D:\\TESTPATH\\edgepath\\today\\dt=202-07-13" todayIdMapping(spark,sc,todayPath,outPutPath,edgeoutPutPath) spark.close() } /** * 功能描述: <輸入今天數據路徑 按照文件形式輸出到指定路徑中 並推出今日圖計算點與邊集合總個數> * 〈使用今日輸入數據轉換成唯一數字值 圖計算之后再將數值轉換回明文 生成唯一uuid〉 * @Param: [spark, sc, todayPath, outPutPath, edgeoutPutPath] * @Return: void * @Author: baierfa * @Date: 2020-08-05 10:18 */ def todayIdMapping(spark:SparkSession,sc: SparkContext,todayPath: String,outPutPath:String ,edgeoutPutPath:String )={ // 一、數據加載 // 今天數據加載 val todaydf = spark.read.textFile(todayPath) // 二、處理數據為生成圖做准備 // 生成今日點集合 val to_veritx = todaydf.rdd.flatMap(line => { // 將數據源進行分割 val field = line.split("\t") //把數據轉換成(long,值)要想long值不重復 可以使用hashcode //本文用於生產環境 使用了md5加密 詳細文件請看其他篇章 for (ele <- field if StringUtil.isNotBlank(ele)&&(!"\\N".equals(ele))) yield (UtilTool.getMD5(ele), ele) }) // 生成今日邊集合 val to_edges = todaydf.rdd.flatMap(line => { // 將數據源進行分割 val field = line.split("\t") //將數據轉換 將值轉換成邊 用於連線 連線值這邊用""想更換看個人意願 for (i <- 0 to field.length - 2 if StringUtil.isNotBlank(field(i))&&(!"\\N".equals(field(i))) ;j <- i + 1 to field.length - 1 if StringUtil.isNotBlank(field(j))&&(!"\\N".equals(field(j)))) yield Edge(UtilTool.getMD5(field(i)), UtilTool.getMD5(field(j)), "") }) // 在數據不做多次etl數據操作下可以使用共同出現次數來判定是否歸並為同一個用戶 // 例如 合並起來用戶 mobile 與 device_id 同時出現兩次以上才被記入同一個 // .map(edge => (edge, 1)) // .reduceByKey(_ + _) // .filter(tp => tp._2 > 2) // .map(tp => tp._1) // 三、匯總各個節點使用圖計算生成圖 // 單將數據重新賦值適用於以后多數據源合並 val veritx = to_veritx val edges = to_edges // 開始使用點集合與邊集合進行圖計算訓練 val graph = Graph(veritx, edges) // 四、生成最大連通圖 val graph2 = graph.connectedComponents() val vertices = graph2.vertices // 五、將最小圖計算值替換成uuid val uidRdd = vertices.map(tp => (tp._2, tp._1)) .groupByKey() .map(tp => (StringUtil.replace(UUID.randomUUID().toString, "-", ""), tp._2)) // 對點與邊進行統計作為記錄輸出 可以用於后期統計檢查生成報表警報數據是否異常 val uu = veritx.map(lin=>("vertices",1)).union(edges.map(lin=>("edges",1))).reduceByKey(_ + _) .map(tp=>tp._1+"\t"+tp._2) // 將現有的數據轉換成銘文識別后展示 // 將各個點的數據匯總到driver端 val idmpMap = veritx.collectAsMap() // 按照map方式廣播出去做轉換 val bc = sc.broadcast(idmpMap) // 將數據的id轉換成明文 val ss = uidRdd.mapPartitions(itemap => { val vert_id_map = bc.value itemap.map(tp => { //從廣播變量中獲取id值的信息並轉換 val t2 = for (ele <- tp._2) yield vert_id_map.get(ele).get //按照將要輸出的數據格式進行排版 (uuid mobile1,mobile2,mobile3,device_id1,device_2) tp._1+"\t"+t2.mkString(",") }) }) // 數據輸出 ss.saveAsTextFile(outPutPath) uu.saveAsTextFile(edgeoutPutPath) } }

引用jar包

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.4.0</spark.version>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        
        <dependency>
            <groupId>com.thoughtworks.paranamer</groupId>
            <artifactId>paranamer</artifactId>
            <version>2.8</version>
        </dependency>

    </dependencies>

啟動命令
spark-submit \ --class IdMapping \ --master yarn \ --deploy-mode cluster \ --num-executors 40 \ --driver-memory 4g \ --executor-memory 6g \ --executor-cores 3 \ --conf spark.default.parallelism=400 \ --conf spark.shuffle.memoryFraction=0.3 \ ID_Mapping_Spark.jar \ hdfs://user/hive/oneid_data/data_origindata_di/dt=2020-07-13 \ hdfs://user/hive/oneid_data/data_sink_id_mapping/dt=2020-07-14 \ hdfs://user/hive/oneid_data/data_sink_edge_vertex/dt=2020-07-14

辛苦碼字如有轉載請標明出處謝謝!——拜耳法

都看到這里了非常感謝!
本片章暫未完結 有疑問請+vx :baierfa
在這里插入圖片描述

PS:我要在下一章在我心中不完美的你打一個淋漓盡致的標簽

將大神掛在那片白雲下: oneid與用戶標簽之間的相互打通 實現用戶標簽


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM