python並發與web
python並發主要方式有:
- Thread(線程)
- Process(進程)
- 協程
python因為GIL的存在使得python的並發無法利用CPU多核的優勢以至於性能比較差,下面我們將通過幾個例子來介紹python的並發。
線程
我們通過一個簡單web server程序來觀察python的線程,首先寫一個耗時的小函數
def fib(n):
if n <= 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
然后寫一個fib web server,程序比較簡單就不解釋了。
from socket import *
from fib import fib
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print('Connection', addr)
fib_handle(client)
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print('Closed')
fib_server(('', 25002))
運行shell命令可以看到計算結果
nc localhost 25002
10
55
由於服務段是單線程的,如果另外啟動一個連接將得不到計算結果
nc localhost 25002
10
為了能讓我們的server支持多個請求,我們對服務端代碼加入多線程支持
#sever.py
#服務端代碼
from socket import *
from fib import fib
from threading import Thread
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print('Connection', addr)
#fib_handler(client)
Thread(target=fib_handler, args=(client,), daemon=True).start() #需要在python3下運行
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print('Closed')
fib_server(('', 25002)) #在25002端口啟動程序
運行shell命令可以看到計算結果
nc localhost 25002
10
55
由於服務端是多線程的,啟動一個新連接將得到計算結果
nc localhost 25002
10
55
性能測試
我們加入一段性能測試代碼
#perf1.py
from socket import *
from threading import Thread
import time
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 25002))
n = 0
def monitor():
global n
while True:
time.sleep(1)
print(n, 'reqs/sec')
n = 0
Thread(target=monitor).start()
while True:
start = time.time()
sock.send(b'1')
resp = sock.recv(100)
end = time.time()
n += 1
#代碼非常簡單,通過全局變量n來統計qps(req/sec 每秒請求數)
在shell中運行perf1.py可以看到結果如下:
- 106025 reqs/sec
- 109382 reqs/sec
- 98211 reqs/sec
- 105391 reqs/sec
- 108875 reqs/sec
平均每秒請求數大概是10w左右
如果我們另外啟動一個進程來進行性能測試就會發現python的GIL對線程造成的影響
python3 perf1.py
- 74677 reqs/sec
- 78284 reqs/sec
- 72029 reqs/sec
- 81719 reqs/sec
- 82392 reqs/sec
- 84261 reqs/sec
並且原來的shell中的qps也是類似結果
- 96488 reqs/sec
- 99380 reqs/sec
- 84918 reqs/sec
- 87485 reqs/sec
- 85118 reqs/sec
- 78211 reqs/sec
如果我們再運行
nc localhost 25002
40
來完全占用服務器資源一段時間,就可以看到shell窗口內的rqs迅速下降到
- 99 reqs/sec
- 99 reqs/sec
這也反映了Python的GIL的一個特點,會優先處理占用CPU資源大的任務
具體原因我也不知道,可能需要閱讀GIL實現源碼才能知道。
線程池在web編程的應用
python有個庫叫做cherrypy,最近用到,大致瀏覽了一下其源代碼,其內核使用的是python線程池技術。
cherrypy通過Python線程安全的隊列來維護線程池,具體實現為:
class ThreadPool(object):
"""A Request Queue for an HTTPServer which pools threads.
ThreadPool objects must provide min, get(), put(obj), start()
and stop(timeout) attributes.
"""
def __init__(self, server, min=10, max=-1,
accepted_queue_size=-1, accepted_queue_timeout=10):
self.server = server
self.min = min
self.max = max
self._threads = []
self._queue = queue.Queue(maxsize=accepted_queue_size)
self._queue_put_timeout = accepted_queue_timeout
self.get = self._queue.get
def start(self):
"""Start the pool of threads."""
for i in range(self.min):
self._threads.append(WorkerThread(self.server))
for worker in self._threads:
worker.setName('CP Server ' + worker.getName())
worker.start()
for worker in self._threads:
while not worker.ready:
time.sleep(.1)
....
def put(self, obj):
self._queue.put(obj, block=True, timeout=self._queue_put_timeout)
if obj is _SHUTDOWNREQUEST:
return
def grow(self, amount):
"""Spawn new worker threads (not above self.max)."""
if self.max > 0:
budget = max(self.max - len(self._threads), 0)
else:
# self.max <= 0 indicates no maximum
budget = float('inf')
n_new = min(amount, budget)
workers = [self._spawn_worker() for i in range(n_new)]
while not all(worker.ready for worker in workers):
time.sleep(.1)
self._threads.extend(workers)
....
def shrink(self, amount):
"""Kill off worker threads (not below self.min)."""
[...]
def stop(self, timeout=5):
# Must shut down threads here so the code that calls
# this method can know when all threads are stopped.
[...]
可以看出來,cherrypy的線程池將大小初始化為10,每當有一個httpconnect進來時就將其放入任務隊列中,然后WorkerThread會不斷從任務隊列中取出任務執行,可以看到這是一個非常標准的線程池模型。
進程
由於Python的thread無法利用多核,為了充分利用多核CPU,Python可以使用了多進程來模擬線程以提高並發的性能。Python的進程代價比較高可以看做是另外再啟動一個python進程。
#server_pool.py
from socket import *
from fib import fib
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool #這里用的python3的線程池,對應python2的threadpool
pool = Pool(4) #啟動一個大小為4的進程池
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print('Connection', addr)
Thread(target=fib_handler, args=(client,), daemon=True).start()
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
future = pool.submit(fib, n)
result = future.result()
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print('Closed')
fib_server(('', 25002))
性能測試
可以看到新的server的qps為:
- 4613 reqs/sec
- 4764 reqs/sec
- 4619 reqs/sec
- 4393 reqs/sec
- 4768 reqs/sec
- 4846 reqs/sec
這個結果遠低於前面的10w qps主要原因是進程啟動速度較慢,進程池內部邏輯比較復雜,涉及到了數據傳輸,隊列等問題。
但是通過多進程我們可以保證每一個鏈接相對獨立,不會受其他請求太大的影響。
即使我們使用以下耗時的命令也不會影響到性能測試
nc localhost 25502
40
協程
協程簡介
協程是一個古老的概念,最早出現在早期的os中,它出現的時間甚至比線程進程還要早。
協程也是一個比較難以理解和運用的並發方式,用協程寫出來的代碼比較難以理解。
python中使用yield和next來實現協程的控制。
def count(n):
while(n > 0):
yield n #yield起到的作用是blocking,將代碼阻塞在這里,生成一個generator,然后通過next調用。
n -= 1
for i in count(5):
print(i)
#可以看到運行結果:
5
4
3
2
1
下面我們通過例子來介紹如何書寫協程代碼。首先回到之前的代碼。首先我們要想到我們為什么要用線程,當然是為了防止阻塞,
這里的阻塞來自socket的IO和cpu占用2個方面。協程的引入也是為了防止阻塞,因此我們先將代碼中的阻塞點標記出來。
#sever.py
#服務端代碼
from socket import *
from fib import fib
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept() #blocking
print('Connection', addr)
fib_handler(client)
def fib_handler(client):
while True:
req = client.recv(100) #blocking
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp) #blocking
print('Closed')
fib_server(('', 25002)) #在25002端口啟動程序
上面標記了3個socket IO阻塞點,我們先忽略CPU占用。
- 首先我們在blocking點插入yield語句,這樣做的原因就是,通過yield標記出blocking點以及blocking的原因,這樣我們就可以在調度的時候實現noblocking,我們調度的時候遇到yield語句並且block之后就可以直接去執行其他的請求而不用阻塞在這里,這里我們也將實現一個簡單的noblocking調度方法。
#sever.py
#服務端代碼
from socket import *
from fib import fib
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept() #blocking
print('Connection', addr)
fib_handler(client)
def fib_handler(client):
while True:
yield 'recv', client
req = client.recv(100) #blocking
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
yield 'send', client
client.send(resp) #blocking
print('Closed')
fib_server(('', 25002)) #在25002端口啟動程序
- 上述程序無法運行,因為我們還沒有一個yield的調度器,程序只是單純的阻塞在了yield所標記的地方,這也是協程的一個好處,可以人為來調度,不像thread一樣亂序執行。下面是包含了調度器的代碼。
from socket import *
from fib import fib
from threading import Thread
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool
from select import select
tasks = deque()
recv_wait = {}
send_wait = {}
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
why, what = next(task)
if why == 'recv':
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
else:
raise RuntimeError("ARG!")
except StopIteration:
print("task done")
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept()
print('Connection', addr)
tasks.append(fib_handler(client))
def fib_handler(client):
while True:
yield 'recv', client
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
yield 'send', client
client.send(resp)
print('Closed')
tasks.append(fib_server(('', 25003)))
run()
- 我們通過輪詢+select來控制協程,核心是用一個task queue來維護程序運行的流水線,用recv_wait和send_wait兩個字典來實現任務的分發。
性能測試
可以看到新的server的qps為:
- (82262, 'reqs/sec')
- (82915, 'reqs/sec')
- (82128, 'reqs/sec')
- (82867, 'reqs/sec')
- (82284, 'reqs/sec')
- (82363, 'reqs/sec')
- (82954, 'reqs/sec')
與之前的thread模型性能比較接近,協程的好處是異步的,但是協程 仍然只能使用到一個CPU
當我們讓服務器計算40的fib從而占滿cpu時,qps迅速下降到了0。
tornado 基於協程的 python web框架
tornado是facebook出品的異步web框架,tornado中協程的使用比較簡單,利用coroutine.gen裝飾器可以將自己的異步函數注冊進tornado的ioloop中,tornado異步方法一般的書寫方式為:
@gen.coroutime
def post(self):
resp = yield GetUser()
self.write(resp)
tornado異步原理
def start(self):
"""Starts the I/O loop.
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""
self._running = True
while True:
[ ... ]
if not self._running:
break
[ ... ]
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception, e:
if e.args == (4, "Interrupted system call"):
logging.warning("Interrupted system call", exc_info=1)
continue
else:
raise
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except KeyboardInterrupt:
raise
except OSError, e:
if e[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
except:
logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)
這是tornado異步調度的核心主循環,poll()方法返回一個形如(fd: events)的鍵值對,並賦值給event_pairs變量,在內部的while循環中,event_pairs中的內容被一個一個的取出,然后相應的處理器會被調用,tornado通過下面的函數講socket注冊進epoll中。tornado在linux默認選擇epoll,在windows下默認選擇select(只能選擇select)。
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = handler
self._impl.register(fd, events | self.ERROR)
cherrypy線程池與tornado協程的比較
我們通過最簡單程序運行在單機上進行性能比較
測試的語句為:
ab -c 100 -n 1000 -k localhost:8080/ | grep "Time taken for tests:"
其中cherrypy的表現為:
- Completed 100 requests
- Completed 200 requests
- Completed 300 requests
- Completed 400 requests
- Completed 500 requests
- Completed 600 requests
- Completed 700 requests
- Completed 800 requests
- Completed 900 requests
- Completed 1000 requests
- Finished 1000 requests
Time taken for tests: 10.773 seconds
tornado的表現為:
- Completed 100 requests
- Completed 200 requests
- Completed 300 requests
- Completed 400 requests
- Completed 500 requests
- Completed 600 requests
- Completed 700 requests
- Completed 800 requests
- Completed 900 requests
- Completed 1000 requests
- Finished 1000 requests
Time taken for tests: 0.377 seconds
可以看出tornado的性能還是非常驚人的,當應用程序涉及到異步IO還是要盡量使用tornado
總結
本文主要介紹了python的線程、進程和協程以及其應用,並對這幾種模型進行了簡單的性能分析,python由於GIL的存在,不管是線程還是協程都不能利用到多核。
- 對於計算密集型的web app線程模型與協程模型的性能大致一樣,線程由於調度受操作系統管理,其性能略好。
- 對於IO密集型的web app協程模型性能會有很大的優勢。
參考文獻
略