gevent 使用踩坑


簡單介紹

  1. gevent 基本概念:
       調度器: hub
              上下文切換管理: switch
              主循環: loop
       協程: greenlet
  2. gevent 特性:
        優點:
                高效,實現簡單,易維護
        缺點:
                和go不一樣,並不是python原生支持的功能,所以使用起來難免會踩一些坑,但是由於並不是實現方式有問題,所以存在着點弊病並沒有什么問題。

開始使用

  1. 什么情況下可以使用:
        足夠了解自己服務所使用的io,  寫的代碼足夠規范,否則會出現出了問題都不知道那里有問題。
        需要並行化的代碼不能阻塞時間過長,否則沒有意義。
        存在大量的io, 前提是可以變成非阻塞的,否則不能滿足條件2。
  2. 由於發現api的代碼滿足以上三條,所以開始進行優化:
    第一版代碼如下:
           

    class  ParallelTask( object ):
         def  __init__( self , timeout = 5 ):
             # 覆蓋python原生的socket
             gevent.monkey.patch_socket()
             self .timeout  =  timeout
             self .task  =  []
     
         def  taskAppend( self , task,  * args):
             self .task.append(gevent.spawn(task,  * args))
     
         def  run( self ):
             errno, err_msg  =  get_error(AwemeStatus.SUCCESS)
             try :
                 gevent.joinall( self .task, timeout = self .timeout)
                 return  errno, err_msg
             except  Exception, ex:
                 logger.exception( "[ParallelTask] run task fail error=%s"  %  ex)
                 errno, err_msg  =  get_error(AwemeStatus.REE_PARALLEL_TASK)
                 return  errno, err_msg
    # handler
    def  __getAwemeList( self , category_id, req_type):
         pass
      
    # 創建一個並行處理的任務
    paralle_obj  =  ParallelTask()
    paralle_obj.taskAppend( self .__getAwemeList, category_id, req_type)
    errno, err_msg  =  paralle_obj.run()

    代碼看似沒有問題,自測也沒發現什么問題。

  3. 上線觀察:
    果不其然,出問題了。。。

    錯誤發生在 challengeDetail 接口, 也是奇怪,上的明明是category 接口。
    不過看錯誤知道應該是掉RPC沒有成功, 查看RPC的日志可以看到全是fail。
  4. 分析問題:
     首先RPC失敗肯定是由於使用gevent 引起的。但是我在這category使用gevent 為啥會影響challengeDetail 呢。
     觀察最上面的 ParallelTask 代碼,里面使用了monkey patch , 沒錯就是這個東西,重寫了python原生socket ,使其支持了非阻塞io。
     所以在沒有並行化的代碼里使用RPC調用,里面使用的socket變成了非阻塞的,而這些調用不受hup控制,所以肯定會出現調用失敗的情況。
  5. 解決:
    解決辦法很簡單,在用完monkey patch 之后恢復原生的socket不久好了。 gevent 沒有提供接口恢復,所以自己實現了下。

    def  repatching(item):
         try :
             # 需要重新寫回的module
             module  =  __import__ (item)
             # 需要重寫的屬性
             saved  =  gevent.monkey.saved
             mapper  =  saved.get(item, {})
             for  attr  in  mapper:
                 old_value  =  mapper.get(attr)
                 if  not  old_value:
                     continue
                 setattr (module, attr, old_value)
         except  Exception, ex:
             logger.exception( "[gevent] repatching fail error=%s"  %  ex)
     
    class  ParallelTask( object ):
         def  __init__( self , timeout = 5 ):
             # 覆蓋python原生的socket, 使用完記得repatching 不然會有未知錯誤
             gevent.monkey.patch_socket()
             self .timeout  =  timeout
             self .task  =  []
     
         def  taskAppend( self , task,  * args):
             self .task.append(gevent.spawn(task,  * args))
     
         def  run( self ):
             errno, err_msg  =  get_error(AwemeStatus.SUCCESS)
             try :
                 gevent.joinall( self .task, timeout = self .timeout)
                 return  errno, err_msg
             except  Exception, ex:
                 logger.exception( "[ParallelTask] run task fail error=%s"  %  ex)
                 errno, err_msg  =  get_error(AwemeStatus.REE_PARALLEL_TASK)
                 return  errno, err_msg
             finally :
                 # 使用完非阻塞的網絡io之后一定要改回來
                 repatching( 'socket' )
  6. 線上觀察:
    bug 修復之后線上沒有新的問題爆出來,至今穩定運行着, 本來打算用go優化的接口看樣子也不需要了, 並行化之后接口的時延由3s 降低到了300ms, 降低的幅度也符合預期 (30個rpc並行)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM