luigi學習1


一、luigi介紹

luigi是基於python語言的,可幫助建立復雜流式批處理任務管理系統。這些批處理作業典型的有hadoop job,數據庫數據的導入與導出,或者是機器學習算法等等。

luigi的github:https://github.com/spotify/luigi

目前已經有一些抽象層次較低的數據處理工具,比如hive,pig,cascading等。luigi並不是要取代他們,而是幫助你管理這些作業,luigi的task可以是一個hive查詢,java寫的hadoop作業,一個scala寫的spark作業或一個python程序等。luigi提供了互相依賴的大量作業的工作流程管理,所以程序員可以把他們的精力放到作業本身。

目前有一些相似的項目比如Oozie和Azkaban。一個重要的區別是luigi並不僅僅為hadoop作業,它可以很方便的擴展其他類型的任務。

二、luigi官網的hello world例子

2.1top Artists例子的目的

這個例子的目的想要集合一些生產數據的流,然后找到前10個artists,並把最終的結果保存到數據庫

2.2Aggregate Artist Streams

class AggregateArtists(luigi.Task):
    date_interval = luigi.DateIntervalParameter()

    def output(self):
        return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)

    def requires(self):
        return [Streams(date) for date in self.date_interval]

    def run(self):
        artist_count = defaultdict(int)

        for input in self.input():
            with input.open('r') as in_file:
                for line in in_file:
                    timestamp, artist, track = line.strip().split()
                    artist_count[artist] += 1

        with self.output().open('w') as out_file:
            for artist, count in artist_count.iteritems():
                print >> out_file, artist, count

對於這個類的解釋:

requires方法:這個方法指定了本task需要的依賴,在這個例子中,AggregateArttists依賴一個Stream作業,Stream作業需要一個日期作為參數。

參數:每一個作業都可以定義一個或者多個參數,這些參數需要定義在類級別。比如上面這個類就有一個參數date_interval

output方法:定義了作業結果的保存地。

run方法:對於普通的task,你需要實現run方法。在run方法中可以是任何東西,可以創建子進程,進行長時間的算術運算等等。對於一些task的子類,你就不需要實現run方法了,比如JobTask要求你實現mapper和reducer方法。

LocalTarget:這是一個內置的類,可以幫助你很容易的讀取或者寫本地磁盤。並且保證對磁盤的操作是原子性的。

2.3Streams

class Streams(luigi.Task):
    date = luigi.DateParameter()

    def run(self):
        with self.output().open('w') as output:
            for _ in range(1000):
                output.write('{} {} {}\n'.format(
                    random.randint(0, 999),
                    random.randint(0, 999),
                    random.randint(0, 999)))

    def output(self):
        return luigi.LocalTarget(self.date.strftime('data/streams_%Y_%m_%d_faked.tsv'))

 

 這個類沒有依賴,最終產生的效果是在本地文件系統上產生一個結果文件。

2.4在本地執行

PYTHONPATH='' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06

 

執行完成之后,在當前目錄下產生了一個data目錄,data目錄下的內容如下:

(my_python_env)[root@hadoop26 data]# ls
artist_streams_2012-06.tsv    streams_2012_06_06_faked.tsv  streams_2012_06_12_faked.tsv  streams_2012_06_18_faked.tsv  streams_2012_06_24_faked.tsv  streams_2012_06_30_faked.tsv
streams_2012_06_01_faked.tsv  streams_2012_06_07_faked.tsv  streams_2012_06_13_faked.tsv  streams_2012_06_19_faked.tsv  streams_2012_06_25_faked.tsv
streams_2012_06_02_faked.tsv  streams_2012_06_08_faked.tsv  streams_2012_06_14_faked.tsv  streams_2012_06_20_faked.tsv  streams_2012_06_26_faked.tsv
streams_2012_06_03_faked.tsv  streams_2012_06_09_faked.tsv  streams_2012_06_15_faked.tsv  streams_2012_06_21_faked.tsv  streams_2012_06_27_faked.tsv
streams_2012_06_04_faked.tsv  streams_2012_06_10_faked.tsv  streams_2012_06_16_faked.tsv  streams_2012_06_22_faked.tsv  streams_2012_06_28_faked.tsv
streams_2012_06_05_faked.tsv  streams_2012_06_11_faked.tsv  streams_2012_06_17_faked.tsv  streams_2012_06_23_faked.tsv  streams_2012_06_29_faked.tsv

 

 streams_*:就是stream作業生成的。

artist_*:是AggregateArtists生成的,就一個文件而已

2.5擴展

再次運行上面的執行命令發現並沒有執行任何操作,因為所有任務的output已經存在。這意味着luigi的task都是冪等的,也就是說不管執行多少次,作業的輸出應該是不變的。

--local-scheduler告訴luigi不要去連接scheduler server。這是不推薦的運行方式,這種方式也就用在測試階段。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM