pyspark的RDD代碼紀錄


pyspark rdd.py文件代碼紀錄

代碼版本為 spark 2.2.0

1.RDD及常見算子

class RDD(): #這里簡單介紹幾個典型的算子,其余的算子代碼可以自己去看一看
    
    def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
        """
        _jrdd是個非常重要的屬性,這個屬性會在pyspark的計算過程中被全程傳遞
        pyspark里被第一個建立出來的RDD往往都是通過jvm調用建立起來的數據源RDD
        這個_jrdd的值就是這個jvm里對應的數據源RDD
        這里需要記住,這個rdd最終在執行任務的時候被jvm執行,將數據源數據傳遞給python進程
        """
        self._jrdd = jrdd
        self.is_cached = False
        self.is_checkpointed = False
        self.ctx = ctx
        self._jrdd_deserializer = jrdd_deserializer
        self._id = jrdd.id()
        self.partitioner = None

    #最重要也是也是最基本的action
    #其它action都是最終調用此action實現
    def collect(self):
        """
        返回的是一個list,所有分區的結果集
        調用的是scala中對應的PythonRDD對象的collectAndServer方法觸發任務的執行
        collect是所有其它action動作的基礎跟入口,也就是說collectAndServer是統一執行入口
        """
        with SCCallSiteSync(self.context) as css:
            #提交任務的時候給了一個參數,就是_jrdd對應的rdd
            #這個是最初的數據源rdd或者是PythonRDD
            #這里需要記住,因為當轉到scala里的PythonRDD的時候就看出此處的作用了
            port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
        return list(_load_from_socket(port, self._jrdd_deserializer))

    #reduce action
    def reduce(self, f):
        """
        可以看見此次最終調用的也是collect()
        """
        def func(iterator):
            iterator = iter(iterator)
            try:
                initial = next(iterator)
            except StopIteration:
                return
            yield reduce(f, iterator, initial)

        vals = self.mapPartitions(func).collect() #這里
        if vals:
            return reduce(f, vals)
        raise ValueError("Can not reduce() empty RDD")
        
    
    #這個函數是其它幾個action的基礎,也是調用collect實現的
    def fold(self, zeroValue, op):
        """
        這個函數最終調用的也是collect()來提交任務
        這個函數被foreach,sum,count等action調用
        """
        def func(iterator):
            acc = zeroValue
            for obj in iterator:
                acc = op(acc, obj)
            yield acc

        vals = self.mapPartitions(func).collect() #這里
        return reduce(op, vals, zeroValue)

    def union(self, other):
        """
        這個算子pyspark本地並未做過多處理,直接使用的jvm中對應的union
        
        """
        if self._jrdd_deserializer == other._jrdd_deserializer:
            rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
                      self._jrdd_deserializer)
        else:
            # These RDDs contain data in different serialized formats, so we
            # must normalize them to the default serializer.
            self_copy = self._reserialize()
            other_copy = other._reserialize()
            rdd = RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
                      self.ctx.serializer)
        if (self.partitioner == other.partitioner and
                self.getNumPartitions() == rdd.getNumPartitions()):
            rdd.partitioner = self.partitioner
        return rdd

    #這個函數也很重要,如果說所有action的基礎是collect
    #那么所有transform的基礎是這個
    def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
        """
        這個函數被上層其它的轉換算子調用
        map,flatMap,mapPartitions,reduceByKey,combinerByKey等等
        PipelinedRDD 是pyspark中第二個RDD類型,所有轉換操作返回的類型都是這個類型
        """
        return PipelinedRDD(self, f, preservesPartitioning)

    def mapPartitions(self, f, preservesPartitioning=False):
        """
        可以看見也是調用了mapPartitionsWithIndex實現的
        這里定義的func是個關鍵,封裝了用戶的方法,在PipelinedRDD中函數被嵌套封裝起來
        """
        def func(s, iterator):
            return f(iterator)
        return self.mapPartitionsWithIndex(func, preservesPartitioning)

    def flatMap(self, f, preservesPartitioning=False):
        """
        可以看見同上一個函數類似
        """
        def func(s, iterator):
            return chain.from_iterable(map(f, iterator))
        return self.mapPartitionsWithIndex(func, preservesPartitioning)

    def join(self, other, numPartitions=None):
        """
        join是通過調用python_join實現的,這個函數在pyspark join.py文件中實現的
        join.py中的實現代碼將在其它部分說明
        此處只做簡略說明,底層是用union和groupByKey實現的
        """
        return python_join(self, other, numPartitions)

    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
        """
        調用的combineByKey實現的
        """
        return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)

    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
                     numPartitions=None, partitionFunc=portable_hash):
        """
        這個函數實現邏輯是
        1.用mapPartitions把本分區相同的key聚合到一起
        2.然后再用partitionBy重新分區,把相同的key分到相同的分區
        3.再來一次步驟1
        """
        if numPartitions is None:
            numPartitions = self._defaultReducePartitions()

        serializer = self.ctx.serializer
        memory = self._memory_limit()
        agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

        def combineLocally(iterator):
            merger = ExternalMerger(agg, memory * 0.9, serializer)
            merger.mergeValues(iterator)
            return merger.items()

        locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
        shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)

        def _mergeCombiners(iterator):
            merger = ExternalMerger(agg, memory, serializer)
            merger.mergeCombiners(iterator)
            return merger.items()

        return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)

2.PipelinedRDD

class PipelinedRDD(RDD):

    """
    這個類是所有轉換操作返回回去的RDD類型,這個類繼承了RDD類
    這個類重寫了_jrdd屬性,返回的jrdd是一個PythonRDD
    PythonRDD的父rdd是最初生成的rdd中的_jrdd
    也就是說,用戶使用pyspark代碼的時候,執行的jvm代碼都是從PythonRDD開始
    """

    def __init__(self, prev, func, preservesPartitioning=False):
        if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
            # 上一個rdd不是PipelinedRDD的話就把原始rdd._jrdd傳遞下去
            self.func = func
            self.preservesPartitioning = preservesPartitioning
            self._prev_jrdd = prev._jrdd
            self._prev_jrdd_deserializer = prev._jrdd_deserializer
        else:
            prev_func = prev.func

            #這個函數就是把上一個rdd的邏輯和當前的處理邏輯嵌套起來
            #prev_func是上一次轉換時指定的函數
            #func是這一次轉換時指定的函數
            def pipeline_func(split, iterator):
                return func(split, prev_func(split, iterator))
            self.func = pipeline_func
            self.preservesPartitioning = \
                prev.preservesPartitioning and preservesPartitioning
            #上一個rdd是PipelinedRDD的話就把從最初rdd得到的_jrdd傳遞下去
            self._prev_jrdd = prev._prev_jrdd
            self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
        self.is_cached = False
        self.is_checkpointed = False
        self.ctx = prev.ctx
        self.prev = prev
        self._jrdd_val = None
        self._id = None
        self._jrdd_deserializer = self.ctx.serializer
        self._bypass_serializer = False
        self.partitioner = prev.partitioner if self.preservesPartitioning else None

    def getNumPartitions(self):
        return self._prev_jrdd.partitions().size()

    @property
    def _jrdd(self):
        """
        這里構造PythonRDD
        """
        if self._jrdd_val:
            return self._jrdd_val
        if self._bypass_serializer:
            self._jrdd_deserializer = NoOpSerializer()

        if self.ctx.profiler_collector:
            profiler = self.ctx.profiler_collector.new_profiler(self.ctx)
        else:
            profiler = None

        #把用戶的python代碼序列化
        wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
                                      self._jrdd_deserializer, profiler)
        #構造一個新的_jrdd 類型是PythonRDD,此rdd的父rdd是最初的數據源對應的_jrdd
        #當在此rdd的基礎上調用action的時候,傳遞進去的_jrdd就是這里返回的東西
        python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
                                             self.preservesPartitioning)
        self._jrdd_val = python_rdd.asJavaRDD()

        if profiler:
            self._id = self._jrdd_val.id()
            self.ctx.profiler_collector.add_profiler(self._id, profiler)
        return self._jrdd_val

    def id(self):
        if self._id is None:
            self._id = self._jrdd.id()
        return self._id

    def _is_pipelinable(self):
        return not (self.is_cached or self.is_checkpointed)

3. RDD中join算子的實現

join實現代碼記錄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM