luigi框架--關於python運行spark程序


首先,目標是寫個python腳本,跑spark程序來統計hdfs中的一些數據。參考了別人的代碼,故用了luigi框架。

至於luigi的原理 底層的一些東西Google就好。本文主要就是聚焦快速使用,知其然不知其所以然。

python寫Spark或mapreduce還有其他的方法,google上很多,這里用luigi只是剛好有參考的代碼,而且理解起來還是簡單,就用了。

上代碼:

import luigi, sys
from datetime import datetime, timedelta
from luigi.contrib.spark import PySparkTask

class luigiBase(PySparkTask):
date = luigi.DateParameter(default=datetime.now())
def main(self, sc, *args):
log_rdd = sc.textFile(self.input()[0].path)
#要做的spark操作
  log_rdd.repartition(1).saveAsTextFile(self.output().path)
@property
  def name(self):
return "luigi_test_{}_username".format(format_date(self.date))
def requires(self):
return [HdfsFiles(date=self.date)]
def output(self):
return luigi.hdfs.HdfsTarget(Files().path,format=luigi.hdfs.PlainDir)

class luigiStats(luigi.Task):
now = datetime.now()
date = luigi.DateParameter(default=datetime(now.year, now.month, now.day) )
def requires(self):
return luigiBase(self.date)

if __name__ == '__main__':
luigi.run(main_task_cls=luigiStats)

1.對於普通的luigi任務,關鍵是要按需實現requires、output和run三個函數;對於luigi封裝好的spark任務,關鍵是要按需實現requires、output和main三個函數

2.base類繼承PySparkTask類,該類還有很多參數可以設置,但作為最最簡單的luigi例子,就都剔除了,只要在意requires、output和main三個函數就好。可以把requires理解成輸入,output輸出,main是要實現的邏輯。name函數之所以也寫出來,是因為在將代碼pushonline的時候,每個Job都要取名字,而公司對job的名字是有規定的,如果name結尾不是你的用戶名,Spark程序是會報錯的,就是不讓你跑的意思。

3.代碼有兩個類,base和stats類,執行邏輯是這樣的:主函數調用stats,然后發現stats類requires(依賴於)base類,就看看這個依賴的輸出存不存在,如果存在就作為自己的輸入,然后執行自己類中的代碼。如果不存在就執行base類。上面代碼中我的stats類中不需要執行上面,就沒寫main,只是用來檢查下base執行了沒,沒執行就執行base去。

3.該base類中requires和ouput都是hdfs文件,邏輯和stats類一樣。base類需要繼承PySparkTask類,而luigi.run()的參數需要時繼承了luigi.Task的類,所以才分開寫成兩個類了,我自己是這樣理解的。

4.requires函數的返回值不能是個target對象,這里具體的理解就是不能是一個直接讀取的hdfs文件,可以封裝到一個類中去,這個類可以有個屬性是path,是用來返回一個hdfs文件的地址的。依賴不僅限一個,可以是多個,生成一個列表返回。

5.如果不是在自己的電腦上安裝的Spark,要注意:由於PySparkTask調用的spark集群不在本地,好像不支持對本地文件的一些操作,開始的時候想把結果寫在本地,一直找不到輸出結果。

6.一般公司都有相對應得網頁可以查看spark和hadoop程序的運行的情況,可以查看日志什么的。

7.base類中可以設置下queue 參數,選擇你程序的運行隊列,有時候默認的隊列好像特別慢,可以設置個其他的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM