【房價網房價信息爬蟲】整站40萬條房價數據並行抓取,可更換抓取城市


寫在前面

這次的爬蟲是關於房價信息的抓取,目的在於練習10萬以上的數據處理及整站式抓取。

數據量的提升最直觀的感覺便是對函數邏輯要求的提高,針對Python的特性,謹慎的選擇數據結構。以往小數據量的抓取,即使函數邏輯部分重復,I/O請求頻率密集,循環套嵌過深,也不過是1~2s的差別,而隨着數據規模的提高,這1~2s的差別就有可能擴展成為1~2h。

因此對於要抓取數據量較多的網站,可以從兩方面着手降低抓取信息的時間成本。

1)優化函數邏輯,選擇適當的數據結構,符合Pythonic的編程習慣。例如,字符串的合並,使用join()要比“+”節省內存空間。

2)依據I/O密集與CPU密集,選擇多線程、多進程並行的執行方式,提高執行效率。

 

一、獲取索引

包裝請求request,設置超時timeout

 1 # 獲取列表頁面
 2 def get_page(url):
 3     headers = {
 4         'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
 5                       r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
 6         'Referer': r'http://bj.fangjia.com/ershoufang/',
 7         'Host': r'bj.fangjia.com',
 8         'Connection': 'keep-alive'
 9     }
10     timeout = 60
11     socket.setdefaulttimeout(timeout)  # 設置超時
12     req = request.Request(url, headers=headers)
13     response = request.urlopen(req).read()
14     page = response.decode('utf-8')
15     return page

 

一級位置:區域信息

 

 

二級位置:板塊信息(根據區域位置得到板塊信息,以key_value對的形式存儲在dict中)

 

以dict方式存儲,可以快速的查詢到所要查找的目標。-> {'朝陽':{'工體','安貞','健翔橋'......}}

 

三級位置:地鐵信息(搜索地鐵周邊房源信息)

 

將所屬位置地鐵信息,添加至dict中。  -> {'朝陽':{'工體':{'5號線','10號線' , '13號線'},'安貞','健翔橋'......}}

對應的url:http://bj.fangjia.com/ershoufang/--r-%E6%9C%9D%E9%98%B3%7Cw-5%E5%8F%B7%E7%BA%BF%7Cb-%E6%83%A0%E6%96%B0%E8%A5%BF%E8%A1%97

解碼后的url:http://bj.fangjia.com/ershoufang/--r-朝陽|w-5號線|b-惠新西街

 

根據url的參數模式,可以有兩種方式獲取目的url:

1)根據索引路徑獲得目的url

 1 # 獲取房源信息列表(嵌套字典遍歷)
 2 def get_info_list(search_dict, layer, tmp_list, search_list):
 3     layer += 1  # 設置字典層級
 4     for i in range(len(search_dict)):
 5         tmp_key = list(search_dict.keys())[i]  # 提取當前字典層級key
 6         tmp_list.append(tmp_key)   # 將當前key值作為索引添加至tmp_list
 7         tmp_value = search_dict[tmp_key]
 8         if isinstance(tmp_value, str):   # 當鍵值為url時
 9             tmp_list.append(tmp_value)   # 將url添加至tmp_list
10             search_list.append(copy.deepcopy(tmp_list))   # 將tmp_list索引url添加至search_list
11             tmp_list = tmp_list[:layer]  # 根據層級保留索引
12         elif tmp_value == '':   # 鍵值為空時跳過
13             layer -= 2           # 跳出鍵值層級
14             tmp_list = tmp_list[:layer]   # 根據層級保留索引
15         else:
16             get_info_list(tmp_value, layer, tmp_list, search_list)  # 當鍵值為列表時,迭代遍歷
17             tmp_list = tmp_list[:layer]
18     return search_list

2)根據dict信息包裝url

 {'朝陽':{'工體':{'5號線'}}}

參數:

——  r-朝陽

——  b-工體

——  w-5號線

組裝參數:http://bj.fangjia.com/ershoufang/--r-朝陽|w-5號線|b-工體

1 # 根據參數創建組合url
2 def get_compose_url(compose_tmp_url, tag_args,  key_args):
3     compose_tmp_url_list = [compose_tmp_url, '|' if tag_args != 'r-' else '', tag_args, parse.quote(key_args), ]
4     compose_url = ''.join(compose_tmp_url_list)
5     return compose_url

 

二、獲取索引頁最大頁數

 1 # 獲取當前索引頁面頁數的url列表
 2 def get_info_pn_list(search_list):
 3     fin_search_list = []
 4     for i in range(len(search_list)):
 5         print('>>>正在抓取%s' % search_list[i][:3])
 6         search_url = search_list[i][3]
 7         try:
 8             page = get_page(search_url)
 9         except:
10             print('獲取頁面超時')
11             continue
12         soup = BS(page, 'lxml')
13         # 獲取最大頁數
14         pn_num = soup.select('span[class="mr5"]')[0].get_text()
15         rule = re.compile(r'\d+')
16         max_pn = int(rule.findall(pn_num)[1])
17         # 組裝url
18         for pn in range(1, max_pn+1):
19             print('************************正在抓取%s頁************************' % pn)
20             pn_rule = re.compile('[|]')
21             fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
22             tmp_url_list = copy.deepcopy(search_list[i][:3])
23             tmp_url_list.append(fin_url)
24             fin_search_list.append(tmp_url_list)
25     return fin_search_list

 

三、抓取房源信息Tag

這是我們要抓取的Tag:

['區域', '板塊', '地鐵', '標題', '位置', '平米', '戶型', '樓層', '總價', '單位平米價格']

 1 # 獲取tag信息
 2 def get_info(fin_search_list, process_i):
 3     print('進程%s開始' % process_i)
 4     fin_info_list = []
 5     for i in range(len(fin_search_list)):
 6         url = fin_search_list[i][3]
 7         try:
 8             page = get_page(url)
 9         except:
10             print('獲取tag超時')
11             continue
12         soup = BS(page, 'lxml')
13         title_list = soup.select('a[class="h_name"]')
14         address_list = soup.select('span[class="address]')
15         attr_list = soup.select('span[class="attribute"]')
16         price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"})  # select對於某些屬性值(屬性值中間包含空格)無法識別,可以用find_all(attrs={})代替
17         for num in range(20):
18             tag_tmp_list = []
19             try:
20                 title = title_list[num].attrs["title"]
21                 print(r'************************正在獲取%s************************' % title)
22                 address = re.sub('\n', '', address_list[num].get_text())   
23                 area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)  
24                 layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
25                 floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
26                 price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
27                 unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
28                 tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
29                 for tag in [title, address, area, layout, floor, price, unit_price]:
30                     tag_tmp_list.append(tag)
31                 fin_info_list.append(tag_tmp_list)
32             except:
33                 print('【抓取失敗】')
34                 continue
35     print('進程%s結束' % process_i)
36     return fin_info_list

 

四、分配任務,並行抓取

對任務列表進行分片,設置進程池,並行抓取。

1 # 分配任務
2 def assignment_search_list(fin_search_list, project_num):  # project_num每個進程包含的任務數,數值越小,進程數越多
3     assignment_list = []
4     fin_search_list_len = len(fin_search_list)
5     for i in range(0, fin_search_list_len, project_num):
6         start = i
7         end = i+project_num
8         assignment_list.append(fin_search_list[start: end])  # 獲取列表碎片
9     return assignment_list
 1  p = Pool(4)  # 設置進程池
 2     assignment_list = assignment_search_list(fin_info_pn_list, 3)  # 分配任務,用於多進程
 3     result = []  # 多進程結果列表
 4     for i in range(len(assignment_list)):
 5         result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
 6     p.close()
 7     p.join()
 8     for result_i in range(len(result)):
 9         fin_info_result_list = result[result_i].get()
10         fin_save_list.extend(fin_info_result_list)  # 將各個進程獲得的列表合並

通過設置進程池並行抓取,時間縮短為單進程抓取時間的3/1,總計時間3h。

電腦為4核,經過測試,任務數為3時,在當前電腦運行效率最高。

五、將抓取結果存儲到excel中,等待可視化數據化處理

 1 # 存儲抓取結果
 2 def save_excel(fin_info_list, file_name):
 3     tag_name = ['區域', '板塊', '地鐵', '標題', '位置', '平米', '戶型', '樓層', '總價', '單位平米價格']
 4     book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name)  # 默認存儲在桌面上
 5     tmp = book.add_worksheet()
 6     row_num = len(fin_info_list)
 7     for i in range(1, row_num):
 8         if i == 1:
 9             tag_pos = 'A%s' % i
10             tmp.write_row(tag_pos, tag_name)
11         else:
12             con_pos = 'A%s' % i
13             content = fin_info_list[i-1]  # -1是因為被表格的表頭所占
14             tmp.write_row(con_pos, content)
15     book.close()

 

附上源碼

  1 #! -*-coding:utf-8-*-
  2 # Function: 房價調查
  3 # Author:蘭茲
  4 
  5 from urllib import parse, request
  6 from bs4 import BeautifulSoup as BS
  7 from multiprocessing import Pool
  8 import re
  9 import lxml
 10 import datetime
 11 import cProfile
 12 import socket
 13 import copy
 14 import xlsxwriter
 15 
 16 
 17 starttime = datetime.datetime.now()
 18 
 19 base_url = r'http://bj.fangjia.com/ershoufang/'
 20 
 21 
 22 test_search_dict = {'昌平': {'霍營': {'13號線': 'http://bj.fangjia.com/ershoufang/--r-%E6%98%8C%E5%B9%B3|w-13%E5%8F%B7%E7%BA%BF|b-%E9%9C%8D%E8%90%A5'}}}
 23 
 24 search_list = []  # 房源信息url列表
 25 tmp_list = []  # 房源信息url緩存列表
 26 layer = -1
 27 
 28 
 29 # 獲取列表頁面
 30 def get_page(url):
 31     headers = {
 32         'User-Agent': r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
 33                       r'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3',
 34         'Referer': r'http://bj.fangjia.com/ershoufang/',
 35         'Host': r'bj.fangjia.com',
 36         'Connection': 'keep-alive'
 37     }
 38     timeout = 60
 39     socket.setdefaulttimeout(timeout)  # 設置超時
 40     req = request.Request(url, headers=headers)
 41     response = request.urlopen(req).read()
 42     page = response.decode('utf-8')
 43     return page
 44 
 45 
 46 # 獲取查詢關鍵詞dict
 47 def get_search(page, key):
 48     soup = BS(page, 'lxml')
 49     search_list = soup.find_all(href=re.compile(key), target='')
 50     search_dict = {}
 51     for i in range(len(search_list)):
 52         soup = BS(str(search_list[i]), 'lxml')
 53         key = soup.select('a')[0].get_text()
 54         value = soup.a.attrs['href']
 55         search_dict[key] = value
 56     return search_dict
 57 
 58 
 59 # 獲取房源信息列表(嵌套字典遍歷)
 60 def get_info_list(search_dict, layer, tmp_list, search_list):
 61     layer += 1  # 設置字典層級
 62     for i in range(len(search_dict)):
 63         tmp_key = list(search_dict.keys())[i]  # 提取當前字典層級key
 64         tmp_list.append(tmp_key)   # 將當前key值作為索引添加至tmp_list
 65         tmp_value = search_dict[tmp_key]
 66         if isinstance(tmp_value, str):   # 當鍵值為url時
 67             tmp_list.append(tmp_value)   # 將url添加至tmp_list
 68             search_list.append(copy.deepcopy(tmp_list))   # 將tmp_list索引url添加至search_list
 69             tmp_list = tmp_list[:layer]  # 根據層級保留索引
 70         elif tmp_value == '':   # 鍵值為空時跳過
 71             layer -= 2           # 跳出鍵值層級
 72             tmp_list = tmp_list[:layer]   # 根據層級保留索引
 73         else:
 74             get_info_list(tmp_value, layer, tmp_list, search_list)  # 當鍵值為列表時,迭代遍歷
 75             tmp_list = tmp_list[:layer]
 76     return search_list
 77 
 78 
 79 # 獲取房源信息詳情
 80 def get_info_pn_list(search_list):
 81     fin_search_list = []
 82     for i in range(len(search_list)):
 83         print('>>>正在抓取%s' % search_list[i][:3])
 84         search_url = search_list[i][3]
 85         try:
 86             page = get_page(search_url)
 87         except:
 88             print('獲取頁面超時')
 89             continue
 90         soup = BS(page, 'lxml')
 91         # 獲取最大頁數
 92         pn_num = soup.select('span[class="mr5"]')[0].get_text()
 93         rule = re.compile(r'\d+')
 94         max_pn = int(rule.findall(pn_num)[1])
 95         # 組裝url
 96         for pn in range(1, max_pn+1):
 97             print('************************正在抓取%s頁************************' % pn)
 98             pn_rule = re.compile('[|]')
 99             fin_url = pn_rule.sub(r'|e-%s|' % pn, search_url, 1)
100             tmp_url_list = copy.deepcopy(search_list[i][:3])
101             tmp_url_list.append(fin_url)
102             fin_search_list.append(tmp_url_list)
103     return fin_search_list
104 
105 
106 # 獲取tag信息
107 def get_info(fin_search_list, process_i):
108     print('進程%s開始' % process_i)
109     fin_info_list = []
110     for i in range(len(fin_search_list)):
111         url = fin_search_list[i][3]
112         try:
113             page = get_page(url)
114         except:
115             print('獲取tag超時')
116             continue
117         soup = BS(page, 'lxml')
118         title_list = soup.select('a[class="h_name"]')
119         address_list = soup.select('span[class="address]')
120         attr_list = soup.select('span[class="attribute"]')
121         price_list = soup.find_all(attrs={"class": "xq_aprice xq_esf_width"})  # select對於某些屬性值(屬性值中間包含空格)無法識別,可以用find_all(attrs={})代替
122         for num in range(20):
123             tag_tmp_list = []
124             try:
125                 title = title_list[num].attrs["title"]
126                 print(r'************************正在獲取%s************************' % title)
127                 address = re.sub('\n', '', address_list[num].get_text())
128                 area = re.search('\d+[\u4E00-\u9FA5]{2}', attr_list[num].get_text()).group(0)
129                 layout = re.search('\d[^0-9]\d.', attr_list[num].get_text()).group(0)
130                 floor = re.search('\d/\d', attr_list[num].get_text()).group(0)
131                 price = re.search('\d+[\u4E00-\u9FA5]', price_list[num].get_text()).group(0)
132                 unit_price = re.search('\d+[\u4E00-\u9FA5]/.', price_list[num].get_text()).group(0)
133                 tag_tmp_list = copy.deepcopy(fin_search_list[i][:3])
134                 for tag in [title, address, area, layout, floor, price, unit_price]:
135                     tag_tmp_list.append(tag)
136                 fin_info_list.append(tag_tmp_list)
137             except:
138                 print('【抓取失敗】')
139                 continue
140     print('進程%s結束' % process_i)
141     return fin_info_list
142 
143 
144 # 分配任務
145 def assignment_search_list(fin_search_list, project_num):  # project_num每個進程包含的任務數,數值越小,進程數越多
146     assignment_list = []
147     fin_search_list_len = len(fin_search_list)
148     for i in range(0, fin_search_list_len, project_num):
149         start = i
150         end = i+project_num
151         assignment_list.append(fin_search_list[start: end])  # 獲取列表碎片
152     return assignment_list
153 
154 
155 # 存儲抓取結果
156 def save_excel(fin_info_list, file_name):
157     tag_name = ['區域', '板塊', '地鐵', '標題', '位置', '平米', '戶型', '樓層', '總價', '單位平米價格']
158     book = xlsxwriter.Workbook(r'C:\Users\Administrator\Desktop\%s.xls' % file_name)  # 默認存儲在桌面上
159     tmp = book.add_worksheet()
160     row_num = len(fin_info_list)
161     for i in range(1, row_num):
162         if i == 1:
163             tag_pos = 'A%s' % i
164             tmp.write_row(tag_pos, tag_name)
165         else:
166             con_pos = 'A%s' % i
167             content = fin_info_list[i-1]  # -1是因為被表格的表頭所占
168             tmp.write_row(con_pos, content)
169     book.close()
170 
171 
172 if __name__ == '__main__':
173     file_name = input(r'抓取完成,輸入文件名保存:')
174     fin_save_list = []  # 抓取信息存儲列表
175     # 一級篩選
176     page = get_page(base_url)
177     search_dict = get_search(page, 'r-')
178     # 二級篩選
179     for k in search_dict:
180         print(r'************************一級抓取:正在抓取【%s】************************' % k)
181         url = search_dict[k]
182         second_page = get_page(url)
183         second_search_dict = get_search(second_page, 'b-')
184         search_dict[k] = second_search_dict
185     # 三級篩選
186     for k in search_dict:
187         second_dict = search_dict[k]
188         for s_k in second_dict:
189             print(r'************************二級抓取:正在抓取【%s】************************' % s_k)
190             url = second_dict[s_k]
191             third_page = get_page(url)
192             third_search_dict = get_search(third_page, 'w-')
193             print('%s>%s' % (k, s_k))
194             second_dict[s_k] = third_search_dict
195     fin_info_list = get_info_list(search_dict, layer, tmp_list, search_list)
196     fin_info_pn_list = get_info_pn_list(fin_info_list)
197     p = Pool(4)  # 設置進程池
198     assignment_list = assignment_search_list(fin_info_pn_list, 2)  # 分配任務,用於多進程
199     result = []  # 多進程結果列表
200     for i in range(len(assignment_list)):
201         result.append(p.apply_async(get_info, args=(assignment_list[i], i)))
202     p.close()
203     p.join()
204     for result_i in range(len(result)):
205         fin_info_result_list = result[result_i].get()
206         fin_save_list.extend(fin_info_result_list)  # 將各個進程獲得的列表合並
207     save_excel(fin_save_list, file_name)
208     endtime = datetime.datetime.now()
209     time = (endtime - starttime).seconds
210     print('總共用時:%s s' % time)

 

總結:

當抓取數據規模越大,對程序邏輯要求就愈嚴謹,對python語法要求就越熟練。如何寫出更加pythonic的語法,也需要不斷學習掌握的

推薦閱讀《編寫高質量代碼 改善Python程序的91個建議》

大家可以嘗試抓取一下,分析一下房價的走勢也是蠻有意思的ლ(^o^ლ)

歡迎交流,轉載請注明出處~ (^ _ ^)/~~

更多python爬蟲實例,請訪問:http://www.landsblog.com/blog/category/pachong

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM