tornado 異步調用系統命令和非阻塞線程池


項目中異步調用 ping 和 nmap 實現對目標 ip 和所在網關的探測

Subprocess.STREAM 不用擔心進程返回數據過大造成的死鎖, Subprocess.PIPE 會有這個問題.

import tornado.gen
from tornado.process import Subprocess


@tornado.gen.coroutine
def run_command(command):
    """run command"""
    process = Subprocess(
        [command],
        stdout=Subprocess.STREAM,
        stderr=Subprocess.STREAM,
        shell=True
    )
    out, err = yield [process.stdout.read_until_close(), process.stderr.read_until_close()]
    raise tornado.gen.Return((out, err))


class NmapHandler(tornado.web.RequestHandler):
    """handle nmap check request"""
    @tornado.gen.coroutine
    def get(self):
        ip = self.get_argument("ip", None)
        if not ip:
            self.write(json.dumps({}))
            raise tornado.gen.Return(None)

        nmap_resp, _ = yield run_command(nmap % ip)

        self.write(json.dumps(
            {
                "ip": ip,
                "nmap_resp": nmap_resp
            }
        ))

 

前一陣想到一個問題, run_command 如何進行異常處理. 原則上, 異常除了本地存儲, 還應該上報調用者. 

子進程執行的命令是固定的, 出現異常只會有兩種情況, 第一, 創建子進程失敗, 觸發 OSError, 第二, 子進程中執行的 shell 命令失敗, 報錯信息重定向到stderr.

所以, 暫時的處理是捕捉 OSError.

 

使用非阻塞線程池, 調用 paramiko 來分發檢測任務.

from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor


class FailureHandler(tornado.web.RequestHandler):
    """handle server check request"""
    executor = ThreadPoolExecutor(100)

    @run_on_executor
    def get(self):
        ip = self.get_argument("ip", None)
        if not ip:
            self.write(json.dumps({}))
            raise tornado.gen.Return(None)

        resp = distributer(ip)
        if resp:
            resp = 0
        else:
            resp = 1

        self.write(json.dumps(
            {
                "ip": ip,
                "failure_rslt": resp
            }
        ))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM