selenium爬取Twitter數據


注:這只是很多方法中的一種,當然也不是最好的一種,有其他好的方法,希望大家可以在評論區交流學習

1.需要爬取的數據

用戶主頁的Name、ID、Introduction、以及用戶關注的Following的用戶的同樣信息。
image

2.遇到的問題

twitter的用戶的following用戶界面使用的動態加載的方式,並非靜態的HTML界面,用戶的Following用戶的信息根據滾輪滑動動態進行加載,所以使用selenium中findf_elenment()的方法不能對需要爬取的信息進行定位。
在這里插入圖片描述

最后的解決方案:使用selenium的中webdriver模擬滾輪滑動,抓取response中所有帶有Following的數據包,對抓取到的包進行解析,最終得到我們想要的數據。
在這里插入圖片描述

將response中的數據復制下來,就是我們需要的Following的信息數據。Response中的數據如下圖所示:
在這里插入圖片描述

每一個entryId就是一個Following的詳細數據,只要能把response包抓下來,那一切是不是都能解決了,所以我們接下來的問題就是如何在使用selenium的webdriver模擬的瀏覽器的同時將數據包抓下來進行數據解析。
注:該方法的主要缺點是和網速有很大關系,本來selenium的爬取速度就已經令人捉急,現在為了模擬滾輪滑動有很多強制sleep()操作,整個程序的運行效率並不是很高,這也是后面需要改進的地方。

3.嘗試過的工具

requests:無法使用cookie登錄twitter,也就無法進行數據爬取,遂放棄。
scrapy:使用一次cookie登錄之后,后面就沒有辦法再登錄了,遂放棄。
mitmproxy:中間代理,用於抓取response數據包,配置太復雜,遂放棄。
browsermobproxy:針對http網站的數據轉包可行,但是Twitter需要https,需要先設置FQ代理,沒有辦法設置倆個代理,遂放棄。
requestium:是requests和selenium的結合體,特點是使用cookie將driver轉化為requests登錄,可以加快爬蟲速率,本質上還是requests使用cookie登錄twitter,網絡響應一直是443,沒有辦法進行連接,遂放棄。
selenium:使用webdriver模擬登陸twitter,獲取用戶主頁的各種數據,點擊Following界面並且模擬滾輪滾動,selenium的任務完成。
chrome devtool:Chrome的開發者工具,它可以獲取driver運行中網絡日志,我們根據網絡日志獲取resquestId,然后找到該requestId對應的response,最后對response中的數據進行解析,拿到我們需要的數據。

4.詳細過程

獲取登錄的cookie
首先安裝一個插件EditThisCookie,用於獲取登錄twitter之后的Cookie信息。接着登錄你的twitter賬號
在這里插入圖片描述

接着如下圖所示,點擊EditThisCookie插件--->然后點擊按鈕2將cookie進行復制,保存在一個json文件中,為后面模擬登陸做准備。
在這里插入圖片描述

1).模擬登陸

由於twitter的訪問需要FQ,所以最好在爬蟲之前保證自己的電腦有對應的軟件。接着我們介紹如何使用selenium中的webdriver模擬twitter登陸。
在這里插入圖片描述

注解:

  1. 你自己本地保存Cookie文件的位置。
  2. 設置的FQ代理。
  3. 需要打開Chrome用於獲取network的設置,
  4. 和3的作用是一樣的,但是我在Chrome v95的下嘗試后發現,該操作會將selenium的所有操作轉化為dict,無法進行正常的操作。
  5. 先在沒有cookie的情況下get到url,然后加載了cookie之后,再嘗試get,這樣才能保證正常登錄。
    在這里插入圖片描述

很多之前的博客介紹打開Chrome的network工具的時候,要么只介紹4中提到的方法,要么將倆種方法都提及到,這個地方讓我花了好長時間去弄清楚。

2).爬取主頁信息

一般的推特用戶的主頁是"https://twitter/com/"+"uid",這個uid是指用戶主頁名字下面“@”后面的字符,所以只要拿到uid就可以訪問用戶主頁。用戶主頁的格式比較固定,所以可以使用Xpath定位的方法進行數據爬取。
在這里插入圖片描述

  1. Xpath可以在chrome 中點擊需要查看的元素--->單擊右鍵--->檢查--->在源碼區域再次右鍵單擊--->Copy--->Copy Xpath,對於所有定位的元素都可以這樣操作。
  2. webdriver中用於檢測定位元素是否加載出來的方法,TIMEOUT是一個自己設定的值,不同於sleep(TIMEOUT)操作,WebDriverWait()會不斷檢測元素時候加載出來,如果元素已經加載出來,便直接執行,不再等待,可以提高速率。

3).爬取Following數據

爬取Following數據是整個工作中花費時間最多的地方,最開始直接使用Xpath定位方法,按照Following數量從1開始進行定位的方式,后面在實際試驗中發現,由於Following界面是動態加載的,導致后面某一個Following的Xpath和前面的並不是按照序號連着的,而且也沒有辦法知道表現出什么規律,最后只能放棄。
接着想到,可以抓取Response的包,把需要的Following的數據直接抓下來離線解析,會省去很多操作。在選擇什么方法進行抓包,也花了很多時間探索。首先使用的想到的mitmproxy,但是后來看了一下需要配置的東西有點多,學的東西也有點多,不想搞了就放棄了。接着是browsermobproxy,這個是比較簡單適用的,但是由於twitter是需要FQ的,但是browsermobproxy只能設置一個代理,后面也就放棄了。后面看博客,了解到可以使用Chrome開發工具得到network的log日志文件,得到request-response對應的requestId,然后根據requestId得到對應的response的包,直接解析我們需要的數據,完美!
由於Chrome的日志訪問一次之后,就會將起那么的清零,所有我們首先使用selenium模擬滾輪滑動到界面最下面,然后再一次性把所有的數據都抓取到。
a.模擬頁面滾動
在這里插入圖片描述

注:由於頁面加載需要一些時間,中間需要強制使用sleep(),這是程序中花費時間比較多的地方,也是我目前沒有解決的地方。

b.獲取所有的Following對應的response
在這里插入圖片描述
c.解析response包
在這里插入圖片描述

4).迭代爬取

我們按照廣度搜索的方式,一層一層的爬取數據,可以設定總共的爬取數據的數量,循環進行。
然后對數據進行保存。
在這里插入圖片描述

使用try{}except{}結構,如果程序遇到不可控的錯誤,可以想把之前已經爬到的數據保存下面,然后最后的user開始重新進行爬取數據。

5.完整代碼

import json
import pandas as pd
import os
from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

TIMEOUT = 40   ### 測試的時候,各個地方的timeout可以設置的小一點,在實際程序運行的時候需要設置的更大一點
ALLPARENTSNUMBER = 10000
USERINFOLIST = []
TRUSTLIST = []
baseUrl = 'https://www.twitter.com'

def init(url):
    cookieFile = './cookies/twitterCookie.json'
    with open(cookieFile, 'r') as f:
        cookie = json.load(f)
    chromeOptions = webdriver.ChromeOptions()

    chromeOptions.add_argument("--proxy-server=http://127.0.0.1:7890")
    chromeOptions.add_argument('headless')

    capabilities = DesiredCapabilities.CHROME
    capabilities['goog:loggingPrefs'] = {"performance": "ALL"}  # newer: goog:loggingPrefs
    # 還有一種錯誤的做法
    # chrome_options.add_experimental_option('w3c', False)

    driver = webdriver.Chrome(options=chromeOptions, desired_capabilities=capabilities)
    while 1:
        try:
            driver.get(url)
            break
        except:
            sleep(TIMEOUT // 10)
    for item in cookie:
        driver.add_cookie(item)
    driver.get(url)
    print('Initialize Success!')
    return driver

def getUserInfo(driver,userPageUrl):
    driver.get(userPageUrl)
    nameXpath = '//*[@id="react-root"]/div/div/div[2]/main/div/div/div/div[1]/div/div[2]/div/div/div[1]/div/div[2]/div/div/div[1]/div/span[1]/span'
    uidXpath = '//*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[2]/div/div/div[1]/div/div[2]/div/div/div[2]/div/span'
    introXpath = '//*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[2]/div/div/div[1]/div/div[3]/div/div[1]/span'
    followingNumXpath = '//*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[2]/div/div/div[1]/div/div[5]/div[1]/a/span[1]/span'

    name = WebDriverWait(driver, TIMEOUT).until(EC.presence_of_element_located((By.XPATH, nameXpath))).text
    uid = driver.find_element(By.XPATH,uidXpath).text
    try: intro = driver.find_element(By.XPATH,introXpath).text
    except: intro = ''
    try: followingNum = int((driver.find_element(By.XPATH,followingNumXpath).text).replace(',','').replace('.','').replace('K','000'))
    except: followingNum = 1000
    return [name,uid,intro,followingNum]


def getFollowingInfo(driver,userInfo):
    userFollowingPageUrl = baseUrl+'/'+userInfo[1]+'/following'
    driver.get(userFollowingPageUrl)
    sleep(TIMEOUT//10)
    scrollUntilLoaded(driver)
    targetUserName = userInfo[0]
    sleep(TIMEOUT//10)
    trueFollowerNum = getFollowingResponse(targetUserName,driver)
    return trueFollowerNum


def getFollowingResponse(targetUserName,driver):
    tmpTrueFollowerNum = 0
    for row in driver.get_log('performance'):
        log_data = row
        log_json = json.loads(log_data['message'])
        log = log_json['message']

        if log['method'] == 'Network.responseReceived' and 'Following' in log['params']['response']['url']:
            requestId = log['params']['requestId']
            try:
                responseBody = driver.execute_cdp_cmd("Network.getResponseBody", {"requestId": requestId})['body']
                oneResponseNum = decodeFollowingReponse(targetUserName,responseBody)
                tmpTrueFollowerNum += oneResponseNum
            except:
                pass
    print('\nfollowingNumbers:\t',tmpTrueFollowerNum)
    return tmpTrueFollowerNum

def decodeFollowingReponse(targetUserName,responseBody):
    responseBody = json.loads(responseBody)
    allInstructions = responseBody['data']['user']['result']['timeline']['timeline']['instructions']
    for instruction in  allInstructions:
        if instruction['type']=='TimelineAddEntries':
            allEntries = instruction['entries']
            break
    verifiedEntries = 0
    for ids in range(len(allEntries)-2):
        result = allEntries[ids]['content']['itemContent']['user_results']['result']
        if result.get('legacy'):
            userContent = result['legacy']
            name = userContent['name']
            intro = userContent['description']
            uid = userContent['screen_name']
            isVerified = userContent['verified']
            if isVerified:
                verifiedEntries +=1
                TRUSTLIST.append([targetUserName,name])
                USERINFOLIST.append([name,uid,intro,0])
    return verifiedEntries


def scrollUntilLoaded(driver):
    last_height = driver.execute_script("return document.body.scrollHeight")

    while True:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        sleep(TIMEOUT//6)
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height:
            break
        last_height = new_height


def save():
    userInfoFrame = pd.DataFrame(USERINFOLIST,columns=['name','uid','intro','followingNum'])
    trustFrame = pd.DataFrame(TRUSTLIST,columns=['followee','follower'])

    fileDir = "Twitter_"+str(ALLPARENTSNUMBER)+'/'
    if not os.path.exists(fileDir):
        os.makedirs(fileDir)
    userInfoFrame.to_csv(fileDir+"userInfo.csv",sep='\t')
    trustFrame.to_csv(fileDir+"trusts.csv",sep='\t')

if __name__ == '__main__':
    driver = init(baseUrl)
    startUserUrl = baseUrl+'/POTUS45'
    # 對第一個用戶的處理
    startUserInfo = getUserInfo(driver, startUserUrl)
    USERINFOLIST.append(startUserInfo)
    startUserFollowerNum = getFollowingInfo(driver,startUserInfo)
    USERINFOLIST[0][-1] = startUserFollowerNum

    # 后續用戶重復處理
    levelCount = 0
    parentsUserInfo = USERINFOLIST[0]
    parentsUserNum, allUserNum = 1,1
    try:
        while True:
            nextUserInfo = USERINFOLIST[parentsUserNum]
            trueFollowerNum = getFollowingInfo(driver,nextUserInfo)
            USERINFOLIST[allUserNum][-1] = trueFollowerNum
            allUserNum += trueFollowerNum
            parentsUserNum += 1
            print('number\t:',parentsUserNum)
            if parentsUserNum==ALLPARENTSNUMBER:
                break
        savedUserInfoLen = len(USERINFOLIST)
        for restUser in range(parentsUserNum+1,savedUserInfoLen):
            userUrl = baseUrl+'/'+USERINFOLIST[restUser][1]
            trueFollowerNum = getUserInfo(driver,userUrl)
            USERINFOLIST[restUser][-1] = trueFollowerNum
            print('number\t:',restUser)
    except:
        save()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM