0x1、前言
在現場取證遇到分析流量包的情況會比較少,雖然流量類設備原理是把數據都抓出來進行解析,很大一定程度上已經把人可以做的事情交給了機器自動完成。
可用於PCAP包分析的軟件比如科來,Wireshark都是很好用的分析軟件,找Pcap解析的編程類代碼時發現已經有很多大佬寫過Python腳本輔助解析Pcap,也有提取將Pcap信息以界面形式展示出來框架。
本文對利用Python里的Scapy庫提取協議五元組信息進行學習性總結,沒有用於實戰,因為實踐過程中發現PCAP讀包解包查包速度太慢了。
0x2、參考庫
Python解析pcap包的常見庫有Scapy、dpkt、Pyshark等。可以參考的源碼,工具如下
https://github.com/thepacketgeek/cloud-pcap
https://github.com/madpowah/ForensicPCAP
https://github.com/le4f/pcap-analyzer
https://github.com/HatBoy/Pcap-Analyzer
https://github.com/caesar0301/awesome-pcaptools
https://asecuritysite.com/forensics/pcap
https://github.com/DanMcInerney/net-creds
提供解析Pcap包服務的網站https://packettotal.com/、https://www.capanalysis.net/ca/、https://asecuritysite.com/forensics/pcap?infile=smtp.pcap&infile=smtp.pcap、https://app.any.run
0x3、郵件協議
提取發件人的郵箱,就要熟悉SMTP的幾個端口進行識別。
25 端口為SMTP(Simple Mail Transfer Protocol,簡單郵件傳輸協議)服務所開放的,是用於發送郵件。
110端口是為POP3(郵件協議3)服務開放的,POP2、POP3都是主要用於接收郵件的,目前POP3使用的比較多,許多服務器都同時支持POP2和POP3。
143端口主要是用於“Internet Message AccessProtocol”v2(Internet消息訪問協議,簡稱IMAP),和POP3一樣,是用於電子郵件的接收的協議。
465 端口是SSL/TLS通訊協議的 內容一開始就被保護起來了 是看不到原文的。
465 端口(SMTPS):465端口是為SMTPS(SMTP-over-SSL)協議服務開放的,這是SMTP協議基於SSL安全協議之上的一種變種協議,它繼承了SSL安全協議的非對稱加密的高度安全可靠性,可防止郵件泄露。
587 端口是STARTTLS協議的 屬於TLS通訊協議 只是他是在STARTTLS命令執行后才對之后的原文進行保護的。
SMTP常用命令語法
在Wireshark中SMTP協議數據是在建立TCP三次握手后會出現的,常用的命令如下。
SMTP命令不區分大小寫,但參數區分大小寫,有關這方面的詳細說明請參考RFC821。
HELO <domain> <CRLF>。向服務器標識用戶身份發送者能欺騙,說謊,但一般情況下服務器都能檢測到。
MAIL FROM: <reverse-path> <CRLF>。<reverse-path>為發送者地址,此命令用來初始化郵件傳輸,即用來對所有的狀態和緩沖區進行初始化。
RCPT TO:<forward-path> <CRLF>。 <forward-path>用來標志郵件接收者的地址,常用在MAIL FROM后,可以有多個RCPT TO。
DATA <CRLF>。將之后的數據作為數據發送,以<CRLF>.<CRLF>標志數據的結尾。
REST <CRLF>。重置會話,當前傳輸被取消。
NOOP <CRLF>。要求服務器返回OK應答,一般用作測試。
QUIT <CRLF>。結束會話。
VRFY <string> <CRLF>。驗證指定的郵箱是否存在,由於安全方面的原因,服務器大多禁止此命令。
EXPN <string> <CRLF>。驗證給定的郵箱列表是否存在,由於安全方面的原因,服務器大多禁止此命令。
HELP <CRLF>。查詢服務器支持什么命令。
代碼
forensicPCAP 可以根據解析PCAP包,然后利用CMD模塊循環交互界面,輸入命令調出對應函數。核心原理為
基於Scapy找到源端口或目的端口為110、143端口的數據記錄保存作為郵件數據。
關鍵代碼代碼如下:
self.pcap是構造函數self.pcap = rdpcap(namefile)
提前讀取,然后enumerate()遍歷。指定目標端口和源端口是110、143的數據篩選出來。
def do_mail(self, arg, opts=None):
"""Print the number of mail's requests and store its
Usage :
- mail"""
sys.stdout.write(bcolors.TXT + "## Searching mail's request ... ")
sys.stdout.flush()
con = []
mailpkts = []
for i,packet in enumerate(self.pcap):
# TCP的包
if TCP in packet:
# 獲取源端口或目的端口為110、143端口的數據記錄
if packet.getlayer('TCP').dport == 110 or packet.getlayer('TCP').sport == 110 or packet.getlayer('TCP').dport == 143 or packet.getlayer('TCP').sport == 143 :
if packet.getlayer('TCP').flags == 2:
con.append(i)
mailpkts.append(packet)
sys.stdout.write("OK.\n")
print "## Result : Mail's request : " + str(len(con))
sys.stdout.write(bcolors.TXT + "## Saving mails ... ")
sys.stdout.flush()
res = ""
for packet in mailpkts:
if packet.getlayer('TCP').flags == 24:
res = res + packet.getlayer('Raw').load
sys.stdout.write(".")
sys.stdout.flush()
sys.stdout.write("OK\n")
sys.stdout.flush()
self.cmd = "mail"
self.last = res
0x4、DNS解析
forensicPCAP 解析DNS部分代碼。bcolors.TXT是設定了一個字體顯示的顏色,res = packet.getlayer('DNS').qd.qname
是獲取DNS域名解析記錄。
############# do_dns() ###########
def do_dns(self, arg, opts=None):
"""Print all DNS requests in the PCAP file
Usage :
- dns"""
sys.stdout.write(bcolors.TXT + "## Listing all DNS requests ...")
sys.stdout.flush()
dns = []
dns.append([])
# 枚舉PCAP包的數據
for i,packet in enumerate(self.pcap):
if DNS in packet:
# 獲取DNS域名解析記錄
res = packet.getlayer('DNS').qd.qname
if res[len(res) - 1] == '.':
res = res[:-1]
# 保存域名
dns.append([i, res])
sys.stdout.write("OK.\n")
# 統計DNS數目
print bcolors.TXT + "## Result : " + str(len(dns) - 1) + " DNS request(s)" + bcolors.ENDC
self.last = dns
self.cmd = "dns"
pcap-analyzer核心的代碼是借鑒了forensicPCAP,獲取解析為DNS的請求記錄顯示的時候會統計出次數最多的IP的前十名。
代碼
def get_dns(file):
dns = []
# 打開PCAP文件
pcap = rdpcap(UPLOAD_FOLDER+file)
for packet in pcap:
if DNS in packet:
# 核心代碼
res = packet.getlayer('DNS').qd.qname
if res[len(res) - 1] == '.':
res = res[:-1]
dns.append(res)
# 統計DNS協議,出現次數最多的IP前十名
dns = Counter(dns).most_common(10)
0x5、密碼信息提取
net-creds 以Scapy為基礎,解析PCAP中含有密碼信息的一個腳本。
代碼
主要代碼由other_parser()函數實現,分割每個包中的HTTP等內容,然后搜索身份驗證相關的關鍵字篩選出賬戶、密碼。
def other_parser(src_ip_port, dst_ip_port, full_load, ack, seq, pkt, verbose):
'''
Pull out pertinent info from the parsed HTTP packet data
'''
user_passwd = None
http_url_req = None
method = None
http_methods = ['GET ', 'POST ', 'CONNECT ', 'TRACE ', 'TRACK ', 'PUT ', 'DELETE ', 'HEAD ']
http_line, header_lines, body = parse_http_load(full_load, http_methods)
headers = headers_to_dict(header_lines)
if 'host' in headers:
host = headers['host']
else:
host = ''
if http_line != None:
method, path = parse_http_line(http_line, http_methods)
http_url_req = get_http_url(method, host, path, headers)
if http_url_req != None:
if verbose == False:
if len(http_url_req) > 98:
http_url_req = http_url_req[:99] + '...'
printer(src_ip_port, None, http_url_req)
# Print search terms
searched = get_http_searches(http_url_req, body, host)
if searched:
printer(src_ip_port, dst_ip_port, searched)
# Print user/pwds
if body != '':
user_passwd = get_login_pass(body)
if user_passwd != None:
try:
http_user = user_passwd[0].decode('utf8')
http_pass = user_passwd[1].decode('utf8')
# Set a limit on how long they can be prevent false+
if len(http_user) > 75 or len(http_pass) > 75:
return
user_msg = 'HTTP username: %s' % http_user
printer(src_ip_port, dst_ip_port, user_msg)
pass_msg = 'HTTP password: %s' % http_pass
printer(src_ip_port, dst_ip_port, pass_msg)
except UnicodeDecodeError:
pass
# Print POST loads
# ocsp is a common SSL post load that's never interesting
if method == 'POST' and 'ocsp.' not in host:
try:
if verbose == False and len(body) > 99:
# If it can't decode to utf8 we're probably not interested in it
msg = 'POST load: %s...' % body[:99].encode('utf8')
else:
msg = 'POST load: %s' % body.encode('utf8')
printer(src_ip_port, None, msg)
except UnicodeDecodeError:
pass
# Kerberos over TCP
decoded = Decode_Ip_Packet(str(pkt)[14:])
kerb_hash = ParseMSKerbv5TCP(decoded['data'][20:])
if kerb_hash:
printer(src_ip_port, dst_ip_port, kerb_hash)
# Non-NETNTLM NTLM hashes (MSSQL, DCE-RPC,SMBv1/2,LDAP, MSSQL)
NTLMSSP2 = re.search(NTLMSSP2_re, full_load, re.DOTALL)
NTLMSSP3 = re.search(NTLMSSP3_re, full_load, re.DOTALL)
if NTLMSSP2:
parse_ntlm_chal(NTLMSSP2.group(), ack)
if NTLMSSP3:
ntlm_resp_found = parse_ntlm_resp(NTLMSSP3.group(), seq)
if ntlm_resp_found != None:
printer(src_ip_port, dst_ip_port, ntlm_resp_found)
# Look for authentication headers
if len(headers) == 0:
authenticate_header = None
authorization_header = None
for header in headers:
authenticate_header = re.match(authenticate_re, header)
authorization_header = re.match(authorization_re, header)
if authenticate_header or authorization_header:
break
if authorization_header or authenticate_header:
# NETNTLM
netntlm_found = parse_netntlm(authenticate_header, authorization_header, headers, ack, seq)
if netntlm_found != None:
printer(src_ip_port, dst_ip_port, netntlm_found)
# Basic Auth
parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header)
關鍵字列表:
# Regexs
authenticate_re = '(www-|proxy-)?authenticate'
authorization_re = '(www-|proxy-)?authorization'
ftp_user_re = r'USER (.+)\r\n'
ftp_pw_re = r'PASS (.+)\r\n'
irc_user_re = r'NICK (.+?)((\r)?\n|\s)'
irc_pw_re = r'NS IDENTIFY (.+)'
irc_pw_re2 = 'nickserv :identify (.+)'
mail_auth_re = '(\d+ )?(auth|authenticate) (login|plain)'
mail_auth_re1 = '(\d+ )?login '
NTLMSSP2_re = 'NTLMSSP\x00\x02\x00\x00\x00.+'
NTLMSSP3_re = 'NTLMSSP\x00\x03\x00\x00\x00.+'
# Prone to false+ but prefer that to false-
http_search_re = '((search|query|&q|\?q|search\?p|searchterm|keywords|keyword|command|terms|keys|question|kwd|searchPhrase)=([^&][^&]*))'
0x6、提取數據需要關注的元素
- 源IP、目的IP、源端口、目的端口、協議、數據包大小
關聯出受害者,攻擊者控制的跳板
- 協議:HTTP、FTP、郵件協議
- 先查看HTTP、HTTPS類,然后查看數據包長度。
- 提取相關IP信息。
- 判斷是否是木馬遠控、密碼賬戶、可疑IP
- 賬戶密碼字段
- 滲透測試,嗅探回來的數據包分析含有賬戶密碼信息
- 暴力破解IP
0x7、測試dpkt解析pcap
本想借助dpkt解析mail、dns、http來輔助分析pcap包進行分析,查閱資料學習卻發現並不如使用scapy那么方便。
dpkt是一個python模塊,可以對簡單的數據包創建/解析,以及基本TCP / IP協議的解析,速度很快。
dpkt 手冊
看官方手冊發現DPKT是讀取每個pcap包里的內容,用isinstance判斷是不是有IP的包,再判斷是屬於哪個協議,對應的協議已經封裝好API如果發現可以匹配某個協議API就輸出來相關值。
想要擴展這個源碼還需要去學習一下協議相關的字段含義。
API調用:
https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq
在手冊中找到了在Github中部分API的示例代碼,具備參考價值。
手冊例子
以下代碼是手冊中的例子,通過查詢發現inet_pton無法直接使用,按照網絡上的解決方法修改了一下。
打印數據包
使用DPKT讀取pcap文件並打印出數據包的內容。打印出以太網幀和IP數據包中的字段。
python2測試代碼:
#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os
def mac_addr(address):
"""Convert a MAC address to a readable/printable string
Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address)
class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)]
if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family')
if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError())
return ip_string[:ip_string_size.value - 1]
# Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop
def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet)
def print_packets(pcap):
"""Print out information about each packet in a pcap
Args:
pcap: dpkt pcap reader object (dpkt.pcap.Reader)
"""
# packet num count
r_num = 0
# For each packet in the pcap process the contents
for timestamp, buf in pcap:
r_num=r_num+1
print ('packet num count :' , r_num )
# Print out the timestamp in UTC
print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
# Unpack the Ethernet frame (mac src/dst, ethertype)
eth = dpkt.ethernet.Ethernet(buf)
print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
# Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
continue
# Now unpack the data within the Ethernet frame (the IP packet)
# Pulling out src, dst, length, fragment info, TTL, and Protocol
ip = eth.data
# Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
more_fragments = bool(ip.off & dpkt.ip.IP_MF)
fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
# Print out the info
print('IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)\n' % \
(inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
def test():
"""Open up a test pcap file and print out the packets"""
with open('pcap222.pcap', 'rb') as f:
pcap = dpkt.pcap.Reader(f)
print_packets(pcap)
if __name__ == '__main__':
test()
輸出:
('packet num count :', 4474)
('Timestamp: ', '2017-08-01 03:55:03.314832')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 211.90.25.31 (len=52 ttl=64 DF=1 MF=0 offset=0)
('packet num count :', 4475)
('Timestamp: ', '2017-08-01 03:55:03.485679')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 180.97.33.12 (len=114 ttl=64 DF=0 MF=0 offset=0)
('packet num count :', 4476)
('Timestamp: ', '2017-08-01 03:55:03.486141')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 119.75.222.122 (len=52 ttl=64 DF=1 MF=0 offset=0)
打印ICMP
檢查ICMP數據包並顯示ICMP內容。
#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os
def mac_addr(address):
"""Convert a MAC address to a readable/printable string
Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address)
class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)]
if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family')
if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError())
return ip_string[:ip_string_size.value - 1]
# Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop
def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet)
def print_icmp(pcap):
"""Print out information about each packet in a pcap
Args:
pcap: dpkt pcap reader object (dpkt.pcap.Reader)
"""
# packet num count
r_num = 0
# For each packet in the pcap process the contents
for timestamp, buf in pcap:
r_num=r_num+1
print ('packet num count :' , r_num )
# Unpack the Ethernet frame (mac src/dst, ethertype)
eth = dpkt.ethernet.Ethernet(buf)
# Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
continue
# Now grab the data within the Ethernet frame (the IP packet)
ip = eth.data
# Now check if this is an ICMP packet
if isinstance(ip.data, dpkt.icmp.ICMP):
icmp = ip.data
# Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
more_fragments = bool(ip.off & dpkt.ip.IP_MF)
fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
# Print out the info
print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
print( 'Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
print( 'IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)' % \
(inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
print('ICMP: type:%d code:%d checksum:%d data: %s\n' % (icmp.type, icmp.code, icmp.sum, repr(icmp.data)))
def test():
"""Open up a test pcap file and print out the packets"""
with open('pcap222.pcap', 'rb') as f:
pcap = dpkt.pcap.Reader(f)
print_icmp(pcap)
if __name__ == '__main__':
test()
輸出:
('packet num count :', 377)
('Timestamp: ', '2017-08-01 03:45:56.403640')
('Ethernet Frame: ', 'ec:88:8f:86:14:5c', '9c:5c:8e:76:bf:24', 2048)
IP: 202.118.168.73 -> 192.168.1.103 (len=56 ttl=253 DF=0 MF=0 offset=0)
ICMP: type:3 code:13 checksum:52074 data: Unreach(data=IP(len=28, id=2556, off=16384, ttl=61, p=6, sum=36831, src='\xc0\xa8\x01g', dst='\xcal\x17q', opts='', data='n\xb1\x00P\x85)=]'))
打印HTTP請求
#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os
def mac_addr(address):
"""Convert a MAC address to a readable/printable string
Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address)
class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)]
if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family')
if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError())
return ip_string[:ip_string_size.value - 1]
# Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop
def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet)
def print_http_requests(pcap):
"""Print out information about each packet in a pcap
Args:
pcap: dpkt pcap reader object (dpkt.pcap.Reader)
"""
# packet num count
r_num = 0
# For each packet in the pcap process the contents
for timestamp, buf in pcap:
r_num=r_num+1
print ('packet num count :' , r_num )
# Unpack the Ethernet frame (mac src/dst, ethertype)
eth = dpkt.ethernet.Ethernet(buf)
# Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
continue
# Now grab the data within the Ethernet frame (the IP packet)
ip = eth.data
# Check for TCP in the transport layer
if isinstance(ip.data, dpkt.tcp.TCP):
# Set the TCP data
tcp = ip.data
# Now see if we can parse the contents as a HTTP request
try:
request = dpkt.http.Request(tcp.data)
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
continue
# Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
more_fragments = bool(ip.off & dpkt.ip.IP_MF)
fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
# Print out the info
print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
print('IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)' %
(inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
print('HTTP request: %s\n' % repr(request))
# Check for Header spanning acrossed TCP segments
if not tcp.data.endswith(b'\r\n'):
print('\nHEADER TRUNCATED! Reassemble TCP segments!\n')
def test():
"""Open up a test pcap file and print out the packets"""
with open('pcap222.pcap', 'rb') as f:
pcap = dpkt.pcap.Reader(f)
print_http_requests(pcap)
if __name__ == '__main__':
test()
輸出:
Timestamp: 2004-05-13 10:17:08.222534
Ethernet Frame: 00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 65.208.228.223 (len=519 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/download.html', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'www.ethereal.com', 'referer': 'http://www.ethereal.com/development.html'}, version='1.1', data='', method='GET')
Timestamp: 2004-05-13 10:17:10.295515
Ethernet Frame: 00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 216.239.59.99 (len=761 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/pagead/ads?client=ca-pub-2309191948673629&random=1084443430285&lmt=1082467020&format=468x60_as&output=html&url=http%3A%2F%2Fwww.ethereal.com%2Fdownload.html&color_bg=FFFFFF&color_text=333333&color_link=000000&color_url=666633&color_border=666633', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'pagead2.googlesyndication.com', 'referer': 'http://www.ethereal.com/download.html'}, version='1.1', data='', method='GET')
...
打印出以太網IP
594 MB的pcap解析速度是127秒。
# coding=utf-8
import dpkt
import socket
import time
import ctypes
import os
import datetime
# 測試dpkt獲取IP運行時間
# 使用dpkt獲取時間戳、源IP、目的IP
class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)]
if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows
def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))
if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family')
if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError())
return ip_string[:ip_string_size.value - 1]
# Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop
def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet)
def getip(pcap):
Num = 0
for timestamp, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf)
# 對沒有IP段的包過濾掉
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
continue
ip = eth.data
ip_src = inet_to_str(ip.src)
ip_dst = inet_to_str(ip.dst)
# 打印時間戳,源->目標
#print(ts + " " + ip_src + "-->" + ip_dst)
Num= Num+1
print ('{0}\ttime:{1}\tsrc:{2}-->dst:{3} '.format(Num,timestamp,ip_src ,ip_dst))
if eth.data.__class__.__name__ == 'IP':
ip = '%d.%d.%d.%d' % tuple(map(ord, list(eth.data.dst)))
if eth.data.data.__class__.__name__ == 'TCP':
if eth.data.data.dport == 80:
print eth.data.data.data # http 請求的數據
if __name__ == '__main__':
starttime = datetime.datetime.now()
f = open('pcap222.pcap', 'rb') # 要以rb方式打開,用r方式打開會報錯
pcap = dpkt.pcap.Reader(f)
getip(pcap)
endtime = datetime.datetime.now()
print ('time : {0} seconds '.format((endtime - starttime).seconds))
輸出:
1290064 time:1501562988.75 src:113.142.85.151-->dst:192.168.1.103
1290065 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151
1290066 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151
1290067 time:1501562988.75 src:113.142.85.151-->dst:192.168.1.103
1290068 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151
1290069 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151
1290070 time:1501562988.76 src:122.228.91.14-->dst:192.168.1.103
1290071 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151
1290072 time:1501562988.76 src:113.142.85.151-->dst:192.168.1.103
1290073 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151
1290074 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151
GET / HTTP/1.1
Accept: application/x-ms-application, image/jpeg, application/xaml+xml, image/gif, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*
Accept-Language: zh-cn
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Mac_PowerPC; en) Opera 9.24
Referer: -
Connection: Keep-Alive
Host: win7.shangshai-qibao.cn
0x7、參考
SMTP協議分析 https://yq.aliyun.com/wenji/262429
scapy 解析pcap文件總結 https://www.cnblogs.com/14061216chen/p/8093441.html
python之字符串格式化(format) https://www.cnblogs.com/benric/p/4965224.html