python包之drmaa:集群任務管理



搭建流程時,我們把各個模塊腳本都寫好了,現在通過編寫主程序將模塊串起來,那么怎么樣依次(或者並行)將任務自動投遞到集群呢?就是說這一步運行完之后,下一步自動運行。我們當然可以在腳本中設一個標志,反復檢查這一個標志是否出現來決定是否運行下一步,但這種方法太原始,太多弊端了,耗內存,無法並行,且不可預料的出錯。那么,有沒有相應的工具來管理集群任務投遞?有,python的drmaa包可以實現。

1. drmaa簡介

Distributed Resource Management Application API (DRMAA),即分布式資源管理應用程序API,是一種高級 開放網格論壇(Open_Grid_Forum)應用程序接口規范,用於向分布式資源管理(DRM)系統(例如集群或網格計算提交和控制作業)。API的范圍涵蓋了應用程序提交,控制和監視DRM系統中執行資源上的作業所需的所有高級功能。DRMAA API已在Sun的Grid Engine(SGE)和Condor等作業管理調度系統中實現。關於SGE可參考我的推文:集群SGE作業調度系統

C、C++、Perl、Python等程序語言都開發有相應的drmaa包來實現SGE集群的任務管理。這里記錄下drmaa-python:
Github:drmaa-python
PyPi:https://pypi.org/project/drmaa/

2. 安裝和配置

要求:Python2.7+;與DRMAA兼容的集群,如SGE。

#安裝
pip install drmaa

#設置路徑
export SGE_ROOT=/path/to/gridengine  #SGE安裝的路徑
export SGE_CELL=default

#設置庫
export DRMAA_LIBRARY_PATH=/usr/lib/libdrmaa.so.1.0 
#libdrmaa.so.1.0 C動態庫,是libdrmaa-dev包的一部分

3. 示例

3.1 開始和終止會話

Session

#!/usr/bin/env python

import drmaa

def main():
   """Create a drmaa session and exit"""
   with drmaa.Session() as s:  #自動初始化,組織工作提交
      print('A session was started successfully')
#with結束自動exit(),大部分函數都要在exit()前執行,如runJob/wait,getContact可在exit()后。
if __name__=='__main__':
   main()

使用可重新連接的會話,可以將DRMAA庫初始化為上一個會話,從而允許該庫訪問該會話的作業列表.

#!/usr/bin/env python

import drmaa

def main():
    """
    Create a session, show that each session has an ID, use session ID to
    disconnect, then reconnect. Finally, exit.
    """
    s = drmaa.Session()
    s.initialize()
    print('A session was started successfully')
    response = s.contact
    print('session contact returns: %s' % response)
    s.exit()
    print('Exited from session')

    s.initialize(response) #初始化上個session
    print('Session was restarted successfullly')
    s.exit()


if __name__=='__main__':
    main()

3.2 運行工作

假設已知當前目錄有一個sleeper.sh腳本,后接兩個參數:

#!/bin/bash
echo "Hello world, the answer is $1"
sleep 3s
echo "$2 Bye world!"

drmaa將sleeper.sh提交到SGE:

#!/usr/bin/env python

import drmaa
import os

def main():
   """
   Submit a job.
   Note, need file called sleeper.sh in current directory.
   """
   with drmaa.Session() as s:
       print('Creating job template')
       jt = s.createJobTemplate()  #分配工作模板(存儲提交作業的信息結構)
       jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh') #設置remoteCommand屬性,找到要運行的程序。
        #路徑默認為用戶的主目錄,相對路徑用workingDirectory屬性
       jt.args = ['42', 'Simon says:']  #執行文件的參數
       jt.joinFiles=True

       jobid = s.runJob(jt)  #將分配給作業的ID放入我們傳遞給的字符數組中runJob()
       print('Your job has been submitted with ID %s' % jobid)

 #    jobid = s.runBulkJobs(jt, 1, 30, 2) #提交一個數組作業
 #    print('Your jobs have been submitted with IDs %s' % jobid)

       print('Cleaning up')
       s.deleteJobTemplate(jt) #刪除作業模板,釋放作業模板保留的DRMAA內存,但對提交的作業沒有影響

if __name__=='__main__':
   main()

3.3 等待工作

即等待任務完成

#!/usr/bin/env python

import drmaa
import os

def main():
    """
    Submit a job and wait for it to finish.
    Note, need file called sleeper.sh in home directory.
    """
    with drmaa.Session() as s:
        print('Creating job template')
        jt = s.createJobTemplate()
        jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh')
        jt.args = ['42', 'Simon says:']
        jt.joinFiles = True

        jobid = s.runJob(jt)
        print('Your job has been submitted with ID %s' % jobid)

        retval = s.wait(jobid, drmaa.Session.TIMEOUT_WAIT_FOREVER) #調用wait()等待作業結束
        print('Job: {0} finished with status {1}'.format(retval.jobId, retval.hasExited))

#以下是提交多個作業的等待處理,synchronize替代wait
#joblist = s.runBulkJobs(jt, 1, 30, 2)
#print('Your jobs have been submitted with IDs %s' % joblist)
#s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, True)

        print('Cleaning up')
        s.deleteJobTemplate(jt)

if __name__=='__main__':
    main()

wait()返回一個JobInfo元組,其具有下面的屬性: jobId,hasExited,hasSignal,terminatedSignal,hasCoreDump, wasAborted,exitStatus,resourceUsage

synchronize()的第3個參數是該synchronize()的調用是否在工作后清除。工作完成后,它會留下一些統計信息,如退出狀態和用途,直到wait() 或synchronize()的處理狀態變為True。確保每一項任務對這兩個函數之一調用是很有必要的,否則可能引起內存泄漏。如果想要每一項任務恢復統計信息,可將synchronize()設置False。如下:

joblist = s.runBulkJobs(jt, 1, 30, 2)
print('Your jobs have been submitted with IDs %s' % joblist)

s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False) #False,每一項工作等待一次
for curjob in joblist:
    print('Collecting job ' + curjob)
    retval = s.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER)
    print('Job: {0} finished with status {1}'.format(retval.jobId,retval.hasExited))

3.4 控制工作

#!/usr/bin/env python

import drmaa
import os

def main():
    """Submit a job, then kill it.
    Note, need file called sleeper.sh in home directory.
    """
    with drmaa.Session() as s:
        print('Creating job template')
        jt = s.createJobTemplate()
        jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh')
        jt.args = ['42', 'Simon says:']
        jt.joinFiles = True

        jobid = s.runJob(jt)
        print('Your job has been submitted with ID %s' % jobid)
        # options are: SUSPEND, RESUME, HOLD, RELEASE, TERMINATE
        s.control(jobid, drmaa.JobControlAction.TERMINATE) #刪除剛提交的作業

        print('Cleaning up')
        s.deleteJobTemplate(jt)

if __name__=='__main__':
    main()

還可以用control()來暫停,恢復,保留或釋放工作。control()還可用於控制未通過DRMAA提交的作業,可以將任何有效的SGE作業ID傳遞control()為要刪除的作業ID。

3.5 查詢工作狀態

#!/usr/bin/env python

import drmaa
import time
import os

def main():
    """
    Submit a job, and check its progress.
    Note, need file called sleeper.sh in home directory.
    """
    with drmaa.Session() as s:
        print('Creating job template')
        jt = s.createJobTemplate()
        jt.remoteCommand = os.path.join(os.getcwd(), 'sleeper.sh')
        jt.args = ['42', 'Simon says:']
        jt.joinFiles=True

        jobid = s.runJob(jt)
        print('Your job has been submitted with ID %s' % jobid)

        # Who needs a case statement when you have dictionaries?
        decodestatus = {drmaa.JobState.UNDETERMINED: 'process status cannot be determined',
                        drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
                        drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
                        drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
                        drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
                        drmaa.JobState.RUNNING: 'job is running',
                        drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
                        drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
                        drmaa.JobState.DONE: 'job finished normally',
                        drmaa.JobState.FAILED: 'job finished, but failed'}

        for ix in range(10):
            print('Checking %s of 10 times' % ix)
            print decodestatus(s.jobStatus(jobid)) #jobStatus()獲取作業的狀態
            time.sleep(5)

        print('Cleaning up')
        s.deleteJobTemplate(jt)

if __name__=='__main__':
    main()   #確定工作狀態並報告

其他更多關於JobInfo,JobTemplate,Session等方法的屬性可參考:https://drmaa-python.readthedocs.io/en/latest/drmaa.html

4. 應用

4.1 寫一個簡單應用

#!/usr/bin/env python

import drmaa
import os

class SGE():
    def __init__(self):
        self.__sgeProject="Test"
        self.__sgeQueue="test.q"
        self.__maxvmen="1G"
        self.__proc="1"
        self.__script=""
        self.__workdir=""
        self.__session=""
    def setSgeProject(self, p):
        self.__sgeProject=p
    def getSgeProject(self):
        return self.__sgeProject
    def setSgeQueue(self, q):
        self.__sgeQueue=q
    def getSgeQueue(self):
        return self.__sgeQueue
    def setMaxvmem(self, m):
        self.__maxvmem=m
    def setNumproc(self, proc):
        self.__proc=proc
    def getMaxvmem(self):
        return self.__maxvmem
    def setScript(self, s):
        self.__script=s
    def getScript(self):
        return self.__script
    def setWorkDir(self, w):
        self.__workdir=w
    def getWorkDir(self):
        return self.__workdir
    def setSession(self, ss):
        self.__session=ss
    def getSession(self):
        return self.__session

    def submit(self):
        st=os.stat(self.__script)    #系統 stat 的調用,返回stat結構
        os.chmod(self.__script, st.st_mode | stat.S_IEXEC | stat.S_IXGRP)  #S_IEXEC是S_IXUSR同義詞,所有者具有執行權限;S_IXGRP,組具有執行權限
        jt = self.__session.createJobTemplate() ##分配工作模板
        jt.remoteCommand = self.__script  #remoteCommand屬性找到要執行的腳本
        jt.workingDirectory = self.__workdir  #設定當前工作目錄
        par4qsub="".join(["-binding linear:",self.__proc," -P ",self.__sgeProject," -q ",self.__sgeQueue," -cwd -l ","vf=",self.__maxvmem," -l p=",self.__proc])
        print('qsub {0} {1}'.format(par4qsub,self.__script))
        jt.nativeSpecification = par4qsub #傳遞給jt的指令
        jobid =self.__session.runJob(jt) #將分配給作業的ID傳遞給的字符數組
        self.__session.deleteJobTemplate(jt)
        return jobid

def main():
    with drmaa.Session() as s:
        sgeObj = SGE()
        sgeObj.setSession(session)
        sgeObj.setSgeProject("SGEProject")
        sgeObj.setSgeQueue("SGEQueue")
        dict_qsub_id={}
        joblist=[]
        cwdir=os.path.join(getcwd())
        sgeObj.setWorkDir(cwdir)
        sgeObj.setScript(os.path.join(cwdir,"test.sh"))
        sgeObj.setMaxvmem("Memory")
        sgeObj.setNumproc("1")
        jobid=sgeObj.submit()
        dict_qsub_id[jobid]=os.path.join(cwdir,"test.sh")
        joblist.append(jobid)

        s.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False) #設為false
        for curjob in joblist:
            retval = session.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER)
            print('Job: {0} finished with status {1}'.format(retval.jobId,retval.hasExited))

if __name__=="__main__":
    main()

4.2 應用示例2

說明:用MEGAN做微生物物種注釋時,blast nr得到的結果太多,一次性注釋太久,因此將其拆分開來。Linux環境中使用MEGAN注釋需要調用xvfb-run(相當於一個wrapper, 給應用程序提供虛擬的 X server),但xvfb不能並行,當我同時運行多個注釋時,MEGAN生成的臨時文件rma會發生沖突,因而無法同時得到注釋結果。不能並行就只能串行,但我拆分了上百份文件,不可能手動一個個投遞,如何一個個任務依次運行呢?可以用drmaa寫個循環。

#繼承上面的SGE類
def check_status(retval,running_log,path,email):
    if(retval.exitStatus != 0):  #出錯的要發郵件通知
        running_log.write('{0}\nError job: {1}\n  exitStatus: {2}\n  wasAborted: {3}\n  maxvmem: {4}Gb\n  Qsub_id: {5}\n\n'.format("="*40, path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000), retval.jobId )) 
        running_log.close()
        emailObj = Email()
        emailObj.setReceiver(email)
        emailObj.sendMail('<html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"></head><body><p>Error job: {0}</p><p>  exitStatus: {1}</p><p>wasAborted: {2}</p><p>maxvmem: {3}</p></body></html>'.format(path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000) ))
        os._exit(0)
    elif(retval.wasAborted == True):  #手工終止的不需要發郵件通知
        running_log.write('{0}\nAborted job: {1}\n  exitStatus: {2}\n  wasAborted: {3}\n  maxvmem: {4}Gb\n  Qsub_id: {5}\n\n'.format("="*40, path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000) , retval.jobId))
        running_log.close()
        os._exit(0)
    else:
        running_log.write('{0}\nFinished job: {1}\n  exitStatus: {2}\n  wasAborted: {3}\n  maxvmem: {4}Gb\n  Qsub_id: {5}\n\n'.format("="*40, path, retval.exitStatus, retval.wasAborted, str(float(retval.resourceUsage['maxvmem'])/1000000000), retval.jobId ))
        running_log.flush()   #立即寫到文件中。

def main():
    project_dir=os.getcwd()
    running_log=open("running.log", "w")
    qsub_id_log=open("qsub_id.log", "w")
    dict_qsub_id={}
    with drmaa.Session() as session:
        sgeObj = Sge()
        sgeObj.setSession(session)
        for i in range(1,101):  #拆分的一百份任務
            joblist=[]
            subdir="tax_"+str(i)
            cwdir=os.path.join(project_dir,subdir)
            sgeObj.setWorkDir(cwdir)
            shell="run_tax_"+str(i)+".sh"
            sgeObj.setScript(os.path.join(cwdir,shell))
            sgeObj.setMaxvmem("1G")
            sgeObj.setNumproc("1")
            jobid=sgeObj.submit()
            qsub_id_log.write(jobid+"\n")
            qsub_id_log.flush() 
            dict_qsub_id[jobid]=os.path.join(cwdir,shell)
            joblist.append(jobid)
            session.synchronize(joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False)
            for curjob in joblist:
                retval = session.wait(curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER)
                check_status(retval,running_log,dict_qsub_id[retval.jobId],"123456@qq.com")
        emailObj = Email()
        emailObj.setReceiver("123456@qq.com")
        emailObj.sendMail('<html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"></head><body><p>Finished job: {0}</p></body></html>'.format(project_dir))
        print ("{0} All work is done! {0}>".format("="*30))
        running_log.close()
        qsub_id_log.close()

if __name__=="__main__":
    main()

Ref:https://drmaa-python.readthedocs.io/en/latest/tutorials.html#starting-and-stopping-a-session


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM