一、ICMP協議分析
ICMP:Internet控制報文協議。由於IP協議並不是一個可靠的協議,它不保證數據被成功送達,那么,如何才能保證數據的可靠送達呢? 這里就需要使用到一個重要的協議模塊ICMP(網絡控制報文)協議。它傳遞差錯報文以及其他需要注意的信息,經常供IP層或更高層協議(TCP或UDP)使用。所以它經常被認為是IP層的一個組成部分。它在IP數據報文中的封裝如下:
ICMP的數據報文格式如下所示。所有報文的前4個字節都是一樣的,其他的因報文類型不同而不一樣。類型字段可以有15個不同的值,用以描述不同的ICMP報文。校驗和字段覆蓋整個ICMP報文,使用了和IP首部檢驗和一樣的算法,詳細請搜索TCP/IP檢驗和算法。
不同類型的報文是由類型字段和代碼字段來共同決定。下表是各種類型的ICMP報文。
根據上表可知,ICMP協議大致分為兩類,一種是查詢報文,一種是差錯報文。查詢報文是用一對請求和應答定義的,它通常有以下幾種用途:
- ping查詢
- 子網掩碼查詢(用於無盤工作站在初始化自身的時候初始化子網掩碼)
- 時間戳查詢(可以用來同步時間)
而差錯報文通常包含了引起錯誤的IP數據報的第一個分片的IP首部(和選項),加上該分片數據部分的前8個字節。RFC 792規范中定義的這8個字節中包含了該分組運輸層首部的所有分用信息,這樣運輸層協議就可以向正確的進程提交ICMP差錯報文。
當傳送IP數據包發生錯誤時,比如主機不可達,端口不可達等,ICMP協議就會把錯誤信息封包,然后傳送回給主機。給主機一個處理錯誤的機會,這也就是為什么說建立在IP層以上的協議是可能做到安全的原因。由上面可知,ICMP數據包由8bit的錯誤類型和8bit的代碼和16bit的校驗和組成,而前 16bit就組成了ICMP所要傳遞的信息。由數據鏈路層所能發送的最大數據幀,即MTU(Maximum Transmission Unit)為1500,計算易知ICMP協議在實際傳輸中數據包為:20字節IP首部 + 8字節ICMP首部+ 1472字節(數據大小)。
盡管在大多數情況下,錯誤的包傳送應該給出ICMP報文,但是在特殊情況下,是不產生ICMP錯誤報文的。如下
- ICMP差錯報文不會產生ICMP差錯報文(出IMCP查詢報文)(防止IMCP的無限產生和傳送)
- 目的地址是廣播地址或多播地址的IP數據報。
- 作為鏈路層廣播的數據報。
- 不是IP分片的第一片。
- 源地址不是單個主機的數據報。這就是說,源地址不能為零地址、環回地址、廣播地 址或多播地址。
二、ping程序原理分析
ping程序是由Mike Muuss編寫,目的是為了測試另一 台主機是否可達,現在已經成為一個常用的網絡狀態檢查工具。該程序發送一份 ICMP回顯請求報文給遠程主機,並等待返回 ICMP回顯應答。利用ping這種原理,已經出現了許多基於ping的網絡掃描器,比如nmap、arping、fping、hping3等。所以隨着Internet安全意識的增強,現在有些提供訪問控制策略的路由器和防火牆已經可以設置過濾特定ICMP報文請求。因此並不能通過簡單的ping命令判斷遠程主機是否在線。
ping 使用的是ICMP協議,它發送icmp回送請求消息給目的主機。ICMP協議規定:目的主機必須返回ICMP回送應答消息給源主機。如果源主機在一定時間內收到應答,則認為主機可達。大多數的 TCP/IP 實現都在內核中直接支持Ping服務器,ICMP回顯請求和回顯應答報文如下圖所示。
ping的原理是用類型碼為0的ICMP發請 求,受到請求的主機則用類型碼為8的ICMP回應。通過計算ICMP應答報文數量和與接受與發送報文之間的時間差,判斷當前的網絡狀態。這個往返時間的計算方法是:ping命令在發送ICMP報文時將當前的時間值存儲在ICMP報文中發出,當應答報文返回時,使用當前時間值減去存放在ICMP報文數據中存放發送請求的時間值來計算往返時間。ping返回接受到的數據報文字節大小、TTL值以及往返時間。
Unix系統在實現ping程序時是把ICMP報文中的標識符字段置成發送進程的 ID號。這樣 即使在同一台主機上同時運行了多個 ping程序實例,ping程序也可以識別出返回的信息。
三、ICMP 的應用--Traceroute
Traceroute 是用來偵測主機到目的主機之間所經路由情況的重要工具,也是最便利的工具。前面說到,盡管 ping 工具也可以進行 偵測,但是,因為 ip 頭的限制,ping 不能完全的記錄下所經過的路由器。所以 Traceroute 正好就填補了這個缺憾。
Traceroute 的原理是非常非常的有意思,它受到目的主機的 IP 后,首先給目的主機發送一個 TTL=1(還記得 TTL 是什么嗎?)的 UDP(后面就 知道 UDP 是什么了)數據包,而經過的第一個路由器收到這個數據包以后,就自動把 TTL 減1,而 TTL 變為0以后,路由 器就把這個包給拋棄了,並同時產生 一個主機不可達的 ICMP 數據報給主機。主機收到這個數據報以后再發一個 TTL=2的 UDP 數據報給目的主機,然后刺激第二個路由器給主機發 ICMP 數據 報。如此往復直到到達目的主機。這樣,traceroute 就拿到了所有的路由器 ip。從而避開了 ip 頭只能記錄有限路由 IP 的問題。
有人要問,我怎么知道 UDP 到沒到達目的主機呢?這就涉及一個技巧的問題,TCP 和 UDP 協議有一個端口號定義,而普通的網 絡程序只監控少數的幾個號碼較 小的端口,比如說80,比如說23,等等。而 traceroute 發送的是端口號>30000(真變態)的 UDP 報,所以到 達目的主機的時候,目的 主機只能發送一個端口不可達的 ICMP 數據報給主機。主機接到這個報告以后就知道,主機到了,所以,說 Traceroute 是一個騙子一點也不為過。
Traceroute 程序里面提供了一些很有用的選項,甚至包含了 IP 選路的選項,請察看 man 文檔來了解這些,這里就不贅述了。
四、python實現ping程序
方法一、使用python腳本調用系統中的ping命令簡單實現
import subprocess import shlex cmd = "ping -c 1 www.baidu.com" args = shlex.split(cmd) try: subprocess.check_call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print "baidu server is up!" except subprocess.CalledProcessError: print "Failed to get ping."
但是,很多情況下,系統中的ping可執行文件是不可用,或者無法訪問。這時,就需要使用一個純python的檢查腳本了。下面是ICMP ping的python實現腳本。這個腳本中定義了一個Pinger類,使用的一個校驗檢驗和的do_checksum()方法,一個發送ping數據報文的send_ping()方法,接受ping數據報文的receive_ping()方法和一個執行這個類的ping()主方法。下面是具體的代碼
# 定義Pinger類,初始化創建實例 import os import argparse import socket import struct import select import time ICMP_ECHO_REQUEST = 8 # Platform specific DEFAULT_TIMEOUT = 2 DEFAULT_COUNT = 4 class Pinger(object): """ Pings to a host -- the Pythonic way""" def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT): self.target_host = target_host self.count = count self.timeout = timeout
下面定義了do_checksum()方法,進行檢驗和的校驗,校驗方法如下:
- 把校驗和字段置為0
- 將icmp包(包括header和data)以16bit(2個字節)為一組,並將所有組相加(二進制求和)
- 若高16bit不為0,則將高16bit與低16bit反復相加,直到高16bit的值為0,從而獲得一個只有16bit長度的值
- 將此16bit值進行按位求反操作,將所得值替換到校驗和字段
def do_checksum(self, source_string): """ Verify the packet integritity """ sum = 0 max_count = (len(source_string)/2)*2 count = 0 while count < max_count: # 分割數據每兩比特(16bit)為一組 val = ord(source_string[count + 1])*256 + ord(source_string[count]) sum = sum + val sum = sum & 0xffffffff count = count + 2 if max_count<len(source_string): <span class="hljs-comment"># 如果數據長度為基數,則將最后一位單獨相加</span> sum = sum + ord(source_string[len(source_string) - 1]) sum = sum & 0xffffffff sum = (sum >> 16) + (sum & 0xffff) # 將高16位與低16位相加直到高16位為0 sum = sum + (sum >> 16) answer = ~sum answer = answer & 0xffff answer = answer >> 8 | (answer << 8 & 0xff00) return answer # 返回的是十進制整數
下面是接受ICMP類型碼為8的ICMP回應報文的方法。在未到達超時時間之前socket處於阻塞狀態一直等待響應,當有數據傳回時就接受響應,然后提取包含標識符ID的ICMP報文首部和包含發送時間值的ICMP內容部分,計算請求-響應的延時間隔。
def receive_ping(self, sock, ID, timeout): """ Receive ping from the socket. """ time_remaining = timeout while True: start_time = time.time() readable = select.select([sock], [], [], time_remaining) time_spent = (time.time() - start_time) if readable[0] == []: # Timeout return time_received = time.time() recv_packet, addr = sock.recvfrom(1024) icmp_header = recv_packet[20:28] type, code, checksum, packet_ID, sequence = struct.unpack( "bbHHh", icmp_header ) if packet_ID == ID: bytes_In_double = struct.calcsize("d") time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0] return time_received - time_sent time_remaining = time_remaining - time_spent if time_remaining <= 0: return
下面定義的send_ping()方法,獲取遠程主機的DNS主機名,然后使用struct模塊創建一個ICMP_ECHO_REQUEST數據包,將查驗請求的數據發送到目標主機。在此發送前也需要進行do_checksum()方法的校驗。
def send_ping(self, sock, ID): """ Send ping to the target host """ target_addr = socket.gethostbyname(self.target_host) my_checksum = 0 # Create a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) bytes_In_double = struct.calcsize("d") data = (192 - bytes_In_double) * "Q" data = struct.pack("d", time.time()) + data # Get the checksum on the data and the dummy header. my_checksum = self.do_checksum(header + data) header = struct.pack( "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 ) packet = header + data sock.sendto(packet, (target_addr, 1))
下面定義了一個ping_once()方法,向遠程主機發送一次查驗:將ICMP協議傳給socket()方法,創建一個原始的ICMP套接字。由於ping程序需要使用SOCK_RAW來構建數據包,所以需要root權限才能運行這個程序。因此,本程序需要使用root權限運行,下面的異常處理部分就是來負責未使用root運行時拋出的異常。
def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error, (errno, msg): if errno == 1: # Not superuser, so operation not permitted msg += "ICMP messages can only be sent from root user processes" raise socket.error(msg) except Exception, e: print "Exception: %s" %(e) my_ID = os.getpid() & 0xFFFF self.send_ping(sock, my_ID) delay = self.receive_ping(sock, my_ID, self.timeout) sock.close() return delay
下面這個ping()是執行這個類的主要方法。在for循環中調用ping_once()方法,發送ping數據報文,並返回結果。
def ping(self): """ Run the ping process """ for i in xrange(self.count): print "Ping to %s..." % self.target_host, try: delay = self.ping_once() except socket.gaierror, e: print "Ping failed. (socket error: '%s')" % e[1] break if delay == None: print "Ping failed. (timeout within %ssec.)" % self.timeout else: delay = delay * 1000 print "Get ping in %0.4fms" % delay if __name__ == '__main__': parser = argparse.ArgumentParser(description='Python ping') parser.add_argument('--target-host', action="store", dest="target_host", required=True) given_args = parser.parse_args() target_host = given_args.target_host pinger = Pinger(target_host=target_host) pinger.ping()