pyspark的join.py代碼記錄


pyspark RDD中join算子實現代碼分析

代碼版本為 spark 2.2.0

1.join.py

這個代碼單獨作為一個文件在pyspark項目代碼中,只有一個功能即實現join相關的幾個方法

# 這個方法是所有join動作的基礎方法,實現了join的基本需求
# 通過mapValues的方法把所有的rdd value都變成一個(rddNumber,rdd.value)的元組
# 通過轉換之后所有的value都變成了相同的格式,然后再用union聯合起來
# 之后再通過groupByKey的方式把所有key分組到相同的分區,然后再把value聯合起來
def _do_python_join(rdd, other, numPartitions, dispatch):
    vs = rdd.mapValues(lambda v: (1, v))   # 轉換成相同的格式
    ws = other.mapValues(lambda v: (2, v)) # 轉換成相同的格式
    # union聯合起來,在groupByKey分組到一起
    return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))

# 如果有join的需求可以使用類似這種方式實現,性能應該會好一點
# 這是個人添加的示例代碼,並不在源代碼文件中
def _do_self_join(rdd, other, numPartitions, dispatch):
    V=namedtuple("V",["v1","v2"])
    vs = rdd.mapValues(lambda v: V(v,None))
    ws = other.mapValues(lambda v: V(None,v))
    agg = lambda v1,v2: V(v1.v1,v2.v2) if v1.v1 else V(v2.v1,v1.v2)
    return vs.union(ws).reduceByKey(agg)

# 這個方法被RDD.join算子調用
def python_join(rdd, other, numPartitions):
    # 這里構造了一個把value放在同一個元組中的函數
    def dispatch(seq):
        vbuf, wbuf = [], []
        for (n, v) in seq:
            if n == 1:
                vbuf.append(v)
            elif n == 2:
                wbuf.append(v)
        return ((v, w) for v in vbuf for w in wbuf)
    # 調用了上面的函數
    return _do_python_join(rdd, other, numPartitions, dispatch)


# 右join方法,跟join方法實現的方式相同,只是聚合value的方法略有區別
def python_right_outer_join(rdd, other, numPartitions):
    def dispatch(seq):
        vbuf, wbuf = [], []
        for (n, v) in seq:
            if n == 1:
                vbuf.append(v)
            elif n == 2:
                wbuf.append(v)
        if not vbuf:
            vbuf.append(None)
        return ((v, w) for v in vbuf for w in wbuf)
    return _do_python_join(rdd, other, numPartitions, dispatch)


def python_left_outer_join(rdd, other, numPartitions):
    def dispatch(seq):
        vbuf, wbuf = [], []
        for (n, v) in seq:
            if n == 1:
                vbuf.append(v)
            elif n == 2:
                wbuf.append(v)
        if not wbuf:
            wbuf.append(None)
        return ((v, w) for v in vbuf for w in wbuf)
    return _do_python_join(rdd, other, numPartitions, dispatch)


def python_full_outer_join(rdd, other, numPartitions):
    def dispatch(seq):
        vbuf, wbuf = [], []
        for (n, v) in seq:
            if n == 1:
                vbuf.append(v)
            elif n == 2:
                wbuf.append(v)
        if not vbuf:
            vbuf.append(None)
        if not wbuf:
            wbuf.append(None)
        return ((v, w) for v in vbuf for w in wbuf)
    return _do_python_join(rdd, other, numPartitions, dispatch)


def python_cogroup(rdds, numPartitions):
    def make_mapper(i):
        return lambda v: (i, v)
    vrdds = [rdd.mapValues(make_mapper(i)) for i, rdd in enumerate(rdds)]
    union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
    rdd_len = len(vrdds)

    def dispatch(seq):
        bufs = [[] for _ in range(rdd_len)]
        for n, v in seq:
            bufs[n].append(v)
        return tuple(ResultIterable(vs) for vs in bufs)

    return union_vrdds.groupByKey(numPartitions).mapValues(dispatch)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM