pyspark MLlib踩坑之model predict+rdd map zip,zip使用尤其注意啊啊啊!


Updated:use model broadcast, mappartition+flatmap,see:

from pyspark import SparkContext
import numpy as np
from sklearn import ensemble


def batch(xs):
    yield list(xs)


N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.5, N)

model = ensemble.RandomForestClassifier(10).fit(train_x, train_y)

test_x = np.random.randn(N * 100, 10)

sc = SparkContext()

n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()

b_model = sc.broadcast(model)

result = rdd.mapPartitions(batch) \
    .map(lambda xs: ([x[0] for x in xs], [x[1] for x in xs])) \
    .flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))

print(result.take(100))

 see: https://gist.github.com/lucidfrontier45/591be3eb78557d1844ca

----------------------

 

 

一開始是因為沒法直接在pyspark里使用map 來做model predict,但是scala是可以的!如下:

When we use Scala API a recommended way of getting predictions for RDD[LabeledPoint] using DecisionTreeModel is to simply map over RDD:

val labelAndPreds = testData.map { point => val prediction = model.predict(point.features) (point.label, prediction) }

Unfortunately similar approach in PySpark doesn't work so well:

labelsAndPredictions = testData.map( lambda lp: (lp.label, model.predict(lp.features)) labelsAndPredictions.first()

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Instead of that official documentation recommends something like this:

predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

 

而這就是萬惡的根源,因為zip在某些情況下並不能得到你想要的結果,就是說zip后的順序是混亂的!!!我就在項目里遇到了!!!

 

This appears to imply that even the trivial a.map(f).zip(a) is not guaranteed to be equivalent to a.map(x => (f(x),x)). What is the situation when zip() results are reproducible?

見:https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method

原因:

zip is generally speaking a tricky operation. It requires both RDDs not only to have the same number of partitions but also the same number of elements per partition.

Excluding some special cases this is guaranteed only if both RDDs have the same ancestor and there are not shuffles and operations potentially changing number of elements (filter, flatMap) between the common ancestor and the current state. Typically it means only map (1-to-1) transformations.

見:https://stackoverflow.com/questions/32084368/can-only-zip-with-rdd-which-has-the-same-number-of-partitions-error

 

根源是因為我的ancestor rdd做了shuffle和filter的操作!最后在他們的子rdd上使用zip就會出錯(數據亂序了)!!!真是太郁悶了,折騰一天這個問題,感謝上帝終於解決了!阿門!

 

最后我的解決方法是:

1、直接將rdd做union操作,rdd = rdd.union(sc.parallelize([])),然后map,zip就能輸出正常結果了!

2、或者是直接將預測的rdd collect到driver機器,使用model predict,是比較丑陋的做法!

 

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM