背景
線上有一個相關百科的服務,返回一個query中提及的百科詞條。該服務是用python實現的,以前通過thrift接口訪問,現要將其改為通過HTTP訪問。之前沒有搭建HTTPServer的經驗,因此想用python的web Framework來做這件事,於是有了下面的工作。第一部分是框架選擇,這一部分沒有太仔細考慮,只是大概看了一些文章。第二部分是根據所需要的功能,學習及測試在框架上應該如何實現。第三部分是實際的代碼。第四部分是下一步的學習。
框架選擇
python有很多開源的web framework。從知乎上找了幾篇綜述型的簡介,大體包括:Django、Bottle、Flask、web2py、Tornado。看中了介紹中提及Tornado的速度與並發量,於是打算用tornado來實現。所以按目前的了解,或許Tornado並非實現本工作的最佳方案,只是一個可行方案。
學習與測試
用tornado開發web服務的基本流程
tornado具有web framework的功能,因此用它開發web服務非常方便:
- 實現處理請求的Handler,該類繼承自
tornado.web.RequestHandler
,實現用於處理請求的對應方法如:get、post等。返回內容用self.write
方法輸出。- 實例化一個Application。構造函數的參數是一個Handlers列表,通過正則表達式,將請求與Handler對應起來。通過dict將Handler需要的其他對象以參數的方式傳遞給Handler的initialize方法。
- 初始化一個
tornado.httpserver.HTTPServer
對象,構造函數的參數是上一步的Application對象。- 為HTTPServer對象綁定一個端口。
- 開始IOLoop。
原服務的特點
原服務是一個內存占用大,IO密集,計算量適中的服務。
- 內存占用大。需要加載一個比較大的詞表,其中每個詞對應一個id列表,這一部分是C++實現的,通過boost.python封裝為python可調用的so。原服務單進程占用內存超過5G。
- IO密集。計算過程中大量訪問redis讀取term及baikeid的屬性信息,用於過濾及rank計算。也訪問在線分詞服務,獲取各term的NLP分析。
- 計算量適中。划詞匹配、rank計算有一定計算量,但是總體來看計算量不是特別大。python單進程每天500多萬的訪問量,單CPU利用率也就40%-50%之間。
關於服務的分析:
- 內存占用大。內存占用大,但絕大部分是只讀的。不適合獨立啟動多個進程,適合多線程或用子進程。
- IO密集。適合將IO操作都變為異步請求,或者用多線程模型。
- 計算量適中。由於python解釋器使用GIL,多線程只能提高IO的並發能力,不能提高計算的並發能力。因此可以考慮通過子進程的方式,適當增加提供服務的進程數,提高整個系統服務能力的上限。
需要用到的特性
由於tornado的亮點是異步請求,所以這里首先想到的是將所有請求都改造為異步的。但是這里遇到一個問題,就是異步函數內一定不能有阻塞調用出現,否則整個IOLoop都會被卡住。這就要求徹底地去改造服務,將所有IO或是用時較長的請求都改造為異步函數。這個工程量是非常大的,需要去修改已有的代碼。因此,我們考慮用線程池的方式去實現。當一個線程阻塞在某個請求或IO時,其他線程或IOLoop會繼續執行。
另外一個瓶頸就是GIL限制了CPU的並發數量,因此考慮用子進程的方式增加進程數,提高服務能力上限。
綜合上面的分析,大致用以下方案:
- 通過子進程的方式復制多個進程,使子進程中的只讀頁指向同一個物理頁。
- 線程池。回避異步改造的工作量,增加IO的並發量。
測試代碼
首先測試線程池,測試用例為:
對sleep頁面同時發出兩個請求:
- 在線程池中運行的函數(這里是
self.block_task
)能夠同時執行。表現為在控制台交替打印出數字。- 兩個get請求幾乎同時返回,在瀏覽器上顯示返回的內容。
線程池的測試代碼如下:
import os
import sys
import time
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
from tornado.options import define, options
class HasBlockTaskHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(20) #起線程池,由當前RequestHandler持有
@tornado.gen.coroutine
def get(self):
strTime = time.strftime("%Y-%m-%d %H:%M:%S")
print "in get before block_task %s" % strTime
result = yield self.block_task(strTime)
print "in get after block_task"
self.write("%s" % (result))
@run_on_executor
def block_task(self, strTime):
print "in block_task %s" % strTime
for i in range(1, 16):
time.sleep(1)
print "step %d : %s" % (i, strTime)
return "Finish %s" % strTime
if __name__ == "__main__":
tornado.options.parse_command_line()
app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
http_server = tornado.httpserver.HTTPServer(app)
http_server.bind(8888)
tornado.ioloop.IOLoop.instance().start()
整個代碼里有幾個位置值得關注:
executor = ThreadPoolExecutor(20)
。這是給Handler類初始化了一個線程池。其中concurrent.futures
不屬於tornado,是python的一個獨立模塊,在python3中是內置模塊,python2.7需要自己安裝。- 修飾符
@run_on_executor
。這個修飾符將同步函數改造為在executor(這里是線程池)上運行的異步函數,內部實現是將被修飾的函數submit到executor,返回一個Future對象。- 修飾符
@tornado.gen.coroutine
。被這個修飾符修飾的函數,是一個以同步函數方式編寫的異步函數。原本通過callback方式編寫的異步代碼,有了這個修飾符,可以通過yield一個Future的方式來寫。被修飾的函數在yield了一個Future對象后將會被掛起,Future對象的結果返回后繼續執行。
運行代碼后,在兩個不同瀏覽器上訪問sleep頁面,得到了想要的效果。這里有一個小插曲,就是如果在同一瀏覽器的兩個tab上進行測試,是無法看到想要的效果。第二個get請求會被block,直到第一個get請求返回,服務端才開始處理第二個get請求。這讓我一度覺得多線程沒有生效,用了半天時間查了很多資料,才看到是瀏覽器把相同的第二個請求block了,具體鏈接參考這里。
由於tornado很方便地支持多進程模型,多進程的使用要簡單很多,在以上例子中,只需要對啟動部分稍作改動即可。具體代碼如下所示:
if __name__ == "__main__":
tornado.options.parse_command_line()
app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
http_server = tornado.httpserver.HTTPServer(app)
http_server.bind(8888)
print tornado.ioloop.IOLoop.initialized()
http_server.start(5)
tornado.ioloop.IOLoop.instance().start()
需要注意的地方有兩點:
app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
,在生成Application對象時,要將autoreload和debug兩個參數至為False。也就是需要保證在fork子進程之前IOLoop是未被初始化的。這個可以通過tornado.ioloop.IOLoop.initialized()
函數來跟。http_server.start(5)
在啟動IOLoop之前通過start函數設置進程數量,如果設置為0表示每個CPU都啟動一個進程。
最后的效果是可以看到n+1個進程在運行,且公用同一個端口。
實際代碼
大部分邏輯代碼是封裝好的,服務的代碼如下:
import os
import sys
import json
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.httpclient
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
from tornado.options import define, options
import rela_baike_server
from rela_baike_server import RelaBaikeRequest, RelaBaikeResult, RelaBaikeServer
import logging
from logging.handlers import TimedRotatingFileHandler
logging.basicConfig()
import pdb
g_log_prefix = '../log/rela_baike_tornado.'
def getLogger(strPrefixBase):
strPrefix = "%s%d" % (strPrefixBase, os.getpid())
logger = logging.getLogger("RELA_BAIKE")
logger.propagate = False
handler = TimedRotatingFileHandler(strPrefix, 'H', 1)
handler.suffix = "%Y%m%d_%H%M%S.log"
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def makeResponseBody(retCode, errReason, dicSummary):
dicRes = {}
dicRes['retCode'] = retCode
if retCode != 0:
dicRes['error'] = errReason
else:
dicRes['data'] = dicSummary
return json.dumps(dicRes)
class RelaBaikeHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(50)
def initialize(self, relaServer, logger):
self.__serverRelaBaike = relaServer
self.__logger = logger
@tornado.gen.coroutine
def get(self):
lstSummary = []
retCode = 0
errReason = ""
try:
utfQuery = self.get_argument('query').encode('utf8').strip()
except:
errorReason = 'Query encoding not utf-8.'
strRes = makeResponseBody(-1, errorReason, lstSummary)
self.write(strRes)
return
if utfQuery == "":
strRes = makeResponseBody(0, '', lstSummary)
self.write(strRes)
return
error, errReason, lstSummary = yield self.getRelaBaike(utfQuery)
strRes = makeResponseBody(error, errReason, lstSummary)
self.write(strRes)
def __logResponse(self, utfQuery, relaResult):
succ = relaResult.isSuccess()
if succ:
self.__logger.info("%s\tSucc\t%s" % (utfQuery, "|".join([str(item[0]) for item in relaResult])))
else:
self.__logger.info("%s\tError:%d" % (utfQuery, relaResult.getError()))
@run_on_executor
def getRelaBaike(self, utfQuery):
error = 0
lstSummary = []
relaBaikeRequest = RelaBaikeRequest(content=utfQuery)
relaBaikeResult = self.__serverRelaBaike.getRelaBaike(relaBaikeRequest)
self.__logResponse(utfQuery, relaBaikeResult)
if relaBaikeResult.isSuccess():
for item in relaBaikeResult:
baikeid = item[0]
try:
dicSummary = json.loads(item[1])
except:
return -2, 'summary format error' ,lstSummary
lstSummary.append(dicSummary)
else:
return relaBaikeResult.getError(), rela_baike_server.g_dic_error.get(relaBaikeResult.getError(), 'other error') ,lstSumm
ary
return 0, 'success',lstSummary
def start():
port = int(sys.argv[1])
serverRelaBaike = rela_baike_server.getRelaBaikeServer()
logger = getLogger(g_log_prefix)
app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))])
http_server = tornado.httpserver.HTTPServer(app)
http_server.bind(port)
http_server.start(2)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
start()
代碼所涉及的特性基本上不超過前面的測試例子,除了下兩幾點:
- 在*Handler類里增加了一個
def initialize(self, relaServer, logger)
函數。這是為了把一些初始化好的對象傳到Handler類里。app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))])
。前面handler的initialize函數參數,對應於Application初始化時,每個handler對應的dict。
小結
至此,已經完成將服務通過HTTP服務封裝的工作。有很多可以去進一步了解的內容:
- 進一步了解tornado的其他特性。
- python的修飾符如何使用。
- WSGI是什么。