pyspark RDD中join算子實現代碼分析
代碼版本為 spark 2.2.0
1.join.py
這個代碼單獨作為一個文件在pyspark項目代碼中,只有一個功能即實現join相關的幾個方法
# 這個方法是所有join動作的基礎方法,實現了join的基本需求
# 通過mapValues的方法把所有的rdd value都變成一個(rddNumber,rdd.value)的元組
# 通過轉換之后所有的value都變成了相同的格式,然后再用union聯合起來
# 之后再通過groupByKey的方式把所有key分組到相同的分區,然后再把value聯合起來
def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.mapValues(lambda v: (1, v)) # 轉換成相同的格式
ws = other.mapValues(lambda v: (2, v)) # 轉換成相同的格式
# union聯合起來,在groupByKey分組到一起
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))
# 如果有join的需求可以使用類似這種方式實現,性能應該會好一點
# 這是個人添加的示例代碼,並不在源代碼文件中
def _do_self_join(rdd, other, numPartitions, dispatch):
V=namedtuple("V",["v1","v2"])
vs = rdd.mapValues(lambda v: V(v,None))
ws = other.mapValues(lambda v: V(None,v))
agg = lambda v1,v2: V(v1.v1,v2.v2) if v1.v1 else V(v2.v1,v1.v2)
return vs.union(ws).reduceByKey(agg)
# 這個方法被RDD.join算子調用
def python_join(rdd, other, numPartitions):
# 這里構造了一個把value放在同一個元組中的函數
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
if n == 1:
vbuf.append(v)
elif n == 2:
wbuf.append(v)
return ((v, w) for v in vbuf for w in wbuf)
# 調用了上面的函數
return _do_python_join(rdd, other, numPartitions, dispatch)
# 右join方法,跟join方法實現的方式相同,只是聚合value的方法略有區別
def python_right_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
if n == 1:
vbuf.append(v)
elif n == 2:
wbuf.append(v)
if not vbuf:
vbuf.append(None)
return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)
def python_left_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
if n == 1:
vbuf.append(v)
elif n == 2:
wbuf.append(v)
if not wbuf:
wbuf.append(None)
return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)
def python_full_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
if n == 1:
vbuf.append(v)
elif n == 2:
wbuf.append(v)
if not vbuf:
vbuf.append(None)
if not wbuf:
wbuf.append(None)
return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)
def python_cogroup(rdds, numPartitions):
def make_mapper(i):
return lambda v: (i, v)
vrdds = [rdd.mapValues(make_mapper(i)) for i, rdd in enumerate(rdds)]
union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
rdd_len = len(vrdds)
def dispatch(seq):
bufs = [[] for _ in range(rdd_len)]
for n, v in seq:
bufs[n].append(v)
return tuple(ResultIterable(vs) for vs in bufs)
return union_vrdds.groupByKey(numPartitions).mapValues(dispatch)