通過websocket抓取斗魚彈幕和禮物消息


1.斗魚彈幕協議

到斗魚官方開放平台看斗魚通訊協議,網址“https://open.douyu.com/source/api/63”,登錄后可查看

   

所以根據斗魚協議做編碼函數:

 1     def msg_encode(msg):
 2         #消息以 \0 結尾,並以utf-8編碼
 3         msg = msg + '\0'
 4         msg_bytes = msg.encode('utf-8')
 5         #消息長度 + 頭部長度8
 6         length_bytes = int.to_bytes(len(msg) + 8, 4, byteorder='little')
 7         #斗魚客戶端發送消息類型 689
 8         type = 689
 9         type_bytes = int.to_bytes(type, 2, byteorder='little')
10         # 加密字段與保留字段,默認 0 長度各 1
11         end_bytes = int.to_bytes(0, 1, byteorder='little')
12         #按順序相加  消息長度 + 消息長度 + 消息類型 + 加密字段 + 保留字段
13         head_bytes = length_bytes + length_bytes + type_bytes + end_bytes + end_bytes
14         #消息頭部拼接消息內容
15         data = head_bytes + msg_bytes
16         return data

然后根據斗魚協議做解碼函數:

 1     def msg_decode(msg_bytes):
 2         #定義一個游標位置
 3         cursor = 0
 4         msg = []
 5         while cursor < len(msg_bytes):
 6             #根據斗魚協議,報文 前四位與第二個四位,都是消息長度,取前四位,轉化成整型
 7             content_length = int.from_bytes(msg_bytes[cursor: (cursor + 4) - 1], byteorder='little')
 8             #報文長度不包含前4位,從第5位開始截取消息長度的字節流,並扣除前8位的協議頭,取出正文,用utf-8編碼成字符串
 9             content = msg_bytes[(cursor + 4) + 8:(cursor + 4) + content_length - 1].decode(encoding='utf-8',
10                                                                                            errors='ignore')
11             msg.append(content)
12             cursor = (cursor + 4) + content_length
13         # print(msg)
14         return msg

解碼后的消息需要反序列化:

 1     def msg_format(msg_str):
 2         try:
 3             msg_dict = {}
 4             msg_list = msg_str.split('/')[0:-1]
 5             for msg in msg_list:
 6                 msg = msg.replace('@s', '/').replace('@A', '@')
 7                 msg_tmp = msg.split('@=')
 8                 msg_dict[msg_tmp[0]] = msg_tmp[1]
 9             return msg_dict
10         except Exception as e:
11             print(str(e))

2.抓包分析

建議通過chrome開發者工具抓包,或者使用fiddler抓包,網頁的東西,直接用chrome更方便

為什么不用斗魚開放平台提供的api直接用呢,因為api是面向注冊的開發者的,api都需要申請使用,普通用戶只能抓包分析通用接口

分析:

  wsproxy.douyu.com中交互消息可以獲得多個彈幕服務器地址,我們直接取一個就行,不需要在程序中實現實時分析

  然后分析danmuproxy.douyu.com連接中詳細交互信息,實現與服務器交互

  匿名登錄時彈幕中禮物消息可能默認被屏蔽,需在登錄時提交開啟指令,或登錄后提交開啟指令

  彈幕消息類型與指令非常豐富,這里舉例簡單的消息和指令,詳細可以繼續分析

結果如下:

  登錄:type@=loginreq/roomid@=74751/dfl@=/username@=visitor8243989/uid@=1132317461/ver@=20190610/aver@=218101901/ct@=0/

  ##匿名分配的username與uid可以從wsproxy.douyu.com中交互消息獲取,也可直接自定義提交就行

  退出登錄:type@=logout/

  心跳消息:type@=mrkl/

  加入組消息:type@=joingroup/rid@=74751/gid@=-9999/           #gid默認1,此處改成 - 9999 改成海量彈幕模式

  屏蔽禮物消息:type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=1@AS@Ssn@AA=106@ASss@AA=1@AS@Ssn@AA=107@ASss@AA=1@AS@Ssn@AA=108@ASss@AA=1@AS@Ssn@AA=110@ASss@AA=1@AS@Ssn@AA=901@ASss@AA=1@AS@S/

  開啟禮物消息:type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=0@AS@Ssn@AA=106@ASss@AA=0@AS@Ssn@AA=107@ASss@AA=0@AS@Ssn@AA=108@ASss@AA=0@AS@Ssn@AA=110@ASss@AA=0@AS@Ssn@AA=901@ASss@AA=0@AS@S/

  ##開啟與屏蔽禮物可以詳細控制,可以逐項嘗試分析

              

 

  注:登錄消息中有個字段/dfl@=,此處直接加入屏蔽禮物或開啟禮物指令中相應dfl@=后面的內容,即可在登錄時控制禮物屏蔽或開啟

 

3.禮物消息處理

獲取斗魚禮物類型,需抓包分析來源網址

  url1 = 'https://webconf.douyucdn.cn/resource/common/gift/flash/gift_effect.json'

  url2 = 'https://webconf.douyucdn.cn/resource/common/prop_gift_list/prop_gift_config.json'

將獲取到的禮物json處理一下,合並且去除不需要信息,做成字典:

 1     def get_gift_dict():
 2         gift_json = {}
 3         gift_json1 = requests.get('https://webconf.douyucdn.cn/resource/common/gift/flash/gift_effect.json').text
 4         gift_json2 = requests.get('https://webconf.douyucdn.cn/resource/common/prop_gift_list/prop_gift_config.json').text
 5         gift_json1=gift_json1.replace('DYConfigCallback(','')[0:-2]
 6         gift_json2=gift_json2.replace('DYConfigCallback(','')[0:-2]
 7         gift_json1 = json.loads(gift_json1)['data']['flashConfig']
 8         gift_json2= json.loads(gift_json2)['data']
 9         for gift in gift_json1:
10             gift_json[gift] = gift_json1[gift]['name']
11         for gift in gift_json2:
12             gift_json[gift] = gift_json2[gift]['name']
13         return gift_json

4.websocket連接

python環境先安裝websocket包    pip3 install websocket-client

引入websocket包后,使用websecket.WebSocketApp建立websecket連接客戶端,創建時在 on_open、on_error、on_message、on_close 4個參數中傳入相應事件發生時需要處理的方法

顧名思義,4個參數很好理解了

websocket.WebSocketApp(url, on_open=on_open, on_error=on_error,on_message=on_message, on_close=on_close)

5.效果與源碼

實際效果展示:

源碼(演示代碼,異常判斷,數據校驗就不做了):

  1 __author__ = 'admin'
  2 import websocket
  3 import threading
  4 import time
  5 import requests
  6 import json
  7 
  8 
  9 class DyDanmu:
 10     def __init__(self, roomid, url):
 11         self.gift_dict = self.get_gift_dict()
 12         self.gift_dict_keys = self.gift_dict.keys()
 13         self.room_id = roomid
 14         self.client = websocket.WebSocketApp(url, on_open=self.on_open, on_error=self.on_error,
 15                                              on_message=self.on_message, on_close=self.on_close)
 16         self.heartbeat_thread = threading.Thread(target=self.heartbeat)
 17 
 18     def start(self):
 19         self.client.run_forever()
 20 
 21     def stop(self):
 22         self.logout()
 23         self.client.close()
 24 
 25     def on_open(self):
 26         self.login()
 27         self.join_group()
 28         self.heartbeat_thread.setDaemon(True)
 29         self.heartbeat_thread.start()
 30 
 31 
 32     def on_error(self, error):
 33         print(error)
 34 
 35     def on_close(self):
 36         print('close')
 37 
 38     def send_msg(self, msg):
 39         msg_bytes = self.msg_encode(msg)
 40         self.client.send(msg_bytes)
 41 
 42     def on_message(self, msg):
 43         message = self.msg_decode(msg)
 44         # print(message)
 45         for msg_str in message:
 46             msg_dict = self.msg_format(msg_str)
 47             if msg_dict['type'] == 'chatmsg':
 48                 print(msg_dict['nn'] + ':' + msg_dict['txt'])
 49             if msg_dict['type'] == 'dgb':
 50                 if msg_dict['gfid'] in self.gift_dict_keys:
 51                     print(msg_dict['nn'] + '\t送出\t' + msg_dict['gfcnt'] + '\t個\t' + self.gift_dict[msg_dict['gfid']])
 52                 else:
 53                     print(msg_dict['nn'] + '\t送出\t' + msg_dict['gfcnt'] + '\t個\t' + msg_dict['gfid'] + '\t未知禮物')
 54                     # print(msg_dict)
 55 
 56     # 發送登錄信息
 57     def login(self):
 58         login_msg = 'type@=loginreq/roomid@=%s/' \
 59                     'dfl@=sn@AA=105@ASss@AA=0@AS@Ssn@AA=106@ASss@AA=0@AS@Ssn@AA=107@ASss@AA=0@AS@Ssn@AA=108@ASss@AA=0@AS@Ssn@AA=110@ASss@AA=0@AS@Ssn@AA=901@ASss@AA=0/' \
 60                     'username@=%s/uid@=%s/ltkid@=/biz@=/stk@=/devid@=8d8c22ce6093e6a7264f99da00021501/ct@=0/pt@=2/cvr@=0/tvr@=7/apd@=/rt@=1605498503/vk@=0afb8a90c2cb545e8459d60c760dc08b/' \
 61                     'ver@=20190610/aver@=218101901/dmbt@=chrome/dmbv@=78/' % (
 62                         self.room_id, 'visitor4444086', '1178849206'
 63                     )
 64         self.send_msg(login_msg)
 65 
 66     def logout(self):
 67         logout_msg = 'type@=logout/'
 68         self.send_msg(logout_msg)
 69 
 70     # 發送入組消息
 71     def join_group(self):
 72         join_group_msg = 'type@=joingroup/rid@=%s/gid@=-9999/' % (self.room_id)
 73         self.send_msg(join_group_msg)
 74 
 75     # 關閉禮物信息推送
 76     def close_gift(self):
 77         close_gift_msg = 'type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=1@AS@Ssn@AA=106@ASss@AA=1@AS@Ssn@AA=107@ASss@AA=1@AS@Ssn@AA=108@ASss@AA=1@AS@Ssn@AA=110@ASss@AA=1@AS@Ssn@AA=901@ASss@AA=1@AS@S/'
 78         self.send_msg(close_gift_msg)
 79 
 80     # 保持心跳線程
 81     def heartbeat(self):
 82         while True:
 83             # 45秒發送一個心跳包
 84             self.send_msg('type@=mrkl/')
 85             print('發送心跳')
 86             time.sleep(45)
 87 
 88 
 89     def msg_encode(self, msg):
 90         # 消息以 \0 結尾,並以utf-8編碼
 91         msg = msg + '\0'
 92         msg_bytes = msg.encode('utf-8')
 93         #消息長度 + 頭部長度8
 94         length_bytes = int.to_bytes(len(msg) + 8, 4, byteorder='little')
 95         #斗魚客戶端發送消息類型 689
 96         type = 689
 97         type_bytes = int.to_bytes(type, 2, byteorder='little')
 98         # 加密字段與保留字段,默認 0 長度各 1
 99         end_bytes = int.to_bytes(0, 1, byteorder='little')
100         #按順序相加  消息長度 + 消息長度 + 消息類型 + 加密字段 + 保留字段
101         head_bytes = length_bytes + length_bytes + type_bytes + end_bytes + end_bytes
102         #消息頭部拼接消息內容
103         data = head_bytes + msg_bytes
104         return data
105 
106     def msg_decode(self, msg_bytes):
107         # 定義一個游標位置
108         cursor = 0
109         msg = []
110         while cursor < len(msg_bytes):
111             #根據斗魚協議,報文 前四位與第二個四位,都是消息長度,取前四位,轉化成整型
112             content_length = int.from_bytes(msg_bytes[cursor: (cursor + 4) - 1], byteorder='little')
113             #報文長度不包含前4位,從第5位開始截取消息長度的字節流,並扣除前8位的協議頭,取出正文,用utf-8編碼成字符串
114             content = msg_bytes[(cursor + 4) + 8:(cursor + 4) + content_length - 1].decode(encoding='utf-8',
115                                                                                            errors='ignore')
116             msg.append(content)
117             cursor = (cursor + 4) + content_length
118         # print(msg)
119         return msg
120 
121     def msg_format(self, msg_str):
122         try:
123             msg_dict = {}
124             msg_list = msg_str.split('/')[0:-1]
125             for msg in msg_list:
126                 msg = msg.replace('@s', '/').replace('@A', '@')
127                 msg_tmp = msg.split('@=')
128                 msg_dict[msg_tmp[0]] = msg_tmp[1]
129             return msg_dict
130         except Exception as e:
131             print(str(e))
132 
133     def get_gift_dict(self):
134         gift_json = {}
135         gift_json1 = requests.get('https://webconf.douyucdn.cn/resource/common/gift/flash/gift_effect.json').text
136         gift_json2 = requests.get(
137             'https://webconf.douyucdn.cn/resource/common/prop_gift_list/prop_gift_config.json').text
138         gift_json1 = gift_json1.replace('DYConfigCallback(', '')[0:-2]
139         gift_json2 = gift_json2.replace('DYConfigCallback(', '')[0:-2]
140         gift_json1 = json.loads(gift_json1)['data']['flashConfig']
141         gift_json2 = json.loads(gift_json2)['data']
142         for gift in gift_json1:
143             gift_json[gift] = gift_json1[gift]['name']
144         for gift in gift_json2:
145             gift_json[gift] = gift_json2[gift]['name']
146         return gift_json
147 
148 
149 if __name__ == '__main__':
150     roomid = '74751'
151     url = 'wss://danmuproxy.douyu.com:8506/'
152     dy = DyDanmu(roomid, url)
153     dy.start()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM