Python 之網絡式編程


一 客戶端/服務器架構

即C/S架構,包括

1、硬件C/S架構(打印機)

2、軟件B/S架構(web服務)

C/S架構與Socket的關系:

我們學習Socket就是為了完成C/S的開發

二 OSI七層

引子:  

  計算機組成原理:硬件、操作系統、應用軟件三者組成。

  具備以上條件后,計算機就可以工作,如果你要和別人一起玩,那你就需要上網了。互聯網的核心就是由一堆協議組成,協議就是標准。

 

為什么學習Socket之前要先了解互聯網協議?

  1、C/S架構的軟件(應用軟件屬於應用層)是基於網絡進行通信的

  2、網絡的核心即一堆協議,協議即標准,想開發一款基於網絡通信的軟件,就必須遵循這些標准

OSI七層:

三 Socket層

四 Socket是什么 

  Socket是應用層與TCP/IP協議族通信的中間軟件抽象層,它是一組接口,在設計模式中,Socket其實就是一個門面模式,它把負責的TCP/IP協議族隱藏在Socket接口后面,對用戶來說,一組簡單的接口就是全部,讓Socket去組織數據,以符合指定的協議。

  所以,我們無需深入學習理解TCP/UDP協議,Socket已經為我們封裝好了,我們只需要遵循Socket的規定去編程,寫出的程序自然就是遵循TCP/UDP標准的。

五 套接字發展史及分類

  套接字起源於20世紀70年代加利福尼亞大學伯克利分校版本的Unix,即人們所說的BSD Unix。因此,有時人們也把套接字成為“伯克利套接字”或“BSD套接字”。一開始,套接字被設計用在一台主機上多個應用程序之間的通信,這也被稱作進程間通許或IPC。套接字有兩種(或者稱為兩個種族),分別是基於文件型和就網絡型。

基於文件類型的套接字家族

套接字家族的名字:AF_UNIX

  UNIX一切皆文件,基於文件的套接字調用的就是底層的文件系統來取數據,兩個套接字進程運行在同一機器上,可以通過訪問同一文件系統間接完成通信。

基於網絡類型的套接字家族

套接字家族的名字:AF_INET

  還有AF_INET6被用於ipv6,還有一些其他的地址家族,不過,他們要么是只用於某個平台,要么就是已經被廢棄,或者是很少被使用,或者是根本沒有實現,所有地址家族中,AF_INET是使用最廣泛的一個,Python支持很多地址家族,但是由於我們只關心網絡編程,所以大部分時候我們只使用AF_INET(AF:Address Family;INET:Internet)

六 套接字工作流程

  生活中,你要打電話給一個朋友,先撥號,朋友聽到電話鈴聲響后接打電話,這時你和你的朋友就建立起了連接,就可以講話了,等交流結束,掛斷電話結束此次通話。

 利用Socket模擬生活中打電話:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import socket
 4 
 5 phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 買手機;socket.AF_INET:基於網絡協議;socket.SOCK_STREAM:基於流的TCP協議
 6 phone.bind(('127.0.0.1', 8080))  # 綁定手機卡;元祖形式,ip地址+端口
 7 # 注:服務器的ip地址寫本機的ip地址
 8 phone.listen(5)  # 開機
 9 conn, addr = phone.accept()  # 等電話
10 msg = conn.recv(1024)  # 收消息
11 print('客戶端發來的消息是:', msg)
12 conn.send(msg.upper())  # 發消息
13 conn.close()
14 phone.close()
服務端
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 import socket
4 phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 買手機
5 phone.connect(('127.0.0.1', 8080))  # 撥電話
6 # 注:客戶端的ip地址,寫服務器端的ip地址
7 phone.send('hello'.encode('utf-8'))  # 發消息
8 data = phone.recv(1024)  # 收消息
9 print('收到服務端發來的消息', data)
客戶端

服務器和客戶端無限循環發送消息:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 # import socket
 4 from socket import *
 5 import time
 6 ip_port = ('127.0.0.1', 8080)
 7 back_log = 5
 8 buffer_size = 1024
 9 tcp_server = socket(AF_INET, SOCK_STREAM)
10 tcp_server.bind(ip_port)
11 tcp_server.listen(back_log)
12 print('服務端開始運行')
13 conn, addr = tcp_server.accept()  # 服務器阻塞
14 print('雙向鏈接', conn)
15 print('客戶端地址', addr)
16 while True:
17     time.sleep(1)
18     print('[%s]' % time.time())
19     data = conn.recv(buffer_size)
20     print('客戶端發來的消息是', data.decode('utf-8'))
21     conn.send(data.upper())
22 conn.close()
23 tcp_server.close()
服務器端
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 import time
 5 ip_port = ('127.0.0.1', 8080)
 6 back_log = 5
 7 buffer_size = 1024
 8 tcp_client = socket(AF_INET, SOCK_STREAM)
 9 tcp_client.connect(ip_port)
10 while True:
11     time.sleep(2)
12     print('[%s]' % time.time())
13     msg = input('>>:').strip()
14     tcp_client.send(msg.encode('utf-8'))
15     print('客戶端已經發送消息')
16     data = tcp_client.recv(buffer_size)
17     print('收到服務端發來消息', data.decode('utf-8'))
18 tcp_client.close()
客戶端

Socket收發消息原理圖:

若重啟服務端時,可能會遇到:Address already in use;這個是由於服務端扔然存在四次揮手的time_wait狀態占用地址

解決方案:

1 # 加入一條socket配置,重用ip和端口
2 tcp_server = socket(AF_INET, SOCK_STREAM)
3 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)  # <---就是這條,在bind前加
4 tcp_server.bind(ip_port)
方法一
 1 發現系統存在大量TIME_WAIT狀態的連接,通過調整linux內核參數解決,
 2 vi /etc/sysctl.conf
 3 
 4 編輯文件,加入以下內容:
 5 net.ipv4.tcp_syncookies = 1
 6 net.ipv4.tcp_tw_reuse = 1
 7 net.ipv4.tcp_tw_recycle = 1
 8 net.ipv4.tcp_fin_timeout = 30
 9  
10 然后執行 /sbin/sysctl -p 讓參數生效。
11  
12 net.ipv4.tcp_syncookies = 1 表示開啟SYN Cookies。當出現SYN等待隊列溢出時,啟用cookies來處理,可防范少量SYN攻擊,默認為0,表示關閉;
13 
14 net.ipv4.tcp_tw_reuse = 1 表示開啟重用。允許將TIME-WAIT sockets重新用於新的TCP連接,默認為0,表示關閉;
15 
16 net.ipv4.tcp_tw_recycle = 1 表示開啟TCP連接中TIME-WAIT sockets的快速回收,默認為0,表示關閉。
17 
18 net.ipv4.tcp_fin_timeout 修改系統默認的 TIMEOUT 時間
方法二

七 基於UDP的套接字

udp服務端

1 ss = socket()  # 創建一個服務器的套接字
2 ss.bind()  # 綁定服務器套接字
3 while True:  # 服務器無限循環
4     cs = ss.recvfrom()/ss.sendto()  # 對話(接收與發送)
5 ss.close()  #  關閉服務器套接字

udp客戶端

1 cs = socket()  # 創建客戶套接字
2 while True:
3     cs.sendto()/cs.recvfrom()  # 對話(發送/接收)
4 cs.close()  # 關閉客戶套接字

基於UDP的套接字:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 ip_port = ('127.0.0.1', 8080)
 5 buffer_size = 1024
 6 udp_server = socket(AF_INET, SOCK_DGRAM)  # SOCK_DGRAM:數據報式套接字
 7 udp_server.bind(ip_port)
 8 while True:
 9     # data = udp_server.recvfrom(buffer_size)
10     # print(data)  # (b'hello', ('127.0.0.1', 65047))
11     data, addr = udp_server.recvfrom(buffer_size)
12     print(data)
13     print(addr)
14     udp_server.sendto(data.upper(), addr)
udp服務端
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 ip_port = ('127.0.0.1', 8080)
 5 buffer_size = 1024
 6 udp_client = socket(AF_INET, SOCK_DGRAM)  # SOCK_DGRAM:數據報式套接字
 7 while True:
 8     msg = input('>>').strip()
 9     udp_client.sendto(msg.encode('utf-8'), ip_port)
10     data, addr = udp_client.recvfrom(buffer_size)
11     print(data.decode('utf-8'))
12     print(addr)
udp客戶端

 八 什么是粘包?

注:只有TCP有粘包現象,UDP永遠不會粘包

一個socket收發消息的原理圖:

  發送端可以是一K一K地發送數據,而接收端的應用程序可以兩K兩K地提走數據,當然也有可能一次提走3K或6K數據,或者一次只提走幾個字節的數據,也就是說,應用程序所看到的數據是一個整體,或說是一個流(stream),一條消息有多少字節對應用程序是不可見的,因此TCP協議是面向流的協議,這也是容易出現粘包問題的原因。而UDP是面向消息的協議,每個UDP段都是一條消息,應用程序必須以消息為單位提取數據,不能一次提取任意字節的數據,這一點和TCP是很不同的。怎樣定義消息呢?可以認為對方一次性write/send的數據為一個消息,需要明白的是當對方send一條信息的時候,無論底層怎樣分段分片,TCP協議層會把構成整條消息的數據段排序完成后才呈現在內核緩沖區。

  例如基於tcp的套接字客戶端往服務端上傳文件,發送時文件內容是按照一段一段的字節流發送的,在接收方看了,根本不知道該文件的字節流從何處開始,在何處結束

  所謂粘包問題主要還是因為接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所造成的。

  此外,發送方引起的粘包是由TCP協議本身造成的,TCP為提高傳輸效率,發送方往往要收集到足夠多的數據后才發送一個TCP段。若連續幾次需要send的數據都很少,通常TCP會根據優化算法把這些數據合成一個TCP段后一次發送出去,這樣接收方就收到了粘包數據。

  1. TCP(transport control protocol,傳輸控制協議)是面向連接的,面向流的,提供高可靠性服務。收發兩端(客戶端和服務器端)都要有一一成對的socket,因此,發送端為了將多個發往接收端的包,更有效的發到對方,使用了優化方法(Nagle算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包。這樣,接收端,就難於分辨出來了,必須提供科學的拆包機制。 即面向流的通信是無消息保護邊界的。
  2. UDP(user datagram protocol,用戶數據報協議)是無連接的,面向消息的,提供高效率服務。不會使用塊的合並優化算法,, 由於UDP支持的是一對多的模式,所以接收端的skbuff(套接字緩沖區)采用了鏈式結構來記錄每一個到達的UDP包,在每個UDP包中就有了消息頭(消息來源地址,端口等信息),這樣,對於接收端來說,就容易進行區分處理了。 即面向消息的通信是有消息保護邊界的。
  3. tcp是基於數據流的,於是收發的消息不能為空,這就需要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即便是你輸入的是空內容(直接回車),那也不是空消息,udp協議會幫你封裝上消息頭

  udp的recvfrom是阻塞的,一個recvfrom(x)必須對唯一一個sendinto(y),收完了x個字節的數據就算完成,若是y>x數據就丟失,這意味着udp根本不會粘包,但是會丟數據,不可靠

  tcp的協議數據不會丟,沒有收完包,下次接收,會繼續上次繼續接收,己端總是在收到ack時才會清除緩沖區內容。數據是可靠的,但是會粘包。

粘包解決方案:

方法一、

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 import subprocess
 5 
 6 ip_port = ('127.0.0.1', 8080)
 7 back_log = 5
 8 buffer_size = 1024
 9 tcp_server = socket(AF_INET, SOCK_STREAM)
10 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
11 tcp_server.bind(ip_port)
12 tcp_server.listen(back_log)
13 while True:
14     conn, addr = tcp_server.accept()
15     while True:
16         try:
17             cmd = conn.recv(buffer_size)
18             # if not cmd:break
19             # 執行命令,得到命令的運行結果cmd_res
20             res = subprocess.Popen(cmd.decode('utf-8'), shell=True,
21                                    stderr=subprocess.PIPE,
22                                    stdout=subprocess.PIPE,
23                                    stdin=subprocess.PIPE
24                                    )
25             err = res.stderr.read()
26             if err:
27                 cmd_res = err
28             else:
29                 cmd_res = res.stdout.read()
30             if not cmd_res:
31                 cmd_res = 'excute success'.encode('utf-8')
32             length = len(cmd_res)
33             conn.send(str(length).encode('utf-8'))
34             client_ready = conn.recv(buffer_size)
35             if client_ready == b'ready':
36                 conn.send(cmd_res)
37 
38         except Exception as EX:
39             print(EX)
40             break
41     conn.close()
基於TCP服務端
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 
 5 ip_port = ('127.0.0.1', 8080)
 6 back_log = 5
 7 buffer_size = 1024
 8 tcp_client = socket(AF_INET, SOCK_STREAM)
 9 tcp_client.connect(ip_port)
10 while True:
11     cmd = input('>>>').strip()
12     if not cmd:continue
13     if cmd == 'quit':break
14     tcp_client.send(cmd.encode('utf-8'))
15     length = tcp_client.recv(buffer_size)
16     tcp_client.send(b'ready')
17     length = int(length.decode('utf-8'))
18     recv_size = 0
19     recv_msg = b''
20     while recv_size < length:
21         # 第一種寫法
22         # r_m = tcp_client.recv(buffer_size)
23         # recv_msg += r_m
24         # recv_size += len(r_m)
25 
26         # 第二種寫法
27         recv_msg += tcp_client.recv(buffer_size)
28         recv_size = len(recv_msg)
29     print(recv_msg.decode('gbk'))
30 tcp_client.close()
基於TCP客戶端

方法二、

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 import subprocess
 5 import struct
 6 
 7 ip_port = ('127.0.0.1', 8080)
 8 back_log = 5
 9 buffer_size = 1024
10 tcp_server = socket(AF_INET, SOCK_STREAM)
11 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
12 tcp_server.bind(ip_port)
13 tcp_server.listen(back_log)
14 while True:
15     conn, addr = tcp_server.accept()
16     while True:
17         try:
18             cmd = conn.recv(buffer_size)
19             # if not cmd:break
20             # 執行命令,得到命令的運行結果cmd_res
21             res = subprocess.Popen(cmd.decode('utf-8'), shell=True,
22                                    stderr=subprocess.PIPE,
23                                    stdout=subprocess.PIPE,
24                                    stdin=subprocess.PIPE
25                                    )
26             err = res.stderr.read()
27             if err:
28                 cmd_res = err
29             else:
30                 cmd_res = res.stdout.read()
31             if not cmd_res:
32                 cmd_res = 'excute success'.encode('utf-8')
33             length = len(cmd_res)
34             data_length = struct.pack('i', length)
35             conn.send(data_length)
36             conn.send(cmd_res)
37 
38         except Exception as EX:
39             print(EX)
40             break
41     conn.close()
基於TCP服務端
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 import struct
 5 
 6 ip_port = ('127.0.0.1', 8080)
 7 back_log = 5
 8 buffer_size = 1024
 9 tcp_client = socket(AF_INET, SOCK_STREAM)
10 tcp_client.connect(ip_port)
11 while True:
12     cmd = input('>>>').strip()
13     if not cmd:continue
14     if cmd == 'quit':break
15     tcp_client.send(cmd.encode('utf-8'))
16     length_data = tcp_client.recv(4)
17     length = struct.unpack('i', length_data)[0]
18     recv_size = 0
19     recv_msg = b''
20     while recv_size < length:
21         # 第一種寫法
22         # r_m = tcp_client.recv(buffer_size)
23         # recv_msg += r_m
24         # recv_size += len(r_m)
25 
26         # 第二種寫法
27         recv_msg += tcp_client.recv(buffer_size)
28         recv_size = len(recv_msg)
29     print(recv_msg.decode('gbk'))
30 tcp_client.close()
基於TCP客戶端

 九 利用socketserver實現並發

基於TCP服務端:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import socketserver
 4 
 5 
 6 class MyServer(socketserver.BaseRequestHandler):
 7     def handle(self):
 8         print('conn is:', self.request)  # <==>conn
 9         print('addr is:', self.client_address)  # <==> addr
10         while True:
11             try:
12                 # 收消息
13                 data = self.request.recv(1024)
14                 if not data:break
15                 print('收到客戶端的消息是', data)
16                 # 發消息
17                 self.request.sendall(data.upper())
18             except Exception as EX:
19                 print('錯誤提示:',EX)
20                 break
21 
22 
23 if __name__ == '__main__':
24     s = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), MyServer)  # 多線程;第一個參數,地址+端口;第二個參數,類
25   # s = socketserver.ForkingTCPServer(('127.0.0.1', 8080), MyServer) # 多進程;多進程的開銷大於多線程
26 s.serve_forever()

基於TCP客戶端:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 ip_port = ('127.0.0.1', 8080)
 5 back_log = 5
 6 buffer_size = 1024
 7 tcp_client = socket(AF_INET, SOCK_STREAM)
 8 tcp_client.connect(ip_port)
 9 while True:
10     msg = input('>>').strip()
11     if not msg:continue
12     if msg == 'quit':break
13     tcp_client.send(msg.encode('utf-8'))
14     data = tcp_client.recv(buffer_size)
15     print('收到服務端發來的消息:', data.decode('utf-8'))
16 tcp_client.close()

基於UDP服務端 

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import socketserver
 4 
 5 
 6 class MyServer(socketserver.BaseRequestHandler):
 7     def handle(self):
 8         print(self.request)
 9         print('收到客戶端的消息是:', self.request[0].upper())
10         self.request[1].sendto(self.request[0].upper(), self.client_address)
11 
12 
13 if __name__ == '__main__':
14     s = socketserver.ThreadingUDPServer(('127.0.0.1', 8080), MyServer)  # 第一個參數,地址+端口;第二個參數,類
15     s.serve_forever()

基於UDP客戶端

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from socket import *
 4 ip_port = ('127.0.0.1', 8080)
 5 buffer_size = 1024
 6 udp_client = socket(AF_INET, SOCK_DGRAM)  # SOCK_DGRAM:數據報式套接字
 7 while True:
 8     msg = input('>>').strip()
 9     udp_client.sendto(msg.encode('utf-8'), ip_port)
10     data, addr = udp_client.recvfrom(buffer_size)
11     print(data.decode('utf-8'))
12     print(addr)

 十 認證客戶端的合法性

  如果想在分布式系統中實現一個簡單的客戶端鏈接認證功能,又不像SSL那么復雜,那么可以利用hmac+加鹽的方法來實現。

 1 # _*_coding:utf-8_*_
 2 __author__ = 'Linhaifeng'
 3 from socket import *
 4 import hmac, os
 5 
 6 secret_key = b'alex bang bang bang'
 7 
 8 
 9 def conn_auth(conn):
10     '''
11     認證客戶端鏈接
12     :param conn:
13     :return:
14     '''
15     print('開始驗證新鏈接的合法性')
16     msg = os.urandom(32)  # 隨機生成的;b'\xa3\x9d\xaa\x94\x9e\x89\xe9\xc9\xc3r\xf9E\xe0w\x82=\xaac-\x04\xd8:\xea\x07\xad\x1dx\x1er\xe0\x7f\x02'
17     conn.sendall(msg)
18     h = hmac.new(secret_key, msg)  # <hmac.HMAC object at 0x000000D5DA4824E0>
19     digest = h.digest()  # 隨機生成的;b'\x17!*\xae6\x81\xfe|)\x138\xfa2o%\x1a'
20     respone = conn.recv(len(digest))
21     return hmac.compare_digest(respone, digest)  # 比較第一個參數和第二個參數;相同,返回True,反之也成立
22 
23 
24 def data_handler(conn, bufsize=1024):
25     if not conn_auth(conn):
26         print('該鏈接不合法,關閉')
27         conn.close()
28         return
29     print('鏈接合法,開始通信')
30     while True:
31         data = conn.recv(bufsize)
32         if not data: break
33         conn.sendall(data.upper())
34 
35 
36 def server_handler(ip_port, bufsize, backlog=5):
37     '''
38     只處理鏈接
39     :param ip_port:
40     :return:
41     '''
42     tcp_socket_server = socket(AF_INET, SOCK_STREAM)
43     tcp_socket_server.bind(ip_port)
44     tcp_socket_server.listen(backlog)
45     while True:
46         conn, addr = tcp_socket_server.accept()
47         print('新連接[%s:%s]' % (addr[0], addr[1]))
48         data_handler(conn, bufsize)
49 
50 
51 if __name__ == '__main__':
52     ip_port = ('127.0.0.1', 9999)
53     bufsize = 1024
54     server_handler(ip_port, bufsize)
服務端
 1 # _*_coding:utf-8_*_
 2 __author__ = 'Linhaifeng'
 3 from socket import *
 4 import hmac, os
 5 
 6 secret_key = b'alex bang bang bang'
 7 
 8 
 9 def conn_auth(conn):
10     '''
11     認證客戶端鏈接
12     :param conn:
13     :return:
14     '''
15     print('開始驗證新鏈接的合法性')
16     msg = os.urandom(32)
17     conn.sendall(msg)
18     h = hmac.new(secret_key, msg)
19     digest = h.digest()
20     respone = conn.recv(len(digest))
21     return hmac.compare_digest(respone, digest)
22 
23 
24 def data_handler(conn, bufsize=1024):
25     if not conn_auth(conn):
26         print('該鏈接不合法,關閉')
27         conn.close()
28         return
29     print('鏈接合法,開始通信')
30     while True:
31         data = conn.recv(bufsize)
32         if not data: break
33         conn.sendall(data.upper())
34 
35 
36 def server_handler(ip_port, bufsize, backlog=5):
37     '''
38     只處理鏈接
39     :param ip_port:
40     :return:
41     '''
42     tcp_socket_server = socket(AF_INET, SOCK_STREAM)
43     tcp_socket_server.bind(ip_port)
44     tcp_socket_server.listen(backlog)
45     while True:
46         conn, addr = tcp_socket_server.accept()
47         print('新連接[%s:%s]' % (addr[0], addr[1]))
48         data_handler(conn, bufsize)
49 
50 
51 if __name__ == '__main__':
52     ip_port = ('127.0.0.1', 9999)
53     bufsize = 1024
54     server_handler(ip_port, bufsize)
客戶端(合法,其他均為非法)

 十一 FTP服務器

實現如下功能:

  1、用戶加密認證

  2、每個用戶都有自己的家目錄,且只能訪問自己的家目錄

  3、允許用戶在ftp server上隨意切換目錄(cd)

  4、允許用戶查看當前目錄下的所有文件(ls)

  5、允許上傳和下載文件

  6、文件傳輸過程中顯示進度條

  7、支持文件的斷點續傳

ftp server

目錄結構:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import os,sys
 4 PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 5 sys.path.append(PATH)
 6 
 7 
 8 from core import main
 9 
10 
11 if __name__ == '__main__':
12     main.ArgvHandler()
ftp_server.py
1 [DEFAULT]
2 
3 [alex]
4 Password =123
5 Quotation = 100
6 
7 [root]
8 Password = root
9 Quotation = 100
accounts.cfg
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 import os
4 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
5 
6 ip = '127.0.0.1'
7 port = 8080
8 
9 ACCOUNT_PATH = os.path.join(BASE_DIR, 'conf', 'accounts.cfg')
settings.py
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import socketserver
 4 import optparse
 5 import socketserver
 6 from conf import settings
 7 from core import server
 8 
 9 
10 class ArgvHandler():
11     def __init__(self):
12         self.op = optparse.OptionParser()
13         # self.op.add_option('-s', '--server', dest='server')
14         # self.op.add_option('-P', '--port', dest='port')
15         options, args = self.op.parse_args()
16         # print(options)  # {'server': '127.0.0.1', 'port': '8080'}
17         # print(type(options))  # <class 'optparse.Values'>
18         # print(options.server)  # 127.0.0.1
19         # print(args)
20 
21         options, args = self.op.parse_args()
22         self.verify_args(options, args)
23 
24     def verify_args(self, options, args):
25         cmd = args[0]
26         # 第一種方法
27         # if cmd == 'start':
28         #     pass
29         # else:
30         #     pass
31         # 第二種方法
32         if hasattr(self, cmd):
33             func = getattr(self, cmd)
34             func()
35 
36     def start(self):
37         print('ths server is working...')
38         s = socketserver.ThreadingTCPServer((settings.ip, settings.port), server.ServerHandler)
39         s.serve_forever()
40 
41     def help(self):
42         pass
main.py
  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 import socketserver
  4 import json
  5 import configparser
  6 from conf import settings
  7 import os
  8 
  9 STATUS_CODE = {
 10     250: "Invalid cmd format,e.g:{'action':'get','filename':'test.py','size':344}",
 11     251: "Invalid cmd",
 12     252: "Invalid auth data",
 13     253: "Wrong username or password",
 14     254: "Passed authentication",
 15     255: "Filename doesn't provided",
 16     256: "File doesn't exist on server",
 17     257: "ready to send file",
 18     258: "md5 verification",
 19     800: "the file exist,but not enough,is continue?",
 20     801: "the file exist!",
 21     802: "ready to receive datas",
 22     900: "md5 valdate success"
 23 }
 24 
 25 
 26 class ServerHandler(socketserver.BaseRequestHandler):
 27     def handle(self):
 28         while True:
 29             data = self.request.recv(1024).strip()  # self.request=conn
 30             data = json.loads(data.decode('utf8'))
 31             if data.get('action'):
 32                 if hasattr(self, data.get('action')):
 33                     func = getattr(self, data.get('action'))
 34                     func(**data)
 35                 else:
 36                     print('Invalid cmd')
 37             else:
 38                 print('Invalid cmd')
 39 
 40     def send_response(self, status_code):
 41         response = {'status_code': status_code}
 42         self.request.sendall(json.dumps(response).encode('utf8'))
 43 
 44     def auth(self, **data):
 45         username = data['username']
 46         password = data['password']
 47         username = self.authenticate(username, password)
 48         if username:
 49             self.send_response(254)
 50         else:
 51             self.send_response(253)
 52 
 53     def authenticate(self, username, password):
 54         cfg = configparser.ConfigParser()
 55         cfg.read(settings.ACCOUNT_PATH)
 56         if username in cfg.sections():
 57             if cfg[username]['Password'] == password:
 58                 self.username = username
 59                 self.mainPath = os.path.join(settings.BASE_DIR, 'home', self.username)
 60                 print('passed authenticate')
 61                 return username
 62 
 63     def put(self, **data):
 64         print('data', data)
 65         file_name = data.get('file_name')
 66         file_size = data.get('file_size')
 67         target_path = data.get('target_path')
 68         abs_path = os.path.join(self.mainPath, target_path, file_name)
 69         has_received = 0
 70         if os.path.exists(abs_path):
 71             file_has_size = os.stat(abs_path).st_size
 72             if file_has_size < file_size:
 73                 # 斷點續傳
 74                 self.request.sendall('800'.encode('utf8'))
 75                 choice = self.request.recv(1024).decode('utf8')
 76                 if choice == 'Y':
 77                     self.request.sendall(str(file_has_size).encode('utf8'))
 78                     has_received = file_has_size
 79                     f = open(abs_path, 'ab')
 80                 else:
 81                     f = open(abs_path, 'wb')
 82             else:
 83                 # 文件完全存在
 84                 self.request.sendall('801'.encode('utf8'))
 85                 return
 86         else:
 87             self.request.sendall('802'.encode('utf8'))
 88             f = open(abs_path, 'wb')
 89 
 90         while has_received < file_size:
 91             try:
 92                 data = self.request.recv(1024)
 93             except Exception as EX:
 94                 break
 95             f.write(data)
 96             has_received += len(data)
 97         f.close()
 98 
 99     def ls(self, **data):
100         file_list = os.listdir(self.mainPath)
101         file_str = '\n'.join(file_list)
102         if not len(file_list):
103             file_str = '<empty dir>'
104         self.request.sendall(file_str.encode('utf8'))
105 
106     def cd(self, **data):
107         dirname = data.get('dirname')
108         if dirname == '..':
109             self.mainPath = os.path.dirname(self.mainPath)
110         else:
111             self.mainPath = os.path.join(self.mainPath, dirname)
112         self.request.sendall(self.mainPath.encode('utf8'))
113 
114     def mkdir(self, **data):
115         dirname = data.get('dirname')
116         path = os.path.join(self.mainPath, dirname)
117         if not os.path.exists(path):
118             if '/' in path:
119                 os.makedirs(path)
120             else:
121                 os.mkdir(path)
122             self.request.sendall('create success'.encode('utf8'))
123         else:
124             self.request.sendall('dirname exist'.encode('utf8'))
server.py

ftp client

目錄結構

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 # import socket
  4 #
  5 # sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  6 # sk.connect(('127.0.0.1', 8080))
  7 import optparse
  8 import socket
  9 import json
 10 import os, sys
 11 STATUS_CODE = {
 12     250: "Invalid cmd format,e.g:{'action':'get','filename':'test.py','size':344}",
 13     251: "Invalid cmd",
 14     252: "Invalid auth data",
 15     253: "Wrong username or password",
 16     254: "Passed authentication",
 17     255: "Filename doesn't provided",
 18     256: "File doesn't exist on server",
 19     257: "ready to send file",
 20     258: "md5 verification",
 21     800: "the file exist,but not enough,is continue?",
 22     801: "the file exist!",
 23     802: "ready to receive datas",
 24     900: "md5 valdate success"
 25 }
 26 
 27 
 28 class ClientHandler(object):
 29     def __init__(self):
 30         self.op = optparse.OptionParser()
 31         self.op.add_option('-s', '--server', dest='server')
 32         self.op.add_option('-P', '--port', dest='port')
 33         self.op.add_option('-u', '--username', dest='username')
 34         self.op.add_option('-p', '--password', dest='password')
 35 
 36         self.options, self.args = self.op.parse_args()
 37 
 38         self.verify_args(self.options, self.args)
 39         self.make_connection()
 40         self.mainPath = os.path.dirname(os.path.abspath(__file__))
 41         self.last = 0
 42 
 43     def verify_args(self, options, args):
 44         server = options.server
 45         port = options.port
 46         username = options.username
 47         password = options.password
 48         if int(port) > 0 and int(port) < 65535:
 49             return True
 50         else:
 51             exit('the port is in 0-65535')
 52 
 53     def make_connection(self):
 54         self.sock = socket.socket()
 55         self.sock.connect((self.options.server, int(self.options.port)))
 56 
 57     def interactive(self):
 58         print('begin to interactive...')
 59         # self.authenticate()
 60         if self.authenticate():
 61             while 1:
 62                 cmd_info = input('[%s]' % self.current_dir).strip()
 63                 cmd_list = cmd_info.split()
 64                 if hasattr(self, cmd_list[0]):
 65                     func = getattr(self, cmd_list[0])
 66                     func(*cmd_list)
 67 
 68     def put(self, *cmd_list):
 69         # put 12.png images
 70         actions, local_path, target_path = cmd_list
 71         local_path = os.path.join(self.mainPath, local_path)
 72         file_name = os.path.basename(local_path)
 73         file_size = os.stat(local_path).st_size
 74         data = {
 75             'action': 'put',
 76             'file_name': file_name,
 77             'file_size': file_size,
 78             'target_path': target_path
 79         }
 80         self.sock.send(json.dumps(data).encode('utf8'))
 81         is_exist = self.sock.recv(1024).decode('utf8')
 82         has_send = 0
 83         if is_exist == '800':
 84             # 文件不完整
 85             choice = input('the file exist,but not enough,is continue?[Y/N]').strip()
 86             if choice.upper() == 'Y':
 87                 self.sock.sendall('Y'.encode('utf8'))
 88                 continue_position = self.sock.recv(1024).decode('utf8')
 89                 has_send += int(continue_position)
 90             else:
 91                 self.sock.sendall('N'.encode('utf8'))
 92         elif is_exist == '801':
 93             # 文件完全存在
 94             print('the file exist')
 95             return
 96         else:
 97             pass
 98         f = open(local_path, 'rb')
 99         f.seek(has_send)
100         while has_send < file_size:
101             data = f.read(1024)
102             self.sock.sendall(data)
103             has_send += len(data)
104             self.show_progress(has_send, file_size)
105         f.close()
106         print('successfully upload!')
107 
108     def show_progress(self, has, total):
109         rate = float(has)/float(total)
110         rate_num = int(rate*100)
111         if self.last != rate_num:
112             sys.stdout.write('%s%% %s\r' % (rate_num, '#'*rate_num))
113         self.last = rate_num
114 
115     def ls(self, *cmd_list):
116         data = {
117             'action': 'ls'
118         }
119         self.sock.sendall(json.dumps(data).encode('utf8'))
120         data = self.sock.recv(1024).decode('utf8')
121         print(data)
122 
123     def cd(self, *cmd_list):
124         data = {
125             'action': 'cd',
126             'dirname': cmd_list[1]
127         }
128         self.sock.sendall(json.dumps(data).encode('utf8'))
129         data = self.sock.recv(1024).decode('utf8')
130         self.current_dir = os.path.basename(data)
131 
132     def mkdir(self, *cmd_list):
133         data = {
134             'action': 'mkdir',
135             'dirname': cmd_list[1]
136         }
137         self.sock.sendall(json.dumps(data).encode('utf8'))
138         data = self.sock.recv(1024).decode('utf8')
139 
140     def authenticate(self):
141         if self.options.username is None or self.options.password is None:
142             username = input('username:')
143             password = input('password:')
144             return self.get_auth_result(username, password)
145         return self.get_auth_result(self.options.username, self.options.password)
146 
147     def response(self):
148         data = self.sock.recv(1024).decode('utf8')
149         data = json.loads(data)
150         return data
151 
152     def get_auth_result(self, username, password):
153         data = {
154             'action': "auth",
155             'username': username,
156             'password': password
157         }
158         self.sock.send(json.dumps(data).encode('utf8'))
159         response = self.response()
160         print('response:', response['status_code'])
161         if response['status_code'] == 254:
162             self.username = username
163             self.current_dir = username
164             print(STATUS_CODE[response['status_code']])
165             return True
166         else:
167             print(STATUS_CODE[response['status_code']])
168 
169 
170 ch = ClientHandler()
171 ch.interactive()
ftp_client.py

 十二 進程與線程

1、為什么要有操作系統?

  現代計算機系統是由一個或者多個處理器、內存、硬盤、打印機、鍵盤、鼠標和顯示器等組成的。網絡接口以及各種其他輸入/輸出設備組成的復雜系統,每位程序員不可能掌握所有系統實現的細節,並且管理優化這些部件是一件具有挑戰性極強的工作。所以,我們需要為計算機安裝一層軟件,成為操作系統,任務就是用戶程序性提供一個簡單清晰的計算機模型,並管理以上設備。

  定義:操作系統是一個用來協調、管理和控制計算機硬件和軟件資源的系統程序,它位於硬件和應用程序之間。程序是運行在系統上的具有某種功能的軟件,比如:瀏覽器,音樂播放器等。

  操作系統內部的定義:操作系統的內核是一個管理和控制程序,負責管理計算機的所有物理資源,其中包括:文件系統、內存管理、設備管理、進程管理。

2、什么是進程?

  假如有兩個程序A和B,程序A在執行到一半的過程中,需要讀取大量的數據輸入(I/O操作),而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。是不是在程序A讀取數據的過程中,讓程序B去執行,當程序A讀取完數據之后,讓程序B暫停,然后讓程序A繼續執行?當然沒問題,但這里有一個關鍵詞:切換;既然是切換,那么這就涉及到了狀態的保存,狀態的恢復,加上程序A與程序B所需要的系統資源(內存,硬盤,鍵盤等等)是不一樣的。自然而然的就需要有一個東西去記錄程序A和程序B分別需要什么資源,怎樣去識別程序A和程序B等等,所以就有了一個叫進程的抽象。

定義:

  進程就是一個程序在一個數據集上的一次動態執行過程。進程一般由程序、數據庫、進程控制塊三部分組成。我們編寫的程序用來描述進程要完成哪些功能以及如何完成。數據集則是程序在執行過程中所需要使用的資源。進程控制塊用來記錄的外部特征,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標志。

  本質上就是一段程序的運行過程(抽象的概念)

3、什么是線程?

  線程的出現是為了降低上下文切換的消耗,提高系統的並發性,並突破一個進程只能干一樣事的缺陷,讓進程內並發成為可能。

4、進程與線程區別

1、一個程序至少有一個進程,一個進程至少有一個線程(進程可以理解成線程的容器)

2、進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率

3、線程在執行過程中與進程還是有區別的,每個獨立的線程有一個程序運行的入口,順序執行序列和程序的出口。但是線程不能夠獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。

4、進程是具有一定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位;線程是進程的一個實體,是CPU調度和分源的基本單位,它是比進程更小的能獨立運行的基本單位,線程自己基本上不擁有系統資源,只擁有一點運行中必不可少的資源(如程序計數器,一組寄存器和錢),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源。一個進程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以並發執行。

5、線程:最小的執行單元(實例);進程:最小的資源單位

5、Python的GIL(全局解釋鎖;Global Interpreter Lock)

  In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思:無論你啟多少個線程,你有多少個CPU,Python在執行的時候會淡定的在同一時刻只允許一個線程運行。

6、 線程的兩種調用方式

  threading 模塊建立在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊通過對thread進行二次封裝,提供了更方便的api來處理線程。

調用方式:

方式一、

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 
 6 
 7 def music():
 8     print('begin to listen % s' % time.ctime())
 9     time.sleep(3)
10     print('stop to listen %s' % time.ctime())
11 
12 
13 def game():
14     print('begin to play game % s \r\n' % time.ctime())
15     time.sleep(5)
16     print('stop to play game %s \r\n' % time.ctime())
17 
18 
19 threads = []
20 t1 = threading.Thread(target=music)
21 t2 = threading.Thread(target=game)
22 threads.append(t1)
23 threads.append(t2)
24 if __name__ == '__main__':
25     # join()功能:在子線程完成運行之前,這個子線程的父線程講一直被阻塞
26     # t1 = threading.Thread(target=music)
27     # t2 = threading.Thread(target=game)
28     # t1.start()
29     # t2.start()
30     #
31     # t1.join()
32     # t2.join()
33     # print('end')
34     
35     # setDaemon():將線程生命為守護線程
36     t2.setDaemon(True)  # 注:一定要在start之前設置
37     for t in threads:
38         t.start()
39     print('end')
threading

方式二、

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 
 6 
 7 class MyThread(threading.Thread):
 8     def __init__(self, num):
 9         threading.Thread.__init__(self)
10         self.num = num
11 
12     def run(self):  # 定義每個線程要運行的函數
13 
14         print("running on number:%s" % self.num)
15 
16         time.sleep(3)
17 
18 
19 if __name__ == '__main__':
20     t1 = MyThread(1)
21     t2 = MyThread(2)
22     t1.start()
23     t2.start()
24 
25     print("ending......")
繼承方式調用線程

join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。

setDaemon(True):

  將線程生命為守護線程,必須在start()方法調用之前設置,如果不設置為守護線程,程序會被無限掛起。這個方法基本和join是相反的。當我們在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程就分兵兩路,分別運行,那么當主線程完成想退出時,會驗證子線程是否完成。如果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是,只要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以用setDaemon方法了。

 其他方法:

 1 # run():用於表示線程活動的方法
 2 # start():啟動線程活動
 3 # isAlive():返回線程是否活動的,返回布爾值,True/False
 4 # getName():返回線程名字
 5 # setName():設置線程名字
 6 
 7 threading模塊提供的一些方法:
 8 # threading.currentThread():返回當前的線程變量
 9 # threading.enumerate():返回一個包含正在運行的線程的list。正在運行指線程啟動后-結束前,不包括啟動前和終止后的線程
10 # threading.activeCount():返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 
 6 
 7 def music():
 8     print('begin to listen % s' % time.ctime())
 9     time.sleep(3)
10     print('stop to listen %s' % time.ctime())
11 
12 
13 def game():
14     print('begin to play game % s \r\n' % time.ctime())
15     time.sleep(5)
16     print('stop to play game %s \r\n' % time.ctime())
17 
18 
19 threads = []
20 t1 = threading.Thread(target=music)
21 t2 = threading.Thread(target=game)
22 threads.append(t1)
23 threads.append(t2)
24 if __name__ == '__main__':
25     # join()功能:在子線程完成運行之前,這個子線程的父線程講一直被阻塞
26     # t1 = threading.Thread(target=music)
27     # t2 = threading.Thread(target=game)
28     # t1.start()
29     # t2.start()
30     #
31     # t1.join()
32     # t2.join()
33     # print('end')
34 
35     # setDaemon():將線程生命為守護線程
36     t2.setDaemon(True)  # 注:一定要在start之前設置
37     for t in threads:
38         t.start()
39         print(t.getName())
40         print('count:', threading.activeCount())
41     while threading.activeCount() == 3:
42         print('end')
其他方法演示

7、同步鎖(lock)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 num = 100
 6 
 7 
 8 def sub():
 9     global num
10     print('ok')
11     lock.acquire()  # 加鎖
12     temp = num
13     time.sleep(0.001)
14     num = temp-1
15     lock.release()  # 釋放鎖
16 
17 
18 li = []
19 lock = threading.Lock() 20 for i in range(100):
21     t1 = threading.Thread(target=sub)
22     t1.start()
23     li.append(t1)
24 for l in li:
25     l.join()
26 print(num)

 注:多個線程都在同時操作同一個共享資源,所以造成了資源破壞(join會造成串行,失去線程的意義),可以通過同步鎖來解決這種問題。

8、遞歸鎖

  在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都在使用,所有這兩個線程在無外力作用下將一直等待下去。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 
 6 
 7 class MyThread(threading.Thread):
 8 
 9     def actionA(self):
10         r_lock.acquire()
11         print(self.name, 'gotA', time.ctime())  # 重寫線程后的self.name --->線程的名字
12         time.sleep(2)
13         r_lock.acquire()
14         print(self.name, 'gotB', time.ctime())
15         time.sleep(1)
16         r_lock.release()
17         r_lock.release()
18 
19     def actionB(self):
20         r_lock.acquire()
21         print(self.name, 'gotB', time.ctime())  # 重寫線程后的self.name --->線程的名字
22         time.sleep(2)
23         r_lock.acquire()
24         print(self.name, 'gotA', time.ctime())
25         time.sleep(1)
26         r_lock.release()
27         r_lock.release()
28 
29     def run(self):
30         self.actionA()
31         self.actionB()
32 
33 
34 if __name__ == '__main__':
35     r_lock = threading.RLock() 36     li = []
37     for t in range(3):
38         t = MyThread()
39         t.start()
40         li.append(t)
41 
42     for i in li:
43         i.join()
44 
45     print('end')

  為了支持在同一線程中多次請求同一資源,Python提供了“可重入鎖”:threading.Rlock。Rlock內部維護着一個Lock和counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

9、同步對象(Event)

An event is a simple synchronization object;the event represents an internal flag,

and threads can wait for the flag to be set, or set or clear the flag themselves.


event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()


If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 
 6 
 7 class Boss(threading.Thread):
 8     def run(self):
 9         print('Boss:今天加班到22:00!\r\n')
10         print(event.isSet())  # False
11         event.set()
12         time.sleep(6)
13         print('Boss:可以下班了,明天放假!\r\n')
14         print(event.isSet())
15         event.set()
16 
17 
18 class Worker(threading.Thread):
19     def run(self):
20         event.wait()  # 一旦event被設定,等同於pass
21         print('Worker:唉···命真苦!\r\n')
22         time.sleep(1)
23         event.clear()
24         event.wait()
25         print('Worker:OhYeah!\r\n')
26 
27 
28 if __name__ == '__main__':
29     event = threading.Event() 30     threads = []
31     for i in range(5):
32         threads.append(Worker())
33     threads.append(Boss())
34     for t in threads:
35         t.start()
36     for t in threads:
37         t.join()
38     print('end')

10、信號量

  信號量用來控制線程並發數的,BoundedSemaphore或Semaphore管理一個內置的計數器,每當調用acquire()時-1,調用release()時+1。計數器不能小於0,當計數器為0時,acquire()將阻塞線程至同步鎖狀態,直到其他線程調用release()。(類似於停車位的概念)BoundedSemaphore與Semaphore的唯一區別在於前者將在調用release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將拋出一個異常。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 
 6 
 7 class MyThread(threading.Thread):
 8     def run(self):
 9         if semaphore.acquire():
10             print(self.name, '\r')
11             time.sleep(5)
12             semaphore.release()
13 
14 
15 if __name__ == '__main__':
16     semaphore = threading.Semaphore(5)
17     threads = []
18     for i in range(100):
19         threads.append(MyThread())
20     for t in threads:
21         t.start()

 

11、隊列(queue)

  列表是不安全的數據結構:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import threading
 4 import time
 5 
 6 li = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 7 
 8 
 9 def Foo():
10     while li:
11         try:
12             last_li = li[-1]
13             print(last_li)
14             li.remove(last_li)
15             time.sleep(1)
16         except Exception as EX:
17             print('錯誤提示:', last_li, EX)
18 
19 
20 t1 = threading.Thread(target=Foo)
21 t1.start()
22 t2 = threading.Thread(target=Foo)
23 t2.start()
不安全的列表

  queue隊列類的方法:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import queue  # 線程隊列
 4 q = queue.Queue()  # 創建q對象,同步實現的。隊列長度可為無限或者有限。可通過Queque的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限
 5 q.put(12)  # 將一個值放到隊列,調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為1.如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空處一個數據單元。如果block為0,put方法將引發Full異常
 6 q.put('alex')
 7 q.put({"age": 15})
 8 print(q.get())  # 將一個值從隊列中取出,調用隊列對象的get()方法,從對頭刪除並返回一個實例。可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常
 9 print(q.qsize())  # 返回隊列的大小
10 print(q.empty())  # 判斷隊列是否為空,返回布爾值,True/False
11 print(q.full())  # 判斷隊列是否已經滿了,返回布爾值,True/False
12 q.join()  # 實際上意味着等到隊列為空,再執行別的操作
13 
14 '''
15 Queue模塊的三種隊列及構造函數
16 1、Python Queue模塊的FIFO隊列,先進先出 class queue.Queue(maxsize)
17 2、LIFO 類似於堆,即先進后處。  class queue.LifoQueue(maxsize)
18 3、優先級隊列,級別越低月先出來。 class queue.PriorityQueue(maxsize)
19 '''

生產者消費者模型

為什么要使用生產者和消費者模式?

  在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式?

  生產者和消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import time
 4 import random
 5 import queue
 6 import threading
 7 q = queue.Queue()
 8 
 9 
10 def Producer(name):
11     count = 0
12     while count < 10:
13         print('making..')
14         time.sleep(random.randrange(3))
15         q.put(count)
16         print('Producer [%s] has produced %s meat bun。\r' % (name, count))
17         count += 1
18         print('ok\r')
19 
20 
21 def Consumer(name):
22     count = 0
23     while count < 10:
24         time.sleep(random.randrange(4))
25         if not q.empty():
26             data = q.get()
27             print('lm Consumer [%s] has eat %s meat bun。\r' % (name, data))
28         else:
29             print('---no meat bun anymore----\r')
30         count += 1
31 
32 
33 p1 = threading.Thread(target=Producer, args=('alex',))
34 c1 = threading.Thread(target=Consumer, args=('B',))
35 p1.start()
36 c1.start()
生產者和消費者舉例

12、並發&並行

  並發:指系統具有處理多個任務(動作)的能力

  並行:指系統具有同時處理多個任務(動作)的能力

13、同步&異步

  同步:當進程執行到一個IO(等待外部數據)的時候你等

  異步:當進程執行到一個IO(等待外部數據)的時候你不等;一直等到數據接收完成,在回來處理

14、任務類型

  IO密集型:Python的多線程是有意義的

  計算密集型:Python的多線程就不推薦,可以采用多進程+協程

 16、多進程模塊( multiprocessing)

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

  由於GIL的存在,Python中的多線程其實並不是真正的多線程,如果想充分地使用多核CPU的資源,在Python中大部分情況下需要使用多進程。multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法,也有start(),run(),join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類(這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部分與threading使用同一套API,只不過換到了多進程的情景。

 調用方式一:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from multiprocessing import Process
 4 import time
 5 
 6 
 7 def Foo(name):
 8     time.sleep(1)
 9     print('hello', name, time.ctime())
10 
11 
12 if __name__ == '__main__':
13     p_list = []
14     for i in range(200):
15         p = Process(target=Foo, args=('alex',))
16         p_list.append(p)
17         p.start()
18     for i in p_list:
19         p.join()
20     print('end')
方式一

調用方式二:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from multiprocessing import Process
 4 import time
 5 
 6 
 7 class MyProcess(Process):
 8     def __init__(self):
 9         super(MyProcess, self).__init__()
10 
11     def run(self):
12         time.sleep(1)
13         print('hello', self.name, time.ctime())
14 
15 
16 if __name__ == '__main__':
17     p_list = []
18     for i in range(3):
19         p = MyProcess()
20         p.start()
21         p_list.append(p)
22     for p in p_list:
23         p.join()
24     print('end')
方式二

Process類 

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group:線程組,目前還沒有實現,庫引用中提示必須是None

  target:要執行的方法

  name:進程名

  args/kwargs:要傳入方法的參數

實例方法:

  is_alive():返回進程是否在運行

  join([timeout]):阻塞當前上下文環境的進程,直到調用此方法的進程終止或到達指定的timeout(可選參數)

  start():進程准備就緒,等待CPU調度

  run():start()調用run方法,如果實例進程時未指定傳入target,這start執行t默認run()方法

  terminate():不管任務是否完成,立即停止工作進程

屬性:

  daemon:和線程的setDeamon功能一樣

  name:進程名字

  pid:進程號

17、進程的通信

進程隊列Queue

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import queue
 4 import multiprocessing
 5 
 6 
 7 def Foo(q):
 8     q.put(123)
 9     q.put(456)
10 
11 
12 if __name__ == '__main__':
13     q = multiprocessing.Queue()  # 注意:此處需用進程隊列,不能用線程隊列,即q=queue.Queue()
14     p = multiprocessing.Process(target=Foo, args=(q,))
15     p.start()
16     print(q.get())
17     print(q.get())

 管道

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from multiprocessing import Process, Pipe
 4 
 5 
 6 def Foo(child_conn):
 7     child_conn.send([12, {'name': 'alice'}, 'hello'])
 8     response = child_conn.recv()
 9     print('response:', response)
10     child_conn.close()
11     print('q_id2:', id(child_conn))
12 
13 
14 if __name__ == '__main__':
15     parent_conn, child_conn = Pipe()
16     print('q_ID1', id(child_conn))
17     p = Process(target=Foo, args=(child_conn,))
18     p.start()
19     print(parent_conn.recv())
20     parent_conn.send('早上好!')
21     p.join()
View Code

Managers

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from multiprocessing import Process, Manager
 4 
 5 
 6 def Foo(dic, li, i):
 7     dic[i] = '1'
 8     li.append(i)
 9 
10 
11 if __name__ == '__main__':
12     with Manager() as manager:
13         dic = manager.dict()
14         li = manager.list(range(5))
15         p_list = []
16         for i in range(10):
17             p = Process(target=Foo, args=(dic, li, i))
18             p.start()
19             p_list.append(p)
20         for p in p_list:
21             p.join()
22         print(dic)
23         print(li)
View Code

 進程同步

Without using the lock output from the different processes is liable to get all mixed up.

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from multiprocessing import Process, Lock
 4 
 5 
 6 def Foo(lk, i):
 7     with lk:  # 默認情況下,已經lk.acquire()
 8         print('hello world %s' % i)
 9 
10 
11 if __name__ == '__main__':
12     lock = Lock()
13     for num in range(10):
14         Process(target=Foo, args=(lock, num)).start()
View Code

進程池

  進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可進程為止。

進程池中兩個方法:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from multiprocessing import Process, Pool
 4 import time, os
 5 
 6 
 7 def Foo(i):
 8     time.sleep(1)
 9     print('i = \r', i)
10 
11 
12 def Bar(arg):  # 此處arg=Foo()函數的返回值
13     print('pgid-->%s\r' % os.getpid())
14     print('ppid-->%s\r' % os.getppid())
15     print('logger:%s\r' % arg)
16 
17 
18 if __name__ == '__main__':
19     pool = Pool(5)
20     Bar(1)
21     print('------------\r')
22     for i in range(10):
23         # pool.apply(func=Foo, args=(i,))  # 同步接口
24         # pool.apply_async(func=Foo, args=(i,))
25         pool.apply_async(func=Foo, args=(i,), callback=Bar)  # callback-->回調函數:就是某個動作或者函數執行成功后再去執行的函數
26  pool.close() 27 pool.join() # join和close位置不能反 28 print('end\r')

 十三 協程

  協程:又稱微線程,英文名:Coroutine,本質上是一個線程

  優點1:協程具有極高的執行效率。因為子程序切換不是線程切換,而是由程序自身控制。因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。

  優點2:不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

  因為協程是一個線程執行,那怎么利用多核CPU呢?最簡單的方法就是多進程+協程,即充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import time
 4 
 5 
 6 def consumer(name):
 7     print('---->ready to eat meat bun')
 8     while True:
 9         new_meat_bun = yield
10         print('[%s] is eating meat bun %s' % (name, new_meat_bun))
11 
12 
13 def producer():
14     con1.__next__()
15     con2.__next__()
16     n = 0
17     while 1:
18         time.sleep(1)
19         print('\033[32;1m[producer]\033[0m is making meat bun %s and %s' % (n, n + 1))
20         con1.send(n)
21         con2.send(n + 1)
22         n += 2
23 
24 
25 if __name__ == '__main__':
26     con1 = consumer('alex')
27     con2 = consumer('alice')
28     producer()
yield簡單的實現--->協程

 Greenlet

  greenlet是一個用C實現的協程模塊,相比與Python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator。(注:需要用pip安裝包;pip install gevent)

 1 from greenlet import greenlet
 2 import time
 3 
 4 
 5 def A():
 6     while 1:
 7         print('-------A-------')
 8         time.sleep(0.5)
 9         g2.switch()
10 
11 
12 def B():
13     while 1:
14         print('-------B-------')
15         time.sleep(0.5)
16         g1.switch()
17 
18 
19 g1 = greenlet(A)  # 創建協程g1
20 g2 = greenlet(B)
21 
22 g1.switch()  # 跳轉至協程g1
greenlet

 gevent

 1 import gevent
 2 import requests, time  # 此處requests需安裝模塊;pip --timeout=100 install requests
 3 
 4 start = time.time()
 5 
 6 
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = requests.get(url)  # 爬網頁的標簽
10     data = resp.text
11     print('%d bytes received from %s.' % (len(data), url))
12 
13 
14 gevent.joinall([
15     gevent.spawn(f, 'https://www.qq.com/'),
16     gevent.spawn(f, 'https://www.baidu.com/'),
17     gevent.spawn(f, 'https://www.taobao.com/'),
18 ])
19 
20 print("cost time:", time.time() - start)
gevent

 十四 緩存I/O

  緩存I/O又被稱作標准I/O,大多數文件系統的默認I/O操作都是緩存I/O。在Linux的緩存I/O機制中,操作系統會將I/O的數據緩存在文件系統的頁緩存(page cache)中,也就是說,數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內的緩沖區拷貝到應用程序的地址空間。用戶空間沒法直接訪問內核空間的,內核態到用戶態的數據拷貝。

  緩存I/O的缺點:數據在傳輸過程中需要在應用程序地址空間和內核進行多次數據拷貝操作,這些數據拷貝操作所帶來的CPU以及內存開銷是非常大的。

I/O發生時設計的對象和步驟:

  對於一個network IO(以read舉例),他會涉及到兩個系統對象,一個是調用這個IO的process(or thread),另一個就是系統內核(kernel)。當一個read操作發生時,它會經歷兩個階段:

    1、等待數據准備(Waiting for the data to be read)

    2、將數據從內核拷貝到進程中(Copying the data from the kernel to the process)

注:這兩點很重要,因為這些IO Mode的區別就是在這兩個階段上各有不同的情況。

blocking IO(阻塞IO,Linux下)

  在Linux中,默認情況下所有的socket都是blocking,一個典型的讀操作大概流程圖:

  當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一階段:准備數據。對於network IO來說,很多時候數據在一開始還沒到達(如:還沒收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據准備好了,它就將數據從kernel中拷貝到用戶內存,然后kernel。所以,blocking IO的特點就是在IO執行的兩個階段都被block了。

non-blocking IO(非阻塞IO,Linux下)

  在Linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行時大概的流程:

從上圖可以看出,當用戶進程發出read時,如果kernel中的數據還沒准備好,那么它並不會block用戶進程,而是立即返回一個error。從用戶進程角度講來講,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒准備好,於是它可以再次發送read操作。一旦kernel中的數據准備好了。所以,用戶進程其實是需要不斷的主動詢問kernel數據好了沒有。

 1 import time
 2 import socket
 3 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 4 sk.bind(('127.0.0.1',8080))
 5 sk.listen(5)
 6 sk.setblocking(False)  # 設置是否阻塞,默認為True,非阻塞
 7 while True:
 8     try:
 9         print('waiting client connection...')
10         conn,addr = sk.accept()  # 進程主動輪詢
11         print('+++',addr)
12         data = sk.recv(1024)
13         print(str(data,'utf8'))
14         conn.close()
15     except Exception as EX:
16         print('錯誤提示:',EX)
17         time.sleep(2)
非阻塞-socket-server
1 import time
2 import socket
3 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
4 while True:
5     sk.connect(('127.0.0.1',8080))
6     print('hello')
7     sk.sendall(bytes('hello','utf8'))
8     time.sleep(1)
9     break
非阻塞-socket-client

 IO multiplexing(IO多路復用)

  有些地方也稱為這種IO方式為event driven IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程,大概流程圖:

  當用戶進程調用了select,那么真個進程會被block。而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這時用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。(如果處理的連接數不是很多的話,使用select/epoll的web server不一定比使用multi-threading+blocking IO的web server性能更好,可能延遲更大;select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接)

注:

  1、select函數返回結果中如果有文件可讀了,那么進程就可以同故宮調用accept()或recv()來讓kernel將位於內核中准備到的數據copy到用戶區。

  2、select的優勢在於可以處理多個連接,不適用於單個連接。

 1 import socket
 2 import select
 3 
 4 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 5 sk.bind(('127.0.0.1', 8080))
 6 sk.listen(5)
 7 inputs = [sk, ]
 8 while True:
 9     r, w, e = select.select(inputs, [], [], 3)
10     for obj in r:
11         print('obj:', obj)
12         if obj == sk:
13             conn, addr = obj.accept()
14             print('已連接:', conn)
15             inputs.append(conn)
16         else:
17             data_byte = obj.recv(1024)
18             print(str(data_byte, 'utf8'))
19             inp = input('回答[%s]號客戶端>>:' % inputs.index(obj))
20             obj.sendall(bytes(inp, 'utf8'))
21     print('>>>', r)
多路復用-select-server
 1 import socket
 2 
 3 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 4 sk.connect(('127.0.0.1', 8080))
 5 
 6 while True:
 7     inp = input(">>>>")
 8     sk.sendall(bytes(inp, "utf8"))
 9     data = sk.recv(1024)
10     print(str(data, 'utf8'))
多路復用-select-client

 Asynchronous I/O(異步IO)

  流程圖:

  從圖中可以看出,用戶進程發起read操作之后,立刻就開始去做其它的事。另一方面,從kernel的角度,當他受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。

selectors模塊:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import selectors
 4 import socket
 5 
 6 sel = selectors.DefaultSelector()  # 根據系統,默認選擇最優IO多用戶模式
 7 
 8 
 9 def accept(sock, mask):
10     conn, addr = sock.accept()
11     print('accepted', conn, 'from', addr)
12     conn.setblocking(False)
13     sel.register(conn, selectors.EVENT_READ, read)
14 
15 
16 def read(conn, mask):
17     try:
18         data = conn.recv(1000)
19         if not data:
20             raise Exception
21         print('收到:', data.decode('utf8'))
22         conn.send(data.upper())  # Hope it won't block
23     except Exception as EX:
24         print('closing:', conn)
25         sel.unregister(conn)  # 解除綁定
26         conn.close()
27 
28 
29 sock = socket.socket()
30 sock.bind(('localhost', 8080))
31 sock.listen(100)
32 sock.setblocking(False)
33 sel.register(sock, selectors.EVENT_READ, accept)  # sock與accept綁定
34 while True:
35     events = sel.select()  # 監聽[sock,conn1,conn2....]
36     for key, mask in events:
37         callback = key.data
38         print('>>callback:', callback)
39         callback(key.fileobj, mask)
selectors模塊-server
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import socket
 4 sk = socket.socket()
 5 sk.connect(('127.0.0.1', 8080))
 6 while True:
 7     inp = input('>>>')
 8     sk.send(inp.encode('utf8'))
 9     data = sk.recv(1024)
10     print(data.decode('utf8'))
客戶端

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM