爬蟲與多線程


多線程和多進程爬蟲

 

一.線程

1.什么是線程。

線程是操作系統能夠進行運算調度的最小單位。它被包含在進程中,是進城中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個線程可以並發多個線程,每條線程執行不同的任務。

 

2.線程常用的方法

方法

說明

start()

線程准備就緒,等待CPU調度

setName()

為線程設置名稱

getName()

獲取線程名稱

setDaemon()

設置為守護線程

Join()

逐個執行每個線程,執行完畢后繼續往下執行

run()

線程被調度后會執行該方法,如果想自定義線程類,需要重寫run()方法

 

3.Threading類

3.1 線程的普通創建方式

Threadding用於提供線程相關的操作,線程是應用程序中工作的最小單元。

import threading
import time
def show(arg):
    time.sleep(1)
    print('thread' + str(arg))
for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()
print('主線程結束')

結果:
主線程結束
thread0
thread1
thread2
thread5
thread4
thread3
thread7
thread6
thread8
thread9

 上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。

 

3.2 自定義線程類

繼承自threading.Thread類來自定義線程類,但是其本質卻是重構thread類中的run()方法。

 

import threading
class
myThread(threading.Thread): def __init__(self, sum): threading.Thread.__init__(self) self.sum = sum
def run(self): print('對象數是:',self.sum) if __name__ == '__main__':
t1
= myThread(1) t2 = myThread(2) t1.start() t2.start()

 

3.3 計算子線程執行的時間 

PS:sleep的時候是不會占用CPU的,操作系統會把線程掛起。

import threading
import time

def show(n):
    time.sleep(1)
    print('thread' + str(n))

start_time = time.time()
obj_list = []
for i in range(5): t = threading.Thread(target=show,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('花費的時間為:',time.time() - start_time)

 

3.4 守護線程 

線程的setDaemon(True)將線程變成主線程的守護線程,意思是當主進程結束后,子線程也會隨之退出。意味着當主線程結束后,程序就結束了。

 1 import threading
 2 import time
 3 
 4 def show(n):
 5     time.sleep(1)
 6     print('thread' + str(n))
 7 
 8 start_time = time.time()
 9 obj_list = []
10 
11 for i in range(5):
12     t = threading.Thread(target=show,args=(i,))
13     t.setDaemon(True)
14     t.start()
15     obj_list.append(t)
16 
17 print('花費的時間為:',time.time() - start_time)

 

3.5 GIL(全局解釋器鎖) 

在Python的運行環境中,無論電腦是單核還是雙核,操作系統同時只會執行一個線程。究其原因,是因為GIL(全局解釋器鎖)。

在Python中,一個線程要想要執行,必須要先拿到GIL。可以吧GIL想象成一個“通行證”,並且在一個進程中,GIL只有一個。沒有通行證的線程就不會被執行。

Python多線程的工作過程:

  •  拿到公共數據
  • 申請GIL
  • Python解釋器調用os的原生線程
  • os操作CPU執行運算
  • 當該線程的執行時間到了之后,無論是否執行完,GIL被釋放
  • 其他線程重復上面的操作
  • 其他進程執行完成后,切換到原來的線程(從記錄的上下文繼續執行)

 

3.6 線程鎖(Lock,RLock)

由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。

import threading,time

def run(n):
    global num
    num += 1

num = 0
obj_list = []

for i in range(20000):
    t = threading.Thread(target=run,args=(i,))
    t.start()
    obj_list.append(t)

for obj in obj_list:
    obj.join()

print('num:',num)
臟數據:19999

 


3.6.1  互斥鎖(Lock) 

為了防止上面情況的發生,我們可以使用互斥鎖(Lock)來解決。

import threading,time

lock = threading.Lock() # 實例化一個鎖對象

def run(n):
    lock.acquire()  # 獲取鎖
    global num
    num += 1
    lock.release()   # 釋放鎖

num = 0
obj_list = []

for i in range(20000):

    t = threading.Thread(target=run,args=(i,))
    t.start()
    obj_list.append(t)

for obj in obj_list:
    obj.join()
print('num:',num)

 

3.6.2  遞歸鎖(RLock) 

RLock的用法和Lock一樣,只是他支持嵌套。在多個鎖沒有釋放的時候一般會使用Rlock類。

import threading,time

lock = threading.RLock() # 實例化一個鎖對象
num = 0
obj_list = []
def run(n):
    lock.acquire() # 獲取鎖 
    global num 
    num += 1 
    lock.release() # 釋放鎖 

for i in range(20000): 
    t = threading.Thread(target=run,args=(i,)) 
    t.start() 
    obj_list.append(t) 

for obj in obj_list: 
    obj.join() 
print('num:',num)

 

3.7 信號量Semaphore 

互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

  import threading,time

  lock = threading.BoundedSemaphore(6) # 實例化一個鎖對象

  def run(n):

    lock.acquire()  # 獲取鎖

    time.sleep(1)

    print('run the thread: %s' % n)

    lock.release()   # 釋放鎖

  num = 0

  for i in range(200):

    t = threading.Thread(target=run,args=(i,))

    t.start()

 

3.8事件(Event)

python線程的事件用於主線程控制其他線程的執行,事件是一個簡單的線程同步對象,主要提供了以下幾種方法:

方法

說明

clear()

將flag設置為“false”

set()

將flag設置為“true”

is_set()

判斷是否設置了flag

wait()

一直監聽flag,沒有檢測到會一直處於阻塞狀態

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞

 

import threading,time

event = threading.Event()  # 創建事件對象

def lighter():
    count = 0
    event.set()   #初始值為綠燈
    while 1:
        if 5 < count <= 10:
            event.clear() #紅燈,清楚標志位
            print('\33[41;1mred light is on...\033[0m')
        elif count > 10:
            event.set()   # 綠燈,設置標志位
            count = 0
        else:
            print('\33[41;1mred light is on...\033[0m')
        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():  # 判斷是否設置了標志位
            print("[%s] 綠燈亮,請行駛..." % name)
            time.sleep(1)
        else:
            print("[%s] 紅燈亮,請等待..." % name)
            event.wait()
            print("[%s] 綠燈亮,開始行駛..." % name)

light = threading.Thread(target=lighter,)

car = threading.Thread(target=car, args=('test',))
light.start()  
car.start()

 

 


3.9條件(Condition)
 

使得線程等待,只有滿足某條件時,才釋放n個線程。

互斥鎖是最簡單的線程同步機制,Python提供的Condition對象提供了對復雜線程同步問題的支持。Condition被稱為條件變量,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,然后判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件后,通過notify方法通知其他線程,其他處於wait狀態的線程接到通知后會重新判斷條件。不斷的重復這一過程,從而解決復雜的同步問題。

可以認為Condition對象維護了一個鎖(Lock/RLock)和一個waiting池。線程通過acquire獲得Condition對象,當調用wait方法時,線程會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個線程。當調用notify方法時,Condition對象會從waiting池中挑選一個線程,通知其調用acquire方法嘗試取到鎖。

Condition對象的構造函數可以接受一個Lock/RLock對象作為參數,如果沒有指定,則Condition對象會在內部自行創建一個RLock。

除了notify方法外,Condition對象還提供了notifyAll方法,可以通知waiting池中的所有線程嘗試acquire內部鎖。由於上述機制,處於waiting狀態的線程只能通過notify方法喚醒,所以notifyAll的作用在於防止有線程永遠處於沉默狀態。

演示條件變量同步的經典問題是生產者與消費者問題:假設有一群生產者(Producer)和一群消費者(Consumer)通過一個市場來交互產品。生產者的”策略“是如果市場上剩余的產品少於1000個,那么就生產100個產品放到市場上;而消費者的”策略“是如果市場上剩余產品的數量多余100個,那么就消費3個產品。用Condition解決生產者與消費者問題的代碼如下:

import threading
import time

class Producer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count > 1000:
                    con.wait()
                else:
                    count = count+100
                    msg = self.name+' produce 100, count=' + str(count)
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)


class Consumer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count < 100:
                    con.wait()
                else:
                    count = count-3
                    msg = self.name+' consume 3, count='+str(count)
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)

count = 500
con = threading.Condition()

def test():
    for i in range(2):
        p = Producer()
        p.start()

    for i in range(5):
        c = Consumer()
        c.start()

if __name__ == '__main__':
    test()

 

  

4.多線程的爬蟲實例

'''使用多線程爬取1500個url中的圖片'''

import
threading,requests,time link_list = [] with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url) start_time = time.time() class myThread(threading.Thread): def __init__(self,name,link_range): threading.Thread.__init__(self) self.name = name self.link_range = link_range self.userName = 0 def run(self): print('開始:',self.name) self.craw(self.name,self.link_range) print('結束:',self.name) def writeImages(self, ThreadName, url): print("正在存儲文件 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,link_range): for i in range(link_range[0],link_range[len(link_range)-1]): self.writeImages(name,link_list[i]) obj_list = [] url_list = [(0,300),(301,600),(601,900),(901,1200),(1201,1500)] # 創建新線程 for i in range(5): t = myThread('Thread-'+str(i+1), url_list[i]) t.start() obj_list.append(t) # 等待所有線程執行完成 for url in url_list: url.join() end_time = time.time() print('爬蟲的運行時間為:',end_time - start_time)

 

5.隊列(queue)

queue是python中的標准庫,俗稱隊列。在python中,多個線程之間的數據是共享的,多個線程進行數據交換的時候,不能夠保證數據的安全性和一致性,所以當多個線程需要進行數據交換的時候,隊列就出現了,隊列可以完美解決線程間的數據交換,保證線程間數據的安全性和一致性。

PS: 在python2.x中,模塊名為Queue

queue模塊有三種隊列及構造函數

l  Python queue模塊的FIFO隊列先進先出。 queue.Queue(maxsize)

l  LIFO類似於堆,即先進后出。 queue.LifoQueue(maxsize)

l  還有一種是優先級隊列級別越低越先出來。 queue.PriorityQueue(maxsize)

queue模塊中的常用方法:

方法

說明

queue.qsize()

返回隊列的大小

queue.empty()

如果隊列為空,返回True,反之False

queue.full()

如果隊列滿了,返回True,反之False (queue.full 與 maxsize 大小對應)

queue.get([block[, timeout]])

獲取隊列,立即取出一個元素, timeout超時時間

queue.put(item[, timeout]])

寫入隊列,立即放入一個元素, timeout超時時間

queue.join()

阻塞調用線程,直到隊列中的所有任務被處理掉, 實際上意味着等到隊列為空,再執行別的操作

queue.task_done()

在完成一項工作之后,queue.task_done()函數向任務已經完成的隊列發送一個信號

 

 

5.1代碼實例

l   創建隊列

import queue
q = queue.Queue()

l   empty方法(如果隊列為空,返回True)

import queue
q = queue.Queue()
print(q.empty())
#輸出:True

l  full方法(如果隊列滿了,返回True)

import queue
q = queue.Queue(1) #指定隊列大小
q.put('a')
print(q.full())
#輸出:True

l  put方法和get方法

import queue
q = queue.Queue()
q.put('a')
q.put('b')
print(q.get())
#輸出:a
print(q.get())
#輸出:b

l  qsize方法(返回隊列里元素個數)

import queue
q = queue.Queue()
q.put('a')
q.put('b')
print(q.qsize())
#輸出:2


5.2
 生產者消費者 

import threading,time
import queue

q = queue.Queue(maxsize=10)

# 生產者
def Producer(name):
    count = 1
    while True:
        q.put("冠軍%s" % count)
        print("冠軍",count)
        count +=1
        time.sleep(0.1)

#消費者
def  Consumer(name):
    #while q.qsize()>0:
    while True:
        print("[%s] 取到[%s]..." %(name, q.get()))
        time.sleep(1)

p = threading.Thread(target=Producer,args=("IG",))
c = threading.Thread(target=Consumer,args=("LPL",))
c1 = threading.Thread(target=Consumer,args=("LCK",))
p.start()
c.start()
c1.start()

 

6.多線程和爬蟲實例。

import threading,requests,time
import queue

start_time = time.time()
obj_list = []
work_queue = queue.Queue(1500)
link_list = []
with open('url.txt','r') as f:
    for u in f.readlines():
        url = u.split()[0].replace('\n','')
        link_list.append(url)

# 填充隊列
for url in link_list:
    work_queue.put(url)

class myThread(threading.Thread):
    def __init__(self,name,q):
        threading.Thread.__init__(self)
        self.name = name
        self.userName = 0
        self.q = q

    def run(self):
        print('開始:',self.name)
        while True:
            try:
                self.craw(self.name,self.q)
            except Exception as e:
                break
        print('結束:',self.name)

    def writeImages(self, ThreadName, url):
        print("正在存儲文件 %s ..." % ThreadName+str(self.userName))
        path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png'
        file = open(path, 'wb')
        images = requests.get(url,timeout = 20).content
        file.write(images)
        file.close()
        self.userName += 1

    def craw(self,name,q):
        url = q.get(timeout = 2)
        try:
            self.writeImages(name, url)
        except Exception as e:
            print(q.qsize(),url,e)

# 創建新線程
for i in range(5):
    t = myThread('Thread-'+str(i+1), work_queue)
    t.start()
    obj_list.append(t)

# 等待所有線程執行完成
for url in obj_list:
    url.join()

end_time = time.time()
print('爬蟲的運行時間為:',end_time - start_time)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM