Python開發【項目】:RPC異步執行命令(RabbitMQ雙向通信)


RPC異步執行命令
需求:
  • 利用RibbitMQ進行數據交互
  • 可以對多台服務器進行操作
  • 執行命令后不等待命令的執行結果,而是直接讓輸入下一條命令,結果出來后自動打印
  • 實現異步操作

不懂rpc的請移步http://www.cnblogs.com/lianzhilei/p/5977545.html(最下邊)

本節涉及最多的還是rabbitmq通信原理知識,要求安裝rabbitmq服務

 

程序用廣播topic模式做更好 

 

程序目錄結構:

程序簡介:

# 異步rpc程序

# 博客地址
[第11天博客地址](http://www.cnblogs.com/lianzhilei/p/5970176.html)
               (http://www.cnblogs.com/lianzhilei/p/5970176.html)

## 1、需求
- [ ] 利用RibbitMQ進行數據交互
- [ ] 可以對多台服務器進行操作
- [ ] 執行命令后不等待命令的執行結果,而是直接讓輸入下一條命令,結果出來后自動打印
- [ ] 實現異步操作

## 備注

- [ ] RabbitMQ隊列名:
                    ①執行命令時,隊列名為服務器端的IP
                    ②查詢數據時,用的是回調時隨機生成的callback_queue名
- [ ] threading多線程:
                    實現命令執行后不等待執行結果,依然可以輸入新的指令

- [ ] 執行命令格式:
                 -->>run "dir" host 192.168.20.22 192.168.20.23
                        dir     server端要執行的命令
                        host    host后可跟一個或多個可以通過rabbitMQ的服務器地址

- [ ] 查看后台所有的TASK_ID信息:
                 -->>check_all
     顯示結果樣式:TASK_ID【76786】    HOST【192.168.20.22】    COMMAND【dir】
                  TASK_ID【10307】    HOST【192.168.20.23】    COMMAND【dir】

- [ ] 查看TASK_ID對應的執行結果:
                 -->>check_task 10307
                         10307 為check_all查到的TASK_ID
README.md

程序流程圖:

服務器端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

# !/usr/bin/env python
# -*- coding:utf-8 -*-
# -Author-Lian

import pika
import os

class Server(object):
    def __init__(self,rabbitmq,queue_name):
        self.queue_name = queue_name
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=rabbitmq))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue_name)

    def handle(self,command):
        command = command.decode()
        print(command,type(command))
        message = os.popen(command).read()
        if not message:
            message = "Wrong Command"
        return message

    def on_request(self,ch, method, props, body):
        response = self.handle(body)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,  # 回信息隊列名
                         properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def start(self):
        self.channel.basic_consume(self.on_request,
                                   queue=self.queue_name)

        print(" [x] Awaiting RPC requests")
        self.channel.start_consuming()


if __name__ == "__main__":
    rabbitmq = "localhost"      #rabbitmq服務器地址
    queue_name = "192.168.20.22"    #queue_name為本地ip地址
    server = Server(rabbitmq,queue_name)
    server.start()
server.py

 

客戶端:

bin目錄:

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import os
import platform


#添加BASE_DIR,添加頂級目錄到路徑中,方便調用其他目錄模塊
if platform.system() == 'Windows':
    print(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])
    BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])
else:
    BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])


#加載環境變量
sys.path.append(BASE_DIR)
from conf import settings
from core import main

if __name__ == '__main__':
    obj = main.Handler()
    obj.start()
start.py

conf目錄:

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luotianshuai'

import os
import sys
import platform


if platform.system() == 'Windows':
    BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])
    school_dbpaths = os.path.join(BASE_DIR,'school_db')

else:
    BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
    school_dbpaths =os.path.join(BASE_DIR, 'school_db')


#rabbitmq服務地址ip
RabbitMQ_IP = 'localhost'
settings.py

core目錄

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

from conf import settings
from modules.client import Client
import random,time
import threading

class Handler(object):
    def __init__(self):
        self.information = {}   # 后台進程信息

    def check_all(self,*args):
        '''查看所有task_id信息'''
        time.sleep(2)
        for key in self.information:
            print("TASK_ID【%s】\tHOST【%s】\tCOMMAND【%s】"%(key,self.information[key][0],
                                                                    self.information[key][1]))

    def check_task(self,user_cmd):
        '''查看task_id執行結果'''
        time.sleep(2)
        try:
            task_id = user_cmd.split()[1]
            task_id = int(task_id)
            callback_queue=self.information[task_id][2]
            callback_id=self.information[task_id][3]
            client = Client()
            response = client.get_response(callback_queue, callback_id)
            print(response.decode())
            del self.information[task_id]

        except KeyError  as e :
            print("\33[31;0mWrong id[%s]\33[0m"%e)
        except IndexError as e:
            print("\33[31;0mWrong id[%s]\33[0m"%e)

    def run(self,user_cmd):
        '''執行命令'''
        try:
            time.sleep(2)
            #print("--->>",user_cmd)
            command = user_cmd.split("\"")[1]
            hosts = user_cmd.split()[3:]
            for host in hosts:
                task_id = random.randint(10000, 99999)
                client = Client()
                response = client.call(host, command)
                # print(response)
                self.information[task_id] = [host, command, response[0],response[1]]
        except IndexError as e:
            print("\33[31;0mError:%s\33[0m"%e)

    def reflect(self,str,user_cmd):
        '''反射'''
        if hasattr(self, str):
            getattr(self, str)(user_cmd)
        # else:
        #     setattr(self, str, self.foo)
        #     getattr(self, str)()

    def start(self):
        while True:
            user_cmd = input("->>").strip()
            if not user_cmd:continue
            str = user_cmd.split()[0]
            t1 = threading.Thread(target=self.reflect,args=(str,user_cmd))  #多線程
            t1.start()
main.py

modules目錄

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# -Author-Lian

import pika
import uuid
from conf import settings

class Client(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=settings.RabbitMQ_IP))
        self.channel = self.connection.channel()

    def on_response(self, ch, method, props, body):
        '''獲取命令執行結果的回調函數'''
        # print("驗證碼核對",self.callback_id,props.correlation_id)
        if self.callback_id == props.correlation_id:  # 驗證碼核對
            self.response = body
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def get_response(self,callback_queue,callback_id):
        '''取隊列里的值,獲取callback_queued的執行結果'''
        self.callback_id = callback_id
        self.response = None
        self.channel.basic_consume(self.on_response,  # 只要收到消息就執行on_response
                                   queue=callback_queue)
        while self.response is None:
            self.connection.process_data_events()  # 非阻塞版的start_consuming
        return self.response

    def call(self,queue_name,command):
        '''隊列里發送數據'''
        result = self.channel.queue_declare(exclusive=False) #exclusive=False 必須這樣寫
        self.callback_queue = result.method.queue
        self.corr_id = str(uuid.uuid4())
        # print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key=queue_name,
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,  # 發送返回信息的隊列name
                                       correlation_id=self.corr_id,  # 發送uuid 相當於驗證碼
                                   ),
                                   body=command)

        return self.callback_queue,self.corr_id
client.py

 

 

運行示例圖

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM