import threading import time import requests import json from queue import Queue from lxml import etree class CrawlThread(threading.Thread): '''doc string for crawl thread''' def __init__(self, name, page_queue, data_queue): super(CrawlThread, self).__init__() self.name = name self.page_queue = page_queue self.data_queue = data_queue self.url = 'http://www.ifanjian.net/latest-{}' self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36", } def run(self): print('%s ------- 線程啟動' % self.name) while True: # 判斷采集線程何時結束 if self.page_queue.empty(): break # 從隊列中取出頁面 page = self.page_queue.get() print("===采集===開始第%s頁數據" % page) # 拼接url,發送請求 url = self.url.format(page) time.sleep(1) r = requests.get(url, headers=self.headers) # 將響應內容存放到data_queue self.data_queue.put({ "index": str(page), "text": r.text }) print("===采集===結束第%s頁數據" % page) print('%s ------- 線程結束' % self.name) class ParserThread(threading.Thread): '''doc string for parse thread''' def __init__(self, name, data_queue, fp, lock): super(ParserThread, self).__init__() self.name = name self.data_queue = data_queue self.fp = fp self.lock = lock def run(self): print('%s ------- 線程啟動' % self.name) while True: try: # 從data_queue中取出數據 data = self.data_queue.get(True, 10) print("===解析===開始第%s頁數據" % data["index"]) # 解析內容 self.parse_content(data['text']) print("===解析===開始第%s頁數據" % data["index"]) except Exception: break print('%s ------- 線程結束' % self.name) # 解析內容 def parse_content(self, data): tree = etree.HTML(data) # 先找出所有的li,然后再從li中找到所有的標題和圖片 li_list = tree.xpath("//ul[@class='cont-list']/li") items = [] for li in li_list: # 獲取標題 title = li.xpath(".//h2[@class='cont-list-title']/a/text()")[0] # 獲取圖片 img_url = li.xpath( ".//div[contains(@class,'cont-list-main')]//img/@data-src") item = { '標題': title, '圖片鏈接': img_url } items.append(item) # 寫到文件中 self.lock.acquire() self.fp.write(json.dumps(items, ensure_ascii=False) + '\n') self.lock.release() # 用來存放采集線程 g_crawl_list = [] # 用來存放解析線程 g_parser_list = [] def create_crawl_thread(page_queue, data_queue): crawl_names = ['采集線程1', '采集線程2', '采集線程3'] for name in crawl_names: tcrawl = CrawlThread(name, page_queue, data_queue) g_crawl_list.append(tcrawl) def create_parser_thread(data_queue, fp, lock): parse_names = ['解析線程1', '解析線程2', '解析線程3'] for name in parse_names: tparse = ParserThread(name, data_queue, fp, lock) g_parser_list.append(tparse) def create_queue(): page_queue = Queue() for page in range(1, 10): page_queue.put(page) data_queue = Queue() return page_queue, data_queue def main(): # 創建隊列 page_queue, data_queue = create_queue() # 打開文件 fp = open('jian.json', 'a', encoding='utf-8') # 創建鎖 lock = threading.Lock() # 創建采集線程 create_crawl_thread(page_queue, data_queue) # 創建解析線程 create_parser_thread(data_queue, fp, lock) # 啟動所有采集線程 for tcrawl in g_crawl_list: tcrawl.start() # 啟動所有解析線程 for tparser in g_parser_list: tparser.start() # 主線程等待子線程結束 for tcrawl in g_crawl_list: tcrawl.join() for tparser in g_parser_list: tparser.join() # 關閉文件 fp.close() print("主線程和子線程全部結束.....") if __name__ == "__main__": main()