3 實際應用
1 zeromq和gevent:
zeromq的介紹請參看:http://www.infoq.com/cn/news/2010/09/introduction-zero-mq
假設你已經安裝了zeromq,gevent_zeromq(https://github.com/traviscline/gevent-zeromq.git)和pyzmq
一個很基礎的例子:
import gevent from gevent_zeromq import zmq # Global Context context = zmq.Context() #它是GreenContext的一個簡寫,確保greenlet化socket def server(): server_socket = context.socket(zmq.REQ) #創建一個socket,使用mq類型模式REQ/REP(請求/回復,服務器是請求),還有PUB/SUB(發布/訂閱),push/pull等 server_socket.bind("tcp://127.0.0.1:5000") #綁定socket for request in range(1,10): server_socket.send("Hello") print('Switched to Server for ', request) server_socket.recv() #這里發生上下文切換 def client(): client_socket = context.socket(zmq.REP) (客戶端是回復) client_socket.connect("tcp://127.0.0.1:5000") #連接server的socket端口 for request in range(1,10): client_socket.recv() print('Switched to Client for ', request) client_socket.send("World") publisher = gevent.spawn(server) client = gevent.spawn(client) gevent.joinall([publisher, client])
執行結果:
[root@248_STAT ~]# python test.py
(‘Switched to Server for ‘, 1)
(‘Switched to Client for ‘, 1)
(‘Switched to Server for ‘, 2)
(‘Switched to Client for ‘, 2)
(‘Switched to Server for ‘, 3)
(‘Switched to Client for ‘, 3)
(‘Switched to Server for ‘, 4)
(‘Switched to Client for ‘, 4)
(‘Switched to Server for ‘, 5)
(‘Switched to Client for ‘, 5)
(‘Switched to Server for ‘, 6)
(‘Switched to Client for ‘, 6)
(‘Switched to Server for ‘, 7)
(‘Switched to Client for ‘, 7)
(‘Switched to Server for ‘, 8)
(‘Switched to Client for ‘, 8)
(‘Switched to Server for ‘, 9)
(‘Switched to Client for ‘, 9)
2 telnet 服務器
from gevent.server import StreamServer #StreamServer是一個通用的TCP服務器 def handle(socket, address): socket.send("Hello from a telnet!n") for i in range(5): socket.send(str(i) + 'n') #給socket客戶端發送數據 socket.close() #關閉客戶端連接 server = StreamServer(('127.0.0.1', 5000), handle) #當出現連接調用定義的方法handle server.serve_forever()
執行結果:
dongwm@localhost ~ $ nc 127.0.0.1 5000
Hello from a telnet!
0
1
2
3
4
dongwm@localhost ~ $ telnet 127.0.0.1 5000
Trying 127.0.0.1…
Connected to 127.0.0.1.
Escape character is ‘^]’.
Hello from a telnet!
0
1
2
3
4
Connection closed by foreign host.
3 wsgi服務器
from gevent.wsgi import WSGIServer def application(environ, start_response): status = '200 OK' #頁面狀態指定為200 ok body = '<p>Hello World</p>' headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body] WSGIServer(('', 8000), application).serve_forever() #啟動一個占用8000端口的wsgi服務器 from gevent.pywsgi import WSGIServer #使用pywsgi可以我們自己定義產生結果的處理引擎
def application(environ, start_response): status = '200 OK' headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) yield "<p>Hello" #yield出數據 yield "World</p>" WSGIServer(('', 8000), application).serve_forever()
我們看一個用ab(Apache Benchmark)的性能測試(更多信息請查看http://nichol.as/benchmark-of-python-web-servers),我這里只
對比了gevent和paste的性能比(沒做系統優化,只是在同樣條件下看性能差距):
paste的wsgi程序:
from gevent.wsgi import WSGIServer def application(environ, start_response): status = '200 OK' body = '<p>Hello World</p>' headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body] #WSGIServer(('', 8000), application).serve_forever() from paste import httpserver httpserver.serve(application, '0.0.0.0', request_queue_size=500)
dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8000/ #gevent的性能,條件是:並發100,請求1W
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking 127.0.0.1 (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests
Server Software:
Server Hostname: 127.0.0.1
Server Port: 8000
Document Path: /
Document Length: 18 bytes
Concurrency Level: 100
Time taken for tests: 2.805 seconds
Complete requests: 10000
Failed requests: 0
Write errors: 0
Total transferred: 1380000 bytes
HTML transferred: 180000 bytes
Requests per second: 3564.90 [#/sec] (mean)
Time per request: 28.051 [ms] (mean)
Time per request: 0.281 [ms] (mean, across all concurrent requests)
Transfer rate: 480.43 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 28 15.1 27 69
Waiting: 1 28 15.1 27 69
Total: 2 28 15.1 27 69
Percentage of the requests served within a certain time (ms)
50% 27
66% 35
75% 40
80% 42
90% 48
95% 54
98% 59
99% 62
100% 69 (longest request)
dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8080/ #paste的性能
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking 127.0.0.1 (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests
Server Software: PasteWSGIServer/0.5
Server Hostname: 127.0.0.1
Server Port: 8080
Document Path: /
Document Length: 18 bytes
Concurrency Level: 100
Time taken for tests: 4.119 seconds
Complete requests: 10000
Failed requests: 0
Write errors: 0
Total transferred: 1600000 bytes
HTML transferred: 180000 bytes
Requests per second: 2427.52 [#/sec] (mean)
Time per request: 41.194 [ms] (mean)
Time per request: 0.412 [ms] (mean, across all concurrent requests)
Transfer rate: 379.30 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 41 5.4 41 107
Waiting: 1 41 5.2 40 97
Total: 4 41 5.3 41 107
Percentage of the requests served within a certain time (ms)
50% 41
66% 41
75% 42
80% 43
90% 46
95% 50
98% 56
99% 59
100% 107 (longest request)
很不好理解吧,那我把數據直接整理下:
1 測試用時:
Time taken for tests: 2.805 seconds #gevent
Time taken for tests: 4.119 seconds #paste 花費時間更長
2 每秒請求數:
Requests per second: 3564.90 [#/sec] (mean) #gevent的嘛,每秒請求數大的多
Requests per second: 2427.52 [#/sec] (mean) #paste
3 每請求數耗時:
Time per request: 28.051 [ms] (mean) #gevent耗時少
Time per request: 0.281 [ms] (mean, across all concurrent requests) #gevent並發請求時耗時少
Time per request: 41.194 [ms] (mean) #paste
Time per request: 0.412 [ms] (mean, across all concurrent requests) #paste
4 傳輸效率:
Transfer rate: 448.26 [Kbytes/sec] received #gevent的效率更高
Transfer rate: 379.30 [Kbytes/sec] received #paste
5 連接消耗的時間的分解:
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 28 15.1 27 69
Waiting: 1 28 15.1 27 69
Total: 2 28 15.1 27 69
Connection Times (ms) #paste
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 41 5.4 41 107
Waiting: 1 41 5.2 40 97
Total: 4 41 5.3 41 107 #明顯其中最大用時107/97都大於gevent的69ms,最小用時gevent略強
6 整個場景中所有請求的響應情況。在場景中每個請求都有一個響應時間
Percentage of the requests served within a certain time (ms) #gevent
50% 29
66% 31
75% 34
80% 34
90% 36
95% 38
98% 42
99% 44
100% 71 (longest request)
可以這樣理解:50%用戶效應小於29ms,60%用戶響應小於31ms,最長的訪問響應為71ms
Percentage of the requests served within a certain time (ms) #paste
50% 41
66% 41
75% 42
80% 43
90% 46
95% 50
98% 56
99% 59
100% 107 (longest request) #很明顯,無論那個區間,paste性能都略差
4 長輪詢
import gevent from gevent.queue import Queue, Empty from gevent.pywsgi import WSGIServer import json data_source = Queue() def producer(): while True: data_source.put_nowait('Hello World') #往隊列非阻塞的放入數據 gevent.sleep(1) def ajax_endpoint(environ, start_response): status = '200 OK' headers = [ ('Content-Type', 'application/json') #設定網絡文件的類型是json ] try: datum = data_source.get(timeout=5) except Empty: datum = [] #假如gevent.sleep的時間設置的長一些(比如5s),在不停刷新過程中會獲得空列表 start_response(status, headers) return json.dumps(datum) #返回數據,打印出來的數據是一個帶引號的字符串 gevent.spawn(producer) WSGIServer(('', 8000), ajax_endpoint).serve_forever()
4 聊天室(源碼在這里https://github.com/sdiehl/minichat.git):
from gevent import monkey
monkey.patch_all() #給模塊打包 from flask import Flask, render_template, request, json #作者在這里使用了flask框架,當然你也可以用其它比如django.tornado,bottle等 from gevent import queue from gevent.pywsgi import WSGIServer app = Flask(__name__) app.debug = True class Room(object): def __init__(self): self.users = set() self.messages = [] def backlog(self, size=25): return self.messages[-size:] def subscribe(self, user): self.users.add(user) def add(self, message): for user in self.users: print user user.queue.put_nowait(message) self.messages.append(message) class User(object): def __init__(self): self.queue = queue.Queue() rooms = { 'python': Room(), 'django': Room(), } users = {} @app.route('/') #flask指定url的處理使用路由的方式,訪問頁面地址根目錄就會執行choose_name def choose_name(): return render_template('choose.html') #然后調用模板choose.html,這個html文件最后使用了GET方法提交了一個uid頁面(/<uid>) @app.route('/<uid>') #請求被轉到了這里 def main(uid): return render_template('main.html', #調用模板提供幾個room的連接 uid=uid, rooms=rooms.keys() #格局選擇的連接,通過GET方法轉到那個相應url:/<room>/<uid> ) @app.route('/<room>/<uid>') #請求被轉到了這里 def join(room, uid): user = users.get(uid, None) if not user: users[uid] = user = User() active_room = rooms[room] active_room.subscribe(user) print 'subscribe', active_room, user messages = active_room.backlog() return render_template('room.html', #room.html包含一個POST提交方式,把你的聊天數據提交,並且更新頁面(通過jquery的ajax調用url/poll/<uid>) room=room, uid=uid, messages=messages) @app.route("/put/<room>/<uid>", methods=["POST"]) #通過這個url def put(room, uid): user = users[uid] room = rooms[room] message = request.form['message'] room.add(':'.join([uid, message])) return '' @app.route("/poll/<uid>", methods=["POST"]) def poll(uid): try: msg = users[uid].queue.get(timeout=10) except queue.Empty: msg = [] return json.dumps(msg) #返回隊列中包含的聊天記錄 if __name__ == "__main__": http = WSGIServer(('', 5000), app) http.serve_forever()
來一個更復雜帶有前台后端的模型(例子來自http://blog.pythonisito.com/2011/07/gevent-zeromq-websockets-and-flot-ftw.html):
源碼在:http://dl.dropbox.com/u/24086834/blog/20110723/zmq_websocket.tar.gz
其中需要修改graph.js第二行:
var ws = new WebSocket(“ws://localhost:9999/test”);
為:
var ws = new MozWebSocket(“ws://localhost:9999/test”); #因為我的火狐用的websocket不同
這個demo.py,我來解析下:
import os
import time import math import json import webbrowser import paste.urlparser #paste是一個WSGI工具包,在WSGI的基礎上包裝了幾層,讓應用管理和實現變得方便 import gevent from gevent_zeromq import zmq from geventwebsocket.handler import WebSocketHandler #基於gevent的pywsgi的WebSocket的處理程序 def main(): #主方法 context = zmq.Context() gevent.spawn(zmq_server, context) #上個例子使用joinall,這個例子是spawn+start,context是參數,也就是實例化的GreenContext ws_server = gevent.pywsgi.WSGIServer( ('', 9999), WebSocketApp(context), handler_class=WebSocketHandler) http_server = gevent.pywsgi.WSGIServer( ('', 8000), paste.urlparser.StaticURLParser(os.path.dirname(__file__))) # paste.urlparser用來處理url和靜態文件 http_server.start() #啟動grennlet實例 ws_server.start() webbrowser.open('http://localhost:8000/graph.html') #啟動瀏覽器看這個頁面,當正常啟動后js會畫圖 zmq_producer(context) def zmq_server(context): sock_incoming = context.socket(zmq.SUB) sock_outgoing = context.socket(zmq.PUB) sock_incoming.bind('tcp://*:5000') #發布綁定 sock_outgoing.bind('inproc://queue') #訂閱綁定,本地(通過內存)進程(線程間)通信傳輸 sock_incoming.setsockopt(zmq.SUBSCRIBE, "") #這里表示對發布的所有信息都訂閱 while True: msg = sock_incoming.recv() sock_outgoing.send(msg) class WebSocketApp(object): def __init__(self, context): self.context = context def __call__(self, environ, start_response): ws = environ['wsgi.websocket'] sock = self.context.socket(zmq.SUB) sock.setsockopt(zmq.SUBSCRIBE, "") #訂閱所有信息 sock.connect('inproc://queue') #websocket連接到訂閱的地址 while True: msg = sock.recv() ws.send(msg) def zmq_producer(context): #發布的方法 socket = context.socket(zmq.PUB) socket.connect('tcp://127.0.0.1:5000') #綁定到發布的socket while True: x = time.time() * 1000 y = 2.5 * (1 + math.sin(x / 500)) socket.send(json.dumps(dict(x=x, y=y))) #往發布socket發送數據,這樣,數據會被inproc://queue訂閱,而被websocket獲取,根據數據展示 gevent.sleep(0.05) if __name__ == '__main__': main() from:http://www.dongwm.com/archives/guanyugeventdeyixielijieer/