python異常機制、多進程與PyQt5中的QTimer、多線程


1.異常處理機制

def test(x): try: y = 10 / x print(y) #except Exception as e: #print(e) #可以打印出異常的類型 except ZeroDivisionError: #拋出異常,執行下面的程序,如果是界面軟件可以彈出一個窗口,提示用戶輸入錯誤 print(x) else: #如果程序不存在異常,則執行該程序 print(100 / x) finally: print('the program end') #不管程序是否錯誤,始終都會執行 test(0) # 0 # the program end test(1) # 10.0 # 100.0 # the program end 
def test2(): try: assert False, 'error' except: print('another') # 輸出 another def myLevel(level): if level < 1: raise ValueError('Invalid level', level) #自定義異常 try: myLevel(0) except ValueError: print('level < 1') #輸出 :level < 1 print('test 2--') test2()

2.多進程

引入多進程的庫文件: import multiprocessing 

多進程測試, 需要在主函數 main 中 進行測試
 創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
 方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。
 屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止后自動終止,且自己不能產生新進程,必須在start()之前設置

daemon 是守護進程 :daemon是父進程終止后自動終止,且自己不能產生新進程,必須在start()之前設置。#主線程執行完畢之后,不管子線程是否執行完畢都隨着主線程一起結束。

def worker_1(): print('this is worker 1') time.sleep(3) print('worker 1 end') # n = 3 # while n > 0: # print("The time is {0}".format(time.ctime())) # time.sleep(interval) # n -= 1 def worker_2(): print('this is worker 2') time.sleep(3) print('worker 2 end') def worker_3(): print('this is worker 3') time.sleep(3) print('worker 3 end') if __name__ == "__main__": p1 = multiprocessing.Process(target = worker_1, args = ()) p2 = multiprocessing.Process(target = worker_2, args = ()) p3 = multiprocessing.Process(target = worker_3, args = ()) # p1.daemon = True # 設置子程序為 守護進程, 主程序結束后,它就隨之結束(即如果沒有join函數,將不執行worker1函數), 需要放在start 前面 p1.start() #進程調用 start 的時候,將自動調用 run 函數 p2.start() p3.start() #運行3 個worker 程序花費的時間為3 s, 使用了3 個進程進行處理 p1.join() p2.join() p3.join() #添加 join 函數, 先執行 worker 函數, 再執行主進程函數 for p in multiprocessing.active_children(): #這是主進程函數 print('name is :' + p.name + 'pid is :' + str(p.pid) + 'is _alive: '+ str(p.is_alive())) print('電腦的核數'+ str(multiprocessing.cpu_count())) # 電腦的核數4 print('end----')

輸出:

#---沒有 join 函數---
# name is :Process-1pid is :6488is _alive: True
# name is :Process-2pid is :5660is _alive: True
# name is :Process-3pid is :7776is _alive: True
# 電腦的核數4
# end----
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end    
 
#---有 join 函數---   等待所有子進程結束,再執行主進程
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end
# 電腦的核數4    #此時執行完線程, active_children() 為空
# end----  

使用多個進程訪問共享資源的時候,需要使用Lock 來避免訪問的沖突

def worker_first(lock, f): with lock: with open(f, 'a+') as t: #with 自動關閉文件 n = 3 while n > 0: t.write('first write ---') n -= 1 def worker_second(lock, f): lock.acquire() #獲取鎖 try: with open(f, 'a+') as t: n = 3 while n > 0: t.write('second write ---') n -= 1 finally: lock.release() #使用異常機制,保證最后釋放鎖 def myProcess(): lock = multiprocessing.Lock() f = r'D:\myfile1.txt' p1 = multiprocessing.Process(target=worker_first, args=(lock, f, )) p2 = multiprocessing.Process(target=worker_second, args=(lock, f, )) p1.start() p2.start() print('end -----------') if __name__ == '__main__': myProcess()

輸出:

first write ---first write ---first write ---second write ---second write ---second write ---

進程池

進程池 Pool 
 在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,
 此時可以發揮進程池的功效。Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,
 那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
 
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的

非阻塞和阻塞進程池

單個函數使用線程池:

def func(msg): print('msg is :' + msg) time.sleep(2) print(msg + ' end') return msg + ' re' if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) #創建進程池 result = [] for i in range(4): msg = '%d'%(i) # result.append(pool.apply_async(func, (msg, ))) pool.apply_async(func, (msg, )) #非阻塞進程池, 維持總的進程數為 processes=3, 當一個進程結束后,添加另一個進程進去 # pool.apply(func, (msg, )) #阻塞進程池 print('start------') pool.close() # close 函數需要在 join 函數之前, 關閉進程池后,沒有進程加入到pool中 pool.join() # join函數等待所有子進程結束 print('all program done') # print([re.get() for re in result]) #獲取結果

輸出:

#---非阻塞進程池 輸出------
# start------
# msg is :0
# msg is :1
# msg is :2
# 0 end
# msg is :3
# 1 end
# 2 end
# 3 end
# all program done    
#---阻塞進程池 輸出------ 順序執行
# msg is :0
# 0 end
# msg is :1
# 1 end
# msg is :2
# 2 end
# msg is :3
# 3 end
# start------
# all program done

多個函數使用線程池

def f1(): result = 0 print('this is one') for i in range(100000): result += i return result def f2(): result = 1 print('this is two') for i in range(2, 100000): result *= i return result if __name__ == '__main__': start = time.time() myPool = multiprocessing.Pool(processes=2) result = [] print('start----') myFunc = [f1, f2] for f in myFunc: result.append(myPool.apply_async(f)) #將輸出結果保存到列表中 myPool.close() myPool.join() print([re.get() for re in result]) end = time.time() print('花費的時間: ' + str(end-start)) #huafei de 時間: 8.397480010986328 print('end------')

使用get() 函數,同樣可以獲取pandas 的數據結構;

def f1(): result = 0 print('this is one') result = pd.DataFrame(np.arange(16).reshape(-1, 4)) return result # for i in range(100000): # result += i # return result def f2(): result = 1 print('this is two') result = pd.DataFrame(np.arange(9).reshape(-1, 3)) return result # for i in range(2, 100000): # result *= i # return result if __name__ == '__main__': start = time.time() myPool = multiprocessing.Pool(processes=2) result = [] print('start----') myFunc = [f1, f2] for f in myFunc: result.append(myPool.apply_async(f)) #將輸出結果保存到列表中 myPool.close() myPool.join() for re in result: print(re.get())

 輸出結果:

start----
this is one
this is two
    0   1   2   3
0   0   1   2   3
1   4   5   6   7
2   8   9  10  11
3  12  13  14  15
   0  1  2
0  0  1  2
1  3  4  5
2  6  7  8
花費的時間: 0.9370532035827637
end------

 

3.PyQt5 中的QTimer 模塊

pyqt5中的多線程的應用,多線程技術涉及3種方法,1.使用計時器QTimer, 2.使用多線程模塊QThread ,3.使用事件處理功能
1.使用QTimer模塊,創建QTimer實例,將timeout信號連接到槽函數,並調用timeout 信號。 self.timer.start(1000)時間為毫秒
引入模塊
from PyQt5.QtCore import QTimer, QDateTime

def timerTest(): class WinForm(QWidget): def __init__(self,parent=None): super(WinForm,self).__init__(parent) self.setWindowTitle("QTimer demo") self.listFile= QListWidget() self.label = QLabel('顯示當前時間') self.startBtn = QPushButton('開始') self.endBtn = QPushButton('結束') layout = QGridLayout(self) # 初始化一個定時器 self.timer = QTimer(self) # showTime()方法 self.timer.timeout.connect(self.showTime) # layout.addWidget(self.label,0,0,1,2) layout.addWidget(self.startBtn,1,0) layout.addWidget(self.endBtn,1,1) self.startBtn.clicked.connect( self.startTimer) #首先按下start開始計時,到了2s后,觸發timeout 信號 self.endBtn.clicked.connect( self.endTimer) self.setLayout(layout) QTimer.singleShot(5000, self.showmsg) #給定的時間間隔后,調用一個槽函數來發射信號 def showmsg(self): QMessageBox.information(self, '提示信息', '你好') def showTime(self): # 獲取系統現在的時間 time = QDateTime.currentDateTime() #獲取當前時間 # 設置系統時間顯示格式 timeDisplay = time.toString("yyyy-MM-dd hh:mm:ss dddd") # 在標簽上顯示時間 self.label.setText( timeDisplay ) def startTimer(self): # 設置計時間隔並啟動 self.timer.start(1000) #時間為 1 s self.startBtn.setEnabled(False) self.endBtn.setEnabled(True) def endTimer(self): self.timer.stop() self.startBtn.setEnabled(True) #依次循環 self.endBtn.setEnabled(False) if __name__ == "__main__": app = QApplication(sys.argv) form = WinForm() form.show() # QTimer.singleShot(5000, app.quit) #界面運行5秒后,自動關閉 sys.exit(app.exec_()) timerTest()

QThread模塊,使用該模塊開始一個線程,可以創建它的一個子類,然后覆蓋QThread.run() 的方法。調用自定義的線程時,調用start()方法,會自動調用run的方法,
QThread 還有 started , finished 信號,可以見信號連接到槽函數中

def threadTest(): class MainWidget(QWidget): def __init__(self,parent=None): super(MainWidget,self).__init__(parent) self.setWindowTitle("QThread 例子") self.thread = Worker() #創建線程 self.listFile = QListWidget() self.btnStart = QPushButton('開始') self.btnStop = QPushButton('結束') layout = QGridLayout(self) layout.addWidget(self.listFile,0,0,1,2) layout.addWidget(self.btnStart,1,1) layout.addWidget(self.btnStop,2,1) self.btnStart.clicked.connect( self.slotStart ) #通過按鈕狀態實現 線程的開啟 self.btnStop.clicked.connect(self.slotStop) self.thread.sinOut.connect(self.slotAdd) def slotStop(self): self.btnStart.setEnabled(True) self.thread.wait(2000) def slotAdd(self,file_inf): self.listFile.addItem(file_inf) #增加子項 def slotStart(self): self.btnStart.setEnabled(False) self.thread.start() #線程開始 #創建一個線程 class Worker(QThread): sinOut = pyqtSignal(str) #自定義一個信號 def __init__(self, parent=None): super(Worker, self).__init__(parent) self.work = True self.num = 0 def run(self): while self.work == True: listStr = 'this is index file {0}'.format(self.num) self.sinOut.emit(listStr) #發射一個信號 self.num += 1 self.sleep(2) #休眠2 秒 def __del__(self): self.work = False self.wait() if __name__ == "__main__": app = QApplication(sys.argv) demo = MainWidget() demo.show() sys.exit(app.exec_()) # threadTest()

demo 2:

def threadTest2(): global sec sec=0 class WorkThread(QThread): #自定義一個線程 trigger = pyqtSignal() def __int__(self): super(WorkThread,self).__init__() def run(self): for i in range(2000000000): print('haha ' + str(i)) #此處放運行 量較大的程序 # 循環完畢后發出信號 self.trigger.emit() def countTime(): global sec sec += 1 # LED顯示數字+1 lcdNumber.display(sec) def work(): # 計時器每秒計數 timer.start(1000) #每秒運行完后,是lcd增加數字 # 計時開始 workThread.start() # 當獲得循環完畢的信號時,停止計數 workThread.trigger.connect(timeStop) def timeStop(): timer.stop() print("運行結束用時",lcdNumber.value()) global sec sec=0 if __name__ == "__main__": app = QApplication(sys.argv) top = QWidget() top.resize(300,120) # 垂直布局類QVBoxLayout layout = QVBoxLayout(top) # 加個顯示屏 lcdNumber = QLCDNumber() layout.addWidget(lcdNumber) button = QPushButton("測試") layout.addWidget(button) timer = QTimer() workThread = WorkThread() button.clicked.connect(work) # 每次計時結束,觸發 countTime timer.timeout.connect(countTime) top.show() sys.exit(app.exec_()) threadTest2()

 

 

 

參考:

多進程https://www.cnblogs.com/kaituorensheng/p/4445418.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM