paramiko模块执行交互式命令(实现用户切换)


invoke_shell()介绍 (可用来登陆后切换账户)
  在实际工作中,对于一些服务器,需要通过root账户权限来实现一些功能,但有的服务器是进行加固处理的,即没法直接用root账户连接SSH,所以就需要先通过一般用户 如 ossadm用户登录上后,再使用 su root 指令来切换成root账户
 而在python的paramiko模块中,直接通过ssh.exec_command(“su root”)是没办法直接更改账户的,执行进程时将被永久阻塞,所以我们采用另一种执行命令的方法 ssh.invoke_shell()
  invoke_shell()方法,用于创建一个子shell进程,这样所有的操作都可以在该子shell中进行,su切换用户不受影响,但该方法没有exec_command那种方便的ChannelFile对象,所有的标准输出和标准错误内容都通过invoke_shell返回对象的recv方法来获取,每一次调用recv只会从上一次返回的地方开始返回,也没有直接获取命令退出状态的方法,除非自己编写相应的程序,在此不赘述。

 

#!/usr/bin/env python
# coding:utf-8

from src.utils.config import Config
import paramiko
import time

class SSH:
    def __init__(self):
        pass

    def connect(self, ip, user='admin', password='Qax_skyeye-2021'):

        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            ssh.connect(hostname=ip, port=22, username=user, password=password)
            self.channel = ssh.invoke_shell()
            time.sleep(0.1)
        except paramiko.ssh_exception.AuthenticationException:
            print('Failed to login. ip username or password not correct.')
            exit(-1)

    def execute(self,command,end_with="# "):
        """
        command:要执行的命令
        end_with:输出结束的标志,例如当指令执行结束后,Linux窗口会显示#
        """
        command = command + "\n"
        print("command is: %s" % command)
        self.channel.send(command)
        buff = ''
        while not buff.endswith(end_with):
            resp = self.channel.recv(9999)
            buff += resp.decode('utf-8')
        print("output is: %s" % buff)
        return buff

    def use_dia(self,dia_password=r"xxxxxxxxxx@1"):
        """
        使用admin用户登录后,是一个受限shell。
        """
        try:
            self.execute("dia",end_with="Password: ")
            self.execute(dia_password)
        except:
            print("Failed to switch to root by dia")
            exit(-1)

    def close(self):
        self.ssh.close()

def get_ssh():
    conf = Config()
    username = conf.sensor_username
    password = conf.sensor_password
    ip = conf.sensor_ip

    my_ssh = SSH()
    my_ssh.connect(ip,username,password)
    return my_ssh
if __name__ == "__main__":
    my_ssh = get_ssh()
    my_ssh.use_dia()
    my_ssh.execute("whoami")
    print("ok")

 

 

 

 

 


参考链接:https://blog.csdn.net/weixin_42252770/article/details/99697415


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM