Python3網絡學習案例一:Ping詳解


1. 使用Ping做什么

ping用於確定本地主機是否能與另一台主機成功交換(發送與接收)數據包,再根據返回的信息,就可以推斷TCP/IP參數是否設置正確,以及運行是否正常、網絡是否通暢等。

2. 效果

CMD命令:

Python程序:

3. 在驗證兩台主機是否能正常聯通時做了什么

驗證的過程就像打電話一樣,我們如果要知道自己能否給另一個人交流就給他打一個電話,如果你聽到了對方的聲音那就是打通了,否則就是失敗。但是無論成功與否我們都有一個這樣的過程:

通過電話線(或是無線,即手機)給對方發送一個請求通話的消息,無論對方是否接通都會返回一條應答的消息。

這里我們發送或接收的消息就是報文(IP報文),其中,發送的請求通話消息我們稱為“請求回顯”(Ping請求),而接收到的應答消息稱為“回顯應答”(Ping應答)。

我們通過“回顯應答”類型報文中的內容來判斷是否能夠正常連接(交換數據包)。

而在Ping過程中需要進行四次(並不固定,一般默認四次)請求,這個可以看做是連續打四次電話。

4. IP報文中有什么

由於IP協議並不是一個可靠的協議,它不保證數據被成功送達,那么,如何才能保證數據的可靠送達呢? 這里就需要使用到一個重要的協議模塊ICMP(網絡控制報文)協議。它傳遞差錯報文以及其他需要注意的信息,經常供IP層或更高層協議(TCP或UDP)使用。所以它經常被認為是IP層的一個組成部分。它在IP數據報文中的封裝如下:

 

 

 在這里我們不關心IP首部內容(只需要記住它有20bytes即可),而ICMP報文的類型正如上述所說有好多種,我們根據報文的type和code來判斷ICMP報文的類型(當然這里我們也只關系回顯請求和回顯應答):

ICMP報文 = ICMP報頭+Data(這個消息可以自定義,當然每個不同類型的報文的data都會有所不同)

(ICMP_Packet = ICMPHeader + Data)

 

5. ICMP報文中的各個數據是什么,有什么用

(1)type和code

type和code決定了ICMP報文是什么類型(當然這里我們也只關系回顯請求和回顯應答)。

以下是不同type(ICMP類型)和code(編碼)對應的不同ICMP報文類型,不用全部記住,在這里我們只要知道:

type  = 8, code = 0 表示 回顯請求

type = 0, code = 0 表示 回顯應答

(2)Checksum(校驗和)

用於檢驗數據包是否正確。

(3)ID(標識符)

每一個不同的進程都有一個ID作為標識,即每次運行這個Ping程序都會獲得一個進程ID,我們通過這個比較發送進程ID和回顯應答中的ID,如果一樣則說明是同一個Ping程序

(4)sequence(序列號)

每發送一次消息,就有一個序列號,第一個就是1,第二次就是2等等(也可以從“0”開始,可以自己定義),按照收發四次來講就是0、1、2、3

(5)Data(數據)

你可以將你自己想要的其他信息放到這里,比如發送消息的時間,這樣就可以計算延遲了

 6. 程序框架及具體代碼

首先,我們知道要做到兩台host通信需要借助socket,以下內容也是在此基礎上展開的

(1)全局變量

設置了幾個全局變量,包括:回顯請求(8),回顯應答(0),Ping標識符(ID),Ping序列號(sequence)

ICMP_ECHO_REQUEST = 8  # ICMP type code for echo request messages
ICMP_ECHO_REPLY = 0  # ICMP type code for echo reply messages
ID = 0  # ID of icmp_header
SEQUENCE = 0  # sequence of ping_request_msg

(2)main()

首先我們要有這個程序的入口,俗稱“主函數”

在這里我們調用Ping(host, timeout = 1)【自定義】函數正式開始這個過程

ping("baidu.com")

(3)Ping(host, timeout = 1)

有兩個參數(host,timeout):

host是要嘗試連接的主機的域名或者直接寫IP地址也沒問題

timeout是設置的超時時間,當等待時間超過1s時我們認為連接超時,即連接失敗(表示無法正常交流)

內容簡介:

定義變量---在Ping函數中我們設置了幾個變量用於表示發送、成功接收、丟失包的數量 和 最長時延、最短時延、總時延

