我們先通過百度搜索智聯招聘,進入智聯招聘官網,一看,傻眼了,需要登錄才能查看招聘信息
沒辦法,用賬號登錄進去,登錄后的網頁如下:
輸入職位名稱點擊搜索,顯示如下網頁:
把這個URL:https://sou.zhaopin.com/?jl=765&kw=軟件測試&kt=3 拷貝下來,退出登錄,再在瀏覽器地址欄輸入復制下來的URL
哈哈,居然不用登錄,也可以顯示搜索的職位信息。好了,到這一步,目的達成。
接下來,我們來分析下頁面,打開瀏覽器的開發者工具,選擇Network,查看XHR,重新刷新一次頁面,可以看到有多個異步加載信息
查看每個請求的返回消息,我們可以找到其中有個請求已JSON方式返回了符合要求的總職位數以及職位鏈接等信息
點擊Headers,查看這個請求的URL:
我們把Request URL復制到瀏覽器中打開,沒錯就是我們需要的信息:
分析這個URL:https://fe-api.zhaopin.com/c/i/sou?pageSize=60&cityId=765&workExperience=-1&education=-1&companyType=-1&employmentType=-1&jobWelfareTag=-1&kw=軟件測試&kt=3
我們可以知道:
1、pageSize:每頁開始的值,第一頁是0,第二是60,第三頁是120,以此類推
2、cityId:是城市編碼,直接輸入城市名,也是可以的,比如:深圳
3、kw:搜索時輸入的關鍵詞,也就是職位名稱
其他的字段都可以不變。
分析完了之后,我們可以開始寫代碼了:
我們先定義一個日志模塊,保存爬蟲過程中的日志:

# !usr/bin/env python3 # -*- coding:utf-8 -*- """ @project = Spider_zhilian @file = log @author = Easton Liu @time = 2018/10/20 21:42 @Description: 定義日志輸出,同時輸出到文件和控制台 """ import logging import os from logging.handlers import TimedRotatingFileHandler class Logger: def __init__(self, logger_name='easton'): self.logger = logging.getLogger(logger_name) logging.root.setLevel(logging.NOTSET) self.log_file_name = 'spider_zhilian.log' self.backup_count = 5 # 日志輸出級別 self.console_output_level = 'WARNING' self.file_output_level = 'DEBUG' # 日志輸出格式 pattern='%(asctime)s - %(levelname)s - %(message)s' self.formatter = logging.Formatter(pattern) # 日志路徑 if not os.path.exists('log'): os.mkdir('log') self.log_path = os.path.join(os.getcwd(),'log') def get_logger(self): """在logger中添加日志句柄並返回,如果logger已有句柄,則直接返回""" if not self.logger.handlers: console_handler=logging.StreamHandler() console_handler.setFormatter(self.formatter) console_handler.setLevel(self.console_output_level) self.logger.addHandler(console_handler) # 每天重新創建一個日志文件,最多保留backup_count份 file_handler = TimedRotatingFileHandler(filename=os.path.join(self.log_path, self.log_file_name), when='D', interval=1, backupCount=self.backup_count, delay=True, encoding='utf-8' ) file_handler.setFormatter(self.formatter) file_handler.setLevel(self.file_output_level) self.logger.addHandler(file_handler) return self.logger logger = Logger().get_logger()
用一個簡單的方法來實現增量爬取,把爬取的URL以hashlib加密,加密后返回32個字符,為了節省內存,只取中間的16個字符,這樣也可以保證每個不同的URL有不同的加密字符,把爬取的URL加密字符保存到集合中,在爬取完成后,序列化保存到本地磁盤,下次再次爬取時,反序列化保存的URL到內存,對於已經爬取的URL不再爬取,這樣就實現了增量爬取。
URL加密:

def hash_url(url): ''' 對URL進行加密,取加密后中間16位 :param url:已爬取的URLL :return:加密的URL ''' m = hashlib.md5() m.update(url.encode('utf-8')) return m.hexdigest()[8:-8]
序列化:

def save_progress(data, path): ''' 序列化保存已爬取的URL文件 :param data:要保存的數據 :param path:文件路徑 :return: ''' try: with open(path, 'wb+') as f: pickle.dump(data, f) logger.info('save url file success!') except Exception as e: logger.error('save url file failed:',e)
反序列化:

def load_progress( path): ''' 反序列化加載已爬取的URL文件 :param path: :return: ''' logger.info("load url file of already spider:%s" % path) try: with open(path, 'rb') as f: tmp = pickle.load(f) return tmp except: logger.info("not found url file of already spider!") return set()
獲取符合要求的職位總頁數:從JSON消息中獲取numFound字段,這個是總條數,再除以60,向上取整,返回的就是總頁數

def get_page_nums(cityname,jobname): ''' 獲取符合要求的工作頁數 :param cityname: 城市名 :param jobname: 工作名 :return: 總數 ''' url = r'https://fe-api.zhaopin.com/c/i/sou?pageSize=60&cityId={}&workExperience=-1&education=-1' \ r'&companyType=-1&employmentType=-1&jobWelfareTag=-1&kw={}&kt=3'.format(cityname,jobname) logger.info('start get job count...') try: rec = requests.get(url) if rec.status_code==200: j = json.loads(rec.text) count_nums = j.get('data')['numFound'] logger.info('get job count nums sucess:%s'%count_nums) page_nums = math.ceil(count_nums/60) logger.info('page nums:%s' % page_nums) return page_nums except Exception as e: logger.error('get job count nums faild:%s',e)
獲取每頁的職位連接:JSON消息中的positionURL就是職位鏈接,在這里我們順便獲取職位的創建時間,更新時間,截止時間以及職位福利,以字典返回

1 def get_urls(start,cityname,jobname): 2 ''' 3 獲取每頁工作詳情URL以及部分職位信息 4 :param start: 開始的工作條數 5 :param cityname: 城市名 6 :param jobname: 工作名 7 :return: 字典 8 ''' 9 url = r'https://fe-api.zhaopin.com/c/i/sou?start={}&pageSize=60&cityId={}&workExperience=-1&education=-1' \ 10 r'&companyType=-1&employmentType=-1&jobWelfareTag=-1&kw={}&kt=3'.format(start,cityname,jobname) 11 logger.info('spider start:%s',start) 12 logger.info('get current page all job urls...') 13 url_list=[] 14 try: 15 rec = requests.get(url) 16 if rec.status_code == 200: 17 j = json.loads(rec.text) 18 results = j.get('data').get('results') 19 for job in results: 20 empltype = job.get('emplType') # 職位類型,全職or校園 21 if empltype=='全職': 22 url_dict = {} 23 url_dict['positionURL'] = job.get('positionURL') # 職位鏈接 24 url_dict['createDate'] = job.get('createDate') # 招聘信息創建時間 25 url_dict['updateDate'] = job.get('updateDate') # 招聘信息更新時間 26 url_dict['endDate'] = job.get('endDate') # 招聘信息截止時間 27 positionLabel = job.get('positionLabel') 28 if positionLabel: 29 jobLight = (re.search('"jobLight":\[(.*?|[\u4E00-\u9FA5]+)\]',job.get('positionLabel'))) # 職位亮點 30 url_dict['jobLight'] = jobLight.group(1) if jobLight else None 31 else: 32 url_dict['jobLight'] = None 33 url_list.append(url_dict) 34 logger.info('get current page all job urls success:%s' % len(url_list)) 35 return url_list 36 except Exception as e: 37 logger.error('get current page all job urls faild:%s', e) 38 return None
在瀏覽器中輸入一個職位鏈接,查看頁面信息
在這里我們以lxml來解析頁面,解析結果以字典保存到生成器中

def get_job_info(url_list,old_url): ''' 獲取工作詳情 :param url_list: 列表 :return: 字典 ''' if url_list: for job in url_list: url = job.get('positionURL') h_url = hash_url(url) if not h_url in old_url: logger.info('spider url:%s'%url) try: response = requests.get(url) if response.status_code == 200: s = etree.HTML(response.text) job_stat = s.xpath('//div[@class="main1 cl main1-stat"]')[0] stat_li_first = job_stat.xpath('./div[@class="new-info"]/ul/li[1]')[0] job_name = stat_li_first.xpath('./h1/text()')[0] # 工作名 salary = stat_li_first.xpath('./div/strong/text()')[0] # 月薪 stat_li_second = job_stat.xpath('./div[@class="new-info"]/ul/li[2]')[0] company_url = stat_li_second.xpath('./div[1]/a/@href')[0] # 公司URL company_name = stat_li_second.xpath('./div[1]/a/text()')[0] # 公司名稱 city_name = stat_li_second.xpath('./div[2]/span[1]/a/text()')[0] # 城市名 workingExp = stat_li_second.xpath('./div[2]/span[2]/text()')[0] # 工作經驗 eduLevel = stat_li_second.xpath('./div[2]/span[3]/text()')[0] # 學歷 amount = stat_li_second.xpath('./div[2]/span[4]/text()')[0] # 招聘人數 job_text = s.xpath('//div[@class="pos-ul"]//text()') # 工作要求 job_desc = '' for job_item in job_text: job_desc = job_desc+job_item.replace('\xa0','').strip('\n') job_address_path = s.xpath('//p[@class="add-txt"]/text()') # 上班地址 job_address = job_address_path[0] if job_address_path else None company_text = s.xpath('//div[@class="intro-content"]//text()') # 公司信息 company_info='' for item in company_text: company_info = company_info+item.replace('\xa0','').strip('\n') promulgator = s.xpath('//ul[@class="promulgator-ul cl"]/li') compant_industry = promulgator[0].xpath('./strong//text()')[0] #公司所屬行業 company_type = promulgator[1].xpath('./strong/text()')[0] #公司類型:民營,國企,上市 totall_num = promulgator[2].xpath('./strong/text()')[0] #公司總人數 company_addr = promulgator[4].xpath('./strong/text()')[0].strip() #公司地址 logger.info('get job info success!') old_url.add(h_url) yield { 'job_name':job_name, # 工作名稱 'salary':salary, # 月薪 'company_name':company_name, # 公司名稱 'eduLevel':eduLevel, # 學歷 'workingExp':workingExp, # 工作經驗 'amount':amount, # 招聘總人數 'jobLight':job.get('jobLight'), # 職位亮點 'city_name':city_name, # 城市 'job_address':job_address, # 上班地址 'createDate':job.get('createDate'), # 創建時間 'updateDate':job.get('updateDate'), # 更新時間 'endDate':job.get('endDate'), # 截止日期 'compant_industry':compant_industry, # 公司所屬行業 'company_type':company_type, # 公司類型 'totall_num':totall_num, # 公司總人數 'company_addr':company_addr, # 公司地址 'job_desc':job_desc, # 崗位職責 'job_url':'url', # 職位鏈接 'company_info':company_info, # 公司信息 'company_url':company_url # 公司鏈接 } except Exception as e: logger.error('get job info failed:',url,e)
輸出到CSV

headers = ['職業名', '月薪', '公司名', '學歷', '經驗', '招聘人數', '公司亮點','城市', '上班地址', '創建時間', '更新時間', '截止時間', '行業', '公司類型', '公司總人數', '公司地址', '崗位描述', '職位鏈接', '信息', '公司網址'] def write_csv_headers(csv_filename): with open(csv_filename,'a',newline='',encoding='utf-8-sig') as f: f_csv = csv.DictWriter(f,headers) f_csv.writeheader() def save_csv(csv_filename,data): with open(csv_filename,'a+',newline='',encoding='utf-8-sig') as f: f_csv = csv.DictWriter(f,data.keys()) f_csv.writerow(data)
最后就是主函數了:

def main(): if not os.path.exists(output_path): os.mkdir(output_path) for jobname in job_names: for cityname in city_names: logger.info('*'*10+'start spider '+'jobname:'+jobname+'city:'+cityname+'*'*10) total_page = get_page_nums(cityname,jobname) old_url = load_progress('old_url.txt') csv_filename=output_path+'/{0}_{1}.csv'.format(jobname,cityname) if not os.path.exists(csv_filename): write_csv_headers(csv_filename) for i in range(int(total_page)): urls = get_urls(i*60, cityname, jobname) data = get_job_info(urls, old_url) for d in data: save_csv(csv_filename,d) save_progress(old_url,'old_url.txt') logger.info('*'*10+'jobname:'+jobname+'city:'+cityname+' spider finished!'+'*'*10)
打印爬蟲耗時總時間:
city_names = ['深圳','廣州']
job_names = ['軟件測試','數據分析']
output_path = 'output'
if __name__=='__main__': start_time = datetime.datetime.now() logger.info('*'*20+"start running spider!"+'*'*20) main() end_time = datetime.datetime.now() logger.info('*'*20+"spider finished!Running time:%s"%(start_time-end_time) + '*'*20) print("Running time:%s"%(start_time-end_time))
以上代碼已全部上傳到github中,地址:https://github.com/Python3SpiderOrg/zhilianzhaopin