應用場景:不斷消費一個容器里面的數據,使用同一個線程池,實現高可用性並減少系統性能開銷;(這里拿redis作為容器來做示范),線程池的使用請查看https://www.cnblogs.com/hoojjack/p/10846010.html。
需求:程序開始前創建一個線程池,然后一直用這個線程池來運行程序,不銷毀這個線程池,盡量高效的使用這個線程池;實現高可用性和節約內存的作用;
線程池開辟一個任務池:在線程池開始接受任務后,它會將所有的任務都放在內存中,然后開啟多個線程來輪詢消費任務,如果不停的往線程池里面塞任務,並且任務減少的速度小於任務增加的速度,那么就會導致任務池里面的任務越來越多,進而導致主機內存占用率越來越多,最終系統將該任務kill掉,所以不能無限制的給線程池添加任務。解決方法:將添加到任務池里面的任務放在一個列表里面,然后循環遍歷[-線程數:],如果這里面有任務顯示done,那么表示這個任務池到了快要消費完了,這個時候給任務池添加任務就可以。
線程池消費原理:將任務都放在一個任務池A里面,然后等線程池里面的線程空閑了就分發給空閑的線程,然后線程的任務執行完畢后繼續接任務;這就要控制一下接受任務的速度,如果無限循環的將任務寫入任務池A里面,那么就會導致系統內存爆炸,例如迭代版本二實例;
主體程序:
import redis, time from concurrent.futures import ThreadPoolExecutor class Redis_demo(object): def __init__(self): self.redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0, decode_responses=True) def consumer(self): redis_client = redis.Redis(connection_pool=self.redis_pool) data = redis_client.rpop("test_qly") return data def List_all(self): lists = [] while True: data = self.consumer() lists.append(data) if len(lists) > 20: return lists def parse(self, data): time.sleep(5) return data + "," def Future(self, future): response = future.result() print(response) def main(self): while True: datas = self.List_all() obj_list = [] threadPool = ThreadPoolExecutor(max_workers=4) for data in datas: future = threadPool.submit(self.parse, data) obj_list.append(future) future.add_done_callback(self.Future) threadPool.shutdown(wait=True)
迭代版本一:
if __name__ == '__main__': cla = Redis_demo() while True: datas = cla.List_all() obj_list = [] threadPool = ThreadPoolExecutor(max_workers=4) for data in datas: future = threadPool.submit(cla.parse, data) obj_list.append(future) future.add_done_callback(cla.Future) threadPool.shutdown(wait=True)
缺點:while循環每次執行的時候都要新創建一個線程池,把任務執行完畢之后再銷毀;下一次的循環再創建一個線程池,沒有實現線程的高可用性;
迭代版本二:
if __name__ == '__main__': cla = Redis_demo() threadPool = ThreadPoolExecutor(max_workers=4) while True: datas = cla.List_all() obj_list = [] for data in datas: future = threadPool.submit(cla.parse, data) obj_list.append(future) future.add_done_callback(cla.Future) # threadPool.shutdown(wait=True) # 在這個例子里面必須要關閉這個,因為執行這個后,線程池就會銷毀掉,那么下一次循環調用線程池的話就會報錯,因為線程池已經銷毀掉了
缺點:這個其實也叫錯誤,在開頭已經說了,再詳細說一遍吧;while循環會一直把任務存進任務池A里面供線程池來消費,但是存任務的速度遠遠比線程池里面消費的速度快,這就導致任務池A里面存放的任務一直在變多,然后系統內存就會爆炸;程序執行一會后系統內存就會占用100%,然后該任務會被系統kill掉。
迭代版本三:
def __init__(self):
self.thread_time = 0
self.times = 0
self.redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0, decode_responses=True)
def Future(self, future):
response = future.result()
print(response)
status = future.done()
if status:
self.times += 1
# 重寫上面兩個方法
if __name__ == '__main__': cla = Redis_demo() threadPool = ThreadPoolExecutor(max_workers=4) while True: datas = cla.List_all() obj_list = [] for data in datas: future = threadPool.submit(cla.parse, data) obj_list.append(future) future.add_done_callback(cla.Future) # 防止線程池里面堆積太多的數據引起內存爆炸 while True: if cla.times > cla.thread_time-1: cla.times = 0 break else: time.sleep(2)
實現原理:用future.done()來判斷所有線程執行的任務數,用全局變量times來累計,如果全局變量times與任務數相同了,代表該任務池A里面所有的任務都執行完了,線程池里面的線程也空閑了,然后再把任務寫入任務池A中,如果全局變量times小於任務數,則進入等待階段,知道所有的任務全部執行完畢;
好處:實現了線程池部分高可用性(一個線程池從頭用到尾)、節約了內存開銷;
缺點:該線程池中如果有一個線程反應慢了,那么其他空閑線程就會等待這個響應慢的線程結束后才會重新拉取任務;嚴重一點,如果一個線程堵塞了,那么就會卡住,因為全局變量times總是小於任務數;還有就是必須要用到(future.add_done_callback(cla.Future)),如果沒有回調函數就沒法統計執行完畢的任務數。
迭代版本四(終極版):
def parse(self, data):
print(threading.current_thread().name) # 打印線程名字,監控是否有線程一直在堵塞狀態
time.sleep(5) return data + ","
# 重寫上面的方法,打印線程號
if __name__ == '__main__': cla = Redis_demo() threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="threadName") # 線程序號前面添加默認名稱,例如:線程名字是_1,添加后就成為threadName_1
while True: datas = cla.List_all() obj_list = [] for data in datas: future = threadPool.submit(cla.parse, data) obj_list.append(future) future.add_done_callback(cla.Future) # 檢測線程池是否空閑了,如果有空閑的就執行新任務,如果所有線程都在執行任務,那么就進入等待階段 while True: status = False for future in obj_list[-4:]: # 這里的4與線程池里面的線程數保持一致,每次的任務數盡量是線程池數量的整數倍 status = future.done() # 查看任務執行狀態,True表示該任務已經執行完畢 print(status, "打印執行狀態") if status: break if status: break else: time.sleep(1) # threadPool.shutdown(wait=True) # 這行代碼其實沒什么實質性作用,因為線程池從程序運行開始就不會停,直到程序停止
實現原理:將所有任務放到一個列表obj_list中,然后不斷的循環驗證左右的任務是否有執行完畢的,去最后4個任務查看一下狀態,如果最后4個里面也有執行完畢的,那么就寫入新的任務到任務池A;
好處:需求里面要求的點基本上全部都實現了;
缺點:嚴格來說,如果有堵塞的線程,那么就會減少工作的線程,所以就要求對程序要有嚴格的測試,保證程序不會出現嚴重堵塞的問題;其次,最好還是打印一下線程名稱,這樣就能更好的監控是否有堵塞的線程;
代碼解決辦法:1、建一個元祖,將線程號存入元祖,然后每隔一段時間檢測一下元祖中元素的數量,如果小於線程數就發出報警信息;2、建一個字典,將每個線程出現的次數計入字典中,如果線程間數量相差太多的話發出報警信息。