好久也沒寫過博客了,距離上一寫的博文到現在也過去了四年。這段時間Urumqi yq突然爆發,單位暫時也不讓回。一個人宅着沒事就刷刷抖音看看短視頻,作為一位有故事的男人【狗頭】,抖音推薦的視頻還是挺符合個人口味的,於是就萌生了把這些好看的視頻全部保存的想法。之前喜歡用一些免費的公眾號小程序去下載無水印的視頻,可是沒過多久這些小程序不是失效就是需要變相付費下載,最為一名資深白嫖黨【狗頭】豈是能忍的,然后就各種查資料,一頓操作猛如虎后,也沒有什么簡便方法。還是自己寫程序把,可是自從工作以來最多就用用office三件套,編程都荒廢了,用python寫個hello world都會把單詞拼錯【汗】,沒關系我們可以學,大不了從頭再來,個人自學、在B站上學、面向github編程學。以下省略千余字學習過程。
PS:本人也是編程新手寫作此文純當個人學習記錄,代碼只是滿足個人需求,以下內容若有不嚴謹之處希望各位看官老爺們不吝嗇賜教。
好了不廢話了開始進入正題
- 獲得抖音首頁推薦視頻信息然后下載視頻(無水印)
- 獲取某抖主發布的全部視頻信息然后下載視頻(無水印)
- 獲取自己或他人喜歡視頻的信息然后下載視頻(無水印)
- windows環境下搭建好python編程環境,本人使用python3.8.3版本
- 好用的編輯器如pycharm、vscode等
- 安卓模擬器、或者實體機也行
- 抖音app本人使用抖音極速版(v10.8.0)
- https抓包工具charles(可以不用)、 mitmproxy [詳情]
- 假設你已經在pc端和模擬器上配置好charles和mitmproxy證書,確保已經可以正常
抓取http及https數據包 - 一顆不厭其煩的心
mitmproxy is a free and open source interactive HTTPS proxy.
安裝:pip install mitmproxy
安裝后有3個命令行工具:mitmproxy, mitmdump, mitmweb
本文只使用mitmdump查看流量數據(請求與響應)、執行自定義腳本
在配置好mitmproxy之后,在控制台上輸入mitmdump並在模擬器上打開抖音app,mitmdump會出現手機上的所有請求。經過一番分析實踐,查找資料發現有如下三個api接口,分別請求用戶發布的視頻信息、
請求首頁推薦視頻信息,請求用戶喜歡的視頻的信息。
post_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/post/......' feed_api = 'https://aweme-lq.snssdk.com/aweme/v2/feed/......' favo_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/favorite/......'
然后這三個請求對應的response里面就是我們需要的內容,通過分析response我們發現post、favo
返回是json格式的數據包。feed返回的是protobuf的數據包。所以我們只需要在自定義的mitmproxy
腳本中根據app所以請求中的關鍵字獲取相對應的response數據包即可。
通過分析獲取到的數據包、然后問題出現了:json格式的數據包python很方便操作,主要是如何解析protobuf的數據包。
然后又經過一番各種Google,結果發現:protobuf 有一套自己的語法。不了解 Protobuf 協議語法和用法的話也無法反解數據。也就是說在沒有那個抖音自定義的( .proto 文件)情況下,基本上是無法逆序列化解析。經過一番資料查找各種踩坑之后,借助工具,使用google提供的的protoc編譯工具,這個工具提供反解析參數。這樣就能獲取protobuf數據包的大致信息。
protoc –decode_raw < douyin_feed.bin > 1.txt
我們可以對比解析前和解析后的數據對比。
解析前
解析后
諸如此類的url地址。我們大致可以確定這很可能就是視頻的分享地址,通過一番實踐論證后果真如此,
那么,我們直接請求這些分享地址,在瀏覽器中打開按F12打開開發者模式,觀察里面的ajax異步請求
的數據包,復制視頻play_addr,再打開后發現播放的視頻仍然是有水印的。然后又經過一番查找資料,
無果。好吧在網上找了個某第三方抖音分享視頻下載網站,簡單分析了它的接口,照着網站js加密參數 。后來偶然間發現了一篇帖子(不好意思
那部分自己也能正常獲取請求結果,返回視頻無水印下載地址
實在是想不起了。。。)大致意思是把里面鏈接playwm改成play,用手機端UA就能獲得無水印地址了,試了一下果真如此。
經過上面的分析之后,我們開始碼代碼把。
然后問題出現了,由於mitmdump只是加載我們的腳本程序,如果把所有的代碼都堆在腳本程序里
會造成腳本運行緩慢阻塞網絡請求,開始想在腳本里寫個多線程去執行耗時操作,然后就被教育了。
好吧那么把我們需要的信息寫入數據庫中,然后再寫個程序讀取數據庫然后再去下載視頻總沒問題吧。
然后經過一番實踐發現,獲取的視頻鏈接都有時效性,過段時間就會失效。那么就沒有什么辦法一遍
瀏覽視頻,一邊下載視頻嗎?答案是肯定的,后來想到用socket套接字在兩個程序中進行通信。
用我們的腳本程序A獲取視頻信息用套接字發送到視頻下載程序B,然后在程序B中開個線程用於接收
套接字信息,再開幾個線程下載視頻,程序B中不同線程之間用Queue()隊列實現生產消費模式。
ok,主體框架搭好了,我們開始吧。talk is cheap show me the code…
""" =================================================== -*- coding:utf-8 -*- Author :GadyPu E_mail :Gadypy@gmail.com Time :2020/8/ 0004 下午 12:03 FileName :mitmproxy_douyin_get_url_scripts.py ==================================================== """ import mitmproxy.http import json import time import struct from socket import * post_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/post/' feed_api = 'https://aweme-lq.snssdk.com/aweme/v2/feed/' favo_api = 'https://aweme-lq.snssdk.com/aweme/v1/aweme/favorite/' def send_data_to_server(header_dict, type): ''' :param header_dict 獲取到的數據包字典 :param type 原視頻類型,feed,post,favo 與服務端通信發送數據,使用自定義協議 每次調用就創建一個套接字,用完就關閉 ''' tcp_client_socket = None host = '127.0.0.1' port = 9527 address = (host, port) try: tcp_client_socket = socket(AF_INET, SOCK_STREAM) tcp_client_socket.connect(address) if type == 'post' or type == 'favo': json_data = json.dumps(header_dict) json_bytes = json_data.encode('utf-8') tcp_client_socket.send(struct.pack('i', len(json_bytes))) tcp_client_socket.send(json_bytes) #print(header_dict) elif type == 'feed': #先發送協議頭用struct打包,包含要發送的數據大小 data_len = header_dict['size'] byte_arr = header_dict['content'] new_dict = { 'type': 'feed', 'size': data_len } json_data = json.dumps(new_dict) json_bytes = json_data.encode('utf-8') tcp_client_socket.send(struct.pack('i', len(json_bytes))) tcp_client_socket.send(json_bytes) chunk_size = 1024 start = 0 end = 1 * chunk_size #print('new_dict...........................:', new_dict) #發送protubuf數據,每次發送1024個字節 while True: if data_len // chunk_size > 0: read_bytes = byte_arr[start : end] start = end end += chunk_size data_len -= chunk_size tcp_client_socket.send(read_bytes) #print(read_bytes) else: read_bytes = byte_arr[start : ] tcp_client_socket.send(read_bytes) break except: pass if tcp_client_socket: tcp_client_socket.close() def get_local_time(create_time): ''' :param create_time 原視頻的發布時間,linux時間戳 :return: 返回年月日格式的日期 ''' time_local = time.localtime(int(create_time)) pub_date = time.strftime("%Y-%m-%d", time_local) return pub_date def response(flow: mitmproxy.http.HTTPFlow): if flow.request.url.startswith(post_api) or flow.request.url.startswith(favo_api): if flow.response.status_code == 200: url_json = json.loads(flow.response.text) if url_json and url_json['aweme_list']: for aweme_list in url_json['aweme_list']: aweme_id = aweme_list['aweme_id'] create_time = aweme_list['create_time'] create_time = get_local_time(create_time) type = 'post' if flow.request.url.startswith(post_api) else 'favo' header_dict = { 'type': type, 'aweme_id_create_time': aweme_id + '_' + create_time, 'nickname': aweme_list['author']['nickname'], 'play_url': aweme_list['video']['play_addr']['url_list'][0] } send_data_to_server(header_dict, type) elif flow.request.url.startswith(feed_api): if flow.response.status_code == 200: procbuf = flow.response.content feed_dict = { 'type': "feed", 'content': procbuf, 'size': len(procbuf) } #print('procbuf len................', len(procbuf)) send_data_to_server(feed_dict, 'feed') addons = { response() }
4.2 輔助程序
""" =================================================== -*- coding:utf-8 -*- Author :GadyPu E_mail :Gadypy@gmail.com Time :2020/8/ 0004 下午 FileName :parase_data.py ==================================================== """ import os import re import json import time import requests import random import hashlib from lxml import etree import math from decimal import Decimal import warnings warnings.filterwarnings('ignore') ''' class Get_real_play_addr(object): def __init__(self): self.request_url = 'http://3g.gljlw.com/diy/ttxs_dy2.php?' self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36' } def parase_play_addr(self, url): paly_url = '' r = str(random.random())[2:] s = hashlib.md5((url + '@&^' + r).encode()).hexdigest() params = { 'url': url, 'r': r, 's': s } try: response = requests.get(url = self.request_url, headers = self.headers, params = params) if response.status_code == 200: content = response.content.decode('utf-8') html = etree.HTML(content) paly_url = html.xpath('//source/@src')[0] if paly_url: return paly_url except: print("network error cannot parase play_addr...") return None ''' # 打開protobuf文件,用正則表達式匹配出所有的分享鏈接地址 class Get_url_from_protobuf(object): def __init__(self): self.pat = r'(?<=\")https://www.iesdouyin.com/share/video/.*(?=\")' self.command = r' --decode_raw <' def get_url(self, exe_path, file_path): try: fp = os.popen(exe_path + self.command + file_path) if fp: src = fp.read() fp.close() url_list = re.findall(self.pat, src) url_list = set(url_list) return url_list except: print('decode protobuf failed...') return None def get_local_time(create_time): time_local = time.localtime(int(create_time)) pub_date = time.strftime("%Y-%m-%d", time_local) return pub_date # 獲取分享視頻的下載地址 def Get_real_play_addr_by_web(aweme_id): headers = { 'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; ZTE BA520 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/55.0.2883.77 Mobile Safari/537.36' } api_url = 'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids=' + aweme_id response = requests.get(url = api_url, headers = headers, verify = False) if response.status_code == 200: response_json = response.json() play_addr = response_json['item_list'][0]['video']['play_addr']['url_list'][0] create_time = response_json['item_list'][0]['create_time'] create_time = get_local_time(create_time) play_addr = play_addr.replace('playwm', 'play', 1) # 返回下載地址和視頻的發布時間 return (play_addr, create_time) return None, None def Get_file_size(e: int) -> str: if e <= 0: return '' t = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"] n = math.floor(math.log2(e) / math.log2(1024)) return str(Decimal(e / math.pow(1024, n)).quantize(Decimal("0.00"))) + t[n] #'https://www.iesdouyin.com/share/video/6854870744690625805/?region=CN&mid=6854870758414781191' #print(Get_real_play_addr_by_web("6854870744690625805"))
4.3 下載程序
""" =================================================== -*- coding:utf-8 -*- Author :GadyPu E_mail :Gadypy@gmail.com Time :2020/8/ 0004 下午 FileName :douyin_video_downloads.py ==================================================== """ import requests import json import os import time import sys import threading import struct from queue import Queue from socket import * from parase_data import Get_url_from_protobuf from parase_data import Get_real_play_addr_by_web from parase_data import Get_file_size import warnings warnings.filterwarnings("ignore") headers = { 'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; ZTE BA520 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/55.0.2883.77 Mobile Safari/537.36' } que = Queue() lock = threading.Lock() chunk_size = 1024 #下載線程 def Download(path, index): while True: global que if que.empty(): print("No.{} thread is waiting for data...".format(index)) data = que.get() dir_name = data['type'] file_name = data['aweme_id_create_time'] dir_path = '' if dir_name == 'feed': play_url, create_time = Get_real_play_addr_by_web(data['aweme_id_create_time']) if (not play_url) or (not create_time): continue file_name = file_name + '_' + create_time dir_path = os.path.join(path, dir_name) else: dir_path = os.path.join(path, dir_name, data['nickname']) play_url = data['play_url'] global lock with lock: if not os.path.exists(dir_path): os.makedirs(dir_path) file_path = os.path.join(dir_path, file_name + '.mp4') read_size = 0 with lock: if os.path.exists(file_path): continue try: response = requests.get(url = play_url, headers = headers, verify = False) if response.status_code == 200: #print(response.headers) total_szie = int(response.headers['Content-Length']) print("NO.{} thread is downloading... {} filesize:{}".format(index, data['aweme_id_create_time'] + '.mp4', Get_file_size(total_szie))) t_1 = time.time() with open(file_path, "wb") as fp: for data in response.iter_content(chunk_size = chunk_size): if data: fp.write(data) read_size += chunk_size #print('No.{} threading is downloading: {} ...: {}%'.format(index, file_path, str(round(read_size / total_szie * 100, 2)))) print("No.{} thread finshed! total cost: {}s".format(index, str(round(time.time() - t_1, 2)))) time.sleep(0.2) else: print("cannot conneted with the servers...") except: print("downloading %s failed... network error please try againg"%play_url) #que.put(data) # 服務端用於接收mitm腳本發送的數據 def run(exe_path, file_path): PORT = 9527 HOST = '' address = (HOST, PORT) tcp_server_socket = socket(AF_INET, SOCK_STREAM) tcp_server_socket.bind(address) print("the server is lunching, listeing the port {}...".format(address[1])) tcp_server_socket.listen(5) while True: try: client_socket, client_address = tcp_server_socket.accept() print('the client{} linked:{}'.format(client_address, time.asctime(time.localtime(time.time())))) data = client_socket.recv(4) header_size = struct.unpack('i', data)[0] header_bytes = client_socket.recv(header_size) header_json = json.loads(header_bytes.decode('utf-8')) if header_json['type'] == 'post' or header_json['type'] == 'favo': que.put(header_json) else: chunk_size = 1024 read_size = 0 total_size = header_json['size'] with open(file_path, 'wb') as fp: while read_size < total_size: data = client_socket.recv(chunk_size) if data: fp.write(data) read_size += len(data) probuf = Get_url_from_protobuf() url_list = probuf.get_url(exe_path, file_path) for url in url_list: try: feed_dict = { 'type': 'feed', 'feed_url': url, 'aweme_id_create_time': url[38: 57] #只是視頻的id,並沒有發布時間 } que.put(feed_dict) except: continue client_socket.close() except: tcp_server_socket.close() print("never run here...") if __name__ == "__main__": dir_path = sys.argv[1] if dir_path.endswith('/'): dir_path += '/' if not os.path.exists(dir_path): os.makedirs(dir_path) #dir_path = r'C:\Users\Administrator\Desktop\pytho_src\douyin\videos' thread_list = [] for i in range(4): if i == 0: thread_list.append(threading.Thread(target = run, args = (r'.\protobuf\protoc.exe', r'.\probuf.bin', ))) else: thread_list.append(threading.Thread(target = Download, args = (dir_path , i + 1, ))) thread_list[i].setDaemon = True [i.start() for i in thread_list] [i.join() for i in thread_list] # #run(r'.\protobuf\protoc.exe', r'.\probuf.bin') # print("finish!")
五、使用方法
5.2 下載程序用法
六、運行效果
七、參考鏈接
1、https://blog.csdn.net/doctor_who2004/article/details/105718889
2、https://www.jianshu.com/p/af381ef134e2
3、https://blog.csdn.net/mp624183768/article/details/80956368
八、下一步計划
差不多就這么多,之前還打算用Appiun實現自動滑動,可又要下一大堆軟件,自己的小電腦跑個模擬器已經不堪重負想想還是算了吧。受條件所限,自己的手機上沒有進行測試(不然躺床上刷視頻pc端程序掛后台運行,手機端瀏覽的視頻基本上都能下載),只是在模擬器上運行,然后就沒然后了。
大家有啥疑問的歡迎在評論區留言。