spark-紅酒-白酒評估


Storm
------------------
    實時計算,延遲很低。
    吞吐量小。
    tuple()

Spark Streaming
------------------
    DStream,離散流計算。
    相當於一序列RDD。
    按照時間片划分RDD。
    DStream分區 = RDD的分區。
    動態數據。
    StreamingContext( , Seconds(2))
    windows話操作,batch的擴展。
    吞吐量大。
    socketTextStream()            //Socket
                                //分區200ms

    kafka流                        //kafka分區 == rdd一個分區。

LocationStrategy
------------------
    位置策略,控制主題分區在哪個節點消費。
    PreferBroker                //首選kafka服務器
    PreferConsistent            //首選均衡處理
    PreferFixed                    //首選固定位置


ConsumerStrategy
-----------------
    控制消費者對kafka消息的消費范圍界定。
    Assign                        //指定,控制到主題下的分區.
    Subscribe                    //訂閱主題集合,控制不到主題下的某個分區。
    SubscribePattern            //正則消費,對Subscribe的增強,支持正則表達式.

消費語義模型
----------------
    1.at most once
        submitOffset()
        consumeMessage() ;

    2.at least once
        consumeMessage()
        commitOffset()

    3.exact once 
        依托於外部事務性資源(例如數據庫)產品的事務管理特性。
        將offset存儲到事務性資源庫中。


KafkaRDD分區計算
------------------
    通過kafkaRDD的consumer.assignmeng()方法來得到。
    而消費者對象是通過consumerStrategy.onStart獲得.
    因此KafkaRDD的分區數區域於消費者策略,原則上每
    個主題分區對應一個rdd分區,有些情況需要考量,比如
    分區上有限速的。


java版編程實現spark streaming kafka消息消費
---------------------------------------------
    package com.oldboy.spark.java;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.*;

    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    /**
     * Created by Administrator on 2018/5/22.
     */
    public class SparkStreamingKafkaJavaDemo {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf();
            conf.setAppName("stream") ;
            conf.setMaster("local[*]") ;

            //創建流上下文
            JavaStreamingContext ssc = new JavaStreamingContext(conf , Durations.seconds(5)) ;

            //位置策略
            LocationStrategy loc = LocationStrategies.PreferConsistent();

            //消費參數
            /**
             * "bootstrap.servers" -> "s102:9092,s103:9092",
             "key.deserializer" -> classOf[StringDeserializer],
             "value.deserializer" -> classOf[StringDeserializer]
             "group.id" -> "g1",
             "auto.offset.reset" -> "latest",
             "enable.auto.commit" -> (false: java.lang.Boolean)
             *
             */
            Map<String,Object> kafkaParams = new HashMap<String,Object>() ;
            kafkaParams.put("bootstrap.servers" ,"s102:9092" ) ;
            kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
            kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
            kafkaParams.put("group.id" , "g1") ;
            kafkaParams.put("auto.offset.reset" , "latest") ;
            kafkaParams.put("enable.auto.commit" , false) ;

            //消費主題
            List<String> topics = new ArrayList<String>() ;
            topics.add("t1") ;

            //消費者策略
            ConsumerStrategy con = ConsumerStrategies.Subscribe(topics , kafkaParams) ;
            JavaDStream<ConsumerRecord<String,String>> ds1 = KafkaUtils.<String,String>createDirectStream(ssc , loc , con) ;
            JavaDStream<String> ds2 = ds1.map(new Function<ConsumerRecord<String,String>, String>() {
                public String call(ConsumerRecord<String, String> v1) throws Exception {
                    return v1.value();
                }
            }) ;
            ds2.print();

            ssc.start();

            ssc.awaitTermination();

        }
    }


機器學習
----------------------
    算法。
    machine learning.


數學基礎
---------------
    1.kmean
        k個均值
    2.median
        中位數
            
    3.mode
        眾數
    4.range
        極差 , max - min
    5.variance
        方差
        差平方和平均值
        (x1-x)^2 + (x2-x)^2 + ...
        ----------------------------
                    n
    6.standard deviation
        標准差!
        方差的平方根。
        sqrt(variance)

    7.skewness
        偏度。
        對稱分布 : mean = median = mod
        左偏分布 : mean < median < mod
        右偏分布 : mean > median > mod
    8.kertosis
        峰度
        正態 :kertosis = 3
        較凸 :kertosis > 3
        平滑 :kertosis < 3

BI
-------------
    商業智能。


監督和非監督
--------------
    1.監督
        使用的數據都是打了標簽的。
        垃圾郵件分類。
        神經網絡
        SVM
        朴素貝葉斯

    2.非監督
        沒有標簽。
        Kmean



貝葉斯
----------------
    A事件發生時,B事件發生的概率。

                P(A|B) * P(B)
    P(B | A) = -----------------
                    P(A)



TF-IDF
----------
    1.TF
        term frequence,詞頻,針對單個文檔。
        word count.
        衡量單詞描述該文檔主題的相關性。

        //j : 第j篇文章
        //i : 第i個單詞
                    N(ij)
        TF(ij) = --------------------
                    Sum(N(j))
                
        
    2.IDF
        inverse document frequence,逆文檔頻率,針對文檔集合(語料庫)
        計算單詞對整個文檔集合的區分能力。

                          |D| 1000
        idf(i) = log10 -----------------------
                        出現單詞i的文檔個數 + 1

    3.TF-IDF
        tf衡量某個單詞在文章的重要性。
        idf衡量單詞用來區分整個語料庫的重要性。
        1000


最小二乘法
----------------
    平方和最小值.

線性回歸
---------------
    regress,
    呈現直線方式變換.
    回歸結果是變化的值。

邏輯回歸
------------------------
    計算的結果是固定值。
    線性回歸對結果進行二元判斷,就是邏輯回歸。

    
向量
------------------------
    (1,2,3,4)
     1
     2
     3
     4)

     (0,3,8,0,0,9,0,2)


松散向量:
-----------
    sparse vector,
    占用內存少。
    (1000,1:3,2:8,5:9,..)


密度向量:
-----------
    dense vector,
    (0,1,2,0,0,5,0,6)



hello world , how are you, thank you!!

1/7
you  = 2/ 7

1.
----------
    hello tom1

2.
---------------
    hello tom2
3.
---------------
    hello tom3
                3
hello = ----------- = 1
                3
y = ax1 + bx2 + ... nx11 + C


使用線性回歸實現酒質量預測
--------------------------
    1.引入maven依賴
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    2.編程
        /**
          * 使用spark的線性回歸預測紅酒質量
          */

        import org.apache.spark.{SparkConf, SparkContext}
        import org.apache.spark.ml.regression.LinearRegression
        import org.apache.spark.ml.param.ParamMap
        import org.apache.spark.ml.linalg.{Vector, Vectors}
        import org.apache.spark.sql.{Row, SparkSession}

        object SparkMLLibLinearRegress {


            def main(args: Array[String]): Unit = {

                val conf = new SparkConf()
                conf.setAppName("ml_linearRegress")
                conf.setMaster("local[*]")

                val spark = SparkSession.builder().config(conf).getOrCreate()
                //1.定義樣例類
                case class Wine(FixedAcidity: Double, VolatileAcidity: Double, CitricAcid: Double,
                                ResidualSugar: Double, Chlorides: Double, FreeSulfurDioxide: Double,
                                TotalSulfurDioxide: Double, Density: Double, PH: Double,
                                Sulphates: Double, Alcohol: Double,
                                Quality: Double)

                //2.加載csv紅酒文件,變換形成rdd
                val file = "file:///D:\\ml\\data\\red.csv" ;
                val wineDataRDD = spark.sparkContext.textFile(file)
                    .map(_.split(";"))
                    .map(w => Wine(w(0).toDouble, w(1).toDouble, w(2).toDouble, w(3).toDouble,
                        w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble,
                        w(9).toDouble, w(10).toDouble,
                        w(11).toDouble))

                //導入sparksession的隱式轉換對象的所有成員,才能將rdd轉換成Dataframe
                import spark.implicits._
                val trainingDF = wineDataRDD.map(w => (w.Quality, Vectors.dense(w.FixedAcidity, w.VolatileAcidity,
                    w.CitricAcid, w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                    w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
                trainingDF.show(100,false)

                //3.創建線性回歸對象
                val lr = new LinearRegression()        //val lr = new LinearRegression()

                //4.設置回歸對象參數
                lr.setMaxIter(2)                     //lr.setMaxIter(2)

                //5.擬合模型
                val model = lr.fit(trainingDF)             //val model = lr.fit(trainingDF)

                //6.構造測試數據集
                val testDF = spark.createDataFrame(Seq((5.0, Vectors.dense(7.4, 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68, 9.8)),
                    (5.0, Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56, 9.4)),
                    (7.0, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968, 3.36, 0.57, 9.5))))
                    .toDF("label", "features")

                //7.對測試數據集注冊臨時表
                testDF.createOrReplaceTempView("test")        //testDF.createOrReolaceTempView("test")

                //8.使用訓練的模型對測試數據進行預測,並提取查看項
                val tested = model.transform(testDF)
                tested.show(100,false)

                //
                val tested2 = tested.select("features", "label", "prediction")
                tested2.show(100,false)

                //9.展示預測結果
                tested.show()

                //10.通過測試數據集只抽取features,作為預測數據。
                val predictDF = spark.sql("select features from test")
                predictDF.show(100,false)
                model.transform(predictDF).show(1000,false)
            }
        }


模型持久化
------------------
    //保存模型
    val model = lr.fit(trainingDF)
    model.save("file:///d:/mr/model/linreg")

    //加載模型
    val model = LinearRegressionModel.load("file:///d:/mr/model/linreg")


使用邏輯回歸實現白酒的好壞評估
-------------------------------
    /**
      * 邏輯回歸
      */

    import org.apache.spark.SparkConf
    import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.regression.LinearRegressionModel
    import org.apache.spark.sql.SparkSession

    object WineLogisticRegressDemo {
        def main(args: Array[String]): Unit = {

            val conf = new SparkConf()
            conf.setAppName("logisticRegress")
            conf.setMaster("local[*]")

            val spark = SparkSession.builder().config(conf).getOrCreate()    //val spark=SparkSession.builder().config(conf).getOrCreate()
            //1.定義樣例類
            case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
                            CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
                            FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double,
                            PH: Double, Sulphates: Double, Alcohol: Double, Quality: Double)

            //2.加載csv紅酒文件,變換形成rdd
            val file = "file:///D:\\ml\\data\\white.csv";
            val wineDataRDD = spark.sparkContext.textFile(file)         //val wineDataRDD = spark.sparkContext.textFile(file)
                .map(_.split(";"))
                .map(w => Wine(w(0).toDouble, w(1).toDouble, w(2).toDouble, w(3).toDouble,
                    w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble,
                    w(9).toDouble, w(10).toDouble,
                    w(11).toDouble))

            //導入sparksession的隱式轉換對象的所有成員,才能將rdd轉換成Dataframe
            import spark.implicits._
            val trainingDF = wineDataRDD.map(w => (if (w.Quality < 7) 0D else
                1D, Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
                w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")

            //3.創建邏輯回歸對象
            val lr = new LogisticRegression()
            //設置回歸對象參數
            lr.setMaxIter(10).setRegParam(0.01)

            //4.擬合訓練數據,生成模型
            val model = lr.fit(trainingDF)

            //5.構造測試數據
            val testDF = spark.createDataFrame(Seq(
                (1.0, Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
                (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
                (0.0, Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
                (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
            ).toDF("label", "features")

            testDF.createOrReplaceTempView("test")

            //預測測試數據
            val tested = model.transform(testDF).select("features", "label", "prediction")

            //
            val realData = spark.sql("select features from test")

            model.transform(realData).select("features", "prediction").show(100, false)
        }
    }



+-----+-----------------------------------+------------------------------------------+-----------------------------------------+
|label|sentence                           |words                                     |rawFeatures                              |
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |
|1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+

+-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|label|sentence                           |words                                     |rawFeatures                              |features                                                                                                              |
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |(20,[0,5,9,17],[0.6931471805599453,0.6931471805599453,0.28768207245178085,1.3862943611198906])                        |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |(20,[2,7,9,13,15],[0.6931471805599453,0.6931471805599453,0.8630462173553426,0.28768207245178085,0.28768207245178085]) |
|1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|(20,[4,6,13,15,18],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.28768207245178085,0.6931471805599453])|
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+


def aggregate[B](z: =>B)
                (seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)

1:
----
comop(comop(comop(comop(seqop(seqop(seqop(zero,10),11),12) ,seqop(seqop(seqop(seqop(zero,13),14),15),16)) , 
            seqop(seqop(seqop(zero,7),8),9)) , 
            
            seqop(seqop(seqop(zero,1),2),3)) , 
            
            seqop(seqop(seqop(zero,4),5),6))

RDD.treeAggregation和depth :
---------------------------
    rdd樹形聚合,操作結果類似於aggregateByKey(zeroU)(seqop,comop ),
    計算方式也是分區內按照seqop聚合,分區間按照comop聚合,
    樹形聚合可以指定深度depth,depth參數對結果沒有影響,影響的是
    性能,將一次聚合的過程分成多次聚合。



Spark DF
-------------
    文檔頻率的計算公式:


                           D + 1
       DF =  log(e)--------------------------
                     出現的單詞的文檔個數 + 1

parts: 10000

depth : 4

//規模 , 樹形聚合的分區數標准,分區數大於該值,
scale = max( pow(10000 , 1/4), 2) = 10

while(10000 > 10 + ceil(10000 / 10)){
    currParts = 10000 / 10  = 1000
}

while(1000 > 10 + ceil(1000 / 10)){
    currParts = 10000 / 10  = 100
}

while(100 > 10 + ceil(100 / 10)){
    currParts = 100 / 10  = 10
}

while(100 > 10 + ceil(100 / 10)){
    currParts = 100 / 10  = 10
}

size  indices    values
100 , [1,3,5] , [100,200,300]
       0,1,2


df : 

k = 0 
while(k < indices.length){
    if(values(k) > 0){
        df(indices(k)) += 1L
    }
}

//所有的單詞的文檔頻率
df : vector

doc1: hello tom1 -> (200 , [3,5] , [1,1])
doc1: hello tom2
doc1: hello tom3
doc1: hello tom4
doc1: hello tom5
        3 + 1
hello ------------
        3 + 1


20
4

0
    3 + 1
log------
    0 + 1

0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 1
0 130:64 131:253 132:255 133:63 157:96 158:205 159:251 1
0 155:53 156:255 157:253 158:253 159:253 160:124 183:180
0 128:73 129:253 130:227 131:73 132:21 156:73 157:251 15
0 154:46 155:105 156:254 157:254 158:254 159:254 160:255
0 152:56 153:105 154:220 155:254 156:63 178:18 179:166 1
0 155:21 156:176 157:253 158:253 159:124 182:105 183:176

0:

1 100:166 101:222 102:55 128:197 129:254 130:218 131:5 1
1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 
1 125:145 126:255 127:211 128:31 152:32 153:237 154:253 
1 153:5 154:63 155:197 181:20 182:254 183:230 184:24 209
1 152:1 153:168 154:242 155:28 180:10 181:228 182:254 18
1 159:121 160:254 161:136 186:13 187:230 188:253 189:248
1 155:178 156:255 157:105 182:6 183:188 184:253 185:216 
1 130:7 131:176 132:254 133:224 158:51 159:253 160:253 1

0 1:2 3:1 5:4
0 0:6 2:1 4:4
0 2:2 3:6 4:4
--------------
  0 1 2 3 4 5 6 7
0 6 2 3 7 8 4 0 0

1 1:3 3:2 5:1
1 1:1 3:2 5:3
  0 1 2 3 4 5 6 7
                            詞頻總數  features
0=>(3 , (6 2 3 7 8 4 0 0)) = 30            8
1=>(2 , (0 4 0 4 0 4 0 0)) = 12         8

//計算標簽數 : 0,1,2
val numLabels = aggregated.length
//計算文檔總數: 每個標簽數的累加和
val numDocuments = aggregated.map(_._2._1).sum


//標簽數組
val labelArray = new Array[Double](numLabels)
//
val piArray = new Array[Double](numLabels)
val thetaArray = new Array[Double](numLabels * numFeatures)

//                 log(1000 + 2 * 1) = log(1002)
val pilogDamon = log(docs + labels * λ)

val piArray    = log[(每個標簽個數 + lambda) - pilogDamon]

//
docs: 5
0    : 3 
1    : 2

pilogDamon = log(5 + 2 * 1) = log(7)
pi(0)      = log(3 + 1) - log(7) = log(4) - log(7) = 1.386 - 1.945 = -0.559



aggregateByKey[(Double, DenseVector)]
    ((0.0, Vectors.zeros(numFeatures).toDense))
    (
      seqOp = {
         case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
           requireValues(features)
                      a       x            y       => y = 1 * x + y
           BLAS.axpy(weight, features, featureSum)
           (weightSum + weight, featureSum)
      },
      combOp = {
         case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
           BLAS.axpy(1.0, featureSum2, featureSum1)
           (weightSum1 + weightSum2, featureSum1)
      })

U        : (0.0, Vectors.zeros(numFeatures).toDense)
seqop    :{
             case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
               requireValues(features)
               BLAS.axpy(weight, features, featureSum)
               (weightSum + weight, featureSum)
            }    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM