【JS 逆向百例】WebSocket 協議爬蟲,智慧樹掃碼登錄案例分析


關注微信公眾號:K哥爬蟲,持續分享爬蟲進階、JS/安卓逆向等技術干貨!

聲明

本文章中所有內容僅供學習交流,抓包內容、敏感網址、數據接口均已做脫敏處理,嚴禁用於商業用途和非法用途,否則由此產生的一切后果均與作者無關,若有侵權,請聯系我立即刪除!

逆向目標

  • 目標:智慧樹掃碼登錄,接口使用了 WebSocket 通信協議
  • 主頁:aHR0cHM6Ly9wYXNzcG9ydC56aGlodWlzaHUuY29tL2xvZ2luI3FyQ29kZUxvZ2lu

WebSocket 簡介

WebSocket 是一種在單個 TCP 連接上進行全雙工通信的協議,WebSocket 使得客戶端和服務器之間的數據交換變得更加簡單。在 WebSocket API 中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,並進行雙向數據傳輸。

WebSocket 協議簡稱為 WS 或者 WSS(WebSocket Secure),其發送請求的 URL 以 ws:// 或者 wss:// 開頭,WSS 是 WS 的加密版本,類似於 HTTP 與 HTTPS。

WebSocket 協議的最大特點就是:服務器可以主動向客戶端推送信息,客戶端也可以主動向服務器發送信息,是真正的雙向平等對話,屬於服務器推送技術的一種。與 HTTP 的對比如下圖所示:

01.png

抓包分析

來到智慧樹的掃碼登錄頁面,抓包選中 WS,用來篩選 WebSocket 請求,如下圖所示:

02.png

其中有一些比較特別的參數,是 HTTP/ HTTPS 請求中沒有的:

  • Upgrade: websocket:表明這是 WebSocket 類型請求;
  • Sec-WebSocket-Version:告訴服務器所使用的 Websocket Draft(協議版本),必須是 13;
  • Sec-WebSocket-Extensions:協議擴展,某類協議可能支持多個擴展,通過它可以實現協議增強;
  • Sec-WebSocket-Key:是 WebSocket 客戶端發送的一個 base64 編碼的密文,是瀏覽器隨機生成的,要求服務端必須返回一個對應加密的 Sec-WebSocket-Accept 應答,否則客戶端會拋出 Error during WebSocket handshake 錯誤,並關閉連接。

我們先掃碼登錄一遍,再選擇 Messages 選項卡,可以看到有一些數據交互,其中綠色的箭頭是客戶端發送給服務器的數據,紅色箭頭是服務器響應返回給客戶端的數據,如下圖所示:

03.png

我們觀察一下整個交互過程,當我們打開二維碼頁面后,也就是二維碼加載出來的同時,WebSocket 連接就建立了,每隔8秒左右,客戶端就主動發送一串字符串,服務端也返回相同的字符串,只不過是字典格式,當我們掃碼成功時,服務端就返回掃碼成功的信息,當我們點擊登陸時,客戶端又會返回掃碼結果,如果成功,就有一個一次性密碼 oncePassword 和一個 uuid,這兩個參數肯定在后續的請求中會用到的。如果長時間不掃碼的話,過段時間就會返回二維碼已失效的信息,每隔8秒發送一次消息,正是為了保持連接以及獲取二維碼狀態消息。

那么到這里就出現了兩個問題:

  1. 在來回交互發送的那串字符串,是怎么得來的?

  2. 在 Python 中應該如何實現 WebSocket 請求?

  3. 如何實現客戶端每隔 8 秒發送一次數據的同時,實時接收服務端的信息?(觀察請求掃碼結果實時返回的,所以不能每隔 8 秒才接收一次)

參數獲取

首先解決第一個問題,客戶端發送的那串字符串是怎么來的,這里尋找加密字符串的方式和 HTTP/HTTPS 請求是一樣的,在本例中,我們可以直接搜索這個字符串,發現是通過一個接口傳過來的,其中 img 就是二維碼圖片的 base64 值,qrToken 就是客戶端發送的那串字符串,如下圖所示:

04.png

這里需要注意的是,並不是所有的 WebSocket 請求都是如此的簡單的,有的客戶端發送的數據是 Binary Message(二進制數據)、或者更復雜的加密參數,直接搜索無法獲取,針對這種情況,我們也有解決方法:

  1. 已知創建 WebSocket 對象的語句為:var Socket = new WebSocket(url, [protocol] );,所以我們可以搜索 new WebSocket 定位到建立請求的位置。

  2. 已知一個 WebSocket 對象有以下相關事件,我們可以搜索對應事件處理程序代碼來定位:

事件 事件處理程序 描述
open Socket.onopen 連接建立時觸發
message Socket.onmessage 客戶端接收服務端數據時觸發
error Socket.onerror 通信發生錯誤時觸發
close Socket.onclose 連接關閉時觸發
  1. 已知一個 WebSocket 對象有以下相關方法,我們可以搜索對應方法來定位:
方法 描述
Socket.send() 使用連接發送數據
Socket.close() 關閉連接

Python 實現 WebSocket 請求

接着前面說,第二個問題,在 Python 中應該如何實現 WebSocket 請求?Python 庫中用於連接 WebSocket 的有很多,比較常用、穩定的有 websocket-client(非異步)、websockets(異步)、aiowebsocket(異步)。在本案例中使用 websocket-client,這里還要注意第三個問題,對於客戶端來說,要每隔 8 秒發送一次數據,對於服務端,我們需要實時接收服務端的信息,可以觀察請求,掃碼的結果是實時返回的,如果我們也每隔 8 秒才接收一次數據的話,有可能會丟失數據,而且也會使得整個程序的響應也不及時,效率變低。

在 websocket-client 官方文檔中給我們提供了一個長連接的 demo,它實現了連續發送三次數據,並實時監聽服務端返回的數據,其中的 websocket.enableTrace(True) 表示是否顯示連接詳細信息:

import websocket
import _thread
import time


def on_message(ws, message):
    print(message)


def on_error(ws, error):
    print(error)


def on_close(ws, close_status_code, close_msg):
    print("### closed ###")


def on_open(ws):
    def run(*args):
        for i in range(3):
            time.sleep(1)
            ws.send("Hello %d" % i)
        time.sleep(1)
        ws.close()
        print("thread terminating...")
    _thread.start_new_thread(run, ())


if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(
        "ws://echo.websocket.org/", on_open=on_open,
        on_message=on_message, on_error=on_error, on_close=on_close
    )

    ws.run_forever()

我們將其適當改造一下,客戶端在 run 方法里,依然是每隔 8 秒發送一次 qr_token,實時接收服務端的消息,當“掃碼成功”字樣出現在消息里時,將得到的 oncePassworduuid 存起來,然后關閉連接,邏輯代碼如下所示,后續只要將二維碼的獲取邏輯接入就行了。(已脫敏處理,不能直接運行)

import json
import time
import _thread
import websocket


web_socket_url = "wss://appcomm-user.脫敏處理.com/app-commserv-user/websocket?qrToken=%s"
qr_token = "ca6e6cfb70de4f2f915b968aefcad404"
once_password = ""
uuid = ""


def wss_on_message(ws, message):
    print("=============== [message] ===============")
    message = json.loads(message)
    print(message)
    if "掃碼成功" in message["msg"]:
        global once_password, uuid
        once_password = message["oncePassword"]
        uuid = message["uuid"]
        ws.close()


def wss_on_error(ws, error):
    print("=============== [error] ===============")
    print(error)
    ws.close()


def wss_on_close(ws, close_status_code, close_msg):
    print("=============== [closed] ===============")
    print(close_status_code)
    print(close_msg)


def wss_on_open(ws):
    def run(*args):
        while True:
            ws.send(qr_token)
            time.sleep(8)
    _thread.start_new_thread(run, (qr_token,))


def wss():
    # websocket.enableTrace(True)  # 是否顯示連接詳細信息
    ws = websocket.WebSocketApp(
        web_socket_url % qr_token, on_open=wss_on_open,
        on_message=wss_on_message, on_error=wss_on_error,
        on_close=wss_on_close
    )
    ws.run_forever()

實現掃碼登錄

最重要的 WebSocket 請求部分已經解決了,掃碼拿到 oncePassworduuid 后,后續的處理步驟就比較簡單了,現在來理一下完整的步驟:

  1. 請求首頁,第一次獲取 cookie,包含:INGRESSCOOKIE、JSESSIONID、SERVERID、acw_tc;
  2. 請求獲取二維碼接口,得到二維碼的 base64 值和 qrToken;
  3. 建立 WebSocket 連接,掃描二維碼,獲取一次性密碼 oncePassword 和 uuid(好像沒什么用);
  4. 請求一個登錄接口,302 重定向,需要攜帶一次性密碼,第二次獲取 cookie,包含:CASLOGC、CASTGC,同時更新 SERVERID;
  5. 請求第 4 步 302 重定向地址,第三次獲取 cookie,包含:SESSION;
  6. 攜帶完整 cookie,請求用戶信息接口,獲取真實用戶名等信息。

實際上 WebSocket 連接結束后,有很多請求,看起來都比較可以,但是經過 K 哥測試,只有兩個重定向比較有用,抓包如下:

05.png

完整代碼

GitHub 關注 K 哥爬蟲,持續分享爬蟲相關代碼!歡迎 star !https://github.com/kgepachong/

以下只演示部分關鍵代碼,不能直接運行! 完整代碼倉庫地址:https://github.com/kgepachong/crawler/

Python 登錄代碼

import time
import json
import base64
import _thread
import requests
import websocket
from PIL import Image


web_socket_url = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
get_login_qr_img_url = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
login_url = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
user_info_url = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"

headers = {
    "Host": "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler",
    "Pragma": "no-cache",
    "Referer": "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36"
}

qr_token = ""
once_password = ""
uuid = ""
cookie = {}


def get_cookies_first():
    response = requests.get(url=login_url, headers=headers)
    global cookie
    cookie = response.cookies.get_dict()


def get_login_qr_img():
    response = requests.get(url=get_login_qr_img_url, headers=headers, cookies=cookie).json()
    qr_img = response["img"]
    global qr_token
    qr_token = response["qrToken"]
    with open('code.png', 'wb') as f:
        f.write(base64.b64decode(qr_img))
    image = Image.open('code.png')
    image.show()
    print("請掃描驗證碼! ")


def wss_on_message(ws, message):
    print("=============== [message] ===============")
    message = json.loads(message)
    print(message)
    if "掃碼成功" in message["msg"]:
        global once_password, uuid
        once_password = message["oncePassword"]
        uuid = message["uuid"]
        ws.close()


def wss_on_error(ws, error):
    print("=============== [error] ===============")
    print(error)
    ws.close()


def wss_on_close(ws, close_status_code, close_msg):
    print("=============== [closed] ===============")
    print(close_status_code)
    print(close_msg)


def wss_on_open(ws):
    def run(*args):
        while True:
            ws.send(qr_token)
            time.sleep(8)
    _thread.start_new_thread(run, (qr_token,))


def wss():
    # websocket.enableTrace(True)  # 是否顯示連接詳細信息
    ws = websocket.WebSocketApp(
        web_socket_url % qr_token, on_open=wss_on_open,
        on_message=wss_on_message, on_error=wss_on_error,
        on_close=wss_on_close
    )
    ws.run_forever()


def get_cookie_second():
    global cookie
    params = {
        "pwd": once_password,
        "service": "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    }
    headers["Host"] = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    headers["Referer"] = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    response = requests.get(url=login_url, params=params, headers=headers, cookies=cookie, allow_redirects=False)
    cookie.update(response.cookies.get_dict())
    location = response.headers.get("Location")
    return location


def get_cookie_third(location):
    global cookie
    headers["Host"] = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    headers["Referer"] = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    response = requests.get(url=location, headers=headers, cookies=cookie, allow_redirects=False)
    cookie.update(response.cookies.get_dict())
    location = response.headers.get("Location")
    return location


def get_login_user_info():
    headers["Host"] = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    headers["Origin"] = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    headers["Referer"] = "脫敏處理,完整代碼關注 GitHub:https://github.com/kgepachong/crawler"
    params = {"time": str(int(time.time() * 1000))}
    response = requests.get(url=user_info_url, headers=headers, cookies=cookie, params=params)
    print(response.text)


def main():
    # 第一次獲取 cookie,包含 INGRESSCOOKIE、JSESSIONID、SERVERID、acw_tc
    get_cookies_first()
    # 獲取二維碼
    get_login_qr_img()
    # websocket 掃碼登錄,返回一次性密碼
    wss()
    # 第二次獲取 cookie,更新 SERVERID、獲取 CASLOGC、CASTGC
    location1 = get_cookie_second()
    # 第三次獲取 cookie,獲取 SESSION
    get_cookie_third(location1)
    # 獲取登錄用戶信息
    get_login_user_info()


if __name__ == '__main__':
    main()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM