python根據socket模塊檢測端口及vip
#!/usr/bin/env python # -*- coding: utf8 -*- from .ping_helper import Pinger import socket def check_port(ip, port): '''socket檢測端口連通性''' if not port: return True s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(3) try: s.connect((ip, int(port))) s.shutdown(socket.SHUT_RDWR) return True except: return False def check_vip(target_host): '''檢測vip是否可以ping通''' pinger = Pinger(target_host=target_host) try: delay = pinger.ping_once() status = True if delay == None: status = False except socket.gaierror as e: status = False return status
#!/usr/bin/env python # -*- coding: utf8 -*- import socket, os,struct,select,time ICMP_ECHO_REQUEST = 8 # Platform specific DEFAULT_TIMEOUT = 2 DEFAULT_COUNT = 4 class Pinger(object): """ Pings to a host -- the Pythonic way""" def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT): self.target_host = target_host self.count = count self.timeout = timeout def do_checksum(self, source_string): """ Verify the packet integritity """ sum = 0 max_count = (len(source_string)/2)*2 count = 0 while count < max_count: val = source_string[count + 1]*256 + source_string[count] sum = sum + val sum = sum & 0xffffffff count = count + 2 if max_count<len(source_string): sum = sum + ord(source_string[len(source_string) - 1]) sum = sum & 0xffffffff sum = (sum >> 16) + (sum & 0xffff) sum = sum + (sum >> 16) answer = ~sum answer = answer & 0xffff answer = answer >> 8 | (answer << 8 & 0xff00) return answer def receive_pong(self, sock, ID, timeout): """ Receive ping from the socket. """ time_remaining = timeout while True: start_time = time.time() readable = select.select([sock], [], [], time_remaining) time_spent = (time.time() - start_time) if readable[0] == []: # Timeout return time_received = time.time() recv_packet, addr = sock.recvfrom(1024) icmp_header = recv_packet[20:28] type, code, checksum, packet_ID, sequence = struct.unpack( "bbHHh", icmp_header ) if packet_ID == ID: bytes_In_double = struct.calcsize("d") time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0] return time_received - time_sent time_remaining = time_remaining - time_spent if time_remaining <= 0: return def send_ping(self, sock, ID): """ Send ping to the target host """ target_addr = socket.gethostbyname(self.target_host) my_checksum = 0 # Create a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) bytes_In_double = struct.calcsize("d") data = (192 - bytes_In_double) * "Q" data = struct.pack("d", time.time()) + bytes(data.encode('utf-8')) # Get the checksum on the data and the dummy header. my_checksum = self.do_checksum(header + data) header = struct.pack( "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 ) packet = header + data sock.sendto(packet, (target_addr, 1)) def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error as e: if e.errno == 1: # Not superuser, so operation not permitted e.msg += "ICMP messages can only be sent from root user processes" raise socket.error(e.msg) except Exception as e: print ("Exception: %s" %(e)) my_ID = os.getpid() & 0xFFFF self.send_ping(sock, my_ID) delay = self.receive_pong(sock, my_ID, self.timeout) sock.close() return delay def ping(self): """ Run the ping process """ for i in range(self.count): try: delay = self.ping_once() except socket.gaierror as e: print ("Ping failed. (socket error: '%s')" % e[1]) break if delay == None: print ("Ping failed. (timeout within %ssec.)" % self.timeout) else: delay = delay * 1000 print ("Get pong in %0.4fms" % delay)
注意
''' 使用使用shutdown來關閉socket的功能 SHUT_RDWR:關閉讀寫,即不可以使用send/write/recv/read SHUT_RD:關閉讀,即不可以使用recv/read SHUT_WR:關閉寫,即不可以使用send/write ''' import socket s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(("localhost",50000)) s.sendall("this is shutdown test" + "\r\n") s.send("this is shutdown test") s.shutdown(socket.SHUT_RDWR) print(socket.SHUT_RDWR) print(socket.SHUT_RD) print(socket.SHUT_WR) s.close()
close方法可以釋放一個連接的資源,但是不是立即釋放,如果想立即釋放,那么在close之前使用shutdown方法
shut_rd() -------關閉接受消息通道
shut_wr()--------關閉發送消息通道
shut_rdwr()-------連個通道都關閉
使用:在close()之前加上shutdown(num)即可 [shut_rd(), shut_wr(), shut_rdwr()分別代表num 為0 1 2 ]