Socket
一、概述
socket通常也稱作"套接字",用於描述IP地址和端口,是一個通信鏈的句柄,應用程序通常通過"套接字"向網絡發出請求或者應答網絡請求。
socket起源於Unix,而Unix/Linux基本哲學之一就是“一切皆文件”,對於文件用【打開】【讀寫】【關閉】模式來操作。socket就是該模式的一個實現,socket即是一種特殊的文件,一些socket函數就是對其進行的操作(讀/寫IO、打開、關閉)
socket和file的區別:
- file模塊是針對某個指定文件進行【打開】【讀寫】【關閉】
- socket模塊是針對 服務器端 和 客戶端Socket 進行【打開】【讀寫】【關閉】

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.bind(ip_port) sk.listen(5) while True: print 'server waiting...' conn,addr = sk.accept() client_data = conn.recv(1024) print client_data conn.sendall('不要回答,不要回答,不要回答') conn.close() socket server

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.connect(ip_port) sk.sendall('請求占領地球') server_reply = sk.recv(1024) print server_reply sk.close() socket client
WEB服務應用:

#!/usr/bin/env python #coding:utf-8 import socket def handle_request(client): buf = client.recv(1024) client.send("HTTP/1.1 200 OK\r\n\r\n") client.send("Hello, World") def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost',8080)) sock.listen(5) while True: connection, address = sock.accept() handle_request(connection) connection.close() if __name__ == '__main__': main()
二、解釋
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
參數一:地址簇 socket.AF_INET IPv4(默認) socket.AF_INET6 IPv6 socket.AF_UNIX 只能夠用於單一的Unix系統進程間通信 參數二:類型 socket.SOCK_STREAM 流式socket , for TCP (默認) socket.SOCK_DGRAM 數據報式socket , for UDP socket.SOCK_RAW 原始套接字,普通的套接字無法處理ICMP、IGMP等網絡報文,而SOCK_RAW可以;其次,SOCK_RAW也可以處理特殊的IPv4報文;此外,利用原始套接字,可以通過IP_HDRINCL套接字選項由用戶構造IP頭。 socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在需要執行某些特殊操作時使用,如發送ICMP報文。SOCK_RAM通常僅限於高級用戶或管理員運行的程序使用。 socket.SOCK_SEQPACKET 可靠的連續數據包服務 參數三:協議 0 (默認)與特定的地址家族相關的協議,如果是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議

import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) sk.bind(ip_port) while True: data = sk.recv(1024) print data import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) while True: inp = raw_input('數據:').strip() if inp == 'exit': break sk.sendto(inp,ip_port) sk.close()
sk.bind(address)
s.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。
sk.listen(backlog)
開始監聽傳入連接。backlog指定在拒絕連接之前,可以掛起的最大連接數量。
backlog等於5,表示內核已經接到了連接請求,但服務器還沒有調用accept進行處理的連接個數最大為5
這個值不能無限大,因為要在內核中維護連接隊列
sk.setblocking(bool)
是否阻塞(默認True),如果設置False,那么accept和recv時一旦無數據,則報錯。
sk.accept()
接受連接並返回(conn,address),其中conn是新的套接字對象,可以用來接收和發送數據。address是連接客戶端的地址。
接收TCP 客戶的連接(阻塞式)等待連接的到來
sk.connect(address)
連接到address處的套接字。一般,address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤。
sk.connect_ex(address)
同上,只不過會有返回值,連接成功時返回 0 ,連接失敗時候返回編碼,例如:10061
sk.close()
關閉套接字
sk.recv(bufsize[,flag])
接受套接字的數據。數據以字符串形式返回,bufsize指定最多可以接收的數量。flag提供有關消息的其他信息,通常可以忽略。
sk.recvfrom(bufsize[.flag])
與recv()類似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。
sk.send(string[,flag])
將string中的數據發送到連接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。
sk.sendall(string[,flag])
將string中的數據發送到連接的套接字,但在返回之前會嘗試發送所有數據。成功返回None,失敗則拋出異常。
sk.sendto(string[,flag],address)
將數據發送到套接字,address是形式為(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。
sk.settimeout(timeout)
設置套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛創建套接字時設置,因為它們可能用於連接的操作(如 client 連接最多等待5s )
sk.getpeername()
返回連接套接字的遠程地址。返回值通常是元組(ipaddr,port)。
sk.getsockname()
返回套接字自己的地址。通常是一個元組(ipaddr,port)
sk.fileno()
套接字的文件描述符

import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) sk.bind(ip_port) while True: data = sk.recv(1024) print data import socket ip_port = ('127.0.0.1',9999) sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0) while True: inp = raw_input('數據:').strip() if inp == 'exit': break sk.sendto(inp,ip_port) sk.close() UDP Demo
三、實例
智能機器人

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind(ip_port) sk.listen(5) while True: conn,address = sk.accept() conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('通過可能會被錄音.balabala一大推') else: conn.sendall('請重新輸入.') conn.close() 服務端

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8005) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客戶端
SocketServer模塊
一、使用以源碼剖析
對於默認Socket服務端處理客戶端請求時,按照阻塞方式依次處理請求,SocketServer實現同事處理多個請求。

#!/usr/bin/env python # -*- coding:utf-8 -*- import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): # print self.request,self.client_address,self.server conn = self.request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('通過可能會被錄音.balabala一大推') else: conn.sendall('請重新輸入.') if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer) server.serve_forever()

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客戶端
從剖析上述源碼執行流程,對源碼精簡如下:
import socket import threading import select def process(request, client_address): print request,client_address conn = request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') flag = True while flag: data = conn.recv(1024) if data == 'exit': flag = False elif data == '0': conn.sendall('通過可能會被錄音.balabala一大推') else: conn.sendall('請重新輸入.') sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sk.bind(('127.0.0.1',8002)) sk.listen(5) while True: r, w, e = select.select([sk,],[],[],1) print 'looping' if sk in r: print 'get request' request, client_address = sk.accept() t = threading.Thread(target=process, args=(request, client_address)) t.daemon = False t.start() sk.close()
如精簡代碼可以看出,SocketServer之所以可以同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端為每一個客戶端創建一個線程,當前線程用來處理對應客戶端的請求,所以,可以支持同時n個客戶端鏈接(長連接)。

#!/usr/bin/env python #coding:utf-8 import SocketServer import os class MyServer(SocketServer.BaseRequestHandler): def handle(self): base_path = 'G:/temp' conn = self.request print 'connected...' while True: pre_data = conn.recv(1024) #獲取請求方法、文件名、文件大小 cmd,file_name,file_size = pre_data.split('|') # 防止粘包,給客戶端發送一個信號。 conn.sendall('nothing') #已經接收文件的大小 recv_size = 0 #上傳文件路徑拼接 file_dir = os.path.join(base_path,file_name) f = file(file_dir,'wb') Flag = True while Flag: #未上傳完畢, if int(file_size)>recv_size: #最多接收1024,可能接收的小於1024 data = conn.recv(1024) recv_size+=len(data) #寫入文件 f.write(data) #上傳完畢,則退出循環 else: recv_size = 0 Flag = False print 'upload successed.' f.close() instance = SocketServer.ThreadingTCPServer(('127.0.0.1',9999),MyServer) instance.serve_forever() FTP上傳文件(服務端)

#!/usr/bin/env python #coding:utf-8 import socket import sys import os ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.connect(ip_port) container = {'key':'','data':''} while True: # 客戶端輸入要上傳文件的路徑 input = raw_input('path:') # 根據路徑獲取文件名 file_name = os.path.basename(path) # 獲取文件大小 file_size=os.stat(path).st_size # 發送文件名 和 文件大小 sk.send(file_name+'|'+str(file_size)) # 為了防止粘包,將文件名和大小發送過去之后,等待服務端收到,直到從服務端接受一個信號(說明服務端已經收到) sk.recv(1024) send_size = 0 f= file(path,'rb') Flag = True while Flag: if send_size + 1024 >file_size: data = f.read(file_size-send_size) Flag = False else: data = f.read(1024) send_size+=1024 sk.send(data) f.close() sk.close() FTP上傳文件(客戶端)
對於大文件處理:
send只會向緩沖區寫一次,傳入的內容不一定能發完,所以,返回值是實際發送的大小。
例如:
1023M = send(1g數據) 那么實際是發送了 1023M,其他 1M 就是漏發了
sendall,內部調用send會一直向緩沖區寫,直到文件全部寫完。
例如:
sendall(1g數據)
第一次:
send(1023M)
第二次:
send(1M)
==========
發送大文件時候,不可能全部讀1G內存,需要open文件時,一點一點讀,然后再發。
# 大文件大小
file_size=os.stat(文件路徑).st_size
# 打開大文件
f = file(文件路徑,'rb')
# 已經發送的數據
send_size = 0
while Flag: # 大文件只剩下 不到 1024 字節,其他已經被發送。 if send_size + 1024 > file_size: # 從大文件中讀取小於 1024字節,可能是 10字節... data = f.read(file_size-send_size) Flag = False else: # 從大文件中讀取 1024 字節 data = f.read(1024) # 記錄已經發送了多少字節 send_size += 1024 # 將大文件中的數據,分批發送到緩沖區,每次最多發 1024 字節 sk.sendall(data)
二、select
Linux中的 select,poll,epoll 都是IO多路復用的機制。
I/O多路復用指:通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。
select select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。 select目前幾乎在所有的平台上支持,其良好跨平台支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。 select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。 另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。 poll poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。 poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增加而線性增大。 另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。 epoll 直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。 epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。 epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。 另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
Python select 用於監聽多個文件描述符:

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket import threading import select def process(request, client_address): print request,client_address conn = request conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') flag = True while flag: data = conn.recv(1024) if data == 'exit': flag = False elif data == '0': conn.sendall('通過可能會被錄音.balabala一大推') else: conn.sendall('請重新輸入.') s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s1.bind(('127.0.0.1',8020)) s1.listen(5) s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s2.bind(('127.0.0.1',8021)) s2.listen(5) while True: r, w, e = select.select([s1,s2,],[],[],1) print 'looping' for s in r: print 'get request' request, client_address = s.accept() t = threading.Thread(target=process, args=(request, client_address)) t.daemon = False t.start() s1.close() s2.close() 服務端

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8020) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客戶端:8020

#!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ('127.0.0.1',8021) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input:') sk.sendall(inp) if inp == 'exit': break sk.close() 客戶端:8021
三、threading
問答:
- 應用程序、進程、線程關系?
- 為什么要使用多個CPU ?
- 為什么要使用多線程?
- 為什么要使用多進程?
- java和C#中的多線程和python多線程的區別?
- Python GIL?
- 線程和進程的選擇:計算密集型和IO密集型程序。(IO操作不占用CPU)
1、Python線程
Threading用於提供線程相關的操作,線程是應用程序中工作的最小單元。
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def show(arg): time.sleep(1) print 'thread'+str(arg) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'
上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。
更多方法:
- start 線程准備就緒,等待CPU調度
- setName 為線程設置名稱
- getName 獲取線程名稱
- setDaemon 設置為后台線程或前台線程(默認)
如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止
如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止 - join 逐個執行每個線程,執行完畢后繼續往下執行...
- run 線程被cpu調度后執行此方法
2、線程鎖
由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,CPU接着執行其他線程。所以,可能出現如下問題:

#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time gl_num = 0 def show(arg): global gl_num time.sleep(1) gl_num +=1 print gl_num for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop' 未使用線程鎖
#!/usr/bin/env python #coding:utf-8 import threading import time gl_num = 0 lock = threading.RLock() def Func(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()
擴展:進程
1、創建多進程程序
from multiprocessing import Process import threading import time def foo(i): print 'say hi',i for i in range(10): p = Process(target=foo,args=(i,)) p.start()
注意:由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。
2、進程共享數據
進程各自持有一份數據,默認無法共享數據
#!/usr/bin/env python #coding:utf-8 from multiprocessing import Process from multiprocessing import Manager import time li = [] def foo(i): li.append(i) print 'say hi',li for i in range(10): p = Process(target=foo,args=(i,)) p.start() print 'ending',li

#方法一,Array from multiprocessing import Process,Array temp = Array('i', [11,22,33,44]) def Foo(i): temp[i] = 100+i for item in temp: print i,'----->',item for i in range(2): p = Process(target=Foo,args=(i,)) p.start() p.join() #方法二:manage.dict()共享數據 from multiprocessing import Process,Manager manage = Manager() dic = manage.dict() def Foo(i): dic[i] = 100+i print dic.values() for i in range(2): p = Process(target=Foo,args=(i,)) p.start() p.join() 進程間共享數據
3、進程池
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print arg pool = Pool(5) #print pool.apply(Foo,(1,)) #print pool.apply_async(func =Foo, args=(1,)).get() for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) print 'end' pool.close() pool.join()
更多請看:http://www.cnblogs.com/wupeiqi/