最近用spark在集群上驗證一個算法的問題,數據量大概是一天P級的,使用hiveContext查詢之后再調用算法進行讀取效果很慢,大概需要二十多個小時,一個查詢將近半個小時,代碼大概如下:
try: sql = """ select ltescrsrq, mr_ltencrsrq1, mr_ltencrsrq2, mr_ltencrsrq3, ltescrsrp, mr_ltencrsrp1, mr_ltencrsrp2, mr_ltencrsrp3, mr_ltesctadv, mr_longitude, mr_latitude from noce.agg_mro_chr_relate_bak where x = %s and y = %s and day=20170511 and 6371000 * ACOS(SIN(x_latitude * PI() / 180) * SIN(y_latitude * PI() / 180) + COS(x_latitude * PI() / 180) * COS(y_latitude * PI() / 180) * COS(y_longitude * PI() / 180 - x_longitude * PI() / 180)) < 2000 """ % (a, b) sqlcontext.sql(sqlQuery="set hive.mapred.supports.subdirectories=true") sqlcontext.sql(sqlQuery="set mapred.input.dir.recursive=true") result = sqlcontext.sql(sqlQuery=sql).collect() except Exception as e: print(e.message) break
主要是where之后的hive查詢太過緩慢,於是試着直接spark用textFile讀取文件然后在進行map和filter操作:
data = sc.textFile("/DATA/PUBLIC/***/**/*/day=%s/*/*/*" % day) sc.setLogLevel("WARN") data = data.filter(lambda x: x.split('|')[41] != '' or x.split('|')[40] != '') data_filter = data.filter(lambda x: int(x.split('|')[1]) == int(*) and int(x.split('|')[2]) == int(*) and 6371000 * np.arccos(np.sin(float(x.split('|')[76]) * np.pi / 180) * np.sin(float(x.split('|')[41]) * np.pi / 180) + np.cos(float(x.split('|')[76]) * np.pi / 180) * np.cos(float(x.split('|')[41]) * np.pi / 180) * np.cos(float(x.split('|')[40]) * np.pi / 180 - float(x.split('|')[75]) * np.pi / 180)) < 2000) result = data_filter.map(lambda x: [x.split('|')[7], x.split('|')[26], x.split('|')[27], x.split('|')[28], x.split('|')[6], x.split('|')[21], x.split('|')[22], x.split('|')[23], x.split('|')[50], x.split('|')[75], x.split('|')[76]]).collect() result = [map(convert, result[i]) for i in range(len(result))]
驗證之后的結果是這樣大概總共才半個小時就可以全部跑完。效率何止提升了20倍!!!看來spark對hive的優化做的還不夠好,有些人說sparksql可以,但是看了下官網的文檔hivecontext是基於sparksql 的,所以感覺 效果還是不理想。