調用doOnePing【自定義】函數---這個函數包括對發送和接收報文消息的函數,返回值就是時延

此外---當然,我們對doOnePing函數調用了四次

def ping(host, timeout=1):
    send = 0
    lost = 0
    receive = 0
    maxTime = 0
    minTime = 1000
    sumTime = 0
    # 1. Look up hostname, resolving it to an IP address
    desIp = socket.gethostbyname(host)
    global ID
    ID = os.getpid()
    for i in range(0, 4):
        global SEQUENCE
        SEQUENCE = i
        # 2. Call doOnePing function, approximately every second
        delay = doOnePing(desIp, timeout) * 1000
        send += 1
        if delay > 0:
            receive += 1
            if maxTime < delay:
                maxTime = delay
            if minTime > delay:
                minTime = delay
            sumTime += delay
            # 3. Print out the returned delay
            print("Receive from: " + str(desIp) + ", delay = " + str(int(delay)) + "ms")
        else:
            lost += 1
            print("Fail to connect.")
        time.sleep(1)
    # 4. Continue this process until stopped
    avgTime = sumTime / receive
    recvRate = receive / send * 100.0
    print("\nSend: " + str(send) + ", success: " + str(receive) + ", lost: " + str(lost) +
          ", rate of success: " + str(recvRate) + "%.")
    print(
        "MaxTime = " + str(int(maxTime)) + "ms, MinTime = " + str(int(minTime)) + "ms, AvgTime = " + str(int(avgTime)))

(4) doOnePing(destination, timeout)

有兩個參數(destination,timeout):

destination就是獲得的目標主機的IP地址

timeout就是超時時間(自定義的)

def doOnePing(destinationAddress, timeout):
    # 1. Create ICMP socket
    icmpName = socket.getprotobyname('icmp')
    icmp_Socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmpName)
    # 2. Call sendOnePing function
    sendOnePing(icmp_Socket, destinationAddress, ID)
    # 3. Call receiveOnePing function
    totalDelay = receiveOnePing(icmp_Socket, destinationAddress, ID, timeout)
    # 4. Close ICMP socket
    icmp_Socket.close()
    # 5. Return total network delay
    return totalDelay

內容簡介:

創建socket---正如上述所說,我們通過socket來做這個通信,由於是使用ICMP協議,我們需要提前獲取這個協議(根據協議名)

調用sendOnePing---發送一條回顯請求給目標主機

調用receiveOnePing---接收回顯應答函數,返回值為時延

關閉socket

返回在調用receiveOnePing函數后獲得的時延

(5)sendOnePing(icmpSocket, destination, ID)

參數有三個,用於做什么很顯然了,這里不再贅述

def sendOnePing(icmpSocket, destinationAddress, ID):
    icmp_checksum = 0
    # 1. Build ICMP header
    icmp_header = struct.pack('!bbHHh', ICMP_ECHO_REQUEST, 0, icmp_checksum, ID, SEQUENCE)
    time_send = struct.pack('!d', time.time())
    # 2. Checksum ICMP packet using given function
    icmp_checksum = checksum(icmp_header + time_send)
    # 3. Insert checksum into packet
    icmp_header = struct.pack('!bbHHh', ICMP_ECHO_REQUEST, 0, icmp_checksum, ID, SEQUENCE)
    # 4. Send packet using socket
    icmp_packet = icmp_header + time_send
    icmpSocket.sendto(icmp_packet, (destinationAddress, 80))
    # 5. Record time of sending

內容簡介:

這個函數的目的就在於向目標主機發送一條回顯請求報文,而這個報文中應該包括:ICMP_Header+Data(這里是發送的時間,用於計算時延)

其中用到的struct.pack函數請自行百度,如果不想知道細節的話只需要記住pack的第一個參數規定了其編碼格式(有各種字母符號的映射,可以自己編排),而之后的參數都是根據這個編碼格式進行打包的數據

在這個函數中調用了校驗和程序,這個原理也自行百度(很容易找到),不做深究的話也可以直接拿來用

函數編寫步驟:

* 將ICMP_Header的那幾個打包成為數據包(其中的校驗和只是為了Header的完整性而隨意設置的,占位用)

* 用校驗和程序檢測數據包的完整性

* 加入真正的校驗和(這里“真正”似乎不太正確,但暫時想不出有什么其他的詞匯形容了)重新打包

* 將發送的時間(time.time())編碼打包,作為ICMP的Data

* ICMP_Packet = ICMP-Header + Data

* 借助socket發送報文 

* 此外,sendto(msg, (IP, Port))中的Port為80是因為Web默認使用端口號80(還有許多網絡應用有特定的端口號——方便訪問,一般靠前的port都是分配好的,如果是自己構建的程序則可以申請使用空閑的port)

(6)receiveOnePing(icmpSocket, destination, ID, timeout)

這里參數不再介紹

def receiveOnePing(icmpSocket, destinationAddress, ID, timeout):
    # 1. Wait for the socket to receive a reply
    timeBeginReceive = time.time()
    whatReady = select.select([icmpSocket], [], [], timeout)
    timeInRecev = time.time() - timeBeginReceive
    if not whatReady[0]:
        print("none")
        return -1
    # 2. Once received, record time of receipt, otherwise, handle a timeout
    recPacket, addr = icmpSocket.recvfrom(1024)
    timeReceived = time.time()
    # 3. Compare the time of receipt to time of sending, producing the total network delay
    byte_in_double = struct.calcsize("!d")
    timeSent = struct.unpack("!d", recPacket[28: 28 + byte_in_double])[0]
    totalDelay = timeReceived - timeSent
    # 4. Unpack the packet header for useful information, including the ID
    rec_header = recPacket[20:28]
    replyType, replyCode, replyCkecksum, replyId, replySequence = struct.unpack('!bbHHh', rec_header)
    # 5. Check that the ID matches between the request and reply
    if ID == replyId and replyType == ICMP_ECHO_REPLY:
        # 6. Return total network delay
        return totalDelay
    if timeInRecev > timeout:
        print('overtime')
        return -1    

內容簡介:

在這里我們統一規定如果成功接收了就返回delay(時延),如果沒有就返回-1(也可以是其他的自定義數值,只要能區分就可以)

select.select函數自行了解:whatReady[0] 用於判斷是否接收到回顯應答,返回true/false,應答不為空就是成功(繼續計算時延),否則就是失敗(直接返回-1跳出)

其中的timeInRecev用於判斷超時

我們在一開始就說了,IPHeader為20bytes,所以ICMPHeader應該從20開始截取(str[20:28]為ICMP_Header部分)

獲取發送時間的那一部分中涉及到了struct.calcsize()函數,可以看這個鏈接中的內容做基本了解:https://blog.51cto.com/firefish/112690:因為unpack函數解碼的內容至少要是8bytes,這個函數可以考慮為占位用的

7. 完整代碼

 

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
import struct
import time
import select
import socket

ICMP_ECHO_REQUEST = 8  # ICMP type code for echo request messages
ICMP_ECHO_REPLY = 0  # ICMP type code for echo reply messages
ICMP_Type_Unreachable = 11  # unacceptable host
ICMP_Type_Overtime = 3  # request overtime
ID = 0  # ID of icmp_header
SEQUENCE = 0  # sequence of ping_request_msg


def checksum(strings):
    csum = 0
    countTo = (len(strings) / 2) * 2
    count = 0
    while count < countTo:
        thisVal = strings[count + 1] * 256 + strings[count]
        csum = csum + thisVal
        csum = csum & 0xffffffff
        count = count + 2
    if countTo < len(strings):
        csum = csum + strings[len(strings) - 1]
        csum = csum & 0xffffffff
    csum = (csum >> 16) + (csum & 0xffff)
    csum = csum + (csum >> 16)
    answer = ~csum
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer


def receiveOnePing(icmpSocket, ID, timeout):
    # 1. Wait for the socket to receive a reply
    timeBeginReceive = time.time()
    whatReady = select.select([icmpSocket], [], [], timeout)
    timeInRecev = time.time() - timeBeginReceive
    if not whatReady[0]:
        return -1
    timeReceived = time.time()
    # 2. Once received, record time of receipt, otherwise, handle a timeout
    recPacket, addr = icmpSocket.recvfrom(1024)
    # 3. Compare the time of receipt to time of sending, producing the total network delay
    byte_in_double = struct.calcsize("!d")
    timeSent = struct.unpack("!d", recPacket[28: 28 + byte_in_double])[0]
    totalDelay = timeReceived - timeSent
    # 4. Unpack the packet header for useful information, including the ID
    rec_header = recPacket[20:28]
    replyType, replyCode, replyCkecksum, replyId, replySequence = struct.unpack('!bbHHh', rec_header)
    # 5. Check that the ID matches between the request and reply
    if ID == replyId and replyType == ICMP_ECHO_REPLY:
        # 6. Return total network delay
        return totalDelay
    elif timeInRecev > timeout or replyType == ICMP_Type_Overtime:
        return -3  # ttl overtime/timeout
    elif replyType == ICMP_Type_Unreachable:
        return -11  # unreachable
    else:
        print("request over time")
        return -1


def sendOnePing(icmpSocket, destinationAddress, ID):
    icmp_checksum = 0
    # 1. Build ICMP header
    icmp_header = struct.pack('!bbHHh', ICMP_ECHO_REQUEST, 0, icmp_checksum, ID, SEQUENCE)
    time_send = struct.pack('!d', time.time())
    # 2. Checksum ICMP packet using given function
    icmp_checksum = checksum(icmp_header + time_send)
    # 3. Insert checksum into packet
    icmp_header = struct.pack('!bbHHh', ICMP_ECHO_REQUEST, 0, icmp_checksum, ID, SEQUENCE)
    # 4. Send packet using socket
    icmp_packet = icmp_header + time_send
    icmpSocket.sendto(icmp_packet, (destinationAddress, 80))


def doOnePing(destinationAddress, timeout):
    # 1. Create ICMP socket
    icmpName = socket.getprotobyname('icmp')
    icmp_Socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmpName)
    # 2. Call sendOnePing function
    sendOnePing(icmp_Socket, destinationAddress, ID)
    # 3. Call receiveOnePing function
    totalDelay = receiveOnePing(icmp_Socket, ID, timeout)
    # 4. Close ICMP socket
    icmp_Socket.close()
    # 5. Return total network delay
    return totalDelay


def ping(host, count, timeout):
    send = 0
    lost = 0
    receive = 0
    maxTime = 0
    minTime = 1000
    sumTime = 0
    # 1. Look up hostname, resolving it to an IP address
    desIp = socket.gethostbyname(host)
    global ID
    ID = os.getpid()
    for i in range(0, count):
        global SEQUENCE
        SEQUENCE = i
        # 2. Call doOnePing function, approximately every second
        delay = doOnePing(desIp, timeout) * 1000
        send += 1
        if delay > 0:
            receive += 1
            if maxTime < delay:
                maxTime = delay
            if minTime > delay:
                minTime = delay
            sumTime += delay
            # 3. Print out the returned delay
            print("Receive from: " + str(desIp) + ", delay = " + str(int(delay)) + "ms")
        else:
            lost += 1
            print("Fail to connect. ", end="")
            if delay == -11:
                # type = 11, target unreachable
                print("Target net/host/port/protocol is unreachable.")
            elif delay == -3:
                # type = 3, ttl overtime
                print("Request overtime.")
            else:
                # otherwise, overtime
                print("Request overtime.")
        time.sleep(1)
    # 4. Continue this process until stopped
    if receive != 0:
        avgTime = sumTime / receive
        recvRate = receive / send * 100.0
        print(
            "\nSend: {0}, success: {1}, lost: {2}, rate of success: {3}%.".format(send, receive, lost, recvRate))
        print(
            "MaxTime = {0}ms, MinTime = {1}ms, AvgTime = {2}ms".format(int(maxTime), int(minTime), int(avgTime)))
    else:
        print("\nSend: {0}, success: {1}, lost: {2}, rate of success: 0.0%".format(send, receive, lost))


if __name__ == '__main__':
    while True:
        try:
            hostName = input("Input ip/name of the host you want: ")
            count = int(input("How many times you want to detect: "))
            timeout = int(input("Input timeout: "))
            ping(hostName, count, timeout)
            break
        except Exception as e:
            print(e)
            continue

 

 

8. 寫在最后

寫的時候翻了不少文章,但是都找不到了,只有這個離寫這篇文章的時間最近,就貼上來

參考:https://www.cnblogs.com/JetpropelledSnake/p/9177770.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM