一. Gevent實例
import gevent import requests from gevent import monkey # socket發送請求以后就會進入等待狀態,gevent更改了這個機制 # socket.setblocking(False) -->發送請求后就不會等待服務器響應 monkey.patch_all() # 找到內置的socket並更改為gevent自己的東西 def fetch_async(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) print(response.url, response.content) # ##### 發送請求 ##### gevent.joinall([ # 這里spawn是3個任務[實際是3個協程],每個任務都會執行fetch_async函數 gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}), ])
二. grequests實例
grequests實際上就是封裝了gevent里面的方法,然后配合requests實現異步的IO grequests = gevent + request grequests.map() 內部實現
import grequests # 實際上就是requests + gevent request_list = [ # 發送get請求 grequests.get('https://www.baidu.com/', timeout=10.001), grequests.get('https://www.taobao.com/'), grequests.get('https://hao.360.cn/') ] # ##### 執行並獲取響應列表 ##### response_list = grequests.map(request_list) # 實際上內部循環執行gevent內部的joinall()方法 print(response_list) # ##### 執行並獲取響應列表(處理異常) ##### # def exception_handler(request, exception): # print(request,exception) # print("Request failed") # response_list = grequests.map(request_list, exception_handler=exception_handler) # print(response_list)
三. 項目中的應用
def user_batch_import(self, token, file, data): v = self._user_role(token, {}) if "errcode" in v.keys(): return v if 'user[college_id]' not in v: return {"errcode": 2, "errmsg": str(v)} import xlrd show_url = Graphics.upload_image(file, data["context_type"], data["content_type"]) data_path = Graphics.format_nginx_data_url(show_url) book = xlrd.open_workbook(data_path) names = book.sheet_names() res = None for name in names: sheet = book.sheet_by_name(name) nrows = sheet.nrows # 行 if nrows == 0: print(nrows) continue else: q = queue.Queue(maxsize=nrows) threading.Thread(args=(q,)) first_line = sheet.row_values(0) if first_line != ['編號', '姓名', '郵箱', '密碼', '電話', '地址', '角色'] and len(first_line) != 7: return {"errcode": 400, "errmsg": u"不支持此格式順序的輸入;正確格式:['編號', '姓名', '郵箱', '密碼', '電話', '地址', '角色']"} for temp in range(1, nrows): rows = sheet.row_values(temp, start_colx=0, end_colx=None) if rows[6] == "老師": rows[6] = "TeacherEnrollment" elif rows[6] == "學生": rows[6] = "StudentEnrollment" elif rows[6] == "助教": rows[6] = "TaEnrollment" elif rows[6] == "設計者": rows[6] = "DesignerEnrollment" elif rows[6] == "觀察者": rows[6] = "ObserverEnrollment" else: return {"errcode": 400, "errmsg": u"所寫角色不存在,可選:【老師, 學生, 助教, 觀察者, 設計者】"} query = {"user[name]": rows[1], "pseudonym[unique_id]": rows[2], "pseudonym[password]": rows[3], "user[tel]": rows[4], "user[company]": rows[5], "user[time_zone]": "Beijing", "user[locale]": "zh-Hans", "user[terms_of_use]": True, "user[role_name]": rows[6], "user[college_id]": v["user[college_id]"]} q.put(query) res = self._user_batch_import(q) return {"errcode": 0, "errmsg": str(res)} def _user_batch_import(self, q): from app.api.apis import very_email, very_password, very_phone header = {"Authorization": "Bearer %s" % AccessToken} request_res = [] while q.qsize() > 0: query = q.get() if very_email(query["pseudonym[unique_id]"]) and very_phone(query["user[tel]"], "tel") and very_password( query["pseudonym[password]"], "password"): pass else: del query response = grequests.post(API_CANVAS_HOST + '/api/v1/accounts/{account_id}/users'.format(account_id=1), headers=header, data=query) request_res.append(response) res = grequests.map(request_res) return res
