關於gevent的一些理解(二)


3 實際應用

1 zeromq和gevent:

zeromq的介紹請參看:http://www.infoq.com/cn/news/2010/09/introduction-zero-mq

假設你已經安裝了zeromq,gevent_zeromq(https://github.com/traviscline/gevent-zeromq.git)和pyzmq

一個很基礎的例子:

 

import gevent
from gevent_zeromq import zmq

# Global Context
context = zmq.Context() #它是GreenContext的一個簡寫,確保greenlet化socket

def server():
    server_socket = context.socket(zmq.REQ) #創建一個socket,使用mq類型模式REQ/REP(請求/回復,服務器是請求),還有PUB/SUB(發布/訂閱),push/pull等
    server_socket.bind("tcp://127.0.0.1:5000") #綁定socket

    for request in range(1,10):
        server_socket.send("Hello")
        print('Switched to Server for ', request)
        server_socket.recv()  #這里發生上下文切換

def client():
    client_socket = context.socket(zmq.REP)  (客戶端是回復)
    client_socket.connect("tcp://127.0.0.1:5000")  #連接server的socket端口

    for request in range(1,10):

        client_socket.recv()
        print('Switched to Client for ', request)
        client_socket.send("World")

publisher = gevent.spawn(server)
client    = gevent.spawn(client)

gevent.joinall([publisher, client])

執行結果:

[root@248_STAT ~]# python test.py
(‘Switched to Server for ‘, 1)
(‘Switched to Client for ‘, 1)
(‘Switched to Server for ‘, 2)
(‘Switched to Client for ‘, 2)
(‘Switched to Server for ‘, 3)
(‘Switched to Client for ‘, 3)
(‘Switched to Server for ‘, 4)
(‘Switched to Client for ‘, 4)
(‘Switched to Server for ‘, 5)
(‘Switched to Client for ‘, 5)
(‘Switched to Server for ‘, 6)
(‘Switched to Client for ‘, 6)
(‘Switched to Server for ‘, 7)
(‘Switched to Client for ‘, 7)
(‘Switched to Server for ‘, 8)
(‘Switched to Client for ‘, 8)
(‘Switched to Server for ‘, 9)
(‘Switched to Client for ‘, 9)

2 telnet 服務器

from gevent.server import StreamServer #StreamServer是一個通用的TCP服務器

def handle(socket, address):
    socket.send("Hello from a telnet!n")
    for i in range(5):
        socket.send(str(i) + 'n') #給socket客戶端發送數據
    socket.close() #關閉客戶端連接

server = StreamServer(('127.0.0.1', 5000), handle) #當出現連接調用定義的方法handle
server.serve_forever()
 

執行結果:

dongwm@localhost ~ $ nc 127.0.0.1 5000
Hello from a telnet!
0
1
2
3
4
dongwm@localhost ~ $ telnet 127.0.0.1 5000
Trying 127.0.0.1…
Connected to 127.0.0.1.
Escape character is ‘^]’.
Hello from a telnet!
0
1
2
3
4
Connection closed by foreign host.
3 wsgi服務器

from gevent.wsgi import WSGIServer

def application(environ, start_response):
    status = '200 OK' #頁面狀態指定為200 ok
    body = '<p>Hello World</p>'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever() #啟動一個占用8000端口的wsgi服務器

from gevent.pywsgi import WSGIServer #使用pywsgi可以我們自己定義產生結果的處理引擎
def application(environ, start_response):
    status = '200 OK'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    yield "<p>Hello" #yield出數據
    yield "World</p>"

WSGIServer(('', 8000), application).serve_forever()

我們看一個用ab(Apache Benchmark)的性能測試(更多信息請查看http://nichol.as/benchmark-of-python-web-servers),我這里只

對比了gevent和paste的性能比(沒做系統優化,只是在同樣條件下看性能差距):

paste的wsgi程序:

from gevent.wsgi import WSGIServer

def application(environ, start_response):
    status = '200 OK'
    body = '<p>Hello World</p>'

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

#WSGIServer(('', 8000), application).serve_forever()
from paste import httpserver
httpserver.serve(application, '0.0.0.0', request_queue_size=500)
 

dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8000/ #gevent的性能,條件是:並發100,請求1W
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests

Server Software:
Server Hostname: 127.0.0.1
Server Port: 8000

Document Path: /
Document Length: 18 bytes

Concurrency Level: 100
Time taken for tests: 2.805 seconds
Complete requests: 10000
Failed requests: 0
Write errors: 0
Total transferred: 1380000 bytes
HTML transferred: 180000 bytes
Requests per second: 3564.90 [#/sec] (mean)
Time per request: 28.051 [ms] (mean)
Time per request: 0.281 [ms] (mean, across all concurrent requests)
Transfer rate: 480.43 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 28 15.1 27 69
Waiting: 1 28 15.1 27 69
Total: 2 28 15.1 27 69

Percentage of the requests served within a certain time (ms)
50% 27
66% 35
75% 40
80% 42
90% 48
95% 54
98% 59
99% 62
100% 69 (longest request)

dongwm@localhost ~ $ /usr/sbin/ab2 -n 10000 -c 100 http://127.0.0.1:8080/ #paste的性能
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests

Server Software: PasteWSGIServer/0.5
Server Hostname: 127.0.0.1
Server Port: 8080

Document Path: /
Document Length: 18 bytes

Concurrency Level: 100
Time taken for tests: 4.119 seconds
Complete requests: 10000
Failed requests: 0
Write errors: 0
Total transferred: 1600000 bytes
HTML transferred: 180000 bytes
Requests per second: 2427.52 [#/sec] (mean)
Time per request: 41.194 [ms] (mean)
Time per request: 0.412 [ms] (mean, across all concurrent requests)
Transfer rate: 379.30 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 41 5.4 41 107
Waiting: 1 41 5.2 40 97
Total: 4 41 5.3 41 107

Percentage of the requests served within a certain time (ms)
50% 41
66% 41
75% 42
80% 43
90% 46
95% 50
98% 56
99% 59
100% 107 (longest request)

很不好理解吧,那我把數據直接整理下:

1 測試用時:

Time taken for tests: 2.805 seconds #gevent

Time taken for tests: 4.119 seconds #paste 花費時間更長
2 每秒請求數:

Requests per second: 3564.90 [#/sec] (mean) #gevent的嘛,每秒請求數大的多
Requests per second: 2427.52 [#/sec] (mean) #paste

3 每請求數耗時:

Time per request: 28.051 [ms] (mean) #gevent耗時少
Time per request: 0.281 [ms] (mean, across all concurrent requests) #gevent並發請求時耗時少
Time per request: 41.194 [ms] (mean) #paste
Time per request: 0.412 [ms] (mean, across all concurrent requests) #paste

4 傳輸效率:

Transfer rate: 448.26 [Kbytes/sec] received #gevent的效率更高
Transfer rate: 379.30 [Kbytes/sec] received #paste

5 連接消耗的時間的分解:

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 28 15.1 27 69
Waiting: 1 28 15.1 27 69
Total: 2 28 15.1 27 69

Connection Times (ms) #paste
min mean[+/-sd] median max
Connect: 0 0 0.2 0 2
Processing: 2 41 5.4 41 107
Waiting: 1 41 5.2 40 97
Total: 4 41 5.3 41 107 #明顯其中最大用時107/97都大於gevent的69ms,最小用時gevent略強

6 整個場景中所有請求的響應情況。在場景中每個請求都有一個響應時間

Percentage of the requests served within a certain time (ms) #gevent
50% 29
66% 31
75% 34
80% 34
90% 36
95% 38
98% 42
99% 44
100% 71 (longest request)

可以這樣理解:50%用戶效應小於29ms,60%用戶響應小於31ms,最長的訪問響應為71ms
Percentage of the requests served within a certain time (ms) #paste
50% 41
66% 41
75% 42
80% 43
90% 46
95% 50
98% 56
99% 59
100% 107 (longest request) #很明顯,無論那個區間,paste性能都略差

4 長輪詢

import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import json

data_source = Queue()

def producer():
    while True:
        data_source.put_nowait('Hello World') #往隊列非阻塞的放入數據
        gevent.sleep(1)

def ajax_endpoint(environ, start_response):
    status = '200 OK'
    headers = [
        ('Content-Type', 'application/json') #設定網絡文件的類型是json
    ]
    try:
        datum = data_source.get(timeout=5)
    except Empty:
        datum = [] #假如gevent.sleep的時間設置的長一些(比如5s),在不停刷新過程中會獲得空列表

    start_response(status, headers)
    return json.dumps(datum) #返回數據,打印出來的數據是一個帶引號的字符串

gevent.spawn(producer)

WSGIServer(('', 8000), ajax_endpoint).serve_forever()
 

4 聊天室(源碼在這里https://github.com/sdiehl/minichat.git):

 

from gevent import monkey

monkey.patch_all() #給模塊打包
from flask import Flask, render_template, request, json #作者在這里使用了flask框架,當然你也可以用其它比如django.tornado,bottle等

from gevent import queue
from gevent.pywsgi import WSGIServer

app = Flask(__name__)
app.debug = True

class Room(object):

    def __init__(self):
        self.users = set()
        self.messages = []

    def backlog(self, size=25):
        return self.messages[-size:]

    def subscribe(self, user):
        self.users.add(user)

    def add(self, message):
        for user in self.users:
            print user
            user.queue.put_nowait(message)
        self.messages.append(message)

class User(object):

    def __init__(self):
        self.queue = queue.Queue()

rooms = {
    'python': Room(),
    'django': Room(),
}

users = {}

@app.route('/') #flask指定url的處理使用路由的方式,訪問頁面地址根目錄就會執行choose_name
def choose_name():
    return render_template('choose.html') #然后調用模板choose.html,這個html文件最后使用了GET方法提交了一個uid頁面(/<uid>)

@app.route('/<uid>') #請求被轉到了這里
def main(uid):
    return render_template('main.html', #調用模板提供幾個room的連接
        uid=uid,
        rooms=rooms.keys() #格局選擇的連接,通過GET方法轉到那個相應url:/<room>/<uid>
    )

@app.route('/<room>/<uid>') #請求被轉到了這里
def join(room, uid):
    user = users.get(uid, None)

    if not user:
        users[uid] = user = User()

    active_room = rooms[room]
    active_room.subscribe(user)
    print 'subscribe', active_room, user

    messages = active_room.backlog()

    return render_template('room.html', #room.html包含一個POST提交方式,把你的聊天數據提交,並且更新頁面(通過jquery的ajax調用url/poll/<uid>)
        room=room, uid=uid, messages=messages)

@app.route("/put/<room>/<uid>", methods=["POST"]) #通過這個url
def put(room, uid):
    user = users[uid]
    room = rooms[room]

    message = request.form['message']
    room.add(':'.join([uid, message]))

    return ''

@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):
    try:
        msg = users[uid].queue.get(timeout=10)
    except queue.Empty:
        msg = []
    return json.dumps(msg) #返回隊列中包含的聊天記錄

if __name__ == "__main__":
    http = WSGIServer(('', 5000), app)
    http.serve_forever()

來一個更復雜帶有前台后端的模型(例子來自http://blog.pythonisito.com/2011/07/gevent-zeromq-websockets-and-flot-ftw.html):

源碼在:http://dl.dropbox.com/u/24086834/blog/20110723/zmq_websocket.tar.gz

其中需要修改graph.js第二行:

var ws = new WebSocket(“ws://localhost:9999/test”);

為:

var ws = new MozWebSocket(“ws://localhost:9999/test”); #因為我的火狐用的websocket不同

這個demo.py,我來解析下:

 

import os

import time
import math
import json
import webbrowser

import paste.urlparser #paste是一個WSGI工具包,在WSGI的基礎上包裝了幾層,讓應用管理和實現變得方便

import gevent
from gevent_zeromq import zmq
from geventwebsocket.handler import WebSocketHandler #基於gevent的pywsgi的WebSocket的處理程序

def main(): #主方法
    context = zmq.Context()
    gevent.spawn(zmq_server, context) #上個例子使用joinall,這個例子是spawn+start,context是參數,也就是實例化的GreenContext
    ws_server = gevent.pywsgi.WSGIServer(
        ('', 9999), WebSocketApp(context),
        handler_class=WebSocketHandler)
    http_server = gevent.pywsgi.WSGIServer(
        ('', 8000),
        paste.urlparser.StaticURLParser(os.path.dirname(__file__))) # paste.urlparser用來處理url和靜態文件
    http_server.start()  #啟動grennlet實例
    ws_server.start()
    webbrowser.open('http://localhost:8000/graph.html') #啟動瀏覽器看這個頁面,當正常啟動后js會畫圖
    zmq_producer(context)

def zmq_server(context):
    sock_incoming = context.socket(zmq.SUB)
    sock_outgoing = context.socket(zmq.PUB)
    sock_incoming.bind('tcp://*:5000') #發布綁定
    sock_outgoing.bind('inproc://queue') #訂閱綁定,本地(通過內存)進程(線程間)通信傳輸
    sock_incoming.setsockopt(zmq.SUBSCRIBE, "") #這里表示對發布的所有信息都訂閱
    while True:
        msg = sock_incoming.recv()
        sock_outgoing.send(msg)

class WebSocketApp(object):

    def __init__(self, context):
        self.context = context

    def __call__(self, environ, start_response):
        ws = environ['wsgi.websocket']
        sock = self.context.socket(zmq.SUB)
        sock.setsockopt(zmq.SUBSCRIBE, "") #訂閱所有信息
        sock.connect('inproc://queue') #websocket連接到訂閱的地址
        while True:
            msg = sock.recv()
            ws.send(msg)

def zmq_producer(context):  #發布的方法
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://127.0.0.1:5000') #綁定到發布的socket

    while True:
        x = time.time() * 1000
        y = 2.5 * (1 + math.sin(x / 500))
        socket.send(json.dumps(dict(x=x, y=y))) #往發布socket發送數據,這樣,數據會被inproc://queue訂閱,而被websocket獲取,根據數據展示
        gevent.sleep(0.05)

if __name__ == '__main__':
    main()

from:http://www.dongwm.com/archives/guanyugeventdeyixielijieer/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM