luigi學習5-task詳解


task是代碼執行的地方。task通過target互相依賴。

下面是一個典型的task的大綱視圖。

一、Task.requires

requires方法用來指定本task的依賴的其他task對象,依賴的task對象甚至可以是同一個class的對象,下面是一個例子:

def requires(self):
    return OtherTask(self.date), DailyReport(self.date - datetime.timedelta(1))

 

上述的DailyReport task依賴兩個task,其中一個是同類型的。從這里也可以看出requires方法可以返回多個依賴的task對象,這些對象可以封裝在一個dict、list或者tuple中。

二、requiring another task

注意requires不能返回一個target對象,如果你的task依賴一個簡單的target對象,那么你要為這個target對象生成一個task class。例如下面這個例子:

class LogFiles(luigi.ExternalTask):
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/log')

 

這樣做也方面你使用參數來控制:

class LogFiles(luigi.ExternalTask):
    date = luigi.DateParameter()
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/log/%Y-%m-%d'))

 

三、Task.output

output方法可以返回一個或者多個target對象。和requires方法一樣,你可以使用容器來隨意包裝。

但是我們非常希望每一個task只會返回一個target對象在output方法中。如果對個對象被返回,那么你的task就必須保證每一個target都是原子被創建的。

當然如果不關注原子性,那么返回多個target對象也是安全的。

例子:

class DailyReport(luigi.Task):
    date = luigi.DateParameter()
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/reports/%Y-%m-%d'))
    # ...

 

 

四、Task.run

run方法是包含實際運行的代碼。當你同時使用了Task.requires和Task.run那么luigi會把這個分成兩個階段。

首先luigi需要計算出task之間的依賴關系,然后依次執行。input方法是一個很好的輔助方法,他對應着依賴對象的output方法。

下面是一個例子:

class GenerateWords(luigi.Task):

    def output(self):
        return luigi.LocalTarget('words.txt')

    def run(self):

        # write a dummy list of words to output file
        words = [
                'apple',
                'banana',
                'grapefruit'
                ]

        with self.output().open('w') as f:
            for word in words:
                f.write('{word}\n'.format(word=word))


class CountLetters(luigi.Task):

    def requires(self):
        return GenerateWords()

    def output(self):
        return luigi.LocalTarget('letter_counts.txt')

    def run(self):

        # read in file as list
        with self.input().open('r') as infile:
            words = infile.read().splitlines()

        # write each word to output file with its corresponding letter count
        with self.output().open('w') as outfile:
            for word in words:
                outfile.write(
                        '{word} | {letter_count}\n'.format(
                            word=word,
                            letter_count=len(word)
                            )
                        )

 

五、task.input

input方法保證了task.requires返回的對應的target對象。task.requires返回的任何東西都會被轉換,包括list,dict等等。這是非常有用的,當你task有多個依賴的時候。下面是一個例子:

class TaskWithManyInputs(luigi.Task):
    def requires(self):
        return {'a': TaskA(), 'b': [TaskB(i) for i in xrange(100)]}

    def run(self):
        f = self.input()['a'].open('r')
        g = [y.open('r') for y in self.input()['b']]

 

六、Dynamic dependencies

有時可能會發生這樣的情況,在運行之前你不能確切的知道本task依賴於哪一個task對象。在這種情況下,luigi提供了一種機制來指定動態依賴。

如果你在task.run方法中yield了另一個task對象,那么當前的task會被掛起並且這個被yield的task會運行。你也可以yield一系列的task。

例子:

class MyTask(luigi.Task):
    def run(self):
        other_target = yield OtherTask()

        # dynamic dependencies resolve into targets
        f = other_target.open('r')

 

這種機制和task.requires只能二中選一。但是這也帶了很多的限制,你必須保證你的task.run方法是冪等的。

七、task status tracking

對於長時間運行的作業,你可以通過命令行或者日志或者中央調度器的GUI界面來看到任務的進度信息。

你可以再task.run方法中指定一個額外的監控系統。你可以如下這么設置:

class MyTask(luigi.Task):
    def run(self):
        # set a tracking url
        self.set_tracking_url("http://...")

        # set status messages during the workload
        for i in range(100):
            # do some hard work here
            if i % 10 == 0:
                self.set_status_message("Progress: %d / 100" % i)

 

八、events and callbacks

luigi有一個內置的event系統允許你注冊回調函數給event。

你可以同時使用預定義的event和你自定義的event。

每一個event handle都是與一個task class相關的,它也只能被這個class或者其subclass來觸發。

這允許你輕松的訂閱event從一個特殊的類,比如hadoop jobs

@luigi.Task.event_handler(luigi.Event.SUCCESS)
def celebrate_success(task):
    """Will be called directly after a successful execution
       of `run` on any Task subclass (i.e. all luigi Tasks)
    """
    ...

@luigi.contrib.hadoop.JobTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
       of `run` on any JobTask subclass
    """
    ...

luigi.run()

 

九、運行hadoop job

你可以這么直接運行一個hadoop job,而不是用luigi

MyJobTask('abc', 123).run()

 

你也可以直接使用HdfsTarget class

t = luigi.contrib.hdfs.target.HdfsTarget('/tmp/test.gz', format=format.Gzip)
f = t.open('w')
# ...
f.close() # needed

 

十、task priority

luigi調度下一個作業運行時根據優先級的。默認情況下是隨意選擇執行的,這個適合大多數的場景。

如果你想人為的控制執行順序,那么可以設置task的priority:

# A static priority value as a class constant:
class MyTask(luigi.Task):
    priority = 100
    # ...

# A dynamic priority value with a "@property" decorated method:
class OtherTask(luigi.Task):
    @property
    def priority(self):
        if self.date > some_threshold:
            return 80
        else:
            return 40
    # ...

 

優先級的值越高越優先執行。優先級沒有一個確切的范文,你可以隨意指定一個int或者float的值作為優先級。默認值是0。

注意:優先級是需要考慮依賴的,依賴沒有執行,優先級最高也沒什么用。

十一、instance caching

luigi提供了一個元類邏輯,如果

DailyReport(datetime.date(2012, 5, 10))

被實例化了兩次,其實在luigi中是同一個對象。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM