SparkHiveContext和直接Spark讀取hdfs上文件然后再分析效果區別


最近用spark在集群上驗證一個算法的問題,數據量大概是一天P級的,使用hiveContext查詢之后再調用算法進行讀取效果很慢,大概需要二十多個小時,一個查詢將近半個小時,代碼大概如下:

        try:
            sql = """
                select ltescrsrq, mr_ltencrsrq1, mr_ltencrsrq2, mr_ltencrsrq3, ltescrsrp, mr_ltencrsrp1,
                mr_ltencrsrp2, mr_ltencrsrp3, mr_ltesctadv, mr_longitude, mr_latitude
                from noce.agg_mro_chr_relate_bak  where x = %s
                and y = %s
                and day=20170511
                and 6371000 * ACOS(SIN(x_latitude * PI() / 180) * SIN(y_latitude * PI() / 180) +
                COS(x_latitude * PI() / 180) * COS(y_latitude * PI() / 180) * COS(y_longitude * PI() / 180 -
                x_longitude * PI() / 180)) < 2000
                """ % (a, b)
            sqlcontext.sql(sqlQuery="set hive.mapred.supports.subdirectories=true")
            sqlcontext.sql(sqlQuery="set mapred.input.dir.recursive=true")
            result = sqlcontext.sql(sqlQuery=sql).collect()

        except Exception as e:
            print(e.message)
            break

主要是where之后的hive查詢太過緩慢,於是試着直接spark用textFile讀取文件然后在進行map和filter操作:

data = sc.textFile("/DATA/PUBLIC/***/**/*/day=%s/*/*/*" % day)
sc.setLogLevel("WARN")
data = data.filter(lambda x: x.split('|')[41] != '' or x.split('|')[40] != '')
data_filter = data.filter(lambda x: int(x.split('|')[1]) == int(*) and int(x.split('|')[2]) == int(*) and 6371000 *
                    np.arccos(np.sin(float(x.split('|')[76]) * np.pi / 180) * np.sin(float(x.split('|')[41]) * np.pi / 180) +
                    np.cos(float(x.split('|')[76]) * np.pi / 180) * np.cos(float(x.split('|')[41]) * np.pi / 180) *
                    np.cos(float(x.split('|')[40]) * np.pi / 180 - float(x.split('|')[75]) * np.pi / 180)) < 2000)
result = data_filter.map(lambda x: [x.split('|')[7], x.split('|')[26], x.split('|')[27], x.split('|')[28],
                                            x.split('|')[6], x.split('|')[21], x.split('|')[22], x.split('|')[23],
                                            x.split('|')[50], x.split('|')[75], x.split('|')[76]]).collect()
result = [map(convert, result[i]) for i in range(len(result))]

驗證之后的結果是這樣大概總共才半個小時就可以全部跑完。效率何止提升了20倍!!!看來spark對hive的優化做的還不夠好,有些人說sparksql可以,但是看了下官網的文檔hivecontext是基於sparksql 的,所以感覺 效果還是不理想。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM