python3 多線程獲取數據實例


import requests
import json
from retrying import retry
from lxml import etree
from queue import Queue
import threading

class QiuShi:
  def __init__(self):
    # 定義三個隊列
    self.url_queue = Queue()
    self.html_queue = Queue()
    self.content_list_queue = Queue()
    self.headers = {
    "User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36"
    }

  def get_url_list(self):
    url_list = ['https://www.qiushibaike.com/8hr/page/{}/'.format(i) for i in range(1, 14)]
    for url in url_list:
      # 將url使用put方法放入隊列
      self.url_queue.put(url)

  @retry(stop_max_attempt_number=3)
  def _parse_url(self, url):
    response = requests.get(url, headers=self.headers, timeout=3)
    assert response.status_code == 200
    return etree.HTML(response.content)

  def parse_url(self):
  # 這里需要一個url。
  # 因為url = self.url_queue.get()只會從隊列里取一次url.所以這里需要加while True循環來取。
  # 當url隊列里沒有url的時候這里會堵塞等待,只要有就取。
  # 但是取過后隊列的基數並沒有減1(並沒有減去剛取走的url),所以要在下面使用task_done()
  while True:
    url = self.url_queue.get()
    print(url)
    try:
      html = self._parse_url(url)
    except:
      html = None
    # 將html添加到隊列里
    self.html_queue.put(html)
    self.url_queue.task_done()

  def get_content_list(self):
    # 和上面一樣
    while True:
      html = self.html_queue.get()
      if html is not None:
        div_list = html.xpath('//div[@id="content-left"]/div')
        content_list = []
        for div in div_list:
          item = {}
          item['name'] = div.xpath('.//h2/text()')[0].replace("\n", "") if len(div.xpath('.//h2/text()')) > 0 else None
          item['content'] = div.xpath('.//div[@class="content"]/span/text()')[0].replace("\n", "") if len(div.xpath('.//div[@class="content"]/span/text()')) > 0 else None
          item['comment'] = div.xpath('.//i[@class="number"]/text()')[1] if len(div.xpath('.//i[@class="number"]/text()')) > 0 else None
          item['img'] = div.xpath('.//img/@src') if len(div.xpath('.//img/@src')) > 0 else None

          content_list.append(item)
         self.content_list_queue.put(content_list)
      self.html_queue.task_done()

  def save_content_list(self):
    while True:
      content_list = self.content_list_queue.get()

      with open("qiubai.json", "a", encoding="utf-8") as f:
        for content in content_list:
          json.dump(content, f, ensure_ascii=False, indent=2)
          f.write(',\n')

      self.content_list_queue.task_done()

  def run(self):
    thread_list = []
    # 創建一個提取url的線程
    t_url = threading.Thread(target=self.get_url_list)
    thread_list.append(t_url)
    # 因為發送請求比較耗時,這里我們就用多線程來做
    for i in range(5):
      t_parse = threading.Thread(target=self.parse_url)
      thread_list.append(t_parse)
    # 提取數據也比較耗時,這里我們也使用多線程
    for i in range(3):
      t_get_content_list = threading.Thread(target=self.get_content_list)
      thread_list.append(t_get_content_list)
    # 保存數據必須用一個線程要數據就會亂
    t_save = threading.Thread(target=self.save_content_list)
    thread_list.append(t_save)
    for t in thread_list:
      t.setDaemon(True) # 守護線程
      t.start()
    # 當所有隊列里沒有數據,基數都等於0的時候主線程結束。否則一直堵塞在q.join()
    for q in [self.content_list_queue, self.html_queue, self.url_queue]:
      q.join()

if __name__ == '__main__':
qiubai = QiuShi()
qiubai.run()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM