簡單介紹
- gevent 基本概念:
調度器: hub
上下文切換管理: switch
主循環: loop
協程: greenlet - gevent 特性:
優點:
高效,實現簡單,易維護
缺點:
和go不一樣,並不是python原生支持的功能,所以使用起來難免會踩一些坑,但是由於並不是實現方式有問題,所以存在着點弊病並沒有什么問題。
開始使用
- 什么情況下可以使用:
足夠了解自己服務所使用的io, 寫的代碼足夠規范,否則會出現出了問題都不知道那里有問題。
需要並行化的代碼不能阻塞時間過長,否則沒有意義。
存在大量的io, 前提是可以變成非阻塞的,否則不能滿足條件2。 由於發現api的代碼滿足以上三條,所以開始進行優化:
第一版代碼如下:
class
ParallelTask(
object
):
def
__init__(
self
, timeout
=
5
):
# 覆蓋python原生的socket
gevent.monkey.patch_socket()
self
.timeout
=
timeout
self
.task
=
[]
def
taskAppend(
self
, task,
*
args):
self
.task.append(gevent.spawn(task,
*
args))
def
run(
self
):
errno, err_msg
=
get_error(AwemeStatus.SUCCESS)
try
:
gevent.joinall(
self
.task, timeout
=
self
.timeout)
return
errno, err_msg
except
Exception, ex:
logger.exception(
"[ParallelTask] run task fail error=%s"
%
ex)
errno, err_msg
=
get_error(AwemeStatus.REE_PARALLEL_TASK)
return
errno, err_msg
# handler
def
__getAwemeList(
self
, category_id, req_type):
pass
# 創建一個並行處理的任務
paralle_obj
=
ParallelTask()
paralle_obj.taskAppend(
self
.__getAwemeList, category_id, req_type)
errno, err_msg
=
paralle_obj.run()
代碼看似沒有問題,自測也沒發現什么問題。
- 上線觀察:
果不其然,出問題了。。。
錯誤發生在 challengeDetail 接口, 也是奇怪,上的明明是category 接口。
不過看錯誤知道應該是掉RPC沒有成功, 查看RPC的日志可以看到全是fail。 - 分析問題:
首先RPC失敗肯定是由於使用gevent 引起的。但是我在這category使用gevent 為啥會影響challengeDetail 呢。
觀察最上面的 ParallelTask 代碼,里面使用了monkey patch , 沒錯就是這個東西,重寫了python原生socket ,使其支持了非阻塞io。
所以在沒有並行化的代碼里使用RPC調用,里面使用的socket變成了非阻塞的,而這些調用不受hup控制,所以肯定會出現調用失敗的情況。 解決:
解決辦法很簡單,在用完monkey patch 之后恢復原生的socket不久好了。 gevent 沒有提供接口恢復,所以自己實現了下。def
repatching(item):
try
:
# 需要重新寫回的module
module
=
__import__
(item)
# 需要重寫的屬性
saved
=
gevent.monkey.saved
mapper
=
saved.get(item, {})
for
attr
in
mapper:
old_value
=
mapper.get(attr)
if
not
old_value:
continue
setattr
(module, attr, old_value)
except
Exception, ex:
logger.exception(
"[gevent] repatching fail error=%s"
%
ex)
class
ParallelTask(
object
):
def
__init__(
self
, timeout
=
5
):
# 覆蓋python原生的socket, 使用完記得repatching 不然會有未知錯誤
gevent.monkey.patch_socket()
self
.timeout
=
timeout
self
.task
=
[]
def
taskAppend(
self
, task,
*
args):
self
.task.append(gevent.spawn(task,
*
args))
def
run(
self
):
errno, err_msg
=
get_error(AwemeStatus.SUCCESS)
try
:
gevent.joinall(
self
.task, timeout
=
self
.timeout)
return
errno, err_msg
except
Exception, ex:
logger.exception(
"[ParallelTask] run task fail error=%s"
%
ex)
errno, err_msg
=
get_error(AwemeStatus.REE_PARALLEL_TASK)
return
errno, err_msg
finally
:
# 使用完非阻塞的網絡io之后一定要改回來
repatching(
'socket'
)
- 線上觀察:
bug 修復之后線上沒有新的問題爆出來,至今穩定運行着, 本來打算用go優化的接口看樣子也不需要了, 並行化之后接口的時延由3s 降低到了300ms, 降低的幅度也符合預期 (30個rpc並行)