paramiko 源碼修改
paramiko主要用來實現ssh客戶端、服務端鏈接,上一節我們說到了堡壘機,堡壘機內有一個需求是“用戶行為審計”,在這里我們就可以通過修改paramiko內文件的源碼來實現相關要求。
paramiko 源碼包安裝測試
- 下載地址:https://github.com/ 下載源碼包paramiko
- 解壓源碼包 paramiko-master.zip
- 使用目錄:paramiko-master\paramiko-master\demos
- python3啟動:python paramiko-master\paramiko-master\demos\demos.py
注:paramiko demos.py啟動 不支持python3 ,由於python3默認是bays類型,需要修改源碼。
\demos\interactive.py", line 84, in writeall sys.stdout.write(data) TypeError: write() argument must be str, not bytes
修改源代碼(demos.py 添加 python3 支持)
注:interactive.py文件內(77~85 行)
def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break # 修改為編譯為UTF-8 sys.stdout.write(data.decode()) sys.stdout.flush()

C:\Users\Administrator\demos>python3 demo.py Hostname: 192.168.1.100 *** Unable to open host keys file *** WARNING: Unknown host key! Username [Administrator]: root Auth by (p)assword, (r)sa key, or (d)ss key? [p] p Password for root@192.168.1.100: *** Here we go! Line-buffered terminal emulation. Press F6 or ^Z to send EOF. Last login: Mon Jan 22 11:24:35 2018 from 192.168.1.150 [root@localhost ~]# 注:登陸成功
修改源碼(實現抓取用戶操作指令)
注:paramiko-master\paramiko-master\demos\demos.py 文件源碼、簡要說明

import base64 from binascii import hexlify import getpass import os import select import socket import sys import time import traceback from paramiko.py3compat import input import paramiko try: import interactive except ImportError: from . import interactive def agent_auth(transport, username): """ Attempt to authenticate to the given transport using any of the private keys available from an SSH agent. """ agent = paramiko.Agent() agent_keys = agent.get_keys() if len(agent_keys) == 0: return for key in agent_keys: # 嘗試將本地key驗證,驗證不了就用密碼 print('Trying ssh-agent key %s' % hexlify(key.get_fingerprint())) try: transport.auth_publickey(username, key) print('... success!') return except paramiko.SSHException: print('... nope.') def manual_auth(username, hostname): default_auth = 'p' auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth) if len(auth) == 0: auth = default_auth # r非對稱秘鑰 if auth == 'r': default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') path = input('RSA key [%s]: ' % default_path) if len(path) == 0: path = default_path try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('RSA key password: ') key = paramiko.RSAKey.from_private_key_file(path, password) t.auth_publickey(username, key) # 使用DSA 秘鑰 elif auth == 'd': default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa') path = input('DSS key [%s]: ' % default_path) if len(path) == 0: path = default_path try: key = paramiko.DSSKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('DSS key password: ') key = paramiko.DSSKey.from_private_key_file(path, password) t.auth_publickey(username, key) else: pw = getpass.getpass('Password for %s@%s: ' % (username, hostname)) t.auth_password(username, pw) # setup logging paramiko.util.log_to_file('demo.log') username = '' if len(sys.argv) > 1: hostname = sys.argv[1] if hostname.find('@') >= 0: username, hostname = hostname.split('@') else: # 輸入IP hostname = input('Hostname: ') if len(hostname) == 0: # 不輸入打印錯誤 print('*** Hostname required.') sys.exit(1) # 端口號22 port = 22 if hostname.find(':') >= 0: hostname, portstr = hostname.split(':') port = int(portstr) # now connect try: # 創建socket對象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 鏈接IP端口號 sock.connect((hostname, port)) except Exception as e: # 登陸錯誤就報錯 print('*** Connect failed: ' + str(e)) # 打印錯誤行 traceback.print_exc() sys.exit(1) try: # ssh實例化傳入socket t = paramiko.Transport(sock) try: t.start_client() except paramiko.SSHException: print('*** SSH negotiation failed.') sys.exit(1) try: keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) except IOError: try: # 生成rsa 防止證書校驗 keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts')) except IOError: print('*** Unable to open host keys file') keys = {} # check server's host key -- this is important. key = t.get_remote_server_key() if hostname not in keys: print('*** WARNING: Unknown host key!') elif key.get_name() not in keys[hostname]: print('*** WARNING: Unknown host key!') elif keys[hostname][key.get_name()] != key: print('*** WARNING: Host key has changed!!!') sys.exit(1) else: print('*** Host key OK.') # get username if username == '': # getuser取出當前用戶名 default_username = getpass.getuser() username = input('Username [%s]: ' % default_username) if len(username) == 0: username = default_username agent_auth(t, username) # key驗證 沒通過繼續往下走 if not t.is_authenticated(): manual_auth(username, hostname) if not t.is_authenticated(): print('*** Authentication failed. :(') # 認證成功就關閉 t.close() sys.exit(1) # ssh_session交互 chan = t.open_session() # get終端 vt100 終端模式 chan.get_pty() chan.invoke_shell() print('*** Here we go!\n') # 交互 # chan是雙方打開的一個通道 # 調用interactive文件 interactive.interactive_shell(chan) chan.close() t.close() # 表村錯誤數據關閉鏈接 except Exception as e: print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e)) traceback.print_exc() try: t.close() except: pass sys.exit(1)
注:paramiko-master\paramiko-master\demos\interactive.py 文件源碼修改,實現用戶審計
import socket import sys from paramiko.py3compat import u # windows does not have termios... try: # 不出錯代表這是一個linux系統 import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan): if has_termios: # linux鏈接 posix_shell(chan) else: windows_shell(chan) # chan是傳入的通道 def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) #-----------------1.源碼修改---------------------#
# 創建列表用來存放客戶端輸入命令 cmd = []
#----------------------------------------------# while True: # stdin標准輸入,stdout標准輸出,標准錯誤 r, w, e = select.select([chan, sys.stdin], [], []) # 如果chan在r里面就代表有數據過來了 if chan in r: try: # 數據過來就收數據 x = u(chan.recv(1024)) # 如果數據等於0,代表斷開鏈接 if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break # 發送數據 sys.stdout.write(x) # 刷新數據 sys.stdout.flush() except socket.timeout: pass # 沒進行一次動作就select就執行返回一次 if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break
#-----------------2.源碼修改---------------------#
# 判斷用戶是否使用回車鍵 if x == "\r": # 將列表字符拼接 cmd_str="".join(cmd) # 獲取出用戶執行命令 print("-->",cmd_str) # 清空列表 cmd = [] else: # 沒有回車就將字符添加列表內 cmd.append(x)
#----------------------------------------------# chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) # thanks to Mike Looijmans for this code def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break sys.stdout.write(data.decode()) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass