spark2.1 自定義累加器的使用


spark2.1 自定義累加器的使用

  • 繼承AccumulatorV2類,並復寫它的所有方法

    package spark
    
    import constant.Constant
    import org.apache.spark.util.AccumulatorV2
    import util.getFieldFromConcatString
    import util.setFieldFromConcatString
    
    
    open class SessionAccmulator : AccumulatorV2<String, String>() {
    
    
    
    
        private var result = Constant.SESSION_COUNT + "=0|"+
                Constant.TIME_PERIOD_1s_3s + "=0|"+
                Constant.TIME_PERIOD_4s_6s + "=0|"+
                Constant.TIME_PERIOD_7s_9s + "=0|"+
                Constant.TIME_PERIOD_10s_30s + "=0|"+
                Constant.TIME_PERIOD_30s_60s + "=0|"+
                Constant.TIME_PERIOD_1m_3m + "=0|"+
                Constant.TIME_PERIOD_3m_10m + "=0|"+
                Constant.TIME_PERIOD_10m_30m + "=0|"+
                Constant.TIME_PERIOD_30m + "=0|"+
                Constant.STEP_PERIOD_1_3 + "=0|"+
                Constant.STEP_PERIOD_4_6 + "=0|"+
                Constant.STEP_PERIOD_7_9 + "=0|"+
                Constant.STEP_PERIOD_10_30 + "=0|"+
                Constant.STEP_PERIOD_30_60 + "=0|"+
                Constant.STEP_PERIOD_60 + "=0"
    
        override fun value(): String {
            return this.result
        }
    
        /**
         * 合並數據
         */
        override fun merge(other: AccumulatorV2<String, String>?) {
            if (other == null) return else {
                if (other is SessionAccmulator) {
                    var newResult = ""
                    val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
                            Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
                            Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
                            Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
                            Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
                    resultArray.forEach {
                        val oldValue = other.result.getFieldFromConcatString("|", it)
                        if (oldValue.isNotEmpty()) {
                            val newValue = oldValue.toInt() + 1
                            //找到原因,一直在循環賦予值,debug30分鍾 很煩
                            if (newResult.isEmpty()){
                                newResult = result.setFieldFromConcatString("|", it, newValue.toString())
                            }
                            //問題就在於這里,自定義沒有寫錯,合並錯了
                            newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())
                        }
                    }
                    result = newResult
                }
            }
        }
    
        override fun copy(): AccumulatorV2<String, String> {
            val sessionAccmulator = SessionAccmulator()
    
            sessionAccmulator.result = this.result
            return sessionAccmulator
        }
    
        override fun add(p0: String?) {
            val v1 = this.result
            val v2 = p0
            if (v2.isNullOrEmpty()){
                return
            }else{
                var newResult = ""
                val oldValue = v1.getFieldFromConcatString("|", v2!!)
                if (oldValue.isNotEmpty()){
                    val newValue = oldValue.toInt() + 1
                    newResult = result.setFieldFromConcatString("|", v2, newValue.toString())
    
                }
                result = newResult
            }
        }
    
        override fun reset() {
            val newResult = Constant.SESSION_COUNT + "=0|"+
                    Constant.TIME_PERIOD_1s_3s + "=0|"+
                    Constant.TIME_PERIOD_4s_6s + "=0|"+
                    Constant.TIME_PERIOD_7s_9s + "=0|"+
                    Constant.TIME_PERIOD_10s_30s + "=0|"+
                    Constant.TIME_PERIOD_30s_60s + "=0|"+
                    Constant.TIME_PERIOD_1m_3m + "=0|"+
                    Constant.TIME_PERIOD_3m_10m + "=0|"+
                    Constant.TIME_PERIOD_10m_30m + "=0|"+
                    Constant.TIME_PERIOD_30m + "=0|"+
                    Constant.STEP_PERIOD_1_3 + "=0|"+
                    Constant.STEP_PERIOD_4_6 + "=0|"+
                    Constant.STEP_PERIOD_7_9 + "=0|"+
                    Constant.STEP_PERIOD_10_30 + "=0|"+
                    Constant.STEP_PERIOD_30_60 + "=0|"+
                    Constant.STEP_PERIOD_60 + "=0"
            result = newResult
        }
    
        override fun isZero(): Boolean {
            val newResult = Constant.SESSION_COUNT + "=0|"+
                    Constant.TIME_PERIOD_1s_3s + "=0|"+
                    Constant.TIME_PERIOD_4s_6s + "=0|"+
                    Constant.TIME_PERIOD_7s_9s + "=0|"+
                    Constant.TIME_PERIOD_10s_30s + "=0|"+
                    Constant.TIME_PERIOD_30s_60s + "=0|"+
                    Constant.TIME_PERIOD_1m_3m + "=0|"+
                    Constant.TIME_PERIOD_3m_10m + "=0|"+
                    Constant.TIME_PERIOD_10m_30m + "=0|"+
                    Constant.TIME_PERIOD_30m + "=0|"+
                    Constant.STEP_PERIOD_1_3 + "=0|"+
                    Constant.STEP_PERIOD_4_6 + "=0|"+
                    Constant.STEP_PERIOD_7_9 + "=0|"+
                    Constant.STEP_PERIOD_10_30 + "=0|"+
                    Constant.STEP_PERIOD_30_60 + "=0|"+
                    Constant.STEP_PERIOD_60 + "=0"
            return this.result == newResult
        }
    }
    
    方法介紹

    value方法:獲取累加器中的值

       merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進行合並的方法(下面介紹執行流程中將要用到)

        iszero方法:判斷是否為初始值

        reset方法:重置累加器中的值

        copy方法:拷貝累加器

 

spark中累加器的執行流程:

          首先有幾個task,spark engine就調用copy方法拷貝幾個累加器(不注冊的),然后在各個task中進行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執行最后將調用merge方法和各個task的結果累計器進行合並(此時被注冊的累加器是初始值)      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM