項目中異步調用 ping 和 nmap 實現對目標 ip 和所在網關的探測
Subprocess.STREAM 不用擔心進程返回數據過大造成的死鎖, Subprocess.PIPE 會有這個問題.
import tornado.gen from tornado.process import Subprocess @tornado.gen.coroutine def run_command(command): """run command""" process = Subprocess( [command], stdout=Subprocess.STREAM, stderr=Subprocess.STREAM, shell=True ) out, err = yield [process.stdout.read_until_close(), process.stderr.read_until_close()] raise tornado.gen.Return((out, err)) class NmapHandler(tornado.web.RequestHandler): """handle nmap check request""" @tornado.gen.coroutine def get(self): ip = self.get_argument("ip", None) if not ip: self.write(json.dumps({})) raise tornado.gen.Return(None) nmap_resp, _ = yield run_command(nmap % ip) self.write(json.dumps( { "ip": ip, "nmap_resp": nmap_resp } ))
前一陣想到一個問題, run_command 如何進行異常處理. 原則上, 異常除了本地存儲, 還應該上報調用者.
子進程執行的命令是固定的, 出現異常只會有兩種情況, 第一, 創建子進程失敗, 觸發 OSError, 第二, 子進程中執行的 shell 命令失敗, 報錯信息重定向到stderr.
所以, 暫時的處理是捕捉 OSError.
使用非阻塞線程池, 調用 paramiko 來分發檢測任務.
from concurrent.futures import ThreadPoolExecutor from tornado.concurrent import run_on_executor class FailureHandler(tornado.web.RequestHandler): """handle server check request""" executor = ThreadPoolExecutor(100) @run_on_executor def get(self): ip = self.get_argument("ip", None) if not ip: self.write(json.dumps({})) raise tornado.gen.Return(None) resp = distributer(ip) if resp: resp = 0 else: resp = 1 self.write(json.dumps( { "ip": ip, "failure_rslt": resp } ))